【圖解源碼】Zookeeper3.7源碼剖析,Session的管理機制,Leader選舉投票規則,集群數據同步流程

来源:https://www.cnblogs.com/jiagooushi/archive/2022/06/24/16408609.html
-Advertisement-
Play Games

Zookeeper3.7源碼剖析 能力目標 掌握Zookeeper中Session的管理機制 能基於Client進行Debug測試Session創建/刷新操作 能搭建Zookeeper集群源碼配置 掌握集群環境下Leader選舉啟動過程 能說出Zookeeper選舉過程中的概念 能說出Zookeep ...


Zookeeper3.7源碼剖析

能力目標

  • 掌握Zookeeper中Session的管理機制
  • 能基於Client進行Debug測試Session創建/刷新操作
  • 能搭建Zookeeper集群源碼配置
  • 掌握集群環境下Leader選舉啟動過程
  • 能說出Zookeeper選舉過程中的概念
  • 能說出Zookeeper選舉投票規則
  • 能畫出Zookeeper集群數據同步流程

1 Session源碼分析

客戶端創建Socket連接後,會嘗試連接,如果連接成功成功會調用到primeConnection方法用來發送ConnectRequest連接請求,這裡便是設置session會話 ,關於客戶端創建會話我們就不在這裡做講解了,我們直接講解服務端Session會話處理流程。

1.1 服務端Session屬性分析

Zookeeper服務端會話操作如下圖:

file

在服務端通過SessionTrackerImplExpiryQueue來保存Session會話信息。

SessionTrackerImpl有以下屬性:

1:sessionsById 用來存儲ConcurrentHashMap<Long, SessionImpl> {sessionId:SessionImpl} 2:sessionExpiryQueue ExpiryQueue<SessionImpl>失效隊列
3:sessionsWithTimeout ConcurrentMap<Long, Integer>存儲的是{sessionId: sessionTimeout} 
4:nextSessionId 下一個sessionId

ExpiryQueue失效隊列有以下屬性:

1:elemMap ConcurrentHashMap<E, Long> 存儲的是{SessionImpl: newExpiryTime} Session實例對象,失效時間。
2:expiryMap ConcurrentHashMap<Long, Set<E>>存儲的是{time: set<SessionImp>} 失效時間,當前失效時間的Session對象集合。
3:nextExpirationTime 下一次失效時間 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 當前系統時間毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=當前系統時間毫秒值+expirationInterval(失效間隔)。
4:expirationInterval 失效間隔,預設是10s,可以通過sessionlessCnxnTimeout修改。即是通過配置文件的tickTime修改。

1.2 Session創建

我們接著上一章的案例繼續分析,假如客戶端發起請求後,後端如何識別是第一次創建請求?在之前的案例源碼NIOServerCnxn.readPayload()中有所體現,NIOServerCnxn.readPayload()部分關鍵源碼如下:

file

此時如果initialized=false,表示第一次連接 需要創建Session(createSession),此處調用readConnectRequest()後,在readConnectRequest()方法中會將initialized設置為true,只有在處理完連接請求之後才會把initialized設置為true,才可以處理客戶端其他命令。

file

上面方法還調用了processConnectRequest處理連接請求, processConnectRequest 第一次從請求中獲取的sessionId=0,此時會把創建Session作為一個業務,會調用createSession()方法,processConnectRequest 方法部分關鍵代碼如下:

file

創建會話調用createSession(),該方法會首先創建一個sessionId,並把該sessionId作為會話ID創建一個創建session會話的請求,並將該請求交給業務鏈作為一個業務處理,createSession()源碼如下:

file

上面方法用到的sessionTracker.createSession(timeout)做了2個操作分別是創建sessionId和配置sessionId的跟蹤信息,方法源碼如下:
file

會話信息的跟蹤其實就是將會話信息添加到隊列中,任何地方可以根據會話ID找到會話信息,trackSession方法實現了Session創建、Session隊列存儲、Session過期隊列存儲,trackSession方法源碼如下:

file

PrepRequestProcessorrun方法中調用pRequest2Txn,關鍵代碼如下:

file

file

SyncRequestProcessor對txn(創建session的操作)進行持久化,在FinalRequestProcessor會對Session進行提交,其實就是把Session的ID和Timeout存到sessionsWithTimeout中去。

由於FinalRequestProcessor中調用鏈路太複雜,我們把調用鏈路寫出來,大家可以按照這個順序跟蹤:

1:FinalRequestProcessor.applyRequest()
		方法代碼:ProcessTxnResult rc = zks.processTxn(request);
		
2:ZooKeeperServer.processTxn(org.apache.zookeeper.server.Request)
		方法代碼:processTxnForSessionEvents(request, hdr, request.getTxn());

