Flink1.9整合Kafka

来源:https://www.cnblogs.com/tree1123/archive/2019/09/20/11556114.html
-Advertisement-
Play Games

本文基於Flink1.9版本簡述如何連接Kafka。 流式連接器 我們知道可以自己來開發Source 和 Sink ,但是一些比較基本的 Source 和 Sink 已經內置在 Flink 里。 預定義的source支持從文件、目錄、socket,以及 collections 和 iterators ...


file

本文基於Flink1.9版本簡述如何連接Kafka。

流式連接器

file

我們知道可以自己來開發Source 和 Sink ,但是一些比較基本的 Source 和 Sink 已經內置在 Flink 里。

預定義的source支持從文件、目錄、socket,以及 collections 和 iterators 中讀取數據。

預定義的sink支持把數據寫入文件、標準輸出(stdout)、標準錯誤輸出(stderr)和 socket。

連接器可以和多種多樣的第三方系統進行交互。目前支持以下系統:

  • Apache Kafka
  • Apache Cassandra(sink)
  • Amazon Kinesis Streams(source/sink)
  • Elasticsearch(sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ(source/sink)
  • Apache NiFi(source/sink)
  • Twitter Streaming API(source)

請記住,在使用一種連接器時,通常需要額外的第三方組件,比如:數據存儲伺服器或者消息隊列。

Apache Bahir 中定義了其他一些連接器

  • Apache ActiveMQ(source/sink)
  • Apache Flume(sink)
  • Redis(sink)
  • Akka (sink)
  • Netty (source)

使用connector並不是唯一可以使數據進入或者流出Flink的方式。一種常見的模式是從外部資料庫或者 Web 服務查詢數據得到初始數據流,然後通過 Map 或者 FlatMap 對初始數據流進行豐富和增強,這裡要使用Flink的非同步IO。

而向外部存儲推送大量數據時會導致 I/O 瓶頸問題出現。在這種場景下,如果對數據的讀操作遠少於寫操作,可以讓外部應用從 Flink 拉取所需的數據,需要用到Flink的可查詢狀態介面。

本文重點介紹Apache Kafka Connector

Kafka連接器

此連接器提供對Apache Kafka提供的事件流的訪問。

Flink提供特殊的Kafka連接器,用於從/向Kafka主題讀取和寫入數據。Flink Kafka Consumer集成了Flink的檢查點機制,可提供一次性處理語義。為實現這一目標,Flink並不完全依賴Kafka 的消費者組的偏移量,而是在內部跟蹤和檢查這些偏移。

下表為不同版本的kafka與Flink Kafka Consumer的對應關係。

Maven Dependency Supported since Consumer and Producer Class name Kafka version
flink-connector-kafka-0.8_2.11 1.0.0 FlinkKafkaConsumer08 FlinkKafkaProducer08 0.8.x
flink-connector-kafka-0.9_2.11 1.0.0 FlinkKafkaConsumer09 FlinkKafkaProducer09 0.9.x
flink-connector-kafka-0.10_2.11 1.2.0 FlinkKafkaConsumer010 FlinkKafkaProducer010 0.10.x
flink-connector-kafka-0.11_2.11 1.4.0 FlinkKafkaConsumer011 FlinkKafkaProducer011 0.11.x
flink-connector-kafka_2.11 1.7.0 FlinkKafkaConsumer FlinkKafkaProducer >= 1.0.0

而從最新的Flink1.9.0版本開始,使用Kafka 2.2.0客戶端。

下麵簡述使用步驟。

導入maven依賴:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.9.0</version>
</dependency>

安裝Kafka:

可以參照 Kafka入門寶典(詳細截圖版)

相容性:

從Flink 1.7開始,它不跟蹤特定的Kafka主要版本。相反,它在Flink發佈時跟蹤最新版本的Kafka。如果您的Kafka代理版本是1.0.0或更高版本,則應使用此Kafka連接器。如果使用舊版本的Kafka(0.11,0.10,0.9或0.8),則應使用與代理版本對應的連接器。

升級Connect要註意Flink升級作業,同時

  • 在整個過程中使用Flink 1.9或更新版本。
  • 不要同時升級Flink和運營商。

  • 確保您作業中使用的Kafka Consumer和/或Kafka Producer分配了唯一標識符(uid)。

  • 使用stop with savepoint功能獲取保存點(例如,使用stop --withSavepoint)。

用法:

引入依賴後,實例化新的source(FlinkKafkaConsumer)和sink(FlinkKafkaProducer)。

Kafka Consumer

先分步驟介紹構建過程,文末附Flink1.9連接Kafka完整代碼。

Kafka consumer 根據版本分別叫做FlinkKafkaConsumer08 FlinkKafkaConsumer09等等
Kafka >= 1.0.0 的版本就叫FlinkKafkaConsumer。

構建FlinkKafkaConsumer

java示例代碼如下:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

scala:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
    .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
    .print()

必須有的:

1.topic名稱

2.用於反序列化Kafka數據的DeserializationSchema / KafkaDeserializationSchema

3.配置參數:“bootstrap.servers” “group.id” (kafka0.8還需要 “zookeeper.connect”)

配置消費起始位置

java:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour

//指定位置
//Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

DataStream<String> stream = env.addSource(myConsumer);

scala:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

//指定位置
//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)

val stream = env.addSource(myConsumer)
檢查點

啟用Flink的檢查點後,Flink Kafka Consumer將使用主題中的記錄,並以一致的方式定期檢查其所有Kafka偏移以及其他操作的狀態。如果作業失敗,Flink會將流式程式恢復到最新檢查點的狀態,並從存儲在檢查點中的偏移量開始重新使用Kafka的記錄。

如果禁用了檢查點,則Flink Kafka Consumer依賴於內部使用的Kafka客戶端的自動定期偏移提交功能。

如果啟用了檢查點,則Flink Kafka Consumer將在檢查點完成時提交存儲在檢查點狀態中的偏移量。

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
分區發現

Flink Kafka Consumer支持發現動態創建的Kafka分區,並使用一次性保證消費它們。

還可以使用正則:

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);

