<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] 本文思維導圖如下: ![](https://img.kancloud.cn/e6/67/e667c5297ea0167c4810d44b110c902c_1133x215.png) image ### 前言 Watcher機制是zookeeper最重要三大特性**數據節點Znode+Watcher機制+ACL權限控制**中的其中一個,它是zk很多應用場景的一個前提,比如集群管理、集群配置、發布/訂閱。 Watcher機制涉及到客戶端與服務器(注意,不止一個機器,一般是集群,這里先認為一個整體分析)的兩者數據通信與消息通信,除此之外還涉及到客戶端的watchManager。 下面正式進入主題。 ### 1.watcher原理框架 ![](https://img.kancloud.cn/42/12/4212792f91d1e0a5add1bac44e317e6e_1196x667.png) 由圖看出,zk的watcher由客戶端,客戶端WatchManager,zk服務器組成。整個過程涉及了消息通信及數據存儲。 * zk客戶端向zk服務器注冊watcher的同時,會將watcher對象存儲在客戶端的watchManager。 * Zk服務器觸發watcher事件后,會向客戶端發送通知,客戶端線程從watchManager中回調watcher執行相應的功能。 注意的是server服務器端一般有多臺共同一起對外提供服務的,里面涉及到zk專有的ZAB協議(分布式原子廣播協議)。在這先不分析,后面會有單獨一文來介紹,因為ZAB協議是zookeeper的實現精髓,有了zab協議才能使zk真正落地,真正的高可靠,數據同步,適于商用。 ![](https://img.kancloud.cn/f9/bb/f9bb6596cc5908550c4a3a73b3783bf3_679x426.png) 有木有看到小紅旗?加入小紅旗是一個watcher,當小紅旗被創建并注冊到node1節點(會有相應的API實現)后,就會監聽node1+node\_a+node\_b或node\_a+node\_b。這里兩種情況是因為在創建watcher注冊時會有多種途徑。并且watcher不能監聽到孫節點。注意注意注意,watcher設置后,一旦觸發一次后就會失效,如果要想一直監聽,需要在process回調函數里重新注冊相同的 **watcher**。 ### 2.通知狀態與事件 ~~~java public class WatcherTest implements Watcher { @Override public void process(WatchedEvent event) { // TODO Auto-generated method stub WatcherTest w = new WatcherTest(); ZooKeeper zk = new ZooKeeper(wx.getZkpath(),10000, w); } public static void main(String[] args){ WatcherTest w = new WatcherTest(); ZooKeeper zk = new ZooKeeper(wx.getZkpath(), 10000, w); } } ~~~ 上面例子是把異常處理,邏輯處理等都省掉。watcher的應用很簡單,主要有兩步:繼承 **Watcher** 接口,重寫 **process** 回調函數。 當然注冊方式有很多,有默認和重新覆蓋方式,可以一次觸發失效也可以一直有效觸發。這些都可以通過代碼實現。 #### 2.1 KeeperStatus通知狀態 KeeperStatus完整的類名是`org.apache.zookeeper.Watcher.Event.KeeperState`。 #### 2.2 EventType事件類型 EventType完整的類名是`org.apache.zookeeper.Watcher.Event.EventType`。 ![](https://img.kancloud.cn/98/77/98774274543b998e1240b50a5ab33006_1505x639.png) 此圖是zookeeper常用的通知狀態與對應事件類型的對應關系。除了客戶端與服務器連接狀態下,有多種事件的變化,其他狀態的事件都是None。這也是符合邏輯的,因為沒有連接服務器肯定不能獲取獲取到當前的狀態,也就無法發送對應的事件類型了。 這里重點說下幾個重要而且容易迷惑的事件: * NodeDataChanged事件 * 無論節點數據發生變化還是數據版本發生變化都會觸發 * 即使被更新數據與新數據一樣,數據版本dataVersion都會發生變化 * NodeChildrenChanged * 新增節點或者刪除節點 * AuthFailed * 重點是客戶端會話沒有權限而是授權失敗 客戶端只能收到服務器發過來的相關事件通知,并不能獲取到對應數據節點的原始數據及變更后的新數據。因此,如果業務需要知道變更前的數據或者變更后的新數據,需要業務保存變更前的數據(本機數據結構、文件等)和調用接口獲取新的數據 ### 3.watcher注冊過程 #### 3.1涉及接口 創建zk客戶端對象實例時注冊: ~~~java ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) ~~~ 通過這種方式注冊的watcher將會作為整個zk會話期間的**默認watcher**,會一直被保存在客戶端ZK **WatchManager** 的 **defaultWatcher** 中,如果這個被創建的節點在其它時候被創建watcher并注冊,則這個默認的watcher會被覆蓋。注意注意注意,watcher觸發一次就會失效,不管是創建節點時的 **watcher** 還是以后創建的 **watcher**。 其他注冊watcher的API: * `getChildren(String path, Watcher watcher)` * `getChildren(String path, boolean watch)` * Boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher * `getData(String path, boolean watch, Stat stat)` * Boolean watch表示是否使用上下文默認的watcher,即創建zk實例時設置的watcher * `getData(String path, Watcher watcher, AsyncCallback.DataCallback cb, Object ctx)` * `exists(String path, boolean watch)` * Boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher * `exists(String path, Watcher watcher)` ## 舉栗子 ![](https://img.kancloud.cn/8e/ec/8eec473949c0dc2e3dac50a9a2b762db_634x160.png) ![](https://img.kancloud.cn/e2/3f/e23f4a8e992e4960f12593e8c9e31f08_682x78.png) ![](https://img.kancloud.cn/0d/e6/0de63c73d138dc66d16693e764f40846_685x135.png) ![](https://img.kancloud.cn/27/fc/27fc85b1fd0ee39be01580c59e61e8ec_615x296.png) ![](https://img.kancloud.cn/fc/a7/fca7c36a8917c9d69dd3091c8fac199d_689x227.png) 這就是watcher的簡單例子,zk的實際應用集群管理,發布訂閱等復雜功能其實就在這個小例子上拓展的。 #### 3.2客戶端注冊 ![](https://img.kancloud.cn/42/d0/42d0daad6924439ac7790a89027026b2_687x265.png) 這里的客戶端注冊主要是把上面第一點的zookeeper原理框架的注冊步驟展開,簡單來說就是zk客戶端在注冊時會先向zk服務器請求注冊,服務器會返回請求響應,如果響應成功則zk服務端把watcher對象放到客戶端的WatchManager管理并返回響應給客戶端。 #### 3.3服務器端注冊 ![](https://img.kancloud.cn/1e/54/1e54c6f12db16fafef222b78bbb10194_691x280.png) ##### FinalRequestProcessor ~~~dart /** * This Request processor actually applies any transaction associated with a * request and services any queries. It is always at the end of a * RequestProcessor chain (hence the name), so it does not have a nextProcessor * member. * * This RequestProcessor counts on ZooKeeperServer to populate the * outstandingRequests member of ZooKeeperServer. */ public class FinalRequestProcessor implements RequestProcessor ~~~ 由源碼注釋得知,**FinalRequestProcessor**類實際是任何事務請求和任何查詢的的最終處理類。也就是我們客戶端對節點的set/get/delete/create/exists等操作最終都會運行到這里。 以exists函數為例子: ~~~dart case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); String path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); } Stat stat = zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null); rsp = new ExistsResponse(stat); break; } ~~~ `existsRequest.getWatch() ? cnxn : null`此句是在調用exists API時,判斷是否注冊watcher,若是就返回 **cnxn**,**cnxn**是由此句代碼`ServerCnxn cnxn = request.cnxn;`創建的。 ~~~dart /** * Interface to a Server connection - represents a connection from a client * to the server. */ public abstract class ServerCnxn implements Stats, Watcher ~~~ 通過`ServerCnxn`類的源碼注釋得知,`ServerCnxn`是維持服務器與客戶端的**tcp連接**與實現了 **watcher**。總的來說,ServerCnxn類創建的對象**cnxn**即包含了連接信息又包含watcher信息。 同時仔細看**ServerCnxn類**里面的源碼,發現有以下這個函數,process函數正是watcher的回調函數啊。 ~~~java public abstract class ServerCnxn implements Stats, Watcher { . . public abstract void process(WatchedEvent event); Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null); //getZKDatabase實際上是獲取是在zookeeper運行時的數據庫。請看下面 . . } ~~~ ##### ZKDatabase ~~~dart /** * This class maintains the in memory database of zookeeper * server states that includes the sessions, datatree and the * committed logs. It is booted up after reading the logs * and snapshots from the disk. */ public class ZKDatabase ~~~ 通過源碼注釋得知**ZKDatabase**是在zookeeper運行時的數據庫,在`FinalRequestProcessor`的case exists中會把existsRequest(exists請求傳遞給ZKDatabase)。 ~~~kotlin /** * the datatree for this zkdatabase * @return the datatree for this zkdatabase */ public DataTree getDataTree() { return this.dataTree; } ~~~ **ZKDatabase**里面有這關鍵的一個函數是從zookeeper運行時展開的節點數型結構中搜索到合適的節點返回。 ##### watchManager * Zk服務器端Watcher的管理者 * 從兩個維度維護watcher * watchTable從數據節點的粒度來維護 * watch2Paths從watcher的粒度來維護 * 負責watcher事件的觸發 ~~~dart class WatchManager { private final Map<String, Set<Watcher>> watchTable = new HashMap<String, Set<Watcher>>(); private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher, Set<String>>(); Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null);} } ~~~ ##### watcher觸發 ~~~java public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix = getMaxPrefixWithQuota(path); if(lastPrefix != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); //觸發事件 return s; } ~~~ 客戶端回調watcher步驟: * 反序列化,將孒節流轉換成WatcherEvent對象。因為在Java中網絡傳輸肯定是使用了序列化的,主要是為了節省網絡IO和提高傳輸效率。 * 處理chrootPath。獲取節點的根節點路徑,然后再搜索樹而已。 * 還原watchedEvent:把WatcherEvent對象轉換成WatchedEvent。主要是把zk服務器那邊的WatchedEvent事件變為WatcherEvent,標為已watch觸發。 * 回調Watcher:把WatchedEvent對象交給EventThread線程。EventThread線程主要是負責從客戶端的ZKWatchManager中取出Watcher,并放入waitingEvents隊列中,然后供客戶端獲取。 ### 4.小結 到此,zookeeper的watcher機制基本告一段落了,watcher機制主要是客戶端、zk服務器和watchManager三者的協調合作完成的 作者:dandan的微笑 鏈接:https://www.jianshu.com/p/4c071e963f18 來源:簡書
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看