上面調用鏈路中processTxnForSessionEvents(request, hdr, request.getTxn());方法代碼如下:

file

上面方法主要處理了OpCode.createSession並且將sessionId、TimeOut提交到sessionsWithTimeout中,而提交到sessionsWithTimeout的方法SessionTrackerImpl.commitSession()代碼如下:

file

1.3 Session刷新

服務端無論接受什麼請求命令(增刪或ping等請求)都會更新Session的過期時間 。我們做增刪或者ping命令的時候,都會經過RequestThrottlerRequestThrottler的run方法中調用zks.submitRequestNow(),而zks.submitRequestNow(request)中調用了touch(si.cnxn);,該方法源碼如下:

file

touchSession()方法更新sessionExpiryQueue失效隊列中的失效時間,源碼如下:

file

update()方法會在當前時間的基礎上增加timeout,並更新失效時間為newExpiryTime,關鍵源碼如下:

file

1.4 Session過期

SessionTrackerImpl是一個線程類,繼承了ZooKeeperCriticalThread,我們可以看它的run方法,它首先獲取了下一個會話過期時間,並休眠等待會話過期時間到期,然後獲取過期的客戶端會話集合併迴圈關閉,源碼如下:

file

上面方法中調用了sessionExpiryQueue.poll(),該方法代碼主要是獲取過期時間對應的客戶端會話集合,源碼如下:

file

上面的setSessionClosing()方法其實是把Session會話的isClosing狀態設置為了true,方法源碼如下:

file

而讓客戶端失效的方法expirer.expire(s);其實也是一個業務操作,主要調用了ZooKeeperServer.expire()方法,而該方法獲取SessionId後,又創建了一個OpCode.closeSession的請求,並交給業務鏈處理,我們查看ZooKeeperServer.expire()方法源碼如下:

file

PrepRequestProcessor.pRequest2Txn()方法中OpCode.closeSession操作里最後部分代理明確將會話Session的isClosing設置為了true,源碼如下:

file

業務鏈處理對象FinalRequestProcessor.processRequest()方法調用了ZooKeeperServer.processTxn(),並且在processTxn()方法中執行了processTxnForSessionEvents,而processTxnForSessionEvents()方法正好移除了會話信息,方法源碼如下:

file

移除會話的方法SessionTrackerImpl.removeSession()會移除會話ID以及過期會話對象,源碼如下:

file

1.5 Zookeeper會話測試

為了讓Zookeeper的會話理解更深刻,我們對會話流程做一個測試,首先測試會話創建,再測試會話刷新。

1)會話創建測試

我們打開NIOServerCnxn.readPayload()方法,跟蹤首次創建會話,調試情況如下:
file

此時會建立遠程連接並創建SessionID,我們調試到NIOServerCnxn.readConnectRequest()方法,此時建立鏈接,並且得到的sessionId=0。

file

當sessionId=0時,會執行Session創建,Session創建會調用SessionTrackerImpl.createSession()方法實現會話創建,並將會話存入跟蹤隊列,DEBUG測試如下:

file

會話創建代碼如下:

file

跟蹤測試後,控制台輸出如下信息:

AcceptThread----------鏈接服務的IP:127.0.0.1
1:會話未連接,準備首次連接會話.....
2:建立遠程連接......
2:第1次連接的sessionId=0
使用SessionTrackerImpl創建會話,並將會話加入跟蹤隊列中
3:sessionId=0,此時創建sessionId=72061099907219458

2)會話刷新測試

我們執行get /zookeeper指令,然後首先跟蹤到RequestThrottler.run()方法,執行如下:

file

執行程式到達ZooKeeperServer.touch(),即將開始準備刷新會話了,我們測試效果如下:

file

調用SessionTrackerImpl.touchSession()的時候會先判斷會話是否為空、會話是否已經關閉,如果都沒有,才執行刷新會話操作,DEBUG跟蹤如下:

file

刷新會話其實就是會話時間增加,增加會話時間DEBUG跟蹤如下:

file

測試後效果如下:

a.當前請求並未過期,不需要刪除,準備刷新會話
b.準備調用SessionTrackerImpl.touchSession()刷新會話
c.會話不為空,會話也未關閉,準備調用updateSessionExpiry()刷新會話
d.剩餘過期時間:54572178,增加過期時間:30000,刷新會話後過期時間:54604000

2 Zookeeper集群啟動流程

我們先搭建Zookeeper集群,再來分析選舉演算法。

2.1 Zookeeper集群配置

file

如上圖:

1:創建zoo1.cfg、zoo2.cfg、zoo3.cfg
2:創建zkdata1、zkdata2、zkdata3
3:創建3個myid,值分別為1、2、3

