大數據Hadoop之——Kafka 圖形化工具 EFAK(EFAK環境部署)

来源:https://www.cnblogs.com/liugp/archive/2022/05/26/16307589.html
-Advertisement-
Play Games

一、概述 EFAK(Eagle For Apache Kafka,以前稱為 Kafka Eagle)是一款由國內公司開源的Kafka集群監控系統,可以用來監視kafka集群的broker狀態、Topic信息、IO、記憶體、consumer線程、偏移量等信息,併進行可視化圖表展示。獨特的KQL還可以通過 ...


目錄

一、概述

EFAK(Eagle For Apache Kafka,以前稱為 Kafka Eagle)是一款由國內公司開源的Kafka集群監控系統,可以用來監視kafka集群的broker狀態、Topic信息、IO、記憶體、consumer線程、偏移量等信息,併進行可視化圖表展示。獨特的KQL還可以通過SQL線上查詢kafka中的數據。

源碼: https://github.com/smartloli/kafka-eagle/
下載: http://download.kafka-eagle.org/
官方文檔:https://www.kafka-eagle.org/articles/docs/documentation.html

二、EFAK架構

EFAK分散式模式部署,這裡以5個節點為例子(1個Master和4個Slave),各個節點的角色如下如所示:

三、EFAK數據採集原理

對於 Kafka,我們可以收集以下數據

  • Kafka broker常用機器載入信息:記憶體、cpu、IP、版本等。
  • 服務監控數據:TPS、QPS、RT等
  • 應用程式監控:組、消費者、生產者、主題等。

因為EFAK是kafka的監控系統,所以前提是需先安裝Kafka和Zookeeper。

四、安裝Kafka

kafka官網文檔:https://kafka.apache.org/documentation/

1)Kafka下載

$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/kafka/3.1.1/kafka_2.13-3.1.1.tgz
$ tar -xf kafka_2.13-3.1.1.tgz -C /opt/bigdata/hadoop/server/

2)配置環境變數

$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin

$ source /etc/profile

3)創建logs目錄

$ mkdir $KAFKA_HOME/logs

4)修改kafka配置

$ cd $KAFKA_HOME
# 查看現有配置,去掉空行和註釋
$ cat config/server.properties |grep -v '^$\|^#'
$ cat > $KAFKA_HOME/config/server.properties <<EOF
#broker的全局唯一編號,不能重覆
broker.id=0

#刪除topic功能使能
delete.topic.enable=true
#處理網路請求的線程數量
num.network.threads=3
#用來處理磁碟IO的現成數量
num.io.threads=8
#發送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩衝區大小
socket.request.max.bytes=104857600
#kafka數據的存儲位置
log.dirs=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/logs
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181
#zookeeper連接超時時間
zookeeper.connection.timeout.ms=60000
EOF

5)修改zookeeper配置

# 創建zookeeper data和logs目錄
$ mkdir $KAFKA_HOME/zookeeper_data $KAFKA_HOME/zookeeper_logs 
$ vi $KAFKA_HOME/config/zookeeper.properties
# 配置主要修改如下:
#數據目錄
dataDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_data
#日誌目錄
dataLogDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_logs
#心跳間隔時間,zookeeper中使用的基本時間單位,毫秒值。每隔2秒發送一個心跳,session時間tickTime*2
tickTime=2000
#leader與客戶端連接超時時間。表示5個心跳間隔
initLimit=5
#Leader與Follower之間的超時時間,表示2個心跳間隔
syncLimit=2
#客戶端連接埠,預設埠2181
clientPort=12181
# zookeeper集群配置項,server.1,server.2,server.3是zk集群節點;hadoop-node1,hadoop-node2,hadoop-node3是主機名稱;2888是主從通信埠;3888用來選舉leader
server.1=hadoop-node1:2888:3888
server.2=hadoop-node2:2888:3888
server.3=hadoop-node3:2888:3888

6)配置Zookeeper myid

# 在hadoop-node1配置如下:
$ echo 1 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node2配置如下:
$ echo 2 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node3配置如下:
$ echo 3 > $KAFKA_HOME/zookeeper_data/myid

7)開啟Kafka JMX監控

# 在kafka-server-start.sh文件中添加export JMX_PORT="9988",埠自定義就行
$ vi $KAFKA_HOME/bin/kafka-server-start.sh

重啟kafka

$ $KAFKA_HOME/bin/kafka-server-stop.sh ; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

