## 33 CountDownLatch、Atomic 等其它源碼解析
## 引導語
本小節和大家一起來看看 CountDownLatch 和 Atomic 打頭的原子操作類,CountDownLatch 的源碼非常少,看起來比較簡單,但 CountDownLatch 的實際應用卻不是很容易;Atomic 原子操作類就比較好理解和應用,接下來我們分別來看一下。
### 1 CountDownLatch
CountDownLatch 中文有的叫做計數器,也有翻譯為計數鎖,其最大的作用不是為了加鎖,而是通過計數達到等待的功能,主要有兩種形式的等待:
1. 讓一組線程在全部啟動完成之后,再一起執行(先啟動的線程需要阻塞等待后啟動的線程,直到一組線程全部都啟動完成后,再一起執行);
2. 主線程等待另外一組線程都執行完成之后,再繼續執行。
我們會舉一個示例來演示這兩種情況,但在這之前,我們先來看看 CountDownLatch 的底層源碼實現,這樣就會清晰一點,不然一開始就來看示例,估計很難理解。
CountDownLatch 有兩個比較重要的 API,分別是 await 和 countDown,管理著線程能否獲得鎖和鎖的釋放(也可以稱為對 state 的計數增加和減少)。
#### 1.1 await
await 我們可以叫做等待,也可以叫做加鎖,有兩種不同入參的方法,源碼如下:
```
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 帶有超時時間的,最終都會轉化成毫秒 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
```
兩個方法底層使用的都是 sync,sync 是一個同步器,是 CountDownLatch 的內部類實現的,如下:
```
private static final class Sync extends AbstractQueuedSynchronizer {}
```
可以看出來 Sync 繼承了 AbstractQueuedSynchronizer,具備了同步器的通用功能。
無參 await 底層使用的是 acquireSharedInterruptibly 方法,有參的使用的是 tryAcquireSharedNanos 方法,這兩個方法都是 AQS 的方法,底層實現很相似,主要分成兩步:
1. 使用子類的 tryAcquireShared 方法嘗試獲得鎖,如果獲取了鎖直接返回,獲取不到鎖走 2;
2. 獲取不到鎖,用 Node 封裝一下當前線程,追加到同步隊列的尾部,等待在合適的時機去獲得鎖。
第二步是 AQS 已經實現了,第一步 tryAcquireShared 方法是交給 Sync 實現的,源碼如下:
```
// 如果當前同步器的狀態是 0 的話,表示可獲得鎖 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
```
獲得鎖的代碼也很簡單,直接根據同步器的 state 字段來進行判斷,但還是有兩點需要注意一下:
1. 獲得鎖時,state 的值不會發生變化,像 ReentrantLock 在獲得鎖時,會把 state + 1,但 CountDownLatch 不會;
2. CountDownLatch 的 state 并不是 AQS 的默認值 0,而是可以賦值的,是在 CountDownLatch 初始化的時候賦值的,代碼如下:
```
// 初始化,count 代表 state 的初始化值 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // new Sync 底層代碼是 state = count; this.sync = new Sync(count); }
```
這里的初始化的 count 和一般的鎖意義不太一樣,count 表示我們希望等待的線程數,在兩種不同的等待場景中,count 有不同的含義:
1. 讓一組線程在全部啟動完成之后,再一起執行的等待場景下, count 代表一組線程的個數;
2. 主線程等待另外一組線程都執行完成之后,再繼續執行的等待場景下,count 代表一組線程的個數。
所以我們可以把 count 看做我們希望等待的一組線程的個數,可能我們是等待一組線程全部啟動完成,可能我們是等待一組線程全部執行完成。
#### 1.2 countDown
countDown 中文翻譯為倒計時,每調用一次,都會使 state 減一,底層調用的方法如下:
```
public void countDown() { sync.releaseShared(1); }
```
releaseShared 是 AQS 定義的方法,方法主要分成兩步:
1. 嘗試釋放鎖(tryReleaseShared),鎖釋放失敗直接返回,釋放成功走 2;
2. 釋放當前節點的后置等待節點。
第二步 AQS 已經實現了,第一步是 Sync 實現的,我們一起來看下 tryReleaseShared 方法的實現源碼:
```
// 對 state 進行遞減,直到 state 變成 0; // state 遞減為 0 時,返回 true,其余返回 false protected boolean tryReleaseShared(int releases) { // 自旋保證 CAS 一定可以成功 for (;;) { int c = getState(); // state 已經是 0 了,直接返回 false if (c == 0) return false; // 對 state 進行遞減 int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0;
} }
```
從源碼中可以看到,只有到 count 遞減到 0 時,countDown 才會返回 true。
#### 1.3 示例
看完 CountDownLatch 兩個重要 API 后,我們來實現文章開頭說的兩個功能:
1. 讓一組線程在全部啟動完成之后,再一起執行;
2. 主線程等待另外一組線程都執行完成之后,再繼續執行。
代碼在 CountDownLatchDemo 類中,大家可以調試看看,源碼如下:
```
public class CountDownLatchDemo { // 線程任務 class Worker implements Runnable { // 定義計數鎖用來實現功能 1 private final CountDownLatch startSignal; // 定義計數鎖用來實現功能 2 private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } // 子線程做的事情 public void run() { try { System.out.println(Thread.currentThread().getName()+" begin"); // await 時有兩點需要注意:await 時 state 不會發生變化,2:startSignal 的state
初始化是 1,所以所有子線程都是獲取不到鎖的,都需要到同步隊列中去等待,達到先啟動的子線程等待后面啟動的子線程的結果 startSignal.await(); doWork(); // countDown 每次會使 state 減一,doneSignal 初始化為 9,countDown 前 8 次執行都會返回 false (releaseShared 方法),執行第 9 次時,state 遞減為 0,會 countDown 成功,表示所有子線程都執行完了,會釋放 await 在 doneSignal 上的主線程 doneSignal.countDown(); System.out.println(Thread.currentThread().getName()+" end"); } catch (InterruptedException ex) { } // return; } void doWork() throws InterruptedException { System.out.println(Thread.currentThread().getName()+"sleep 5s …………"); Thread.sleep(5000l); } } @Test public void test() throws InterruptedException { // state 初始化為 1 很關鍵,子線程是不斷的 await,await 時 state 是不會變化的,并且發現 state 都是 1,所有線程都獲取不到鎖 // 造成所有線程都到同步隊列中去等待,當主線程執行 countDown 時,就會一起把等待的線程給釋放掉 CountDownLatch startSignal = new CountDownLatch(1); // state 初始化成 9,表示有 9 個子線程執行完成之后,會喚醒主線程 CountDownLatch doneSignal = new CountDownLatch(9); for (int i = 0; i < 9; ++i) // create and start threads { new Thread(new Worker(startSignal, doneSignal)).start();
} System.out.println("main thread begin"); // 這行代碼喚醒 9 個子線程,開始執行(因為 startSignal 鎖的狀態是 1,所以調用一次 countDown 方法就可以釋放9個等待的子線程) startSignal.countDown(); // 這行代碼使主線程陷入沉睡,等待 9 個子線程執行完成之后才會繼續執行(就是等待子線程執行 doneSignal.countDown()) doneSignal.await(); System.out.println("main thread end"); } }
```
執行結果:
```
Thread-0 begin
Thread-1 begin
Thread-2 begin
Thread-3 begin
Thread-4 begin
Thread-5 begin
Thread-6 begin
Thread-7 begin
Thread-8 begin
main thread begin
Thread-0sleep 5s …………
Thread-1sleep 5s …………
Thread-4sleep 5s …………
Thread-3sleep 5s …………
Thread-2sleep 5s …………
Thread-8sleep 5s …………
Thread-7sleep 5s …………
Thread-6sleep 5s …………
Thread-5sleep 5s …………
Thread-0 end
Thread-1 end
Thread-4 end
Thread-3 end
Thread-2 end
Thread-8 end
Thread-7 end
Thread-6 end
Thread-5 end
main thread end
```
從執行結果中,可以看出已經實現了以上兩個功能,實現比較繞,大家可以根據注釋,debug 看一看。
### 2 Atomic 原子操作類
Atomic 打頭的原子操作類有很多,涉及到 Java 常用的數字類型的,基本都有相應的 Atomic 原子操作類,如下圖所示:

Atomic 打頭的原子操作類,在高并發場景下,都是線程安全的,我們可以放心使用。
我們以 AtomicInteger 為例子,來看下主要的底層實現:
```
private volatile int value; // 初始化 public AtomicInteger(int initialValue) { value = initialValue; } // 得到當前值 public final int get() { return value; } // 自增 1,并返回自增之前的值 public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } // 自減 1,并返回自增之前的值 public final int getAndDecrement() { return unsafe.getAndAddInt(this, valueOffset, -1); }
```
從源碼中,我們可以看到,線程安全的操作方法,底層都是使用 unsafe 方法實現,以上幾個 unsafe 方法不是使用 Java 實現的,都是線程安全的。
AtomicInteger 是對 int 類型的值進行自增自減,那如果 Atomic 的對象是個自定義類怎么辦呢,Java 也提供了自定義對象的原子操作類,叫做 AtomicReference。AtomicReference 類可操作的對象是個泛型,所以支持自定義類,其底層是沒有自增方法的,操作的方法可以作為函數入參傳遞,源碼如下:
```
// 對 x 執行 accumulatorFunction 操作 // accumulatorFunction 是個函數,可以自定義想做的事情 // 返回老值 public final V getAndAccumulate(V x,
BinaryOperator<V> accumulatorFunction) { // prev 是老值,next 是新值 V prev, next; // 自旋 + CAS 保證一定可以替換老值 do { prev = get(); // 執行自定義操作 next = accumulatorFunction.apply(prev, x); } while (!compareAndSet(prev, next)); return prev; }
```
### 3 總結
CountDownLatch 的源碼實現簡單,但真的要用好還是不簡單的,其使用場景比較復雜,建議同學們可以 debug 一下 CountDownLatchDemo,在增加實戰能力基礎上,增加底層的理解能力。
- 前言
- 第1章 基礎
- 01 開篇詞:為什么學習本專欄
- 02 String、Long 源碼解析和面試題
- 03 Java 常用關鍵字理解
- 04 Arrays、Collections、Objects 常用方法源碼解析
- 第2章 集合
- 05 ArrayList 源碼解析和設計思路
- 06 LinkedList 源碼解析
- 07 List 源碼會問哪些面試題
- 08 HashMap 源碼解析
- 09 TreeMap 和 LinkedHashMap 核心源碼解析
- 10 Map源碼會問哪些面試題
- 11 HashSet、TreeSet 源碼解析
- 12 彰顯細節:看集合源碼對我們實際工作的幫助和應用
- 13 差異對比:集合在 Java 7 和 8 有何不同和改進
- 14 簡化工作:Guava Lists Maps 實際工作運用和源碼
- 第3章 并發集合類
- 15 CopyOnWriteArrayList 源碼解析和設計思路
- 16 ConcurrentHashMap 源碼解析和設計思路
- 17 并發 List、Map源碼面試題
- 18 場景集合:并發 List、Map的應用場景
- 第4章 隊列
- 19 LinkedBlockingQueue 源碼解析
- 20 SynchronousQueue 源碼解析
- 21 DelayQueue 源碼解析
- 22 ArrayBlockingQueue 源碼解析
- 23 隊列在源碼方面的面試題
- 24 舉一反三:隊列在 Java 其它源碼中的應用
- 25 整體設計:隊列設計思想、工作中使用場景
- 26 驚嘆面試官:由淺入深手寫隊列
- 第5章 線程
- 27 Thread 源碼解析
- 28 Future、ExecutorService 源碼解析
- 29 押寶線程源碼面試題
- 第6章 鎖
- 30 AbstractQueuedSynchronizer 源碼解析(上)
- 31 AbstractQueuedSynchronizer 源碼解析(下)
- 32 ReentrantLock 源碼解析
- 33 CountDownLatch、Atomic 等其它源碼解析
- 34 只求問倒:連環相扣系列鎖面試題
- 35 經驗總結:各種鎖在工作中使用場景和細節
- 36 從容不迫:重寫鎖的設計結構和細節
- 第7章 線程池
- 37 ThreadPoolExecutor 源碼解析
- 38 線程池源碼面試題
- 39 經驗總結:不同場景,如何使用線程池
- 40 打動面試官:線程池流程編排中的運用實戰
- 第8章 Lambda 流
- 41 突破難點:如何看 Lambda 源碼
- 42 常用的 Lambda 表達式使用場景解析和應用
- 第9章 其他
- 43 ThreadLocal 源碼解析
- 44 場景實戰:ThreadLocal 在上下文傳值場景下的實踐
- 45 Socket 源碼及面試題
- 46 ServerSocket 源碼及面試題
- 47 工作實戰:Socket 結合線程池的使用
- 第10章 專欄總結
- 48 一起看過的 Java 源碼和面試真題