配置3個啟動類,如下圖:
file

2.2 集群啟動流程分析

file

如上圖,上圖是Zookeeper單機/集群啟動流程,每個細節所做的事情都在上圖有說明,我們接下來按照流程圖對源碼進行分析。

程式啟動,運行流程啟動集群模式,如下圖:

file

quorumPeer.start()啟動服務,如下代碼:

file

quorumPeer.start()方法代碼如下:

file

quorumPeer.start()方法啟動的主要步驟:

1:loadDataBase()載入數據。
2:startServerCnxnFactory 用來開啟acceptThread、SelectorThread和workerPool線程池。
3:開啟Leader選舉startLeaderElection。
4:開啟JVM監控線程startJvmPauseMonitor。
5:調用父類super.start();進行Leader選舉。

startLeaderElection()開啟Leader選舉方法做了2件事,首先創建初始化選票選自己,接著創建選舉投票方式,源碼如下:

file

createElectionAlgorithm()創建選舉演算法只有第3種,其他2種均已廢棄,方法源碼如下:

file

這個方法創建了以下三個對象:

①、創建QuorumCnxManager對象

②、QuorumCnxManager.Listener

③、FastLeaderElection

3 Zookeeper集群Leader選舉

3.1 Paxos演算法介紹

Zookeeper選舉主要依賴於FastLeaderElection演算法,其他演算法均已淘汰,但FastLeaderElection演算法又是典型的Paxos演算法,所以我們要先學習下Paxos演算法,這樣更有助於掌握FastLeaderElection演算法。

1)Paxos介紹

分散式事務中常見的事務模型有2PC和3PC,無論是2PC提交還是3PC提交都無法徹底解決分散式的一致性問題以及無法解決太過保守及容錯性不好。Google Chubby的作者Mike Burrows說過,世上只有一種一致性演算法,那就是Paxos,所有其他一致性演算法都是Paxos演算法的不完整版。Paxos演算法是公認的晦澀,很難講清楚,但是工程上也很難實現,所以有很多Paxos演算法的工程實現,如Chubby, Raft,ZAB,微信的PhxPaxos等。這一篇會介紹這個公認為難於理解但是行之有效的Paxos演算法。Paxos演算法是萊斯利·蘭伯特(Leslie Lamport)1990年提出的一種基於消息傳遞的一致性演算法,它曾就此發表了《The Part-Time Parliament》,《Paxos Made Simple》,由於採用故事的方式來解釋此演算法,感覺還是很難理解。

2)Paxos演算法背景
Paxos演算法是基於消息傳遞且具有高度容錯特性的一致性演算法,是目前公認的解決分散式一致性問題最有效的演算法之一,其解決的問題就是在分散式系統中如何就某個值(決議)達成一致。
面試的時候:不要把這個Paxos演算法達到的目的和分散式事務聯繫起來,而是針對Zookeeper這樣的master-slave集群對某個決議達成一致,也就是副本之間寫或者leader選舉達成一致。我覺得這個演算法和狹義的分散式事務不是一樣的。
在常見的分散式系統中,總會發生諸如機器宕機或網路異常(包括消息的延遲、丟失、重覆、亂序,還有網路分區)(也就是會發生異常的分散式系統)等情況。Paxos演算法需要解決的問題就是如何在一個可能發生上述異常的分散式系統中,快速且正確地在集群內部對某個數據的值達成一致。也可以理解成分散式系統中達成狀態的一致性。

3)Paxos演算法理解

Paxos 演算法是分散式一致性演算法用來解決一個分散式系統如何就某個值(決議)達成一致的問題。一個典型的場景是,在一個分散式資料庫系統中,如果各節點的初始狀態一致,每個節點都執行相同的操作序列,那麼他們最後能得到一個一致的狀態。為保證每個節點執行相同的命令序列,需要在每一條指令上執行一個”一致性演算法”以保證每個節點看到的指令一致。
分散式系統中一般是通過多副本來保證可靠性,而多個副本之間會存在數據不一致的情況。所以必須有一個一致性演算法來保證數據的一致,描述如下:
  假如在分散式系統中初始是各個節點的數據是一致的,每個節點都順序執行系列操作,然後每個節點最終的數據還是一致的。
  Paxos演算法就是解決這種分散式場景中的一致性問題。對於一般的開發人員來說,只需要知道paxos是一個分散式選舉演算法即可。多個節點之間存在兩種通訊模型:共用記憶體(Shared memory)、消息傳遞(Messages passing),Paxos是基於消息傳遞的通訊模型的。

file

4)Paxos相關概念