【問題】如遇以下報錯:

ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID ELIH-KKbRP-NnPnHt4z-lA doesn't match stored clusterId Some(2HC_x7bTR_u2bCxrqw0Otw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:228) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala)

【解決】在server.properties找到log.dirs配置的路徑。將該路徑下的meta.properties文件刪除,或者編輯meta.properties文件修改裡面的cluster.id即可。

8)將kafka目錄推送到其它節點

$ scp -r $KAFKA_HOME hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r $KAFKA_HOME hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3節點上設置環境變數
$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin

$ source /etc/profile

# 修改advertised.listeners,值改成對應的hostname或者ip

【溫馨提示】修改hadoop-node2上server.properties文件的broker.id,設置為1和2,只要不重覆就行,advertised.listeners地址改成對應機器IP。

9)啟動服務

啟動zookeeper集群之後再啟動kafka集群

$ cd $KAFKA_HOME
# -daemon後臺啟動
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
# 預設埠2181,可以在配置自定義,這裡修改為12181埠
$ netstat -tnlp|grep 12181

啟動Kafka

$ cd $KAFKA_HOME
# 預設埠9092,這裡修改成了19092,可以修改listeners和advertised.listeners
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
$ netstat -tnlp|grep 9092
$ jps

五、安裝EFAK

1)下載EFAK

$ cd /opt/bigdata/hadoop/software
$ wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.1.0.tar.gz
$ tar -xf kafka-eagle-bin-2.1.0.tar.gz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/
$ tar -xf 

2)創建資料庫

$ mysql -uroot -p 
123456

create database ke;

2)設置環境變數

$ vi /etc/profile
export KE_HOME=/opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0/efak-web-2.1.0
export PATH=$PATH:$KE_HOME/bin

$ source /etc/profile

3)配置

這裡設置hadoop-node1為master節點,其它兩個節點為slave節點,修改參數

$ vi $KE_HOME/conf/system-config.properties
# Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181

######################################
# kafka jmx 地址,預設Apache發佈的Kafka基本是這個預設值,
# 對於一些公有雲Kafka廠商,它們會修改這個值,
# 比如會將jmxrmi修改為kafka或者是其它的值,
# 若是選擇的公有雲廠商的Kafka,可以根據實際的值來設置該屬性
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

# Zkcli limit -- Zookeeper cluster allows the number of clients to connect to
# If you enable distributed mode, you can set value to 4 or 8
kafka.zk.limit.size=16

# EFAK webui port -- WebConsole port access address
efak.webui.port=8048

######################################
# EFAK enable distributed,啟用分散式部署
######################################
efak.distributed.enable=true

# 設置節點類型slave or master
# master worknode set status to master, other node set status to slave
efak.cluster.mode.status=master
# deploy efak server address
efak.worknode.master.host=hadoop-node1
efak.worknode.port=8085

# Kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option
cluster1.efak.offset.storage=kafka

# Whether the Kafka performance monitoring diagram is enabled
efak.metrics.charts=true

# EFAK keeps data for 30 days by default
efak.metrics.retain=15

# If offset is out of range occurs, enable this property -- Only suitable for kafka sql
efak.sql.fix.error=false
efak.sql.topic.records.max=5000

# Delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete
efak.topic.token=keadmin


# 關閉自帶的sqlite資料庫,使用外部的mysql資料庫
# Default use sqlite to store data
# efak.driver=org.sqlite.JDBC
# It is important to note that the '/hadoop/kafka-eagle/db' path must be exist.
# efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# efak.username=root
# efak.password=smartloli

# 配置外部資料庫
# (Optional) set mysql address
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://hadoop-node1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456

4)調整啟動參數

EFAK預設啟動記憶體大小為2G,考慮到伺服器情況可以將其調小

## 在 efak 安裝目錄執行
$ vi $KE_HOME/bin/ke.sh
## 將 KE_JAVA_OPTS 最大最小容量調小,例如:
export KE_JAVA_OPTS="-server -Xmx512m -Xms512m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

6)修改 works配置

預設是localhost

$  cat >$KE_HOME/conf/works<<EOF
hadoop-node2
hadoop-node3
EOF

7)將EFAK推送其它節點

# hadoop-node1推送
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3配置環境變數修改節點類型為slave

8)啟動

上面配置文件配置的是分散式部署,當然也可以單機跑,但是不建議單機跑

# 在master節點上執行
$ cd $KE_HOME/bin
$ chmod +x ke.sh 
# 單機版啟動
$ ke.sh start
# 集群方式啟動
$ ke.sh cluster start
$ ke.sh cluster restart

