<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # 基本使用 org.apache.zookeeper.Zookeeper是客戶端入口主類,負責建立與server的會話 它提供以下幾類主要方法 : ![](https://box.kancloud.cn/0c4b193d1e3fd67300c1af9eb4ec9631_527x206.png) # 增刪改查znode數據 ~~~ package hello; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.CountDownLatch; public class SimpleZk { //逗號后面別加空格 private static final String connectString = "192.168.33.12:2181,192.168.33.22:2181,192.168.3.33:2181"; private static final int sessionTimeOut = 2000; //latch就相當于一個對象,當latch.await()方法執行時,線程就會等待 //當latch的count減為0的時候,將會喚醒等待的線程 //讓主線程阻塞掉 CountDownLatch latch = new CountDownLatch(1); ZooKeeper zkClient = null; @Before public void init() throws InterruptedException, IOException { //一new完就往下走,但是這時候客戶端還沒完成連接,所以我們要等他創建好 //一旦成功握手這邊的process就會回調一次 zkClient = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { public void process(WatchedEvent watchedEvent) { //SyncConnected同步連接 //回調了并且事件等于連接成功 if (latch.getCount() > 0 && watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("countdown"); //把計數減少,然后主線程就可以往下走了 latch.countDown(); } //收到事件通知后的回調函數(應該是我們自己的事件處理邏輯) System.out.println(watchedEvent.getType() + "---" + watchedEvent.getPath() + "---" + watchedEvent.getState()); // try { // byte[] zkClientData = zkClient.getData("/idea", true, null); // System.out.println(new String(zkClientData, "UTF-8")); // } catch (Exception e) { // e.printStackTrace(); // } } }); latch.await(); } /** * 數據增刪改查 */ //創建數據節點到zk中 @Test public void testCreate() throws KeeperException, InterruptedException { //參數1:要創建的節點的路徑 //參數2:節點的數據參數 //參數3:節點的權限,這邊用開放的權限 //參數4:節點的類型,有2種,短暫(ephemeral)(斷開連接自己刪除),持久(persistent)(斷開連接不刪除) //PERSISTENT (持久) //PERSISTENT_SEQUENTIAL(持久序列/test0000000019 ) //EPHEMERAL (臨時的) //EPHEMERAL_SEQUENTIAL //返回一個節點的路徑給你 String nodeCreated = zkClient.create("/idea/name", "jdxia".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //上傳可以任意類型,但是都要轉成bytes[] //這邊已經創建完了 System.out.println(nodeCreated); zkClient.close(); } //查詢數據 @Test public void getData() throws KeeperException, InterruptedException, UnsupportedEncodingException { //第一個參數是節點 //第二個參數是watcher,但是可以用true,表示重載之前的 //第三個參數,表示取數據的元信息,表示取最新版的還老版本的 byte[] zkClientData = zkClient.getData("/idea", true, null); System.out.println(new String(zkClientData, "UTF-8")); // Thread.sleep(Long.MAX_VALUE); } //修改數據 @Test public void setData() throws KeeperException, InterruptedException { //第三個參數表示版本,-1表示任何版本 //setACL是設置權限 Stat setData = zkClient.setData("/idea", "java".getBytes(), -1); System.out.println("修改數據成功"); } //刪除數據 @Test public void delData() throws KeeperException, InterruptedException { //如果這個節點下面有子節點,這個是不能刪除的 //第二個參數表示刪除的是那個版本的,-1表示任何版本 zkClient.delete("/idea",-1); //查看節點狀態,如果這個節點沒有會返回null Stat exists = zkClient.exists("/idea", false); System.out.println(exists); } //查看節點的詳細信息 @Test public void getInfo() throws KeeperException, InterruptedException { //查看節點狀態,如果這個節點沒有會返回null Stat stat = zkClient.exists("/idea", false); //獲取數據的長度 int dataLength = stat.getDataLength(); //獲取子節點的個數 int numChildren = stat.getNumChildren(); //獲取數據的版本 int version = stat.getVersion(); System.out.println("數據的長度---"+dataLength); System.out.println("子節點的個數---"+numChildren); System.out.println("數據的版本號---"+version); } //關閉客戶端 @Test public void zkClose() throws InterruptedException { zkClient.close(); } } ~~~ # 監聽znode **zk中監聽回調函數是一次性的,一旦被觸發就被移除監聽列表.如果需要永久監聽,就需要持續進行回調函數注冊** Zookeeper的監聽器工作機制 ![](https://box.kancloud.cn/91ea981258ba044196e4167f559dee13_932x256.png) 監聽器是一個接口,我們的代碼中可以實現Wather這個接口,實現其中的process方法,方法中即我們自己的業務邏輯 ![](https://box.kancloud.cn/9fbce0815f7cc18cad9eb298a387ec4a_847x422.png) 監聽器的注冊是在獲取數據的操作中實現: ~~~ getData(path,watch?)監聽的事件是:節點數據變化事件 getChildren(path,watch?)監聽的事件是:節點下的子節點增減變化事件 ~~~ ![](https://box.kancloud.cn/3ea7260fed43ccabfb850d06582262a7_232x406.png) 我們主線程sleep了,但是數據交換線程沒用sleep,getData調用的是數據交換線程 可以讓他響應后又監聽,不然的話監聽一次就沒了 ## 監聽器原理 ![](https://box.kancloud.cn/2428cb871ac9afe7a209d058109c54dc_1157x260.png) 1. 首先要有一個main()線程 2. 在main線程中創建zookeeper客戶端,這時就會創建兩個線程,一個負責網絡連接通信(connet),一個負責監聽(listener) 3. 通過connect線程將注冊的監聽事件發送給zookeeper 4. 在zookeeper的注冊監聽器列表中將注冊的監聽事件添加到列表中 5. zookeeper監聽到有數據或路徑變化,就會把這個消息發送給listener線程 6. listener線程內部調用了process()方法 ## 代碼 ~~~ package hello; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.CountDownLatch; public class SimpleZk { private static final String connectString = "192.168.33.12:2181,192.168.33.22:2181,192.168.3.33:2181"; private static final int sessionTimeOut = 2000; //latch就相當于一個對象,當latch.await()方法執行時,線程就會等待 //當latch的count減為0的時候,將會喚醒等待的線程 //讓主線程阻塞掉 CountDownLatch latch = new CountDownLatch(1); ZooKeeper zkClient = null; @Before public void init() throws InterruptedException, IOException { //一new完就往下走,但是這時候客戶端還沒完成連接,所以我們要等他創建好 //一旦成功握手這邊的process就會回調一次 zkClient = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { public void process(WatchedEvent watchedEvent) { //SyncConnected同步連接 //回調了并且事件等于連接成功 if (latch.getCount() > 0 && watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("countdown"); //把計數減少,然后主線程就可以往下走了 latch.countDown(); } //收到事件通知后的回調函數(應該是我們自己的事件處理邏輯) System.out.println(watchedEvent.getType() + "---" + watchedEvent.getPath() + "---" + watchedEvent.getState()); try { //這邊的第二個參數用true,表示又用了監聽,這樣會一直監聽下去的 byte[] zkClientData = zkClient.getData("/idea", true, null); System.out.println(new String(zkClientData, "UTF-8")); } catch (Exception e) { e.printStackTrace(); } } }); latch.await(); } //查詢數據 @Test public void getData() throws KeeperException, InterruptedException, UnsupportedEncodingException { //第一個參數是節點 //第二個參數是watcher,但是可以用true,表示重載之前的 //第三個參數,表示取數據的元信息,表示取最新版的還老版本的 byte[] zkClientData = zkClient.getData("/idea", true, null); System.out.println(new String(zkClientData, "UTF-8")); Thread.sleep(Long.MAX_VALUE); } } ~~~ **查詢子節點** ~~~ //查詢子節點 @Test public void getChilrenNode() throws KeeperException, InterruptedException { //用之前的監聽器 List<String> children = zkClient.getChildren("/idea", true); for (String child:children) { System.out.println(child); } } ~~~ # 守護線程 他的listen/connect是守護線程 我們的主線程沒必要再sleep了 ~~~ package hello; public class Test { public static void main(String[] args) { System.out.println("主線程開始了"); Thread thread = new Thread(new Runnable() { @Override public void run() { System.out.println("線程開始了"); while (true) { } } }); thread.setDaemon(true); thread.start(); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看