在Paxos演算法中,有三種角色:

  • Proposer
  • Acceptor
  • Learners

在具體的實現中,一個進程可能同時充當多種角色。比如一個進程可能既是Proposer又是Acceptor又是Learner。Proposer負責提出提案,Acceptor負責對提案作出裁決(accept與否),learner負責學習提案結果。
還有一個很重要的概念叫提案(Proposal)。最終要達成一致的value就在提案里。只要Proposer發的提案被Acceptor接受(半數以上的Acceptor同意才行),Proposer就認為該提案里的value被選定了。Acceptor告訴Learner哪個value被選定,Learner就認為那個value被選定。只要Acceptor接受了某個提案,Acceptor就認為該提案里的value被選定了。
為了避免單點故障,會有一個Acceptor集合,Proposer向Acceptor集合發送提案,Acceptor集合中的每個成員都有可能同意該提案且每個Acceptor只能批准一個提案,只有當一半以上的成員同意了一個提案,就認為該提案被選定了。

file

3.2 QuorumPeer工作流程

file

QuorumCnxManager:每台伺服器在啟動的過程中,會啟動一個QuorumPeer,負責各台伺服器之間的底層Leader選舉過程中的網路通信對應的類就是QuorumCnxManager

Zookeeper對於每個節點QuorumPeer的設計相當的靈活,QuorumPeer主要包括四個組件:客戶端請求接收器(ServerCnxnFactory)、數據引擎(ZKDatabase)、選舉器(Election)、核心功能組件(Leader/Follower/Observer)。

1:ServerCnxnFactory負責維護與客戶端的連接(接收客戶端的請求併發送相應的響應);(1001行)
2:ZKDatabase負責存儲/載入/查找數據(基於目錄樹結構的KV+操作日誌+客戶端Session);(129行)
3:Election負責選舉集群的一個Leader節點;(998行)
4:Leader/Follower/Observer確認是QuorumPeer節點應該完成的核心職責;(1270行)

QuorumPeer工作流程比較複雜,如下圖:

file

QuorumPeer工作流程:

1:初始化配置
2:載入當前存在的數據
3:啟動網路通信組件
4:啟動控制台
5:開啟選舉協調者,並執行選舉(這個過程是會持續,並不是一次操作就結束了)

3.3 QuorumCnxManager源碼分析

QuorumCnxManager內部維護了一系列的隊列,用來保存接收到的、待發送的消息以及消息的發送器,除接收隊列以外,其他隊列都按照SID分組形成隊列集合,如一個集群中除了自身還有3台機器,那麼就會為這3台機器分別創建一個發送隊列,互不幹擾。

file

QuorumCnxManager.Listener :為了能夠相互投票,Zookeeper集群中的所有機器都需要建立起網路連接。QuorumCnxManager在啟動時會創建一個ServerSocket來監聽Leader選舉的通信埠。開啟監聽後,Zookeeper能夠不斷地接收到來自其他伺服器地創建連接請求,在接收到其他伺服器地TCP連接請求時,會進行處理。為了避免兩台機器之間重覆地創建TCP連接,Zookeeper只允許SID大的伺服器主動和其他機器建立連接,否則斷開連接。在接收到創建連接請求後,伺服器通過對比自己和遠程伺服器的SID值來判斷是否接收連接請求,如果當前伺服器發現自己的SID更大,那麼會斷開當前連接,然後自己主動和遠程伺服器將連接(自己作為“客戶端”)。一旦連接建立,就會根據遠程伺服器的SID來創建相應的消息發送器SendWorker和消息發送器RecvWorker,並啟動。

QuorumCnxManager.Listener監聽啟動可以查看QuorumCnxManager.Listenerrun方法,源代碼如下,可以斷點調試看到此時監聽的正是我們所說的投票埠:

file

上面是監聽器,各個服務之間進行通信我們需要開啟ListenerHandler線程,在QuorumCnxManager.Listener.ListenerHandler的run方法中有一個方法acceptConnections() 調用,該方法就是用於接受每次選舉投票的信息,如果只有一個節點或者沒有投票信息的時候,此時方法會阻塞,一旦執行選舉,程式會往下執行,我們可以先啟動1台服務,再啟動第2台、第3台,此時會收到有客戶端參與投票鏈接,程式會往下執行,源碼如下:

file

我們啟動2台服務,效果如下:

file

上面雖然能證明投票訪問了當前監聽的埠,但怎麼知道是哪台服務呢?我們可以沿著receiveConnection()源碼繼續研究,源碼如下:

file

receiveConnection()方法只是獲取了數據流,並沒做特殊處理,並且調用了handleConnection()方法,該方法源碼如下:

file

