## 一、Curator框架使用
Curator框架,非常強大,目前已經是Apache的頂級項目,里面提供了更多豐富的操作,session超時重連,主從選舉,分布式計數器,分布式鎖等適合各種復雜的zookeeper
## 二、依賴的引入
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
## 三、Curator框架中使用鏈式編程的風格,易讀性更強,試用工程方法創建連接對象
1.使用CuratorFameworkFactory的兩個靜態工廠方法來實現
參數一:connectString,連接串
參數二: retryPolicy,重試連接策略,
參數三 sessionTimeoutMs會話超時,默認60ms
參數四 connectionTimeout 連接超時時間,默認為15000ms
2.創建節點create方法,可選鏈式項
creatingParentsIfNeeded:是否需要父節點
withMode: 需要的模式
forPath: 路徑 key value
withACL: 需要認證
3.刪除節點delete方法,可選擇鏈式項
deletingChildrenIfNeeded:遞歸的刪除
graranted: 安全的操作
withVersion: 刪除版本
forPath:
4.讀取和修改數據
gatData: 讀取數據
setData: 設置數據
6.異步綁定回調方法,比如節點綁定一個回調函數,該回調函數可以輸出服務器的狀態碼,以及服務的時間的類型,還可以加入一個線程池進行優化操作。
7.讀取子節點方法getChildren
8.判斷節點是否存在方法checkExists
## 四、代碼實現
public class CuratorBase {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
//1 重試策略:初試時間為1s 重試10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通過工廠創建連接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
// .namespace("super")
.build();
//3 開啟連接
cf.start();
// System.out.println(States.CONNECTED);
// System.out.println(cf.getState());
// 新加、刪除
/**
//4 建立節點 指定節點類型(不加withMode默認為持久類型節點)、路徑、數據內容
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1內容".getBytes());
//5 刪除節點
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
*/
// 讀取、修改
/**
//創建節點
// cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1內容".getBytes());
// cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2內容".getBytes());
//讀取節點
// String ret1 = new String(cf.getData().forPath("/super/c2"));
// System.out.println(ret1);
//修改節點
// cf.setData().forPath("/super/c2", "修改c2內容".getBytes());
// String ret2 = new String(cf.getData().forPath("/super/c2"));
// System.out.println(ret2);
*/
// 綁定回調函數
/**
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
System.out.println("code:" + ce.getResultCode());
System.out.println("type:" + ce.getType());
System.out.println("線程為:" + Thread.currentThread().getName());
}
}, pool)
.forPath("/super/c3","c3內容".getBytes());
Thread.sleep(Integer.MAX_VALUE);
*/
// 讀取子節點getChildren方法 和 判斷節點是否存在checkExists方法
/**
List<String> list = cf.getChildren().forPath("/super");
for(String p : list){
System.out.println(p);
}
Stat stat = cf.checkExists().forPath("/super/c3");
System.out.println(stat);
Thread.sleep(2000);
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
*/
//cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
}
}
### 總結: curator 對節點的簡單操作
#### 一、新建
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath("/super/c1","c1內容".getBytes());
#### 二、刪除節點
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
### 三、獲取子節點
cf.getData().forPath("/super/c2")
### 四、修改子節點
cf.setData().forPath("/super/c2", "修改c2內容".getBytes());
### 重點
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
System.out.println("code:" + ce.getResultCode());
System.out.println("type:" + ce.getType());
System.out.println("線程為:" + Thread.currentThread().getName());
}
}, pool)
為什么采用線程池去承載,是因為在高并發的情況下,我們不能為每一個線程開辟一段空間,我們采用
線程池去調度。當線程池里面的線程有空閑的時間,我們會調度線程池里面的線程去執行里面的代碼。
### 五、 Curator 的監聽 之一
第一步導入jar包
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
使用NodeCache的方式去客服端實例中注冊一個監聽緩存,然后實現對應的監聽方法即可,這里我們主要有兩種監聽方式。
NodeCacheListener: 監聽節點的新增、修改操作
PathChildrenCacheListener:監聽子節點的新增、修改、刪除操作
#### 加緩存,不是重復注冊
public class CuratorWatcher1 {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
//1 重試策略:初試時間為1s 重試10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通過工廠創建連接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
//3 建立連接
cf.start();
//4 建立一個cache緩存
final NodeCache cache = new NodeCache(cf, "/super", false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
/**
* <B>方法名稱:</B>nodeChanged<BR>
* <B>概要說明:</B>觸發事件為創建節點和更新節點,在刪除節點的時候并不觸發此操作。<BR>
* @see org.apache.curator.framework.recipes.cache.NodeCacheListener#nodeChanged()
*/
@Override
public void nodeChanged() throws Exception {
System.out.println("路徑為:" + cache.getCurrentData().getPath());
System.out.println("數據為:" + new String(cache.getCurrentData().getData()));
System.out.println("狀態為:" + cache.getCurrentData().getStat());
System.out.println("---------------------------------------");
}
});
Thread.sleep(1000);
cf.create().forPath("/super", "123".getBytes());
Thread.sleep(1000);
cf.setData().forPath("/super", "456".getBytes());
Thread.sleep(1000);
cf.delete().forPath("/super");
Thread.sleep(Integer.MAX_VALUE);
}
}
## 六、 Curator 的監聽 之二
public class CuratorWatcher {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
//1 重試策略:初試時間為1s 重試10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通過工廠創建連接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
//3 建立連接
cf.start();
//4 建立一個PathChildrenCache緩存,第三個參數為是否接受節點數據內容 如果為false則不接受
PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);
//5 在初始化的時候就進行緩存監聽
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
/**
* <B>方法名稱:</B>監聽子節點變更<BR>
* <B>概要說明:</B>新建、修改、刪除<BR>
* @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
*/
@Override
public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED :" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED :" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED :" + event.getData().getPath());
break;
default:
break;
}
}
});
//創建本身節點不發生變化
cf.create().forPath("/super", "init".getBytes());
//添加子節點
Thread.sleep(1000);
cf.create().forPath("/super/c1", "c1內容".getBytes());
Thread.sleep(1000);
cf.create().forPath("/super/c2", "c2內容".getBytes());
//修改子節點
Thread.sleep(1000);
cf.setData().forPath("/super/c1", "c1更新內容".getBytes());
//刪除子節點
Thread.sleep(1000);
cf.delete().forPath("/super/c2");
//刪除本身節點
Thread.sleep(1000);
cf.delete().deletingChildrenIfNeeded().forPath("/super");
Thread.sleep(Integer.MAX_VALUE);
}
}
### 七、分布式鎖
### 在java高并發和多線程中是怎么解決的呢?
在分布式場景中,我們為了保證數據的一致性,經常在程序運行的某一點需要同步操作
(java 可提供synchronized 或者 Reentrantlock實現)
public class Lock1 {
static ReentrantLock reentrantLock = new ReentrantLock();
static int count = 10;
public static void genarNo(){
try {
reentrantLock.lock();
count--;
//System.out.println(count);
} finally {
reentrantLock.unlock();
}
}
public static void main(String[] args) throws Exception{
final CountDownLatch countdown = new CountDownLatch(1);
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
countdown.await();
genarNo();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
System.out.println(sdf.format(new Date()));
//System.out.println(System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
},"t" + i).start();
}
Thread.sleep(50);
countdown.countDown();
}
}
### Zookpeer分布式鎖的實現
public class Lock2 {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 5000;//ms
static int count = 10;
public static void genarNo(){
try {
count--;
System.out.println(count);
} finally {
}
}
public static void main(String[] args) throws Exception {
//1 重試策略:初試時間為1s 重試10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通過工廠創建連接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
// .namespace("super")
.build();
//3 開啟連接
cf.start();
//4 分布式鎖
final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
//final ReentrantLock reentrantLock = new ReentrantLock();
final CountDownLatch countdown = new CountDownLatch(1);
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
countdown.await();
//加鎖
lock.acquire();
//reentrantLock.lock();
//-------------業務處理開始
//genarNo();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
System.out.println(sdf.format(new Date()));
//System.out.println(System.currentTimeMillis());
//-------------業務處理結束
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//釋放
lock.release();
//reentrantLock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
}
},"t" + i).start();
}
Thread.sleep(100);
countdown.countDown();
}
}
## 八、分布式計數器
分布式計數器,在高并發中使用AtomicInteger這種經典的方式、如果對于一個jvm
的場景當然沒有問題,但是我們現在是分布式、就需要利用Curator框架的Distributed
AtomicInteger了
public class CuratorAtomicInteger {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
//1 重試策略:初試時間為1s 重試10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通過工廠創建連接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
//3 開啟連接
cf.start();
//cf.delete().forPath("/super");
//4 使用DistributedAtomicInteger
DistributedAtomicInteger atomicIntger =
new DistributedAtomicInteger(cf, "/super", new RetryNTimes(3, 1000));
AtomicValue<Integer> value = atomicIntger.add(1);
System.out.println(value.succeeded());
System.out.println(value.postValue()); //最新值
System.out.println(value.preValue()); //原始值
}
}
## 九、barrier
public class CuratorBarrier1 {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
for(int i = 0; i < 5; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.retryPolicy(retryPolicy)
.build();
cf.start();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println(Thread.currentThread().getName() + "已經準備");
barrier.enter();
System.out.println("同時開始運行...");
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println(Thread.currentThread().getName() + "運行完畢");
barrier.leave();
System.out.println("同時退出運行...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();
}
}
}
public class CuratorBarrier2 {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 5000;//ms
static DistributedBarrier barrier = null;
public static void main(String[] args) throws Exception {
for(int i = 0; i < 5; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
cf.start();
barrier = new DistributedBarrier(cf, "/super");
System.out.println(Thread.currentThread().getName() + "設置barrier!");
barrier.setBarrier(); //設置
barrier.waitOnBarrier(); //等待
System.out.println("---------開始執行程序----------");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();
}
Thread.sleep(5000);
barrier.removeBarrier(); //釋放
}
}
## 十、集群管理
public class CuratorWatcher {
/** 父節點path */
static final String PARENT_PATH = "/super";
/** zookeeper服務器地址 */
public static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** 定義session失效時間 */
public static final int SESSION_TIMEOUT = 30000;
public CuratorWatcher() throws Exception{
//1 重試策略:初試時間為1s 重試10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通過工廠創建連接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(retryPolicy)
.build();
//3 建立連接
cf.start();
//4 創建跟節點
if(cf.checkExists().forPath(PARENT_PATH) == null){
cf.create().withMode(CreateMode.PERSISTENT).forPath(PARENT_PATH,"super init".getBytes());
}
//4 建立一個PathChildrenCache緩存,第三個參數為是否接受節點數據內容 如果為false則不接受
PathChildrenCache cache = new PathChildrenCache(cf, PARENT_PATH, true);
//5 在初始化的時候就進行緩存監聽
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
/**
* <B>方法名稱:</B>監聽子節點變更<BR>
* <B>概要說明:</B>新建、修改、刪除<BR>
* @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
*/
@Override
public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED :" + event.getData().getPath());
System.out.println("CHILD_ADDED :" + new String(event.getData().getData()));
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED :" + event.getData().getPath());
System.out.println("CHILD_UPDATED :" + new String(event.getData().getData()));
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED :" + event.getData().getPath());
System.out.println("CHILD_REMOVED :" + new String(event.getData().getData()));
break;
default:
break;
}
}
});
}
}
public class Client1 {
public static void main(String[] args) throws Exception{
CuratorWatcher watcher = new CuratorWatcher();
Thread.sleep(100000000);
}
}
public class Client2 {
public static void main(String[] args) throws Exception{
CuratorWatcher watcher = new CuratorWatcher();
Thread.sleep(100000000);
}
}