DataStream<String> stream = env.addSource(myConsumer);
...

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String](
  java.util.regex.Pattern.compile("test-topic-[0-9]"),
  new SimpleStringSchema,
  properties)

val stream = env.addSource(myConsumer)
...
時間戳和水印

在許多情況下,記錄的時間戳(顯式或隱式)嵌入記錄本身。另外,用戶可能想要周期性地或以不規則的方式發出水印。

我們可以定義好Timestamp Extractors / Watermark Emitters,通過以下方式將其傳遞給您的消費者:

java

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());

DataStream<String> stream = env
    .addSource(myConsumer)
    .print();

scala

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
    .addSource(myConsumer)
    .print()

Kafka Producer

Kafka Producer 根據版本分別叫做FlinkProducer011 FlinkKafkaProducer010等等
Kafka >= 1.0.0 的版本就叫FlinkKafkaProducer 。

構建FlinkKafkaConsumer

java

DataStream<String> stream = ...;

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new SimpleStringSchema());   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);

stream.addSink(myProducer);

scala

val stream: DataStream[String] = ...

val myProducer = new FlinkKafkaProducer011[String](
        "localhost:9092",         // broker list
        "my-topic",               // target topic
        new SimpleStringSchema)   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true)

stream.addSink(myProducer)

需要指定broker list , topic,序列化類。

自定義分區:預設情況下,將使用FlinkFixedPartitioner將每個Flink Kafka Producer並行子任務映射到單個Kafka分區。

可以實現FlinkKafkaPartitioner類自定義分區。

Flink1.9消費Kafka完整代碼:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class KafkaConsumer {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        //構建FlinkKafkaConsumer
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
        //指定偏移量
        myConsumer.setStartFromEarliest();


        DataStream<String> stream = env
                .addSource(myConsumer);

        env.enableCheckpointing(5000);
        stream.print();

        env.execute("Flink Streaming Java API Skeleton");
    }

項目地址:https://github.com/tree1123/flink_demo_1.9

更多Flink知識,歡迎關註實時流式計算

