<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] **本文思維導圖** ![](https://img.kancloud.cn/a4/35/a4353f231ccac53f1200c0488ff661fa_1221x391.png) ### 前言 為了高可用和數據安全起見,zk集群一般都是由幾個節點構成(由n/2+1,投票機制決定,肯定是奇數個節點)。多節點證明它們之間肯定會有數據的通信,同時,為了能夠使zk集群對外是透明的,一個整體對外提供服務,那么客戶端訪問zk服務器的數據肯定是要數據同步,也即**數據一致性**。 zk集群是Leader/Follower模式來保證數據同步的。整個集群同一時刻只能有一個Leader,其他都是Follower或Observer。Leader是通過選舉選出來的,這里涉及到ZAB協議(原子消息廣播協議)。 ### 1.ZAB協議 #### 1.1 概念理解 為了更好理解下文,先說ZAB協議,它是選舉過程和數據寫入過程的基石。ZAB的核心是定義會改變zk服務器數據狀態的事務請求的處理方式。 ZAB的理解:所有事務請求是由一個全局唯一的服務器來協調處理,這個的服務器就是Leader服務器, 其它服務器都是Follower服務器或Observer服務器。Leader服務器負責將一個客戶端的請求轉換成那個一個**事務Proposal?(提議)**,將該Proposal分發給集群中所有的Follower服務器。然后Leader服務器需要等待所有Follower服務器的應答,當Leader服務器收到超過**半數**的Follower服務器進行了明確的應答后,Leader會再次向所有的Follower服務器分發Commit消息,要求其將前一個Proposal進行提交。 注意**事務提議**這個詞,就類似 **人大代表大會提議** ,提議就代表會有應答,之間有通信。因此在zk的ZAB協議為了可靠性和可用性,會有**投票**,**應答**等操作來保證整個zk集群的正常運行。 總的來說就是,涉及到客戶端對zk集群數據改變的行為都先由Leader統一響應,然后再把請求轉換為事務轉發給其他所有的Follower,Follower應答并處理事務,最后再反饋。如果客戶端只是讀請求,那么zk集群所有的節點都可以響應這個請求。 #### 1.2 ZAB協議三個階段 * 1.發現(選舉Leader過程) * 2.同步(選出Leader后,Follower和Observer需進行數據同步) * 3.廣播(同步之后,集群對外工作響應請求,并進行消息廣播,實現數據在集群節點的副本存儲) 下面會逐點分析,但是在這之前先來了解了解zookeeper服務器的知識吧。 ### 2.Zookeeper服務器 #### 2.1 zk服務器角色 * Leader * 事務請求的唯一調度和處理者,保證集群事務處理的順序序性 * 集群內部各服務器的調度者 * Follower * 處理客戶端非事務請求,轉發事務請求給Leader服務器 * 參與事務請求Proposal的投票 * 參與Leader的選舉投票 * Observer * 處理客戶端非事務請求,轉發事務請求給Leader服務器 * 不參加任何形式的投票,包括選舉和事務投票(超過半數確認) * Observer的存在是為了提高zk集群對外提供讀性能的能力 整個zk集群的角色作用如下圖: ![](https://img.kancloud.cn/e6/03/e60315001bd982f5548ded330358c8a7_1343x633.png) #### 2.2 zk服務器狀態 * LOOKING * 尋找Leader狀態 * 當服務器處于這種狀態時,表示當前沒有Leader,需要進入選舉流程 * FOLLOWING * 從機狀態,表明當前服務器角色是Follower * OBSERVING * 觀察者狀態,表明當前服務器角色是Observer * LEADING * 領導者狀態,表明當前服務器角色是Leader * ServerState 類維護服務器四種狀態。 ![](https://img.kancloud.cn/46/e9/46e957e896938c8baa920a4eaf934377_660x114.png) zk服務器的狀態是隨著機器的變化而變化的。比如Leader宕機了,服務器狀態就變為LOOKING,通過選舉后,某機器成為Leader,服務器狀態就轉換為LEADING。其他情況類似。 #### 2.3 zk服務器通信 集群嘛,節點之間肯定是要通信的。zokeeper通信有兩個特點: * 1.使用的通信協議是**TCP協議**。在集群中到底是怎么連接的呢?還記得在配置zookeeper時要創建一個data目錄并在其他創建一個myid文件并寫入唯一的數字嗎?zk服務器的TCP連接方向就是依賴這個myid文件里面的數字大小排列。數小的向數大的發起TCP連接。比如有3個節點,myid文件內容分別為1,2,3。zk集群的tcp連接順序是1向2發起TCP連接,2向3發起TCP連接。如果有n個節點,那么tcp連接順序也以此類推。這樣整個zk集群就會連接起來。 * 2.zk服務器是多端口的。例如配置如下: ~~~jsx tickTime=2000 dataDir=/home/liangjf/app/zookeeper/data dataLogDir=/home/liangjf/app/zookeeper/log clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.1.1:2888:3888 server.2=192.168.1.2:2888:3888 server.3=192.168.1.3:2888:3888 ~~~ * 第1個端口是通信和數據同步端口,默認是2888 * 第2個端口是投票端口,默認是3888 ### 3.選舉機制 #### 3.1 選舉算法 從zookeeper開始發布以來,選舉的算法也慢慢優化。現在為了可靠性和高可用,從3.4.0版本開始zookeeper只支持基于**Tcp**的**FastLeaderElection**選舉協議。 * LeaderElection * Udp協議 * AuthFastLeaderElection * udp * **FastLeaderElection** * Udp * **Tcp** **FastLeaderElection選舉協議**使用TCP實現**Leader投票選舉算法**。它使用了類對象`quorumcnxmanager`管理連接。該算法是基于推送的,可以通過調節參數來改變選舉的過程。第一,`finalizewait`決定等到決定Leader的時間。這是Leader選舉算法的一部分。 `final static int finalizeWait = 200;`(選舉Leader過程的進程時間) `final static int maxNotificationInterval = 60000;`(通知檢查選中Leader的時間間隔) `final static int IGNOREVALUE = -1;` 這里先不詳細分析,下面**3.5 選舉算法源碼分析及舉栗子**才分析。 #### 3.2 何時觸發選舉 選舉Leader不是隨時選舉的,畢竟選舉有產生大量的通信,造成網絡IO的消耗。因此下面情況才會出現選舉: * 集群啟動 * 服務器處于尋找Leader狀態 * 當服務器處于LOOKING狀態時,表示當前沒有Leader,需要進入選舉流程 * 崩潰恢復 * Leader宕機 * 網絡原因導致過半節點與Leader心跳中斷 #### 3.3 如何成為Leader * 數據新舊程度 * 只有擁有最新數據的節點才能有機會成為Leader * 通過zxid的大小來表示數據的新,zxid越大代表數據越新 * myid * 集群啟動時,會在data目錄下配置myid文件,里面的數字代表當前zk服務器節點的編號 * 當zk服務器節點數據一樣新時, myid中數字越大的就會被選舉成ОLeader * 當集群中已經有Leader時,新加入的節點不會影響原來的集群 * 投票數量 * 只有得到集群中多半的投票,才能成為Leader * 多半即:n/2+1,其中n為集群中的節點數量 #### 3.4 重要的zxid 由3.3知道zxid是判斷能否成為Leader的條件之一,它代表服務器的數據版本的新舊程度。 zxid由兩部分構成:主進程周期epoch和事務單調遞增的計數器。zxid是一個64位的數,高32位代表**主進程周期epoch**,低32位代表**事務單調遞增的計數器**。 **主進程周期epoch**也叫epoch,是選舉的輪次,每選舉一次就遞增1。**事務單調遞增的計數器**在每次選舉完成之后就會從0開始。 如果是比較數據新舊的話,直接比較就可以了。因為如果是主進程周期越大,即高32位越大,那么低32位就不用再看了。如果主進程周期一致,低32位越大,整個zxid就越大。所以直接比較整個64位就可以了,不必高32位于高32位對比,低32位與低32位比較。 #### 3.5 選舉算法源碼分析及舉栗子 ##### 3.5.1 舉栗子 zookeeper選舉有兩種情況: * 1.集群首次啟動 * 2.集群在工作時Leader宕機 選主原則如下(在選舉時,對比次序是從上往下) * 1.`New epoch is higher` * 主周期更大,代所有一切是最新,就成為leader * 2.`New epoch is the same as current epoch, but new zxid is higher` * 主周期一致就是在同一輪選票中,zxid越大就成為leader,因為數據更新 * 3.`New epoch is the same as current epoch, new zxid is the same as current zxid, but server id is higher` * 主周期和zxid一致,就看機器的id(myid),myid越大就成為leader 同時,在選舉的時候是投票方式進行的,除主進程周期外,投票格式為(myid,zxid)。 第一種情況,比較容易理解,下面以3臺機器為例子。 * 三個zk節點A,B,C,三者開始都沒有數據,即Zxid一致,對應的myid為1,2,3。 * A啟動myid為1的節點,zxid為0,此時只有一臺服務器無法選舉出Leader * B啟動myid為2的節點,zxid為0,B的zxid與A一樣,比較myid,B的myid為2比A為1大,B成Leader * C啟動myid為3的節點,因為已經有Leader節點,則C直接加入集群,承認B是leader 第二種情況,已5臺機器為例子。 * 五個節點A,B,C,D,E,B是Leader,其他是Follower,myid分別為1,2,3,4,5,zxid分別為3,4,5,6,6。運行到某個時刻時A,B掉線或宕機,此時剩下C D E。在同一輪選舉中,C,D,E分別投自己和交叉投票。 * 第一次投票,都是投自己。 * 投票情況為:C:(3,5) D:(4,6) E:(5,6)。 * 同時也會收到其他機器的投票。 * 投票情況為:C:(3,5)(4,6)(5,6),D:(4,6)(3,5)(5,6),E:(5,6)(4,6)(3,5) * 機器內部會根據選主原則對比投票,變更投票,投票情況為:C:(3,5)(4,6)(5,6)【不變更】。 D:(4,6)(4,6)(5,6)【變更】。E:(5,6)(5,6)(5,6)【變更】 * 統計票數,C-1票,D-3票,E-5票。因此E成為Leader。 接下來就是對新Leader節點的檢查,數據同步,廣播,對外提供服務。 ##### 3.5.1 選舉算法源碼分析 選舉算法的全部代碼在`FastLeaderElection`類中。其他的`lookForLeader`函數是選舉Leader的入口函數。 ~~~java //每一輪選舉就會增大一次邏輯時鐘,同時更新事務 synchronized(this){ logicalclock++; updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } ~~~ //一直循環選舉直到找到leader,這里把打印和不相關的都刪除了,方便分析。 ~~~php while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ //從通知隊列拉取一個投票通知 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); if(n == null){ //看是否選舉時通知發送/接收超時 int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); } else if(self.getVotingView().containsKey(n.sid)) { switch (n.state) { case LOOKING://只有zk服務器狀態為LOOKING時才會進行選舉 // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock) { //如果選舉時的邏輯時鐘大于發送通知來源的機器的邏輯時鐘,就把對方的修改為自己的。 logicalclock = n.electionEpoch; recvset.clear(); //并統計票數,如果能成為leader就更新事務 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { //否者更新事務為對方的投票信息 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock) { //如果通知來演的機器的邏輯時鐘比本次我的選舉時鐘低,直接返回,什么都不做。因為對方沒機會成為leader if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock)); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { //如果Epoch一樣,就看zxid的比較。不過還是會更新事務和回傳通知 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } //把所有接收到的投票信息都放到recvset集合 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //統計誰的投票超過半數,就成為leader if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { //驗證一下,被選舉的leader是否有變化,就是看符不符合 while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ //符合就放進recvqueue集合 recvqueue.put(n); break; } } //改變選舉為leader的機器的狀態為LEADING if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } } break; case FOLLOWING: case LEADING: //在同一輪選舉中,判斷所有的通知,并確認自己是leader if(n.electionEpoch == logicalclock){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(termPredicate(recvset, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, n.electionEpoch)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } } //在對外提供服務前,先廣播一次自己是leader的消息給所有follower,讓大家認同我為leader。 outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)) && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized(this){ logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); leaveInstance(endVote); return endVote; } break; } } } ~~~ 比較重要的子函數有以下這些: * 1.totalOrderPredicate。(投票比較變更原則,選舉的核心) * * * ~~~java protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } //按照這樣的順序比較優先:Epoch > Zxid > myid return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); } ~~~ * 2.termPredicate。(最終的計算票數。先把投票放到集合中,然后再統計。集合能去重) * * * ~~~kotlin private boolean termPredicate( HashMap<Long, Vote> votes, Vote vote) { HashSet<Long> set = new HashSet<Long>(); for (Map.Entry<Long,Vote> entry : votes.entrySet()) { if (vote.equals(entry.getValue())){ set.add(entry.getKey()); } } return self.getQuorumVerifier().containsQuorum(set); } ~~~ * 3.Messenger。(構造Messenger的時候創建2條線程WorkerSender和WorkerReceiver用于整個選舉的集群投票通信) * * * ~~~cpp Messenger(QuorumCnxManager manager) { this.ws = new WorkerSender(manager); Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); this.wr = new WorkerReceiver(manager); t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); } ~~~ 其他細節不多說了,主要是sendqueue和recvqueue隊列存放待發送投票通知和接收投票通知,WorkerSender和WorkerReceiver兩條線程用于投票的通信,QuorumCnxManager manager用于真正和其他機器的tcp連接維護管理,Messenger是整個投票通信的管理者。 ### 3.數據同步機制 #### 3.1 同步準備 完成選舉之后,為了數據一致性,需要進行數據同步流程。 ##### 3.1.1 Leader準備 * Leader告訴其它follower當前最新數據是什么即zxid * Leader構建一個NEWLEADER的包,包括當前最大的zxid,發送給所有的follower或者Observer * Leader給每個follower創建一個線程LearnerHandler來負責處理每個follower的數據同步請求,同時主線程開始阻塞,等到超過一半的follwer同步完成,同步過程才完成,leader才真正成為leader ##### 3.1.2 Follower準備 * 選舉完成后,嘗試與leader建立同步連接,如果一段時間沒有連接上就報連接超時,重新回到選舉狀態FOLLOWING * 向leader發送FOLLOWERINFO包,帶上follower自己最大的zxid #### 3.2 同步初始化 同步初始化涉及到三個東西:minCommittedLog、maxCommittedLog、zxid – minCommittedLog:最小的事務日志id,即zxid沒有被快照存儲的日志文件的第一條,每次快照存儲 完,會重新生成一個事務日志文件 – maxCommittedLog:事務日志中最大的事務,即zxid ### 4.數據同步場景 * 直接差異化同步(DIFF同步) * 僅回滾同步TRUNC?,即刪除多余的事務日志,比如原來的Leader宕機后又重新加入,可能存在它自己寫 入提交但是別的節點還沒來得及提交 * 先回滾再差異化同步(TRUNC+DIFF同步) * 全量同步(SNAP同步) 不同的數據同步算法適用不同的場景。 ### 5.廣播流程 * 集群選舉完成,并且完成數據同步后,開始對外服務,接收讀寫請求 * 當leader接收到客戶端新的事務請求后,會生成對新的事務proposal,并根據zxid的順序向所有的 follower分發事務proposal * 當follower收到leader的proposal時,根據接收的先后順序處理proposal * 當Leader收到follower針對某個proposal過半的ack后,則發起事務提交,重新發起一個commit的 proposal * Follower收到commit的proposal后,記錄事務提交,并把數據更新到內存數據庫 * 補充說明 * 由于只有過半的機器給出反饋,則可能存在某時刻某些節點數據不是最新的 * 如果需要確定讀取到的數據是最新的,則可以在讀取之前,調用sync方法進行數據同步 ### 6.小結 在zookeeper中,除了watcher機制,會話管理,最重要的就是選舉了。它是zookeeper集群的核心,也是廣泛應用在商業中的前提。 作者:dandan的微笑 鏈接:https://www.jianshu.com/p/57fecbe70540 來源:簡書
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看