[TOC]
本文思維導圖如下:

image
### 前言
Watcher機制是zookeeper最重要三大特性**數據節點Znode+Watcher機制+ACL權限控制**中的其中一個,它是zk很多應用場景的一個前提,比如集群管理、集群配置、發布/訂閱。
Watcher機制涉及到客戶端與服務器(注意,不止一個機器,一般是集群,這里先認為一個整體分析)的兩者數據通信與消息通信,除此之外還涉及到客戶端的watchManager。
下面正式進入主題。
### 1.watcher原理框架

由圖看出,zk的watcher由客戶端,客戶端WatchManager,zk服務器組成。整個過程涉及了消息通信及數據存儲。
* zk客戶端向zk服務器注冊watcher的同時,會將watcher對象存儲在客戶端的watchManager。
* Zk服務器觸發watcher事件后,會向客戶端發送通知,客戶端線程從watchManager中回調watcher執行相應的功能。
注意的是server服務器端一般有多臺共同一起對外提供服務的,里面涉及到zk專有的ZAB協議(分布式原子廣播協議)。在這先不分析,后面會有單獨一文來介紹,因為ZAB協議是zookeeper的實現精髓,有了zab協議才能使zk真正落地,真正的高可靠,數據同步,適于商用。

有木有看到小紅旗?加入小紅旗是一個watcher,當小紅旗被創建并注冊到node1節點(會有相應的API實現)后,就會監聽node1+node\_a+node\_b或node\_a+node\_b。這里兩種情況是因為在創建watcher注冊時會有多種途徑。并且watcher不能監聽到孫節點。注意注意注意,watcher設置后,一旦觸發一次后就會失效,如果要想一直監聽,需要在process回調函數里重新注冊相同的 **watcher**。
### 2.通知狀態與事件
~~~java
public class WatcherTest implements Watcher {
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
WatcherTest w = new WatcherTest();
ZooKeeper zk = new ZooKeeper(wx.getZkpath(),10000, w);
}
public static void main(String[] args){
WatcherTest w = new WatcherTest();
ZooKeeper zk = new ZooKeeper(wx.getZkpath(), 10000, w);
}
}
~~~
上面例子是把異常處理,邏輯處理等都省掉。watcher的應用很簡單,主要有兩步:繼承 **Watcher** 接口,重寫 **process** 回調函數。
當然注冊方式有很多,有默認和重新覆蓋方式,可以一次觸發失效也可以一直有效觸發。這些都可以通過代碼實現。
#### 2.1 KeeperStatus通知狀態
KeeperStatus完整的類名是`org.apache.zookeeper.Watcher.Event.KeeperState`。
#### 2.2 EventType事件類型
EventType完整的類名是`org.apache.zookeeper.Watcher.Event.EventType`。

此圖是zookeeper常用的通知狀態與對應事件類型的對應關系。除了客戶端與服務器連接狀態下,有多種事件的變化,其他狀態的事件都是None。這也是符合邏輯的,因為沒有連接服務器肯定不能獲取獲取到當前的狀態,也就無法發送對應的事件類型了。
這里重點說下幾個重要而且容易迷惑的事件:
* NodeDataChanged事件
* 無論節點數據發生變化還是數據版本發生變化都會觸發
* 即使被更新數據與新數據一樣,數據版本dataVersion都會發生變化
* NodeChildrenChanged
* 新增節點或者刪除節點
* AuthFailed
* 重點是客戶端會話沒有權限而是授權失敗
客戶端只能收到服務器發過來的相關事件通知,并不能獲取到對應數據節點的原始數據及變更后的新數據。因此,如果業務需要知道變更前的數據或者變更后的新數據,需要業務保存變更前的數據(本機數據結構、文件等)和調用接口獲取新的數據
### 3.watcher注冊過程
#### 3.1涉及接口
創建zk客戶端對象實例時注冊:
~~~java
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean
canBeReadOnly)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
~~~
通過這種方式注冊的watcher將會作為整個zk會話期間的**默認watcher**,會一直被保存在客戶端ZK **WatchManager** 的 **defaultWatcher** 中,如果這個被創建的節點在其它時候被創建watcher并注冊,則這個默認的watcher會被覆蓋。注意注意注意,watcher觸發一次就會失效,不管是創建節點時的 **watcher** 還是以后創建的 **watcher**。
其他注冊watcher的API:
* `getChildren(String path, Watcher watcher)`
* `getChildren(String path, boolean watch)`
* Boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher
* `getData(String path, boolean watch, Stat stat)`
* Boolean watch表示是否使用上下文默認的watcher,即創建zk實例時設置的watcher
* `getData(String path, Watcher watcher, AsyncCallback.DataCallback cb, Object ctx)`
* `exists(String path, boolean watch)`
* Boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher
* `exists(String path, Watcher watcher)`
## 舉栗子