通過網路連接獲取數據sid,獲取sid表示是哪一臺連過來的,我們可以列印輸出sid,測試輸出如下數據:

參與投票的MyID=2
參與投票的MyID=3

3.4 FastLeaderElection演算法源碼分析

file

Zookeeper集群中,主要分為三者角色,而每一個節點同時只能扮演一種角色,這三種角色分別是:

(1)Leader 接受所有Follower的提案請求並統一協調發起提案的投票,負責與所有的Follower進行內部的數據交換(同步);

(2)Follower 直接為客戶端提供服務並參與提案的投票,同時與Leader進行數據交換(同步);

(3)Observer 直接為客戶端服務但並不參與提案的投票,同時也與Leader進行數據交換(同步);

FastLeaderElection 選舉演算法是標準的 Fast Paxos 演算法實現,可解決 LeaderElection 選舉演算法收斂速度慢的問題。

創建FastLeaderElection 只需要new FastLeaderElection()即可,如下代碼:

file

創建FastLeaderElection會調用starter()方法,該方法會創建sendqueuerecvqueue隊列、Messenger對象,其中Messenger對象的作用非常關鍵,方法源碼如下:

file

創建Messenger的時候,會創建WorkerSender並封裝成wsThread線程,創建WorkerReceiver並封裝成wrThread線程,看名字就很容易理解,wsThread用於發送數據,wrThread用於接收數據,Messenger創建源碼如下:

file

創建完FastLeaderElection後接著會調用它的start()方法啟動選舉演算法,代碼如下:

file

啟動選舉演算法會調用start()方法,start()方法如下:

public void start() {
    this.messenger.start();
}

上面會執行messager.start(),也就是如下方法,也就意味著wsThreadwrThread線程都將啟動,源碼如下:

void start() {
	this.wsThread.start();
	this.wrThread.start();
}

wsThreadWorkerSender封裝而來,此時會調用WorkerSenderrun方法,run方法會調用process()方法,源碼如下:

file

process方法調用了managertoSend方法,此時是把對應的sid作為了消息發送出去,這裡其實是發送投票信息,源碼如下:

void process(ToSend m) {
    ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
    manager.toSend(m.sid, requestBuffer);
}

投票可以投自己,也可以投別人,如果是選票選自己,只需要把投票信息添加到recvQueue中即可,源碼如下:
file

WorkerReceiver.run方法中會從recvQueue中獲取Message,並把發送給其他服務的投票封裝到sendqueue隊列中,交給WorkerSender發送處理,源碼如下:

file

3.5 Zookeeper選舉投票剖析

選舉是個很複雜的過程,要考慮很多場景,而且選舉過程中有很多概念需要理解。

3.5.1 選舉概念

1)ZK服務狀態:

public enum ServerState {
    //代表沒有當前集群中沒有Leader,此時是投票選舉狀態
    LOOKING,  
    //代表已經是伴隨者狀態
    FOLLOWING,
    //代表已經是領導者狀態
    LEADING,
    //代表已經是觀察者狀態(觀察者不參與投票過程)
    OBSERVING
}

2)服務角色:

//Learner 是隨從服務和觀察者的統稱
public enum LearnerType {
    //隨從者角色
    PARTICIPANT,
    //觀察者角色
    OBSERVER
}

3)投票消息廣播:

public static class Notification {
    int version;
    
    //被推薦leader的ID
     long leader;
    
      //被推薦leader的zxid
      long zxid;
     
     //投票輪次
     long electionEpoch;
     
     //當前投票者的服務狀態 (LOOKING)
     QuorumPeer.ServerState state;
     //當前投票者的ID
     long sid;
     //QuorumVerifier作為集群驗證器,主要完成判斷一組server在
     //已給定的配置的server列表中,是否能夠構成集群
     QuorumVerifier qv;
     
     //被推薦leader的投票輪次
     long peerEpoch;
    
}

4)選票模型:

public class Vote {
    //投票版本號,作為一個標識 
    private final int version;
    //當前服務的ID
    private final long id;
    //當前服務事務ID
    private final long zxid;
    //當前服務投票的輪次
    private final long electionEpoch;
    //被推舉伺服器的投票輪次
    private final long peerEpoch;
    //當前伺服器所處的狀態
    private final ServerState state;

}

5)消息發送對象:

public static class ToSend {
    //支持的消息類型
    enum mType {
        crequest, //請求
        challenge, //確認
        notification,//通知
        ack //確認回執
    }
   
    ToSend(mType type, long leader, long zxid, long electionEpoch, ServerState state, long sid, long peerEpoch, byte[] configData) {

        this.leader = leader;
        this.zxid = zxid;
        this.electionEpoch = electionEpoch;
        this.state = state;
        this.sid = sid;
        this.peerEpoch = peerEpoch;
        this.configData = configData;
    }

