【趙強老師】Flink的Watermark機制(基於Flink 1.11.0實現)

来源:https://www.cnblogs.com/collen7788/archive/2020/07/20/13343227.html
-Advertisement-
Play Games

在使用eventTime的時候如何處理亂序數據?我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網路延遲等原因,導致亂序的產生,特別是使用kafka的話,多個分 ...


在使用eventTime的時候如何處理亂序數據?我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網路延遲等原因,導致亂序的產生,特別是使用kafka的話,多個分區的數據無法保證有序。所以在進行window計算的時候,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個特別的機制,就是watermark。Watermark是用於處理亂序事件的,用於衡量Event Time進展的機制。watermark可以翻譯為水位線。

一、Watermark的核心原理

Watermark的核心本質可以理解成一個延遲觸發機制。
在 Flink 的視窗處理過程中,如果確定全部數據到達,就可以對 Window 的所有數據做 視窗計算操作(如彙總、分組等),如果數據沒有全部到達,則繼續等待該視窗中的數據全 部到達才開始處理。這種情況下就需要用到水位線(WaterMarks)機制,它能夠衡量數據處 理進度(表達數據到達的完整性),保證事件數據(全部)到達 Flink 系統,或者在亂序及 延遲到達時,也能夠像預期一樣計算出正確並且連續的結果。當任何 Event 進入到 Flink 系統時,會根據當前最大事件時間產生 Watermarks 時間戳。

那麼 Flink 是怎麼計算 Watermak 的值呢?

Watermark =進入Flink 的最大的事件時間(mxtEventTime)-指定的延遲時間(t)

那麼有 Watermark 的 Window 是怎麼觸發視窗函數的呢?
如果有視窗的停止時間等於或者小於 maxEventTime - t(當時的warkmark),那麼這個視窗被觸發執行。

其核心處理流程如下圖所示。

二、Watermark的三種使用情況

1、本來有序的Stream中的 Watermark

如果數據元素的事件時間是有序的,Watermark 時間戳會隨著數據元素的事件時間按順 序生成,此時水位線的變化和事件時間保持一直(因為既然是有序的時間,就不需要設置延遲了,那麼t就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想狀態下的水位 線。當 Watermark 時間大於 Windows 結束時間就會觸發對 Windows 的數據計算,以此類推, 下一個 Window 也是一樣。這種情況其實是亂序數據的一種特殊情況。

2、亂序事件中的Watermark

現實情況下數據元素往往並不是按照其產生順序接入到 Flink 系統中進行處理,而頻繁 出現亂序或遲到的情況,這種情況就需要使用 Watermarks 來應對。比如下圖,設置延遲時間t為2。

3、並行數據流中的Watermark

在多並行度的情況下,Watermark 會有一個對齊機制,這個對齊機制會取所有 Channel 中最小的 Watermark。

三、設置Watermark的核心代碼

1、首先,正確設置事件處理的時間語義,一般都是採用Event Time。

sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);	

2、其次,指定生成Watermark的機制,包括:延時處理的時間和EventTime對應的欄位。如下:

註意:不管是數據是否有序,都可以使用上面的代碼。有序的數據只是無序數據的一種特殊情況。

四、Watermark編程案例

測試數據:基站的手機通話數據,如下:

需求:按基站,每5秒統計通話時間最長的記錄。

  • StationLog用於封裝基站數據
package watermark;

//station1,18688822219,18684812319,10,1595158485855
public class StationLog {
	private String stationID;   //基站ID
	private String from;		//呼叫放
	private String to;			//被叫方
	private long duration;		//通話的持續時間
	private long callTime;		//通話的呼叫時間
	public StationLog(String stationID, String from, 
			          String to, long duration, 
			          long callTime) {
		this.stationID = stationID;
		this.from = from;
		this.to = to;
		this.duration = duration;
		this.callTime = callTime;
	}
	public String getStationID() {
		return stationID;
	}
	public void setStationID(String stationID) {
		this.stationID = stationID;
	}
	public long getCallTime() {
		return callTime;
	}
	public void setCallTime(long callTime) {
		this.callTime = callTime;
	}
	public String getFrom() {
		return from;
	}
	public void setFrom(String from) {
		this.from = from;
	}

	public String getTo() {
		return to;
	}
	public void setTo(String to) {
		this.to = to;
	}
	public long getDuration() {
		return duration;
	}
	public void setDuration(long duration) {
		this.duration = duration;
	}
}
  • 代碼實現:WaterMarkDemo用於完成計算(註意:為了方便咱們測試設置任務的並行度為1)   
package watermark;

import java.time.Duration;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