這就是watcher的簡單例子,zk的實際應用集群管理,發布訂閱等復雜功能其實就在這個小例子上拓展的。
#### 3.2客戶端注冊

這里的客戶端注冊主要是把上面第一點的zookeeper原理框架的注冊步驟展開,簡單來說就是zk客戶端在注冊時會先向zk服務器請求注冊,服務器會返回請求響應,如果響應成功則zk服務端把watcher對象放到客戶端的WatchManager管理并返回響應給客戶端。
#### 3.3服務器端注冊

##### FinalRequestProcessor
~~~dart
/**
* This Request processor actually applies any transaction associated with a
* request and services any queries. It is always at the end of a
* RequestProcessor chain (hence the name), so it does not have a nextProcessor
* member.
*
* This RequestProcessor counts on ZooKeeperServer to populate the
* outstandingRequests member of ZooKeeperServer.
*/
public class FinalRequestProcessor implements RequestProcessor
~~~
由源碼注釋得知,**FinalRequestProcessor**類實際是任何事務請求和任何查詢的的最終處理類。也就是我們客戶端對節點的set/get/delete/create/exists等操作最終都會運行到這里。
以exists函數為例子:
~~~dart
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest
.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
break;
}
~~~
`existsRequest.getWatch() ? cnxn : null`此句是在調用exists API時,判斷是否注冊watcher,若是就返回 **cnxn**,**cnxn**是由此句代碼`ServerCnxn cnxn = request.cnxn;`創建的。
~~~dart
/**
* Interface to a Server connection - represents a connection from a client
* to the server.
*/
public abstract class ServerCnxn implements Stats, Watcher
~~~
通過`ServerCnxn`類的源碼注釋得知,`ServerCnxn`是維持服務器與客戶端的**tcp連接**與實現了 **watcher**。總的來說,ServerCnxn類創建的對象**cnxn**即包含了連接信息又包含watcher信息。
同時仔細看**ServerCnxn類**里面的源碼,發現有以下這個函數,process函數正是watcher的回調函數啊。
~~~java
public abstract class ServerCnxn implements Stats, Watcher {
.
.
public abstract void process(WatchedEvent event);
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
//getZKDatabase實際上是獲取是在zookeeper運行時的數據庫。請看下面
.
.
}
~~~
##### ZKDatabase
~~~dart
/**
* This class maintains the in memory database of zookeeper
* server states that includes the sessions, datatree and the
* committed logs. It is booted up after reading the logs
* and snapshots from the disk.
*/
public class ZKDatabase
~~~
通過源碼注釋得知**ZKDatabase**是在zookeeper運行時的數據庫,在`FinalRequestProcessor`的case exists中會把existsRequest(exists請求傳遞給ZKDatabase)。
~~~kotlin
/**
* the datatree for this zkdatabase
* @return the datatree for this zkdatabase
*/
public DataTree getDataTree() {
return this.dataTree;
}
~~~
**ZKDatabase**里面有這關鍵的一個函數是從zookeeper運行時展開的節點數型結構中搜索到合適的節點返回。
##### watchManager
* Zk服務器端Watcher的管理者
* 從兩個維度維護watcher
* watchTable從數據節點的粒度來維護
* watch2Paths從watcher的粒度來維護
* 負責watcher事件的觸發
~~~dart
class WatchManager {
private final Map<String, Set<Watcher>> watchTable =
new HashMap<String, Set<Watcher>>();
private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher, Set<String>>();
Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null);}
}
~~~
##### watcher觸發
~~~java
public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
dataWatches.triggerWatch(path, EventType.NodeDataChanged); //觸發事件
return s;
}
~~~
客戶端回調watcher步驟:
* 反序列化,將孒節流轉換成WatcherEvent對象。因為在Java中網絡傳輸肯定是使用了序列化的,主要是為了節省網絡IO和提高傳輸效率。
* 處理chrootPath。獲取節點的根節點路徑,然后再搜索樹而已。
* 還原watchedEvent:把WatcherEvent對象轉換成WatchedEvent。主要是把zk服務器那邊的WatchedEvent事件變為WatcherEvent,標為已watch觸發。
* 回調Watcher:把WatchedEvent對象交給EventThread線程。EventThread線程主要是負責從客戶端的ZKWatchManager中取出Watcher,并放入waitingEvents隊列中,然后供客戶端獲取。
### 4.小結
到此,zookeeper的watcher機制基本告一段落了,watcher機制主要是客戶端、zk服務器和watchManager三者的協調合作完成的
作者:dandan的微笑
鏈接:https://www.jianshu.com/p/4c071e963f18
來源:簡書
- 一.JVM
- 1.1 java代碼是怎么運行的
- 1.2 JVM的內存區域
- 1.3 JVM運行時內存
- 1.4 JVM內存分配策略
- 1.5 JVM類加載機制與對象的生命周期
- 1.6 常用的垃圾回收算法
- 1.7 JVM垃圾收集器
- 1.8 CMS垃圾收集器
- 1.9 G1垃圾收集器
- 2.面試相關文章
- 2.1 可能是把Java內存區域講得最清楚的一篇文章
- 2.0 GC調優參數
- 2.1GC排查系列
- 2.2 內存泄漏和內存溢出
- 2.2.3 深入理解JVM-hotspot虛擬機對象探秘
- 1.10 并發的可達性分析相關問題
- 二.Java集合架構
- 1.ArrayList深入源碼分析
- 2.Vector深入源碼分析
- 3.LinkedList深入源碼分析
- 4.HashMap深入源碼分析
- 5.ConcurrentHashMap深入源碼分析
- 6.HashSet,LinkedHashSet 和 LinkedHashMap
- 7.容器中的設計模式
- 8.集合架構之面試指南
- 9.TreeSet和TreeMap
- 三.Java基礎
- 1.基礎概念
- 1.1 Java程序初始化的順序是怎么樣的
- 1.2 Java和C++的區別
- 1.3 反射
- 1.4 注解
- 1.5 泛型
- 1.6 字節與字符的區別以及訪問修飾符
- 1.7 深拷貝與淺拷貝
- 1.8 字符串常量池
- 2.面向對象
- 3.關鍵字
- 4.基本數據類型與運算
- 5.字符串與數組
- 6.異常處理
- 7.Object 通用方法
- 8.Java8
- 8.1 Java 8 Tutorial
- 8.2 Java 8 數據流(Stream)
- 8.3 Java 8 并發教程:線程和執行器
- 8.4 Java 8 并發教程:同步和鎖
- 8.5 Java 8 并發教程:原子變量和 ConcurrentMap
- 8.6 Java 8 API 示例:字符串、數值、算術和文件
- 8.7 在 Java 8 中避免 Null 檢查
- 8.8 使用 Intellij IDEA 解決 Java 8 的數據流問題
- 四.Java 并發編程
- 1.線程的實現/創建
- 2.線程生命周期/狀態轉換
- 3.線程池
- 4.線程中的協作、中斷
- 5.Java鎖
- 5.1 樂觀鎖、悲觀鎖和自旋鎖
- 5.2 Synchronized
- 5.3 ReentrantLock
- 5.4 公平鎖和非公平鎖
- 5.3.1 說說ReentrantLock的實現原理,以及ReentrantLock的核心源碼是如何實現的?
- 5.5 鎖優化和升級
- 6.多線程的上下文切換
- 7.死鎖的產生和解決
- 8.J.U.C(java.util.concurrent)
- 0.簡化版(快速復習用)
- 9.鎖優化
- 10.Java 內存模型(JMM)
- 11.ThreadLocal詳解
- 12 CAS
- 13.AQS
- 0.ArrayBlockingQueue和LinkedBlockingQueue的實現原理
- 1.DelayQueue的實現原理
- 14.Thread.join()實現原理
- 15.PriorityQueue 的特性和原理
- 16.CyclicBarrier的實際使用場景
- 五.Java I/O NIO
- 1.I/O模型簡述
- 2.Java NIO之緩沖區
- 3.JAVA NIO之文件通道
- 4.Java NIO之套接字通道
- 5.Java NIO之選擇器
- 6.基于 Java NIO 實現簡單的 HTTP 服務器
- 7.BIO-NIO-AIO
- 8.netty(一)
- 9.NIO面試題
- 六.Java設計模式
- 1.單例模式
- 2.策略模式
- 3.模板方法
- 4.適配器模式
- 5.簡單工廠
- 6.門面模式
- 7.代理模式
- 七.數據結構和算法
- 1.什么是紅黑樹
- 2.二叉樹
- 2.1 二叉樹的前序、中序、后序遍歷
- 3.排序算法匯總
- 4.java實現鏈表及鏈表的重用操作
- 4.1算法題-鏈表反轉
- 5.圖的概述
- 6.常見的幾道字符串算法題
- 7.幾道常見的鏈表算法題
- 8.leetcode常見算法題1
- 9.LRU緩存策略
- 10.二進制及位運算
- 10.1.二進制和十進制轉換
- 10.2.位運算
- 11.常見鏈表算法題
- 12.算法好文推薦
- 13.跳表
- 八.Spring 全家桶
- 1.Spring IOC
- 2.Spring AOP
- 3.Spring 事務管理
- 4.SpringMVC 運行流程和手動實現
- 0.Spring 核心技術
- 5.spring如何解決循環依賴問題
- 6.springboot自動裝配原理
- 7.Spring中的循環依賴解決機制中,為什么要三級緩存,用二級緩存不夠嗎
- 8.beanFactory和factoryBean有什么區別
- 九.數據庫
- 1.mybatis
- 1.1 MyBatis-# 與 $ 區別以及 sql 預編譯
- Mybatis系列1-Configuration
- Mybatis系列2-SQL執行過程
- Mybatis系列3-之SqlSession
- Mybatis系列4-之Executor
- Mybatis系列5-StatementHandler
- Mybatis系列6-MappedStatement
- Mybatis系列7-參數設置揭秘(ParameterHandler)
- Mybatis系列8-緩存機制
- 2.淺談聚簇索引和非聚簇索引的區別
- 3.mysql 證明為什么用limit時,offset很大會影響性能
- 4.MySQL中的索引
- 5.數據庫索引2
- 6.面試題收集
- 7.MySQL行鎖、表鎖、間隙鎖詳解
- 8.數據庫MVCC詳解
- 9.一條SQL查詢語句是如何執行的
- 10.MySQL 的 crash-safe 原理解析
- 11.MySQL 性能優化神器 Explain 使用分析
- 12.mysql中,一條update語句執行的過程是怎么樣的?期間用到了mysql的哪些log,分別有什么作用
- 十.Redis
- 0.快速復習回顧Redis
- 1.通俗易懂的Redis數據結構基礎教程
- 2.分布式鎖(一)
- 3.分布式鎖(二)
- 4.延時隊列
- 5.位圖Bitmaps
- 6.Bitmaps(位圖)的使用
- 7.Scan
- 8.redis緩存雪崩、緩存擊穿、緩存穿透
- 9.Redis為什么是單線程、及高并發快的3大原因詳解
- 10.布隆過濾器你值得擁有的開發利器
- 11.Redis哨兵、復制、集群的設計原理與區別
- 12.redis的IO多路復用
- 13.相關redis面試題
- 14.redis集群
- 十一.中間件
- 1.RabbitMQ
- 1.1 RabbitMQ實戰,hello world
- 1.2 RabbitMQ 實戰,工作隊列
- 1.3 RabbitMQ 實戰, 發布訂閱
- 1.4 RabbitMQ 實戰,路由
- 1.5 RabbitMQ 實戰,主題
- 1.6 Spring AMQP 的 AMQP 抽象
- 1.7 Spring AMQP 實戰 – 整合 RabbitMQ 發送郵件
- 1.8 RabbitMQ 的消息持久化與 Spring AMQP 的實現剖析
- 1.9 RabbitMQ必備核心知識
- 2.RocketMQ 的幾個簡單問題與答案
- 2.Kafka
- 2.1 kafka 基礎概念和術語
- 2.2 Kafka的重平衡(Rebalance)
- 2.3.kafka日志機制
- 2.4 kafka是pull還是push的方式傳遞消息的?
- 2.5 Kafka的數據處理流程
- 2.6 Kafka的腦裂預防和處理機制
- 2.7 Kafka中partition副本的Leader選舉機制
- 2.8 如果Leader掛了的時候,follower沒來得及同步,是否會出現數據不一致
- 2.9 kafka的partition副本是否會出現腦裂情況
- 十二.Zookeeper
- 0.什么是Zookeeper(漫畫)
- 1.使用docker安裝Zookeeper偽集群
- 3.ZooKeeper-Plus
- 4.zk實現分布式鎖
- 5.ZooKeeper之Watcher機制
- 6.Zookeeper之選舉及數據一致性
- 十三.計算機網絡
- 1.進制轉換:二進制、八進制、十六進制、十進制之間的轉換
- 2.位運算
- 3.計算機網絡面試題匯總1
- 十四.Docker
- 100.面試題收集合集
- 1.美團面試常見問題總結
- 2.b站部分面試題
- 3.比心面試題
- 4.騰訊面試題
- 5.哈羅部分面試
- 6.筆記
- 十五.Storm
- 1.Storm和流處理簡介
- 2.Storm 核心概念詳解
- 3.Storm 單機版本環境搭建
- 4.Storm 集群環境搭建
- 5.Storm 編程模型詳解
- 6.Storm 項目三種打包方式對比分析
- 7.Storm 集成 Redis 詳解
- 8.Storm 集成 HDFS 和 HBase
- 9.Storm 集成 Kafka
- 十六.Elasticsearch
- 1.初識ElasticSearch
- 2.文檔基本CRUD、集群健康檢查
- 3.shard&replica
- 4.document核心元數據解析及ES的并發控制
- 5.document的批量操作及數據路由原理
- 6.倒排索引
- 十七.分布式相關
- 1.分布式事務解決方案一網打盡
- 2.關于xxx怎么保證高可用的問題
- 3.一致性hash原理與實現
- 4.微服務注冊中心 Nacos 比 Eureka的優勢
- 5.Raft 協議算法
- 6.為什么微服務架構中需要網關
- 0.CAP與BASE理論
- 十八.Dubbo
- 1.快速掌握Dubbo常規應用
- 2.Dubbo應用進階
- 3.Dubbo調用模塊詳解
- 4.Dubbo調用模塊源碼分析
- 6.Dubbo協議模塊