<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                ## 一、Curator框架使用 Curator框架,非常強大,目前已經是Apache的頂級項目,里面提供了更多豐富的操作,session超時重連,主從選舉,分布式計數器,分布式鎖等適合各種復雜的zookeeper ## 二、依賴的引入 <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency> ## 三、Curator框架中使用鏈式編程的風格,易讀性更強,試用工程方法創建連接對象 1.使用CuratorFameworkFactory的兩個靜態工廠方法來實現 參數一:connectString,連接串 參數二: retryPolicy,重試連接策略, 參數三 sessionTimeoutMs會話超時,默認60ms 參數四 connectionTimeout 連接超時時間,默認為15000ms 2.創建節點create方法,可選鏈式項 creatingParentsIfNeeded:是否需要父節點 withMode: 需要的模式 forPath: 路徑 key value withACL: 需要認證 3.刪除節點delete方法,可選擇鏈式項 deletingChildrenIfNeeded:遞歸的刪除 graranted: 安全的操作 withVersion: 刪除版本 forPath: 4.讀取和修改數據 gatData: 讀取數據 setData: 設置數據 6.異步綁定回調方法,比如節點綁定一個回調函數,該回調函數可以輸出服務器的狀態碼,以及服務的時間的類型,還可以加入一個線程池進行優化操作。 7.讀取子節點方法getChildren 8.判斷節點是否存在方法checkExists ## 四、代碼實現 public class CuratorBase { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超時時間 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) // .namespace("super") .build(); //3 開啟連接 cf.start(); // System.out.println(States.CONNECTED); // System.out.println(cf.getState()); // 新加、刪除 /** //4 建立節點 指定節點類型(不加withMode默認為持久類型節點)、路徑、數據內容 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1內容".getBytes()); //5 刪除節點 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); */ // 讀取、修改 /** //創建節點 // cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1內容".getBytes()); // cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2內容".getBytes()); //讀取節點 // String ret1 = new String(cf.getData().forPath("/super/c2")); // System.out.println(ret1); //修改節點 // cf.setData().forPath("/super/c2", "修改c2內容".getBytes()); // String ret2 = new String(cf.getData().forPath("/super/c2")); // System.out.println(ret2); */ // 綁定回調函數 /** ExecutorService pool = Executors.newCachedThreadPool(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { System.out.println("code:" + ce.getResultCode()); System.out.println("type:" + ce.getType()); System.out.println("線程為:" + Thread.currentThread().getName()); } }, pool) .forPath("/super/c3","c3內容".getBytes()); Thread.sleep(Integer.MAX_VALUE); */ // 讀取子節點getChildren方法 和 判斷節點是否存在checkExists方法 /** List<String> list = cf.getChildren().forPath("/super"); for(String p : list){ System.out.println(p); } Stat stat = cf.checkExists().forPath("/super/c3"); System.out.println(stat); Thread.sleep(2000); cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); */ //cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); } } ### 總結: curator 對節點的簡單操作 #### 一、新建 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath("/super/c1","c1內容".getBytes()); #### 二、刪除節點 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); ### 三、獲取子節點 cf.getData().forPath("/super/c2") ### 四、修改子節點 cf.setData().forPath("/super/c2", "修改c2內容".getBytes()); ### 重點 ExecutorService pool = Executors.newCachedThreadPool(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { System.out.println("code:" + ce.getResultCode()); System.out.println("type:" + ce.getType()); System.out.println("線程為:" + Thread.currentThread().getName()); } }, pool) 為什么采用線程池去承載,是因為在高并發的情況下,我們不能為每一個線程開辟一段空間,我們采用 線程池去調度。當線程池里面的線程有空閑的時間,我們會調度線程池里面的線程去執行里面的代碼。 ### 五、 Curator 的監聽 之一 第一步導入jar包 <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> 使用NodeCache的方式去客服端實例中注冊一個監聽緩存,然后實現對應的監聽方法即可,這里我們主要有兩種監聽方式。 NodeCacheListener: 監聽節點的新增、修改操作 PathChildrenCacheListener:監聽子節點的新增、修改、刪除操作 #### 加緩存,不是重復注冊 public class CuratorWatcher1 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超時時間 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 建立連接 cf.start(); //4 建立一個cache緩存 final NodeCache cache = new NodeCache(cf, "/super", false); cache.start(true); cache.getListenable().addListener(new NodeCacheListener() { /** * <B>方法名稱:</B>nodeChanged<BR> * <B>概要說明:</B>觸發事件為創建節點和更新節點,在刪除節點的時候并不觸發此操作。<BR> * @see org.apache.curator.framework.recipes.cache.NodeCacheListener#nodeChanged() */ @Override public void nodeChanged() throws Exception { System.out.println("路徑為:" + cache.getCurrentData().getPath()); System.out.println("數據為:" + new String(cache.getCurrentData().getData())); System.out.println("狀態為:" + cache.getCurrentData().getStat()); System.out.println("---------------------------------------"); } }); Thread.sleep(1000); cf.create().forPath("/super", "123".getBytes()); Thread.sleep(1000); cf.setData().forPath("/super", "456".getBytes()); Thread.sleep(1000); cf.delete().forPath("/super"); Thread.sleep(Integer.MAX_VALUE); } } ## 六、 Curator 的監聽 之二 public class CuratorWatcher { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超時時間 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 建立連接 cf.start(); //4 建立一個PathChildrenCache緩存,第三個參數為是否接受節點數據內容 如果為false則不接受 PathChildrenCache cache = new PathChildrenCache(cf, "/super", true); //5 在初始化的時候就進行緩存監聽 cache.start(StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { /** * <B>方法名稱:</B>監聽子節點變更<BR> * <B>概要說明:</B>新建、修改、刪除<BR> * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent) */ @Override public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED :" + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED :" + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED :" + event.getData().getPath()); break; default: break; } } }); //創建本身節點不發生變化 cf.create().forPath("/super", "init".getBytes()); //添加子節點 Thread.sleep(1000); cf.create().forPath("/super/c1", "c1內容".getBytes()); Thread.sleep(1000); cf.create().forPath("/super/c2", "c2內容".getBytes()); //修改子節點 Thread.sleep(1000); cf.setData().forPath("/super/c1", "c1更新內容".getBytes()); //刪除子節點 Thread.sleep(1000); cf.delete().forPath("/super/c2"); //刪除本身節點 Thread.sleep(1000); cf.delete().deletingChildrenIfNeeded().forPath("/super"); Thread.sleep(Integer.MAX_VALUE); } } ### 七、分布式鎖 ### 在java高并發和多線程中是怎么解決的呢? 在分布式場景中,我們為了保證數據的一致性,經常在程序運行的某一點需要同步操作 (java 可提供synchronized 或者 Reentrantlock實現) public class Lock1 { static ReentrantLock reentrantLock = new ReentrantLock(); static int count = 10; public static void genarNo(){ try { reentrantLock.lock(); count--; //System.out.println(count); } finally { reentrantLock.unlock(); } } public static void main(String[] args) throws Exception{ final CountDownLatch countdown = new CountDownLatch(1); for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { try { countdown.await(); genarNo(); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); System.out.println(sdf.format(new Date())); //System.out.println(System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { } } },"t" + i).start(); } Thread.sleep(50); countdown.countDown(); } } ### Zookpeer分布式鎖的實現 public class Lock2 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超時時間 */ static final int SESSION_OUTTIME = 5000;//ms static int count = 10; public static void genarNo(){ try { count--; System.out.println(count); } finally { } } public static void main(String[] args) throws Exception { //1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) // .namespace("super") .build(); //3 開啟連接 cf.start(); //4 分布式鎖 final InterProcessMutex lock = new InterProcessMutex(cf, "/super"); //final ReentrantLock reentrantLock = new ReentrantLock(); final CountDownLatch countdown = new CountDownLatch(1); for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { try { countdown.await(); //加鎖 lock.acquire(); //reentrantLock.lock(); //-------------業務處理開始 //genarNo(); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); System.out.println(sdf.format(new Date())); //System.out.println(System.currentTimeMillis()); //-------------業務處理結束 } catch (Exception e) { e.printStackTrace(); } finally { try { //釋放 lock.release(); //reentrantLock.unlock(); } catch (Exception e) { e.printStackTrace(); } } } },"t" + i).start(); } Thread.sleep(100); countdown.countDown(); } } ## 八、分布式計數器 分布式計數器,在高并發中使用AtomicInteger這種經典的方式、如果對于一個jvm 的場景當然沒有問題,但是我們現在是分布式、就需要利用Curator框架的Distributed AtomicInteger了 public class CuratorAtomicInteger { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超時時間 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 開啟連接 cf.start(); //cf.delete().forPath("/super"); //4 使用DistributedAtomicInteger DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(cf, "/super", new RetryNTimes(3, 1000)); AtomicValue<Integer> value = atomicIntger.add(1); System.out.println(value.succeeded()); System.out.println(value.postValue()); //最新值 System.out.println(value.preValue()); //原始值 } } ## 九、barrier public class CuratorBarrier1 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超時時間 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { for(int i = 0; i < 5; i++){ new Thread(new Runnable() { @Override public void run() { try { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .retryPolicy(retryPolicy) .build(); cf.start(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println(Thread.currentThread().getName() + "已經準備"); barrier.enter(); System.out.println("同時開始運行..."); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println(Thread.currentThread().getName() + "運行完畢"); barrier.leave(); System.out.println("同時退出運行..."); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start(); } } } public class CuratorBarrier2 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超時時間 */ static final int SESSION_OUTTIME = 5000;//ms static DistributedBarrier barrier = null; public static void main(String[] args) throws Exception { for(int i = 0; i < 5; i++){ new Thread(new Runnable() { @Override public void run() { try { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); cf.start(); barrier = new DistributedBarrier(cf, "/super"); System.out.println(Thread.currentThread().getName() + "設置barrier!"); barrier.setBarrier(); //設置 barrier.waitOnBarrier(); //等待 System.out.println("---------開始執行程序----------"); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start(); } Thread.sleep(5000); barrier.removeBarrier(); //釋放 } } ## 十、集群管理 public class CuratorWatcher { /** 父節點path */ static final String PARENT_PATH = "/super"; /** zookeeper服務器地址 */ public static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** 定義session失效時間 */ public static final int SESSION_TIMEOUT = 30000; public CuratorWatcher() throws Exception{ //1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); //3 建立連接 cf.start(); //4 創建跟節點 if(cf.checkExists().forPath(PARENT_PATH) == null){ cf.create().withMode(CreateMode.PERSISTENT).forPath(PARENT_PATH,"super init".getBytes()); } //4 建立一個PathChildrenCache緩存,第三個參數為是否接受節點數據內容 如果為false則不接受 PathChildrenCache cache = new PathChildrenCache(cf, PARENT_PATH, true); //5 在初始化的時候就進行緩存監聽 cache.start(StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { /** * <B>方法名稱:</B>監聽子節點變更<BR> * <B>概要說明:</B>新建、修改、刪除<BR> * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent) */ @Override public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED :" + event.getData().getPath()); System.out.println("CHILD_ADDED :" + new String(event.getData().getData())); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED :" + event.getData().getPath()); System.out.println("CHILD_UPDATED :" + new String(event.getData().getData())); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED :" + event.getData().getPath()); System.out.println("CHILD_REMOVED :" + new String(event.getData().getData())); break; default: break; } } }); } } public class Client1 { public static void main(String[] args) throws Exception{ CuratorWatcher watcher = new CuratorWatcher(); Thread.sleep(100000000); } } public class Client2 { public static void main(String[] args) throws Exception{ CuratorWatcher watcher = new CuratorWatcher(); Thread.sleep(100000000); } }
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看