## 27 倒數計時開始,三、二、一—CountDownLatch詳解
> 時間像海綿里的水,只要你愿意擠,總還是有的。
> ——魯迅
本節開始我們學習一些新的東西,不再局限于線程、鎖、并發容器這些內容。JCU 包中提供了一些工具,用于線程間的協調。這些工具并不是每個并發編程的場景都需要使用。大部分場景通過 wait/nofity 或者 join 等操作就可以解決。但是在一些特定的場景下,我們則需要借助這些工具來解決問題。本節我們先來學習 CountDownLatch。
## 1、理解 CountDownLatch
從字面理解 CountDownLatch,意思是倒數門閂。它的作用是多個線程做匯聚。主線程開啟了 A、B、C 三個線程做不同的事情,但是主線程需要等待 A、B、C 三個線程全部完成后才能繼續后面的步驟。此時就需要 CountDownLatch 出馬了。CountDownLatch 會阻塞主線程,直到計數走到 0,門閂才會打開,主線程繼續執行。而計數遞減是每個線程自己操作 CountDownLatch 對象實現的。如下圖:

這種場景在我們的生活中十分常見。比如籃球比賽中,作為控球后衛,如果沒有快攻機會,那就需要等到中鋒、大前鋒、小前鋒、得分后衛都跑到位了,我才能決定怎么組織進攻。又比如我們報團去旅游,必須所有人都到機場了,才能一起出發。
對于我們的程序來說這種場景也挺多的,比如你的訂單信息可能需要從多個微服務取得數據,匯總后加工才返回給前臺。此時從多個微服務取得數據可以是多個子線程來完成。
對于以上場景,都是 CountDownLatch 的用武之地。
## 2、如何使用 CountDownLatch
我們來模擬打籃球的例子,主線程假如是控球后衛,我們看一下如果不用 CountDownLatch 會有什么問題:
~~~java
public static void main(String[] args) throws InterruptedException {
System.out.println("控球后衛到位!等待所有位置球員到位!");
new Thread(()->{
System.out.println("得分后衛到位!");
}).start();
new Thread(()->{
System.out.println("中鋒到位!");
}).start();
new Thread(()->{
System.out.println("大前鋒到位!");
}).start();
new Thread(()->{
System.out.println("小前鋒到位!");
}).start();
System.out.println("全部到位,開始進攻!");
}
~~~
輸出為:
~~~
控球后衛到位!等待所有位置球員到位!
得分后衛到位!
中鋒到位!
大前鋒到位!
全部到位,開始進攻!
小前鋒到位!
~~~
可以看到小前鋒還沒有到位,就開始進攻了。這顯然和需求不符。出現這種結果也很好理解,因為代碼中控球后衛并沒有等每個球員的線程到位,就開始進攻了。
正確的姿勢應該如下:
~~~java
public class Client {
private static final CountDownLatch countDownLatch = new CountDownLatch(5);
public static void main(String[] args) throws InterruptedException {
System.out.println("控球后衛到位!等待所有位置球員到位!");
countDownLatch.countDown();
new Thread(()->{
System.out.println("得分后衛到位!");
countDownLatch.countDown();
}).start();
new Thread(()->{
System.out.println("中鋒到位!");
countDownLatch.countDown();
}).start();
new Thread(()->{
System.out.println("大前鋒到位!");
countDownLatch.countDown();
}).start();
new Thread(()->{
System.out.println("小前鋒到位!");
countDownLatch.countDown();
}).start();
countDownLatch.await();
System.out.print("全部到位,開始進攻!");
}
}
~~~
首先聲明聲明了一個 countDownLatch 對象,由于有5名球員,所以傳入 count=5。每個球員的線程在球員到位后,都會執行 countDownLatch.countDown(),這個方法可以理解為我們把初始值的計數數量5做遞減。當減到零時才會執行 countDownLatch.await(); 后面的代碼。countDownLatch.await() 就是我們的門閂,這行代碼做的是鎖門操作,而每次 countDown(),調用5次后,門閂打開,后面的代碼才被執行。
這段代碼輸出如下:
~~~
控球后衛到位!等待所有位置球員到位!
得分后衛到位!
中鋒到位!
大前鋒到位!
小前鋒到位!
全部到位,開始進攻!
~~~
可以看出完全符合我們的預期,如果你還對此表示懷疑,那么你可以在某個線程中讓其 sleep 上幾秒,再看看是否還是全部到位才開始進攻。
## 3、CountDownLatch 的原理解析
CountDownLatch 內部其實還是借助 AQS 實現的。它內部實現了 AbstractQueuedSynchronizer。使用 AQS 的 state 變量來存儲計數器的值,初始化 CountDownLatch,實際在初始化 state 值。
### 3.1 構造函數
我們看其構造函數:
~~~java
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
~~~
~~~java
Sync(int count) {
setState(count);
}
~~~
~~~java
protected final void setState(int newState) {
state = newState;
}
~~~
三個方法串起來看,發現最后就是把傳入的 count 設置給了 state。
### 3.2 await 方法
await 方法會阻塞當前線程,代碼如下:
~~~java
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
~~~
調用了 AQS 的方法:
~~~java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
~~~
嘗試獲取共享鎖 tryAcquireShared,如果不能獲取進入等待隊列。
tryAcquireShared 方法由 CountDownLatch 的內部類 Sync 實現,如下:
~~~java
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
~~~
可以看到如果 state 為0就直接返回了,但如果不為零,才進入等待隊列。調用 tryAcquireShared 僅僅檢查 state值,而不會對其減 1,可以看到傳入的參數 acquires根本沒有用。
我們再看看 countDown 方法。
### 3.3 countDown 方法
這個方法會對 state 遞減。當計數器減為 0 時,所有阻塞的線程都被喚醒。代碼如下:
~~~java
public void countDown() {
sync.releaseShared(1);
}
~~~
可見其也是通過對自己的 AQS 子類調用 releaseShared 方法:
~~~java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
~~~
而在這個方法里,tryReleaseShared 是由子類實現的,也就是 countDown 中的 Sync 類,實現代碼如下:
~~~java
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
~~~
以上代碼在自旋中,通過 CAS 的方式對 state 值-1,如果 c-1 后等于 0,說明計數到 0。那么 releaseShared 中會調用 doReleaseShared(),讓 AQS 釋放資源出來。
以上對做 CountDownLatch 源代碼做了簡單的分析,可以看出主要是使用 AQS 來實現。通過阻塞隊列阻塞線程。然后通過 state 值的初始化和遞減,實現 state 為 0 時,激活阻塞的線程。
## 4、總結
CountDownLatch 有其一定的應用場景,對于多線程協調和串起流程有很大的幫助。我們在多線程開發中,可以留意是否有類似的場景,能夠通過 CountDownLatch 來解決。CountDownLatch 自身也有一定的局限性,它只能被使用一次,而不能被恢復再次使用。下一節我們講學習 CyclicBarrier,它可以重置以重復使用。
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35 拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語