聊聊Flink必知必會(五)

来源:https://www.cnblogs.com/zhiyong-ITNote/archive/2023/11/17/17838824.html
-Advertisement-
Play Games

聊聊Flink的必知必會(三) 聊聊Flink必知必會(四) 從源碼中,根據關鍵的代碼,梳理一下Flink中的時間與視窗實現邏輯。 WindowedStream 對數據流執行keyBy()操作後,再調用window()方法,就會返回WindowedStream,表示分區後又加窗的數據流。如果數據流沒 ...


  1. 聊聊Flink的必知必會(三)
  2. 聊聊Flink必知必會(四)

從源碼中,根據關鍵的代碼,梳理一下Flink中的時間與視窗實現邏輯。

WindowedStream

對數據流執行keyBy()操作後,再調用window()方法,就會返回WindowedStream,表示分區後又加窗的數據流。如果數據流沒有經過分區,直接調用window()方法則會返回AllWindowedStream

如下:

// 構造函數
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
    this.input = input;
    this.builder =
    new WindowOperatorBuilder<>(
    windowAssigner,
    windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),
    input.getExecutionConfig(),
    input.getType(),
    input.getKeySelector(),
    input.getKeyType());
}
        
// KeyedStream類型,表示被加窗的輸入流。
private final KeyedStream<T, K> input;

// 用於構建WindowOperator,最終會生成windowAssigner,Evictor,Trigger
private final WindowOperatorBuilder<T, K, W> builder;

在這裡面還涉及到一些視窗的基本計算運算元,比如reduce,aggregate,apply,process,sum等等.

視窗相關模型的實現

Window

Window類是Flink中對視窗的抽象。它是一個抽象類,包含抽象方法maxTimestamp(),用於獲取屬於該視窗的最大時間戳。

TimeWindow類是其子類。包含了視窗的start,end,offset等時間概念欄位,這裡會計算視窗的起始時間:

// 構造函數
public TimeWindow(long start, long end) {
    this.start = start;
    this.end = end;
}

// timestamp:獲取視窗啟動時的第一個時間戳epoch毫秒
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    final long remainder = (timestamp - offset) % windowSize;
    // handle both positive and negative cases
    if (remainder < 0) {
        return timestamp - (remainder + windowSize);
    } else {
        return timestamp - remainder;
    }
}

WindowAssigner

WindowAssigner表示視窗分配器,用來把元素分配到零個或多個視窗(Window對象)中。它是一個抽象類,其中重要的抽象方法為assignWindows()方法,用來給元素分配視窗。

Flink有多種類型的視窗,如Tumbling Window、Sliding Window等。各種類型的視窗又分為基於事件時間或處理時間的視窗。WindowAssigner的實現類就對應著具體類型的視窗。

SlidingEventTimeWindows是WindowAssigner的另一個實現類,表示基於事件時間的Sliding Window。它有3個long類型的欄位size、slide和offset,分別表示視窗的大小、滑動的步長和視窗起始位置的偏移量。它對assignWindows()方法的實現如下:

@Override
public Collection<TimeWindow> assignWindows(
        Object element, long timestamp, WindowAssignerContext context) {
        // Long.MIN_VALUE is currently assigned when no timestamp is present
    if (timestamp > Long.MIN_VALUE) {
        if (staggerOffset == null) {
            staggerOffset =
                    windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
        }
        long start =
                TimeWindow.getWindowStartWithOffset(
                        timestamp, (globalOffset + staggerOffset) % size, size);
        // 返回構建好起止時間的TimeWindow
        return Collections.singletonList(new TimeWindow(start, start + size));
    } else {
        throw new RuntimeException(
                "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
                        + "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
                        + "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

設置視窗觸發器Trigger

@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
    return EventTimeTrigger.create();
}

WindowAssigner與其主要實現類的關係如下:

1.png

這些類的含義分別如下

  • GlobalWindows:將所有元素分配進同一個視窗的全局視窗分配器。
  • SlidingEventTimeWindows:基於事件時間的滑動視窗分配器。
  • SlidingProcessingTimeWindows:基於處理時間的滑動視窗分配器。
  • TumblingEventTimeWindows:基於事件時間的滾動視窗分配器。
  • TumblingProcessingTimeWindows:基於處理時間的滾動視窗分配器。
  • EventTimeSessionWindows:基於事件時間的會話視窗分配器。
  • ProcessingTimeSessionWindows:基於處理時間的會話視窗分配器。

Trigger

Trigger表示視窗觸發器。它是一個抽象類,主要定義了下麵3個方法用於確定視窗何時觸發計算:

// 每個元素到來時觸發
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 處理時間的定時器觸發時
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 事件時間的定時器觸發時調用
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

這3個方法的返回結果為TriggerResult對象。TriggerResult是一個枚舉類,包含兩個boolean類型的欄位fire和purge,分別表示視窗是否觸發計算和視窗內的元素是否需要清空。

CONTINUE(false, false),
FIRE_AND_PURGE(true, true),
FIRE(true, false),
PURGE(false, true);

TriggerResult(boolean fire, boolean purge) {
    this.purge = purge;
    this.fire = fire;
}

視窗觸發器的實現由用戶根據業務需求自定義。Flink預設基於事件時間的觸發器為EventTimeTrigger,其三個方法處理如下

@Override
public TriggerResult onElement(
        Object element, long timestamp, TimeWindow window, TriggerContext ctx)
        throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // 如果水印已經超過視窗,則立即觸發
        return TriggerResult.FIRE;
    } else {
        // 註冊事件時間定時器
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}

/*
 * 處理時間,視窗不觸發計算也不清空內部元素。
 */
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
        throws Exception {
    return TriggerResult.CONTINUE;
}

Trigger與其主要實現類的繼承關係

2.png

這些類的含義如下

  • CountTrigger:元素數達到設置的個數時觸發計算的觸發器。
  • DeltaTrigger:基於DeltaFunction和設置的閾值觸發計算的觸發器。
  • EventTimeTrigger:基於事件時間的觸發器。
  • ProcessingTimeTrigger:基於處理時間的觸發器。
  • PurgingTrigger:可包裝其他觸發器的清空觸發器。
  • ContinuousEventTimeTrigger:基於事件時間並按照一定的時間間隔連續觸發計算的觸發器。
  • ContinuousProcessingTimeTrigger:基於處理時間並按照一定的時間間隔連續觸發計算的觸發器。

windowOperator

WindowedStream的構造函數中,會生成WindowOperatorBuilder,該類可以返回WindowOperator,這兩個類負責視窗分配器、視窗觸發器和視窗剔除器這些組件在運行時的協同工作。

對於WindowOperator,除了視窗分配器和視窗觸發器的相關欄位,可以先瞭解下麵兩個欄位。

// StateDescriptor類型,表示視窗狀態描述符。
private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;

// 表示視窗的狀態,視窗內的元素都在其中維護。
private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;

視窗中的元素並沒有保存在Window對象中,而是維護在windowState中。windowStateDescriptor則是創建windowState所需用到的描述符。

當有元素到來時,會調用WindowOperator的processElement()方法:

public void processElement(StreamRecord<IN> element) throws Exception {
    // 分配視窗
    final Collection<W> elementWindows = windowAssigner.assignWindows(
        element.getValue(), element.getTimestamp(), windowAssignerContext);
            ...
        if (windowAssigner instanceof MergingWindowAssigner) { // Session Window的情況
            ...
        } else {
            for (W window: elementWindows) { // 非Session Window的情況
                ...
                // 將Window對象設置為namespace並添加元素到windowState中
                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());
                triggerContext.key = key;
                triggerContext.window = window;
                // 獲取TriggerResult,確定接下來是否需要觸發計算或清空視窗
                TriggerResult triggerResult = triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    // 觸發計算
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    // 清空視窗
                    windowState.clear();
                }
                ...
            }
        }
    ...
}

在處理時間或事件時間的定時器觸發時,會調用WindowOperator的onProcessingTime()方法或onEventTime()方法,其中的邏輯與onElement()方法的大同小異。

Watermarks

水位線(watermark)是選用事件時間來進行數據處理時特有的概念。它的本質就是時間戳,從上游流向下游,表示系統認為數據中的事件時間在該時間戳之前的數據都已到達。