web UI:http://192.168.0.113:8048/
賬號密碼:admin/123456


六、簡單使用

1)Kafka CLI簡單使用

【增】添加topic

# 創建topic,1副本,1分區,設置數據過期時間72小時(-1表示不過期),單位ms,72*3600*1000=259200000
$ kafka-topics.sh --create --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092  --partitions 1 --replication-factor 1 --config retention.ms=259200000

【查】

# 查看topic列表
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --list
# 查看topic列表詳情
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe
# 指定topic
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002
# 查看消費者組
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --list
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe  --group test002

【改】這裡主要是修改最常用的三個參數:分區、副本,過期時間

# 修改分區,擴分區,不能減少分區
$ kafka-topics.sh --alter --bootstrap-server hadoop-node1:9092 --topic test002 --partitions 2
# 修改過期時間,下麵兩行都可以
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --topic test002 --add-config retention.ms=86400000
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000

# 修改副本數,將副本數修改成3
$ cat >1.json<<EOF
{"version":1,
"partitions":[
{"topic":"test002","partition":0,"replicas":[0,1,2]},
{"topic":"test002","partition":1,"replicas":[1,2,0]},
{"topic":"test002","partition":2,"replicas":[2,0,1]}
]}
EOF
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002

【刪】

$ kafka-topics.sh --delete --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092

【生成者】

$ kafka-console-producer.sh --broker-list hadoop-node1:9092 --topic test002
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}

【消費者】

# 從頭開始消費
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --from-beginning
# 指定從分區的某個位置開始消費,這裡只指定了一個分區,可以多寫幾行或者遍歷對應的所有分區
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --partition 0 --offset 100

【消費組】

$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --group test002

【查看數據積壓】

$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002

2)EFAK常用命令

$KE_HOME/bin/ke.sh啟動腳本中包含以下命令:

命令 描述
ke.sh start 啟動 EFAK 伺服器。
ke.sh status 查看 EFAK 運行狀態。
ke.sh stop 停止 EFAK 伺服器。
ke.sh restart 重新啟動 EFAK 伺服器。
ke.sh stats 查看 linux 操作系統中的 EFAK 句柄數。
ke.sh find [ClassName] 在 jar 中找到類名的位置。
ke.sh gc 查看 EFAK 進程 gc。
ke.sh version 查看 EFAK 版本。
ke.sh jdk 查看 EFAK 安裝的 jdk 詳細信息。
ke.sh sdate 查看 EFAK 啟動日期。
ke.sh cluster start 查看 EFAK 集群分散式啟動。
ke.sh cluster status 查看 EFAK 集群分散式狀態。
ke.sh cluster stop 查看 EFAK 集群分散式停止。
ke.sh cluster restart 查看 EFAK 集群分散式重啟。