file


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Mybatis的定義 MyBatis 是一款優秀的持久層框架,它支持定製化 SQL、存儲過程以及高級映射。MyBatis 避免了幾乎所有的 JDBC 代碼和手動設置參數以及獲取結果集。MyBatis 可 以使用簡單的 XML 或註解來配置和映射原生信息,將介面和 Java 的 POJOs(Plain ...
  • [TOC] 第十六章、初識資料庫 一、資料庫 二、資料庫的組成 三、資料庫的分類 四、卸載資料庫 五、安裝資料庫 六、連接資料庫 七、用戶信息查看 八、資料庫的基本操作 九、表的基本操作 十、記錄的基本操作 1)查看某個資料庫中的某個表的所有記錄,如果在對應資料庫中,可以直接查找表 mysql : ...
  • ElasticSearch第三篇,關於搜索、過濾、排序簡單講解。 ...
  • centos7 內部集成了mariadb,而安裝mysql的話會和mariadb的文件衝突,所以需要先卸載掉mariadb。 ...
  • 連接資料庫有2種方式:在本機安裝Oracle資料庫或者是安裝一個oracle簡易客戶端當然,簡易客戶端跟oracle資料庫比較少了一些功能連接方式:1)簡易連接sqlplus scott/[email protected]:1521/study註意最後的study是服務名,別搞錯了這種ora-12514... ...
  • SQL Server 數據類型(文章來源:鬆軟科技www.sysoft.net.cn) Character 字元串: Unicode 字元串: Binary 類型: Number 類型: 固定精度和比例的數字。允許從 -10^38 +1 到 10^38 -1 之間的數字。 p 參數指示可以存儲的最大 ...
  • SQL DROP INDEX 語句 我們可以使用 DROP INDEX 命令刪除表格中的索引。 用於 Microsoft SQLJet (以及 Microsoft Access) 的語法: 用於 MS SQL Server 的語法: 用於 IBM DB2 和 Oracle 語法: 用於 MySQL ...
  • 一 SELECT語句關鍵字的定義順序 二 SELECT語句關鍵字的執行順序 三 準備表和數據 \1. 新建一個測試資料庫TestDB; 2.創建測試表table1和table2; 3.插入測試數據; 準備工作做完以後,table1和table2看起來應該像下麵這樣 四 準備SQL邏輯查詢測試語句 + ...
一周排行
    -Advertisement-
    Play Games
  • 基於.NET Framework 4.8 開發的深度學習模型部署測試平臺,提供了YOLO框架的主流系列模型,包括YOLOv8~v9,以及其系列下的Det、Seg、Pose、Obb、Cls等應用場景,同時支持圖像與視頻檢測。模型部署引擎使用的是OpenVINO™、TensorRT、ONNX runti... ...
  • 十年沉澱,重啟開發之路 十年前,我沉浸在開發的海洋中,每日與代碼為伍,與演算法共舞。那時的我,滿懷激情,對技術的追求近乎狂熱。然而,隨著歲月的流逝,生活的忙碌逐漸占據了我的大部分時間,讓我無暇顧及技術的沉澱與積累。 十年間,我經歷了職業生涯的起伏和變遷。從初出茅廬的菜鳥到逐漸嶄露頭角的開發者,我見證了 ...
  • C# 是一種簡單、現代、面向對象和類型安全的編程語言。.NET 是由 Microsoft 創建的開發平臺,平臺包含了語言規範、工具、運行,支持開發各種應用,如Web、移動、桌面等。.NET框架有多個實現,如.NET Framework、.NET Core(及後續的.NET 5+版本),以及社區版本M... ...
  • 前言 本文介紹瞭如何使用三菱提供的MX Component插件實現對三菱PLC軟元件數據的讀寫,記錄了使用電腦模擬,模擬PLC,直至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1. PLC開發編程環境GX Works2,GX Works2下載鏈接 https:// ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • 1、jQuery介紹 jQuery是什麼 jQuery是一個快速、簡潔的JavaScript框架,是繼Prototype之後又一個優秀的JavaScript代碼庫(或JavaScript框架)。jQuery設計的宗旨是“write Less,Do More”,即倡導寫更少的代碼,做更多的事情。它封裝 ...
  • 前言 之前的文章把js引擎(aardio封裝庫) 微軟開源的js引擎(ChakraCore))寫好了,這篇文章整點js代碼來測一下bug。測試網站:https://fanyi.youdao.com/index.html#/ 逆向思路 逆向思路可以看有道翻譯js逆向(MD5加密,AES加密)附完整源碼 ...
  • 引言 現代的操作系統(Windows,Linux,Mac OS)等都可以同時打開多個軟體(任務),這些軟體在我們的感知上是同時運行的,例如我們可以一邊瀏覽網頁,一邊聽音樂。而CPU執行代碼同一時間只能執行一條,但即使我們的電腦是單核CPU也可以同時運行多個任務,如下圖所示,這是因為我們的 CPU 的 ...
  • 掌握使用Python進行文本英文統計的基本方法,並瞭解如何進一步優化和擴展這些方法,以應對更複雜的文本分析任務。 ...
  • 背景 Redis多數據源常見的場景: 分區數據處理:當數據量增長時,單個Redis實例可能無法處理所有的數據。通過使用多個Redis數據源,可以將數據分區存儲在不同的實例中,使得數據處理更加高效。 多租戶應用程式:對於多租戶應用程式,每個租戶可以擁有自己的Redis數據源,以確保數據隔離和安全性。 ...