Flink中,Watermark類表示水位。

/** Creates a new watermark with the given timestamp in milliseconds. */
public Watermark(long timestamp) {
    this.timestamp = timestamp;
}

watermark的生成有兩種方式,這裡不贅述,主要講述下基於配置的策略生成watermark的方式。如下的代碼是比較常見的配置:

// 分配事件時間與水印
.assignTimestampsAndWatermarks(
        // forBoundedOutOfOrderness 會根據事件的時間戳和允許的最大亂序時間生成水印。
        // Duration 設置了最大亂序時間為1秒。這意味著 Flink 將允許在這1秒的時間範圍內的事件不按照事件時間的順序到達,這個時間段內的事件會被認為是"有序的"。
        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1))
        // 設置事件時間分配器,從Event對象中提取時間戳作為事件時間
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

在Flink內部,會根據配置的策略調用BoundedOutOfOrdernessWatermarks生成watermark。該類的代碼如下:

public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 每條數據都會更新最大值
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 發送 watermark 邏輯
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

onEvent決定每次事件都會取得最大的事件時間更新;onPeriodicEmit則是周期性的更新並傳遞到下游。

AbstractStreamOperator

WatermarkGenerator介面的調用是在AbstractStreamOperator抽象類的子類TimestampsAndWatermarksOperator中。其生命周期open函數與每個數據到來的處理函數processElement,如下:

@Override
public void open() throws Exception {
    super.open();

    timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
    watermarkGenerator =
            emitProgressiveWatermarks
                    ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                    : new NoWatermarksGenerator<>();

    wmOutput = new WatermarkEmitter(output);

    watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
    if (watermarkInterval > 0 && emitProgressiveWatermarks) {
        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }
}

@Override
public void processElement(final StreamRecord<T> element) throws Exception {
    final T event = element.getValue();
    final long previousTimestamp =
            element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
    // 從分配器中提取事件時間戳
    final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);

    element.setTimestamp(newTimestamp);
    output.collect(element);
    // 調用水印生成器
    watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
}

從方法的入參可以看出來 flink 運算元間的數據流動是 StreamRecord 對象。它對數據的處理邏輯是什麼都不做直接向下游發送,然後調用 onEvent 記錄最大時間戳,也就是說:flink 是先發送數據再生成 watermark,watermark 永遠在生成它的數據之後。

總結

上面的一系列相關代碼,只是冰山一角,暫時只是把關鍵涉及到的部分捋了一下。最後畫個圖,展示其大致思路。

3.png

參考:

Flink Watermark 源碼解析


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 遇到的問題:將長度為40的數組數據賦值<el-table></el-table>,我發現loading沒有效果,後面發現是頁面卡住了,loading直接沒有出現。 經過查詢資料,發現<el-table>會有卡頓的問題,看到有的博主推薦使用一款叫umy-ui的插件,我就試了試,發現卡頓的問題解決了。 ...
  • 由於我司的業務特性,需要 APP 能夠支持即時在無網路的場景下,也能夠正常使用 APP 的功能 那麼,為了讓一個用 web 前端實現的 APP 能夠在無網路的場景下,也能夠正常運行程式,這其中的離線方案就需要實現幾個關鍵點: 代碼的離線、更新 數據的下載、上傳、更新 本篇就想來講一講,我們在離線應用 ...
  • 微服務架構可以更快地推出新產品,幫助產品更輕鬆地擴展,並更好地響應客戶需求。憑藉多種現代數據模型、在任何情況下的容錯性、用於隔離的多租戶功能以及在多個環境中部署的靈活性,Redis Enterprise 使開發人員和運營商能夠針對微服務架構優化他們的數據層。 ...
  • Java解析上傳的zip文件--包含Excel解析與圖片上傳 前言:今天遇到一個需求:上傳一個zip格式的壓縮文件,該zip中包含人員信息的excel以及excel中每行對應的人的圖片,現在需要將該zip壓縮包中所有內容解析導入到資料庫中,包括圖片,並將圖片與excel內容對應。 代碼演示: /** ...
  • 十一、指針和引用(一) 1、指針 1)思考 ​ 在電腦程式中,有一條鐵律那就是萬物皆內粗,而我們知道,記憶體就是一個個小格,存放著高電平或者低電平,也就是0或者1,我們要表達的一切都是通過這種二進位的方式放到記憶體中,當我們讀取、寫入,其實局勢在對應的記憶體空間執行讀或者寫操作 ​ 我們今天就研究研究, ...
  • 反面單例 代碼 import java.util.ArrayList; import java.util.List; /** * @since : 2023/11/17 **/ public class StupidSingleton { private static final StupidSin ...
  • C++ 指針學習筆記 引入 指針是什麼 指針是一個變數,其值為另一個變數的地址。 指針聲明的一般形式為: type *ptr_name; type 是指針的基類型,ptr_name 是指針的名稱,* 用來指定一個變數是指針 對於一個指針,需要明確四個方面的內容:指針的類型、指針所指向的類型、指針的值 ...
  • Hi i,m JinXiang ⭐ 前言 ⭐ 本篇文章主要介紹單元測試工具Junit使用以及部分理論知識 🍉歡迎點贊 👍 收藏 ⭐留言評論 📝私信必回喲😁 🍉博主收將持續更新學習記錄獲,友友們有任何問題可以在評論區留言 什麼是Junit單元測試? JUnit 是一個 Java 編程語言的單 ...
一周排行
    -Advertisement-
    Play Games
  • 1、預覽地址:http://139.155.137.144:9012 2、qq群:801913255 一、前言 隨著網路的發展,企業對於信息系統數據的保密工作愈發重視,不同身份、角色對於數據的訪問許可權都應該大相徑庭。 列如 1、不同登錄人員對一個數據列表的可見度是不一樣的,如數據列、數據行、數據按鈕 ...
  • 前言 上一篇文章寫瞭如何使用RabbitMQ做個簡單的發送郵件項目,然後評論也是比較多,也是準備去學習一下如何確保RabbitMQ的消息可靠性,但是由於時間原因,先來說說設計模式中的簡單工廠模式吧! 在瞭解簡單工廠模式之前,我們要知道C#是一款面向對象的高級程式語言。它有3大特性,封裝、繼承、多態。 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 介紹 Nodify是一個WPF基於節點的編輯器控制項,其中包含一系列節點、連接和連接器組件,旨在簡化構建基於節點的工具的過程 ...
  • 創建一個webapi項目做測試使用。 創建新控制器,搭建一個基礎框架,包括獲取當天日期、wiki的請求地址等 創建一個Http請求幫助類以及方法,用於獲取指定URL的信息 使用http請求訪問指定url,先運行一下,看看返回的內容。內容如圖右邊所示,實際上是一個Json數據。我們主要解析 大事記 部 ...
  • 最近在不少自媒體上看到有關.NET與C#的資訊與評價,感覺大家對.NET與C#還是不太瞭解,尤其是對2016年6月發佈的跨平臺.NET Core 1.0,更是知之甚少。在考慮一番之後,還是決定寫點東西總結一下,也回顧一下.NET的發展歷史。 首先,你沒看錯,.NET是跨平臺的,可以在Windows、 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 添加節點(nodes) 通過上一篇我們已經創建好了編輯器實例現在我們為編輯器添加一個節點 添加model和viewmode ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...
  • 類型檢查和轉換:當你需要檢查對象是否為特定類型,並且希望在同一時間內將其轉換為那個類型時,模式匹配提供了一種更簡潔的方式來完成這一任務,避免了使用傳統的as和is操作符後還需要進行額外的null檢查。 複雜條件邏輯:在處理複雜的條件邏輯時,特別是涉及到多個條件和類型的情況下,使用模式匹配可以使代碼更 ...
  • 在日常開發中,我們經常需要和文件打交道,特別是桌面開發,有時候就會需要載入大批量的文件,而且可能還會存在部分文件缺失的情況,那麼如何才能快速的判斷文件是否存在呢?如果處理不當的,且文件數量比較多的時候,可能會造成卡頓等情況,進而影響程式的使用體驗。今天就以一個簡單的小例子,簡述兩種不同的判斷文件是否... ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...