EFAK環境部署,和kafka的一些簡單操作就到這裡了,後續會分享KSQL和其它更詳細的頁面化操作,請小伙伴耐心等待~


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在基於SqlSugar的開發框架中,我們設計了一些系統服務層的基類,在基類中會有很多涉及到相關的數據處理操作的,如果需要跟蹤具體是那個用戶進行操作的,那麼就需要獲得當前用戶的身份信息,包括在Web API的控制器中也是一樣,需要獲得對應的用戶身份信息,才能進行相關的身份鑒別和處理操作。本篇隨筆介紹基... ...
  • 在前面隨筆,我們介紹過這個基於SqlSugar的開發框架,我們區分Interface、Modal、Service三個目錄來放置不同的內容,其中Modal是SqlSugar的映射實體,Interface是定義訪問介面,Service是提供具體的數據操作實現。在Service層中,往往除了本身的一些增刪... ...
  • 許可權術語 Subject:用戶,用戶組 Action:對Object的操作,如增刪改查等 Object:許可權作用的對象,也可以理解為資源 Effect:規則的作用,如允許,拒絕 Condition:生效條件 Permission:允許(拒絕)用戶(用戶組)在條件允許下對對象(資源)的動作 Role: ...
  • 1.什麼是quota 簡單的說就是限制用戶對磁碟空間的使用量。 因為Linux是多用戶多任務的操作系統,許多人共用磁碟空間,為了合理的分配磁碟空間,於是就有了quota的出現。 2.quota的用途 顯示磁碟使用情況和配額 3.quota的一般作用對象 (1)針對WWW server (2)針對ma ...
  • 1、引言 最近在查一個bug,查到最後發現是數組越界導致的。數組只有30個位元組,代碼卻向這個數組填充了35個數據,這個bug還是偶現的,查到它確實廢了一番功夫。我就突然想到:C語言為什麼不檢查數組下標呢???先來個demo驗證下 #include<stdio.h> #include<stdlib.h ...
  • 一 、通過雲開發平臺快速創建初始化應用 1.創建相關應用模版請參考鏈接:基於 vue.js 的 SSR 技術—Nuxt.js // 註意在後面提示中,上移下移,按空格選中 Element 2.完成創建後就可以在github中查看到新增的Nuxt倉庫 二 、 本地編寫 流程圖、拓撲圖項目 1.將應用模 ...
  • 思路: 1、執行df -h 找到 帶mnt的行。將結果存入一個文件中。 system("df -h |grep mnt >./extendevinfo.txt"); 也可以直接popen用管道打開,感覺效率可能會更高一些。 2、解析文件中最後/mnt/XXX部分即為掛載路徑。(具體看自己內核掛載路徑 ...
  • 為什麼要使用Nuxt.js Nuxt 基於一個強大的模塊化架構。你可以從 50 多個模塊中進行選擇,讓你的開發變得更快、更簡單。對 PWA 的支持、添加谷歌分析到你的網頁或生成網站地圖,這些功能都無需重新發明輪子來獲得。 Nuxt.js 預設會優化你的應用程式。我們儘可能地利用 Vue.js 和 N ...
一周排行
    -Advertisement-
    Play Games
  • 基於.NET Framework 4.8 開發的深度學習模型部署測試平臺,提供了YOLO框架的主流系列模型,包括YOLOv8~v9,以及其系列下的Det、Seg、Pose、Obb、Cls等應用場景,同時支持圖像與視頻檢測。模型部署引擎使用的是OpenVINO™、TensorRT、ONNX runti... ...
  • 十年沉澱,重啟開發之路 十年前,我沉浸在開發的海洋中,每日與代碼為伍,與演算法共舞。那時的我,滿懷激情,對技術的追求近乎狂熱。然而,隨著歲月的流逝,生活的忙碌逐漸占據了我的大部分時間,讓我無暇顧及技術的沉澱與積累。 十年間,我經歷了職業生涯的起伏和變遷。從初出茅廬的菜鳥到逐漸嶄露頭角的開發者,我見證了 ...
  • C# 是一種簡單、現代、面向對象和類型安全的編程語言。.NET 是由 Microsoft 創建的開發平臺,平臺包含了語言規範、工具、運行,支持開發各種應用,如Web、移動、桌面等。.NET框架有多個實現,如.NET Framework、.NET Core(及後續的.NET 5+版本),以及社區版本M... ...
  • 前言 本文介紹瞭如何使用三菱提供的MX Component插件實現對三菱PLC軟元件數據的讀寫,記錄了使用電腦模擬,模擬PLC,直至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1. PLC開發編程環境GX Works2,GX Works2下載鏈接 https:// ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • 1、jQuery介紹 jQuery是什麼 jQuery是一個快速、簡潔的JavaScript框架,是繼Prototype之後又一個優秀的JavaScript代碼庫(或JavaScript框架)。jQuery設計的宗旨是“write Less,Do More”,即倡導寫更少的代碼,做更多的事情。它封裝 ...
  • 前言 之前的文章把js引擎(aardio封裝庫) 微軟開源的js引擎(ChakraCore))寫好了,這篇文章整點js代碼來測一下bug。測試網站:https://fanyi.youdao.com/index.html#/ 逆向思路 逆向思路可以看有道翻譯js逆向(MD5加密,AES加密)附完整源碼 ...
  • 引言 現代的操作系統(Windows,Linux,Mac OS)等都可以同時打開多個軟體(任務),這些軟體在我們的感知上是同時運行的,例如我們可以一邊瀏覽網頁,一邊聽音樂。而CPU執行代碼同一時間只能執行一條,但即使我們的電腦是單核CPU也可以同時運行多個任務,如下圖所示,這是因為我們的 CPU 的 ...
  • 掌握使用Python進行文本英文統計的基本方法,並瞭解如何進一步優化和擴展這些方法,以應對更複雜的文本分析任務。 ...
  • 背景 Redis多數據源常見的場景: 分區數據處理:當數據量增長時,單個Redis實例可能無法處理所有的數據。通過使用多個Redis數據源,可以將數據分區存儲在不同的實例中,使得數據處理更加高效。 多租戶應用程式:對於多租戶應用程式,每個租戶可以擁有自己的Redis數據源,以確保數據隔離和安全性。 ...