    /*
     * Proposed leader in the case of notification
     * 被投票推舉為leader的服務ID 
     */ long leader;

    /*
     * id contains the tag for acks, and zxid for notifications
     * 
     */ long zxid;

    /*
     * Epoch
     * 投票輪次
     */ long electionEpoch;

    /*
     * Current state;
     * 服務狀態
     */ QuorumPeer.ServerState state;

    /*
     * Address of recipient
     * 消息接收方服務ID
     */ long sid;

    /*
     * Used to send a QuorumVerifier (configuration info)
     */ byte[] configData = dummyData;

    /*
     * Leader epoch
     */ long peerEpoch;

}

3.5.2 選舉過程

QuorumPeer本身是個線程,在集群啟動的時候會執行quorumPeer.start();,此時會調用它重寫的start()方法,最後會調用父類的start()方法,所以該線程會啟動執行,因此會執行它的run方法,而run方法正是選舉流程的入口,我們看run方法關鍵源碼如下:

file

所有節點初始狀態都為LOOKING,會進入到選舉流程,選舉流程首先要獲取演算法,獲取演算法的方法是makeLEStrategy(),該方法返回的是FastLeaderElection實例,核心選舉流程是FastLeaderElection中的lookForLeader()方法。

/****
 * 獲取選舉演算法
 */
@SuppressWarnings("deprecation")
protected Election makeLEStrategy() {
    return electionAlg;
}

lookForLeader()是選舉過程的關鍵流程,源碼分析如下:

file

上面多個地方都用到了過半數以上的方法hasAllQuorums()該方法用到了QuorumMaj類,代碼如下:

file

QuorumMaj構造函數中體現了過半數以上的操作,代碼如下:

file

3.5.3 投票規則

我們來看一下選票PK的方法totalOrderPredicate(),該方法其實就是Leader選舉規則,規則有如下三個:

1:比較 epoche(zxid高32bit),如果其他節點的epoche比自己的大,選舉 epoch大的節點(理由:epoch 表示年代,epoch越大表示數據越新)代碼:(newEpoch > curEpoch);

2:比較 zxid, 如果epoche相同,就比較兩個節點的zxid的大小,選舉 zxid大的節點(理由:zxid 表示節點所提交事務最大的id,zxid越大代表該節點的數據越完整)代碼:(newEpoch == curEpoch) && (newZxid > curZxid);

3:比較 serviceId,如果 epoch和zxid都相等,就比較服務的serverId,選舉 serviceId大的節點(理由: serviceId 表示機器性能,他是在配置zookeeper集群時確定的,所以我們配置zookeeper集群的時候可以把服務性能更高的集群的serverId設置大些,讓性能好的機器擔任leader角色)代碼 :(newEpoch == curEpoch) && ((newZxid == curZxid) && (newId > curId))。

源碼如下:

file

4 Zookeeper集群數據同步

所有事務操作都將由leader執行,並且會把數據同步到其他節點,比如follower、observer,我們可以分析leader和follower的操作行為即可分析出數據同步流程。

4.1 Zookeeper同步流程說明

file

整體流程:

1:當角色確立之後,leader調用leader.lead();方法運行,創建一個接收連接的LearnerCnxAcceptor線程,在LearnerCnxAcceptor線程內部又建立一個阻塞的LearnerCnxAcceptorHandler線程等待Learner端的連接。Learner端以follower為例,follower調用follower.followLeader();方法首先查找leader的Socket服務端,然後建立連接。當follower建立連接後,leader端會建立一個LearnerHandler線程相對應,用來處理follower與leader的數據包傳輸。 

2:follower端封裝當前zk伺服器的Zxid和Leader.FOLLOWERINFO的LearnerInfo數據包發送給leader

3:leader端這時處於getEpochToPropose方法的阻塞時期,需要得到Learner端超過一半的伺服器發送Epoch

4:getEpochToPropose解阻塞之後,LearnerHandler線程會把超過一半的Epoch與leader比較得到最新的newLeaderZxid,並封裝成Leader.LEADERINFO包發送給Learner端

5:Learner端得到最新的Epoch,會更新當前伺服器的Epoch。並把當前伺服器所處的lastLoggedZxid位置封裝成Leader.ACKEPOCH發送給leader

6:此時leader端處於waitForEpochAck方法的阻塞時期,需要得到Learner端超過一半的伺服器發送EpochACK

7:當waitForEpochAck阻塞之後便可以在LearnerHandler線程內決定用那種方式進行同步。如果Learner端的lastLoggedZxid>leader端的,Learner端將會被刪除多餘的部分。如果小於leader端的,將會以不同方式進行同步 