//每隔五秒,將過去是10秒內,通話時間最長的通話日誌輸出。
public class WaterMarkDemo {
	public static void main(String[] args) throws Exception {
		//得到Flink流式處理的運行環境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.setParallelism(1);
		//設置周期性的產生水位線的時間間隔。當數據流很大的時候,如果每個事件都產生水位線,會影響性能。
		env.getConfig().setAutoWatermarkInterval(100);//預設100毫秒
		
		//得到輸入流
		DataStreamSource<String> stream = env.socketTextStream("bigdata111", 1234);
		stream.flatMap(new FlatMapFunction<String, StationLog>() {

			public void flatMap(String data, Collector<StationLog> output) throws Exception {
				String[] words = data.split(",");
				//                           基站ID            from    to        通話時長                                                    callTime
				output.collect(new StationLog(words[0], words[1],words[2], Long.parseLong(words[3]), Long.parseLong(words[4])));
			}
		}).filter(new FilterFunction<StationLog>() {
			
			@Override
			public boolean filter(StationLog value) throws Exception {
				return value.getDuration() > 0?true:false;
			}
		}).assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
				.withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
					@Override
					public long extractTimestamp(StationLog element, long recordTimestamp) {
						return element.getCallTime(); //指定EventTime對應的欄位
					}
				})
		).keyBy(new KeySelector<StationLog, String>(){
			@Override
			public String getKey(StationLog value) throws Exception {
				return value.getStationID();  //按照基站分組
			}}
		).timeWindow(Time.seconds(5)) //設置時間視窗
		.reduce(new MyReduceFunction(),new MyProcessWindows()).print();

		env.execute();
	}
}
//用於如何處理視窗中的數據,即:找到視窗內通話時間最長的記錄。
class MyReduceFunction implements ReduceFunction<StationLog> {
	@Override
	public StationLog reduce(StationLog value1, StationLog value2) throws Exception {
		// 找到通話時間最長的通話記錄
		return value1.getDuration() >= value2.getDuration() ? value1 : value2;
	}
}
//視窗處理完成後,輸出的結果是什麼
class MyProcessWindows extends ProcessWindowFunction<StationLog, String, String, TimeWindow> {
	@Override
	public void process(String key, ProcessWindowFunction<StationLog, String, String, TimeWindow>.Context context,
			Iterable<StationLog> elements, Collector<String> out) throws Exception {
		StationLog maxLog = elements.iterator().next();

		StringBuffer sb = new StringBuffer();
		sb.append("視窗範圍是:").append(context.window().getStart()).append("----").append(context.window().getEnd()).append("\n");;
		sb.append("基站ID:").append(maxLog.getStationID()).append("\t")
		  .append("呼叫時間:").append(maxLog.getCallTime()).append("\t")
		  .append("主叫號碼:").append(maxLog.getFrom()).append("\t")
		  .append("被叫號碼:")	.append(maxLog.getTo()).append("\t")
		  .append("通話時長:").append(maxLog.getDuration()).append("\n");
		out.collect(sb.toString());
	}
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • swap交換記憶體主要是指當物理記憶體不夠用時,系統會啟用硬碟的一部分空間來充當伺服器記憶體,而預設情況下swap記憶體會有一些設置標準,它與物理記憶體的大小也是有關係的,具體標準如下: Ram大小 Swap大小 激活Swap後合計大小 256MB 256MB 512MB 512MB 512MB 1GB 1G ...
  • 一 跨域概述 1.1 同源策略 同源策略是一個安全策略。同源,指的是協議,功能變數名稱,埠相同。瀏覽器處於安全方面的考慮,只允許本功能變數名稱下的介面交互,不同源的客戶端腳本,在沒有明確授權的情況下,不能讀寫對方的資源。 同源策略主要是基於如下可能的安全隱患: 用戶訪問www.mybank.com,登錄併進行網銀 ...
  • 本文更新於2020-04-05,使用MySQL 5.7,操作系統為Deepin 15.4。 使用符號連接分佈IO 利用操作系統的符號連接,將不同的資料庫、表、索引指向不同的物理磁碟,從而達到分佈磁碟IO的目的。 禁止操作系統更新文件的atime屬性 對於讀寫頻繁的資料庫文件來說,記錄文件的訪問時間一 ...
  • Linux(Centos 7) 安裝配置 redis 1.下載reids ( 官網:redis.io,中文網:www.redis.cn) 我下載的是5.0.8版本的 第二步:安裝 解壓(到opt目錄) tar -zxvf redis-5.0.8.tar.gz -C /opt 2.檢查環境(安裝red ...
  • 項目里客戶端突然報錯,原因是SQL Server中某個Function返回值有問題,拿來代碼看還是比較簡單的Function,雖然寫法很不好,但是select dbo.fn_xxxfunction(0)返回值是空就有點奇怪。 IF OBJECT_ID('fn_xxxfunction', 'FN') ...
  • 接上一篇《PG-跨庫操作-dblink》;講下postgres_fdw的使用;postgres_fdw工作原理詳細介紹可以去看下《PostgreSQL指南》第4章; 對FDW特性;還支持在PostgreSQL異構資料庫的同步、遷移的場景。FDW隨著Postgres版本而升級、優化,對分散式架構也是支 ...
  • 墨天輪資料庫周刊第33期發佈啦,每周1次推送本周資料庫相關熱門資訊、精選文章、乾貨文檔。 ...
  • 問題故障:Mysql資料庫意外崩潰,一直無法啟動資料庫。報錯日誌: 啟動報錯:service mysqld restartERROR! MySQL server PID file could not be found!Starting MySQL. ERROR! The server quit wi ...
一周排行
    -Advertisement-
    Play Games
  • GoF之工廠模式 @目錄GoF之工廠模式每博一文案1. 簡單說明“23種設計模式”1.2 介紹工廠模式的三種形態1.3 簡單工廠模式(靜態工廠模式)1.3.1 簡單工廠模式的優缺點:1.4 工廠方法模式1.4.1 工廠方法模式的優缺點:1.5 抽象工廠模式1.6 抽象工廠模式的優缺點:2. 總結:3 ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...