## 28 人齊了,一起行動—CyclicBarrier詳解
> 青年是學習智慧的時期,中年是付諸實踐的時期。
> —— 盧梭
上一節我們講解了 CountDownLatch,它的作用是讓多個線程完成后,再促使主線程繼續向下執行。不過它有一定的局限性,無法被重復使用。本節我們學習的 CyclicBarrier 不會有這個問題。CyclicBarrier 從字面上理解為循環柵欄。柵欄自然起到的就是屏障的作用,阻止線程通過,而循環則是指其可以反復使用。下面我們就先看看如何使用 CyclicBarrier。
## 1、CyclicBarrier 的使用
幾年前北京的黑車盛行,西二旗地鐵口,大量黑車司機在出口招攬生意:“軟件園、軟件園!5 塊一位!還差最后一位!” 。等你上車,發現其實不是還差一位,而是只有你一位。而司機此時絕對不會發車,而是會等車上坐夠 4 個人后才出發,然后下一輛黑車再次坐滿 4 人后發車。下面我們就使用 CyclicBarrier 來模擬這個場景。
~~~java
public class Client {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () ->
System.out.println("人滿了發車")
);
IntStream.range(1, 11).forEach(number -> {
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
System.out.println("第 " + number + " 乘客上車了!");
cyclicBarrier.await();
System.out.println("第 " + number + " 乘客出發了!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
});
}
}
~~~
代碼中首先聲明 cyclicBarrier 對象,構造方法有兩個參數,第一個參數是計數器初始值,每有一個線程達成則會減 1 。減到 0 時,觸發執行第二個參數傳入的 Runnable 實現的 run 方法。我這里使用 lambda 的方式簡化代碼。如果你不需要這個 Runnable 的任務,那么只需要傳入第一個參數即可。
接下來的代碼中,模擬 10 位乘客上車,每次上車后調用 cyclicBarrier.await() 。這里就是屏障點,此時當前線程會阻塞在此處,并且計數器被減 1 。為了輸出的效果便于觀看,每次新線程啟動前先 sleep 一會。
每當四個乘客完成上車操作,cyclicBarrier 就會觸發 “人滿了發車” 的操作。而最后兩位乘客上車后,由于沒有新的乘客上車,計數器不會被減到 0,導致無法越過屏障,所以永遠不會發車。
cyclicBarrier運行的示意圖如下:

代碼運行輸出如下:
~~~
第 1 乘客上車了!
第 2 乘客上車了!
第 3 乘客上車了!
第 4 乘客上車了!
人滿了發車
第 4 乘客出發了!
第 1 乘客出發了!
第 2 乘客出發了!
第 3 乘客出發了!
第 5 乘客上車了!
第 6 乘客上車了!
第 7 乘客上車了!
第 8 乘客上車了!
人滿了發車
第 8 乘客出發了!
第 5 乘客出發了!
第 7 乘客出發了!
第 6 乘客出發了!
第 9 乘客上車了!
第 10 乘客上車了!
~~~
可以看到每上車 4 人,才會觸發發車,同時每個人的線程才會繼續 cyclicBarrier.await() 后面的代碼,輸出 “第 n 乘客出發了!”
這個例子也驗證了 CyclicBarrier 可以重復使用,每次滿 4 人上車,都會觸發發車。然后重新開始計數。
通過這個例子我們了解了 CyclicBarrier 的使用。在這里我們總結下 CyclicBarrier 涉及的幾個概念:
1、計數器。初始值為構造 CyclicBarrier 傳入的第一個參數,每當一個線程到達屏障點,計數器減1;
2、屏障點,線程中調用 cyclicBarrier.await() 后,該線程到達屏障點,等待 CyclicBarrier 打開,也就是計數器到 0 ;
3、沖出屏障后的任務。首先這個任務可選。不需要的話,在構造 CyclicBarrier 時只需要傳入計數器初始值即可。這個任務在計數器到 0時被觸發。
## 2、CyclicBarrier 原理解析
### 2.1、 CyclicBarrier 中的屬性
~~~java
/** CyclicBarrier使用的拍他鎖*/
private final ReentrantLock lock = new ReentrantLock();
/** barrier被沖破前,線程等待的condition*/
private final Condition trip = lock.newCondition();
/** barrier被沖破時,需要滿足的參與線程數。*/
private final int parties;
/* barrier被沖破后執行的方法。*/
private final Runnable barrierCommand;
/** 當其輪次 */
private Generation generation = new Generation();
/**
*目前等待剩余的參與者數量。從 parties倒數到0。每個輪次該值會被重置回parties
*/
private int count;
~~~
可以看到 CyclicBarrier 內部通過 ReentrantLock 來實現的,而 ReentrantLock 的底層實現還是 AQS。
parties 在構造函數中被賦值,它的值永遠不會變,因為 CyclicBarrier 會被重置復用。而每個輪次真正用來計數的變量是 count。每個輪次結束,count 會被重置為 parties 的值。
### 2.2、 await() 方法解析
await 方法的調用,代表調用線程到達了屏障點,這個方法其實調用了 dowait 方法,我們直接分析 dowait 方法,它實現了 CyclicBarrier 的核心功能。
~~~java
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//對共享資源count,generation操作前,需要先上鎖保證線程安全
lock.lock();
try {
//拿到當前輪次對象的引用
final Generation g = generation;
//如果已經broken,那么拋出異常
if (g.broken)
throw new BrokenBarrierException();
//如果被打斷,通過breakBarrier方法設置當前輪次為broken狀態,通知當前輪次所有等待的線程線程
//并且拋出InterruptedException
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//count減1
int index = --count;
//如果index為0,那么沖破屏障點
if (index == 0) { // tripped
boolean ranAction = false;
//沖破屏障點后,如果CyclicBarrier構造時傳入Runnable,則被調用。
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//這個方法中會進行重置,并且通知所有在屏障點阻塞的線程繼續執行。
nextGeneration();
return 0;
} finally {
//正常情況由于運行了command后ranAction被置為true,并不會執行如下邏輯
//在command執行期間出了異常才會進入下面的邏輯,認為當前輪次被破壞了
if (!ranAction)
breakBarrier();
}
}
//開始自旋,直到屏障被沖破,或者interrupted或者超時
for (;;) {
try {
if (!timed)
//阻塞,此時會釋放鎖,以讓其他線程進入await方法中。等待屏障被沖破后,向后執行
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//如果當前線程阻塞被interrupt了,并且本輪次還沒有被break,那么修改本輪次狀態為broken
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
//如果本輪次被破壞,那么拋出異常
if (g.broken)
throw new BrokenBarrierException();
//如果已經成功進入下一輪次,那么返回index
if (g != generation)
return index;
//如果已經超時,那么本輪次被打破
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//釋放鎖
lock.unlock();
}
}
~~~
以上代碼分為兩大段邏輯,分別是自旋前,和自旋。
**A、自旋前的邏輯,核心邏輯如下:**
1. 計數器 -1;
2. 判斷是否計數器到 0;
3. 如果到了,則沖破屏障點,執行傳入的 Runnable;
4. 調用 nextGeneration() 來更新 Generation,重置計數器,并且通知本輪次等待的線程。
**B、如果計數器沒有到 0,則進入自旋的邏輯:**
1. 開始等待,此時會釋放鎖,以讓其它線程進入 lock 的代碼塊執行以上邏輯;
2. 當被喚醒時,可能因為當前 generation 被 break 了,或者計數器到 0,屏障被沖破;
3. 對比邊剛進入 dowait 方法時獲取的 generation 對象和最新 generation 是否一致。不一致說明已經換代了,也就是屏障被沖破,可以 return 了;
4. 如果等待超時或者 generation 被 break,分別拋出異常。
不同線程在 A 部分的邏輯會影響已經進入 B 部分邏輯的線程中止自旋。這些自旋的線程或者沖破屏障點,繼續向下執行,也可能拋出異常。
我們再看下用于更新輪次的方法 nextGeneration():
~~~java
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
~~~
三行代碼做了三件事:
1、通知所有被阻塞在本輪次屏障點的線程。屏障點被沖破,可以繼續向下執行了;
2、重置計數器為初始值;
3、更新輪次對象。這樣自旋中的線程才會跳出自旋。
## 3、總結
CyclicBarrier 和 CountDownLatch 相比,更為靈活,可以被重復使用。前者可以用來分段任務,假如有個任務需要分三個階段來完成,每個階段可以多線程并發執行,但是進入下一個階段的時候,必須所有線程都完成了第一階段的執行。那么通過 CyclicBarrier,在每個線程的每個階段開始前都設置屏障點,可以很輕松地實現。
CyclicBarrier 的實現是通過 ReentrantLock 控制計數器的原子更新,通過條件變量來實現線程同步。
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語