8:leader端發送Leader.NEWLEADER數據包給Learner端(6、7步驟都是另開一個線程來發送這些數據包)

9:Learner端同步之後,會在一個while迴圈內處理各種leader端發送數據包,包括兩階段提交的Leader.PROPOSAL、Leader.COMMIT、Leader.INFORM等。在同步數據後會處理Leader.NEWLEADER數據包,然後發送Leader.ACK給leader端 

10:此時leader端處於waitForNewLeaderAck阻塞等待超過一半節點發送ACK。

我們回到QuorumPeer.run()方法,根據確認的不同角色執行不同操作展開分析。

4.2 Zookeeper Follower同步流程

Follower主要連接Leader實現數據同步,我們看看Follower做的事,我們仍然沿著QuorumPeer.run()展開學習,關鍵代碼如下:

file

創建Follower的方法比較簡單,代碼如下:

file

我們看一下整個Follower在數據同步中做的所有操作follower.followLeader();,源碼如下圖:

file

上面源碼中的follower.followLeader()方法主要做瞭如下幾件事:

1:尋找Leader
2:和Leader創建鏈接
3:向Leader註冊Follower,會將當前Follower節點信息發送給Leader節點
4:和Leader同步歷史數據
5:讀取Leader發送的數據包
6:同步Leader數據包

我們對follower.followLeader()調用的其他方法進行剖析,其中findLeader()是尋找當前Leader節點的,源代碼如下:

file

followLeader()中調用了registerWithLeader(Leader.FOLLOWERINFO);該方法是向Leader註冊Follower,會將當前Follower節點信息發送給Leader節點,Follower節點信息發給Leader是必須的,是Leader同步數據個基礎,源碼如下:

file

followLeader()中最後讀取數據包執行同步的方法中調用了readPacket(qp);,這個方法就是讀取Leader的數據包的封裝,源碼如下:

file

4.3 Zookeeper Leader同步流程

我們查看QuorumPeer.run()方法的LEADING部分,可以看到先創建了Leader對象,並設置了Leader,然後調用了leader.lead()leader.lead()是執行的核心業務流程,源碼如下:

file

leader.lead()方法是Leader執行的核心業務流程,源碼如下:

file

leader.lead()方法會執行如下幾個操作:

1:從快照和事務日誌中載入數據
2:創建一個線程,接收Follower/Observer的連接
3:等待超過一半的(Follower和Observer)連接,再繼續往下執行程式
4:等待超過一半的(Follower和Observer)獲取了新的epoch,並且返回了Leader.ACKEPOCH,再繼續往下執行程式
5:等待超過一半的(Follower和Observer)進行數據同步成功,並且返回了Leader.ACK,再繼續往下執行程式
6:數據同步完成,開啟zkServer,並且同時開啟請求調用鏈接收請求執行
7:進行一個死迴圈,每次休眠self.tickTime / 2,和對所有的(Observer/Follower)發起心跳檢測
8:集群中沒有過半Follower在集群中,調用shutdown關閉一些對象,重新選舉

lead()方法中會創建LearnerCnxAcceptor,該對象是一個線程,主要用於接收followers的連接,這裡加了CountDownLatch根據配置的同步的地址的數量(例如:server.2=127.0.0.1:12881:13881 配置同步的埠是12881只有一個),LearnerCnxAcceptor的run方法源碼如下:

file

LearnerCnxAcceptor的run方法中創建了LearnerCnxAcceptorHandler對象,在接收到鏈接後,就會調用LearnerCnxAcceptorHandler,而LearnerCnxAcceptorHandler是一個線程,它的run方法中調用了acceptConnections()方法,源碼如下:

file

acceptConnections()方法會在這裡阻塞接收followers的連接,當有連接過來會生成一個socket對象。然後根據當前socket生成一個LearnerHandler線程 ,每個Learner者都會開啟一個LearnerHandler線程,方法源碼如下:

file

LearnerHandler.run 這裡就是讀取或寫數據包與Learner交換數據包。如果沒有數據包讀取,則會阻塞當前方法ia.readRecord(qp, "packet");,源碼如下:

file

我們再回到leader.lead()方法,其中調用了getEpochToPropose()方法,該方法是判斷connectingFollowers發給leader端的Epoch是否過半,如果過半則會解阻塞,不過半會一直阻塞著,直到Follower把自己的Epoch數據包發送過來並符合過半機制,源碼如下:

file

lead()方法中,當發送的Epoch過半之後,把當前zxid設置到zk,並等待EpochAck,關鍵源碼如下:

file

waitForEpochAck()方法也會等待超過一半的(Follower和Observer)獲取了新的epoch,並且返回了Leader.ACKEPOCH,才會解除阻塞,否則會一直阻塞。等待EpochAck解阻塞後,把得到最新的epoch更新到當前服務,設置當前leader節點的zab狀態是SYNCHRONIZATION,方法源碼如下:
file

lead()方法中還需要等待超過一半的(Follower和Observer)進行數據同步成功,並且返回了Leader.ACK,程式才會解除阻塞,如下代碼:

file

上面所有流程都走完之後,就證明數據已經同步成功了,會執行startZkServer();

4.4 LearnerHandler數據同步操作

LearnerHandler線程是對應於Learner連接Leader端後,建立的一個與Learner端交換數據的線程。每一個Learner端都會創建一個 LearnerHandler線程。

我們詳細講解LearnerHandler.run()方法。

file

readRecord讀取數據包 不斷從learner節點讀數據,如果沒讀到將會阻塞readRecord

file

如果數據包類型不是Leader.FOLLOWERINFO或Leader.OBSERVERINFO將會返回,因為咱們這裡本身就是Leader節點,讀數據肯定是讀非Leader節點數據。

file

獲取learnerInfoData來獲取sid和版本信息。

file

獲取followerInfo和lastAcceptedEpoch,信息如下:

file

file

把Leader.NEWLEADER數據包放入到queuedPackets,並向其他節點發送,源碼如下:

file

本文由傳智教育博學谷 - 狂野架構師教研團隊發佈
如果本文對您有幫助,歡迎關註和點贊;如果您有任何建議也可留言評論或私信,您的支持是我堅持創作的動力
轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 04簡單迴圈 1. 用一行代碼求和 類型: 簡單迴圈 描述‪‬‪‬‪‬‪‬‪‬‮‬‪‬‫‬‪‬‪‬‪‬‪‬‪‬‮‬‪‬‪‬‪‬‪‬‪‬‪‬‪‬‮‬‪‬‫‬‪‬‪‬‪‬‪‬‪‬‮‬‪‬‮‬‪‬‪‬‪‬‪‬‪‬‮‬‪‬‪‬‪‬‪‬‪‬‪‬‪‬‮‬‭‬‪‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‪‬ 輸入一個正整數 ...
  • 一、實驗目的 在信息時代高速發展的現在,“互聯網+”的使用日趨zhanzhang過互聯網學習知識,傳遞思想,溝通交流,在眾多數據和用戶的碰 撞中,互聯網經濟應運而生。學會利用網路收集信息是最基本的要求,接下來,我將以“行業網站”——站長之 家為例,通過Python爬取各個網站的信息(主要為名稱、Al ...
  • Go 語言入門練手項目系列 01 基於命令行的圖書的增刪查改 02 文件管理 持續更新中... > 本文來自博客園,作者:Arway,轉載請註明原文鏈接:https://www.cnblogs.com/cenjw/p/gobeginner-proj-bookstore-cli.html 介紹 這是一 ...
  • 1.路徑處理 1.找模塊:sys.path import sys print(sys.path) - 1.理解 - 1.是python去查找包或模塊 - 2.項目開始根目錄,python內置的目錄 - 3.雖然說python的安裝目錄下也可以存放我們寫的模塊,但是不建議(太多了,不大好找) - 4. ...
  • 本篇內容將在上一篇已有的內容基礎上,進一步的聊一下項目中使用JPA的一些高階複雜場景的實踐指導,覆蓋了主要核心的JPA使用場景,可以讓你在需求開發的時候對JPA的使用更加的游刃有餘。 ...
  • 前言 Steam是由美國電子游戲商Valve於2003年9月12日推出的數字發行平臺,被認為是電腦游戲界最大的數位發行平臺之一,Steam平臺是全球最大的綜合性數字發行平臺之一。玩家可以在該平臺購買、下載、討論、上傳和分享游戲和軟體。 而每周的steam會開啟了一輪特惠,可以讓游戲打折,而玩家就會 ...
  • Hi,大家好,我是Mic 一個工作5年的粉絲找到我。 他說: “Mic老師,你要是能回答出這個問題,我就佩服你” 我當場就懵了,現在打賭都這麼隨意了嗎? 我問他問題是什麼,他說“Kafka如何避免重覆消費的問題!” 下麵看看普通人和高手的回答! 普通人: Kafka怎麼避免重覆消費就是我們可以通過 ...
  • 前言 今天給大家分享一下我自己寫的筆記,純純的都是乾貨,關於字好像也能看。這是我學python整理出來的一些資料,希望對大家 有用。想要更多的資料那就的給一個關註了… python學習交流Q群:903971231### #導入Counter from collections import Count ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...