## 19 LinkedBlockingQueue 源碼解析
## 引導語
說到隊列,大家的反應可能是我從來都沒有用過,應該是不重要的 API 吧。如果這么想,那就大錯特錯了,我們平時使用到的線程池、讀寫鎖、消息隊列等等技術和框架,底層原理都是隊列,所以我們萬萬不可輕視隊列,隊列是很多高級 API 的基礎,學好隊列,對自己深入 Java 學習非常重要。
本文主要以 LinkedBlockingQueue 隊列為例,詳細描述一下底層具體的實現。
### 1 整體架構
LinkedBlockingQueue 中文叫做鏈表阻塞隊列,這個命名很好,從命名上就知道其底層數據結構是鏈表,并且隊列是可阻塞的。接下來,我們就從整體結構上看看 LinkedBlockingQueue。
#### 1.1 類圖
首先我們來看下 LinkedBlockingQueue 類圖,如下:

從類圖中,我們大概可以看出兩條路徑:
1. AbstractQueue -> AbstractCollection -> Collection ->Iterable 這條路徑依賴,主要是想復用 Collection 和 迭代器的一些操作,這些我們在說集合的時候,都知道這些類是干什么,能干什么,就不細說了;
2. BlockingQueue -> Queue -> Collection,BlockingQueue 和 Queue 是新出來的兩個接口,我們重點說一下。
Queue 是最基礎的接口,幾乎所有的隊列實現類都會實現這個接口,該接口定義出了隊列的三大類操作:
新增操作:
1. add 隊列滿的時候拋出異常;
2. offer 隊列滿的時候返回 false。
查看并刪除操作:
1. remove 隊列空的時候拋異常;
2. poll 隊列空的時候返回 null。
只查看不刪除操作:
1. element 隊列空的時候拋異常;
2. peek 隊列空的時候返回 null。
一共 6 種方法,除了以上分類方法,也可以分成兩類:
1. 遇到隊列滿或空的時候,拋異常,如 add、remove、element;
2. 遇到隊列滿或空的時候,返回特殊值,如 offer、poll、peek。
實際上,這些都比較難記憶。每次需要使用的時候,我都會看會源碼,才能想起這個方法是拋異常還是返回特殊值。
BlockingQueue 在 Queue 的基礎上加上了阻塞的概念,比如一直阻塞,還是阻塞一段時間。為了方便記憶,我們畫一個表格,如下:
| | |特殊值|一直阻塞|阻塞一段時間|
| --- | --- | --- | --- | --- |
|新增操作–隊列滿|add|offer返回false|put|offer過超時時間返回false|
|查看并刪除操作–隊列空 |remove|poll 返回 null|take|阻塞一段時間|
|只查看不刪除操作–隊列空 |element|peek 返回 null|暫無|暫無|
PS: remove 方法,BlockingQueue 類注釋中定義的是拋異常,但 LinkedBlockingQueue 中 remove 方法實際是返回 false。 從表格中可以看到,在新增和查看并刪除兩大類操作上,BlockingQueue 增加了阻塞的功能,而且可以選擇一直阻塞,或者阻塞一段時間后,返回特殊值。
#### 1.2 類注釋
我們看看從 LinkedBlockingQueue 的類注釋中能得到那些信息:
1. 基于鏈表的阻塞隊列,其底層的數據結構是鏈表;
2. 鏈表維護先入先出隊列,新元素被放在隊尾,獲取元素從隊頭部拿;
3. 鏈表大小在初始化的時候可以設置,默認是 Integer 的最大值;
4. 可以使用 Collection 和 Iterator 兩個接口的所有操作,因為實現了兩者的接口。
#### 1.3 內部構成
LinkedBlockingQueue 內部構成簡單來說,分成三個部分:鏈表存儲 + 鎖 + 迭代器,我們來看下源碼。
```
// 鏈表結構 begin //鏈表的元素 static class Node<E> { E item; //當前元素的下一個,為空表示當前節點是最后一個
Node<E> next; Node(E x) { item = x; } } //鏈表的容量,默認 Integer.MAX_VALUE private final int capacity; //鏈表已有元素大小,使用 AtomicInteger,所以是線程安全的 private final AtomicInteger count = new AtomicInteger(); //鏈表頭 transient Node<E> head; //鏈表尾 private transient Node<E> last; // 鏈表結構 end // 鎖 begin //take 時的鎖 private final ReentrantLock takeLock = new ReentrantLock(); // take 的條件隊列,condition 可以簡單理解為基于 ASQ 同步機制建立的條件隊列 private final Condition notEmpty = takeLock.newCondition(); // put 時的鎖,設計兩把鎖的目的,主要為了 take 和 put 可以同時進行 private final ReentrantLock putLock = new ReentrantLock(); // put 的條件隊列 private final Condition notFull = putLock.newCondition(); // 鎖 end
// 迭代器 // 實現了自己的迭代器 private class Itr implements Iterator<E> { ……………… }
```
從代碼上來看,結構是非常清晰的,三種結構各司其職:
1. 鏈表的作用是為了保存當前節點,節點中的數據可以是任意東西,是一個泛型,比如說隊列被應用到線程池時,節點就是線程,比如隊列被應用到消息隊列中,節點就是消息,節點的含義主要看隊列被使用的場景;
2. 鎖有 take 鎖和 put 鎖,是為了保證隊列操作時的線程安全,設計兩種鎖,是為了 take 和 put 兩種操作可以同時進行,互不影響。
#### 1.4 初始化
初始化有三種方式:
1. 指定鏈表容量大小;
2. 不指定鏈表容量大小,默認是 Integer 的最大值;
3. 對已有集合數據進行初始化。
源碼如下:
```
// 不指定容量,默認 Integer 的最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 指定鏈表容量大小,鏈表頭尾相等,節點值(item)都是 null public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } // 已有集合數據進行初始化 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { // 集合內的元素不能為空 if (e == null) throw new NullPointerException(); // capacity 代表鏈表的大小,在這里是 Integer 的最大值 // 如果集合類的大小大于 Integer 的最大值,就會報錯 // 其實這個判斷完全可以放在 for 循環外面,這樣可以減少 Integer 的最大值次循環(最壞情況) if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
```
對于初始化源碼,我們說明兩點:
1. 初始化時,容量大小是不會影響性能的,只影響在后面的使用,因為初始化隊列太小,容易導致沒有放多少就會報隊列已滿的錯誤;
2. 在對給定集合數據進行初始化時,源碼給了一個不優雅的示范,我們不反對在每次 for 循環的時候,都去檢查當前鏈表的大小是否超過容量,但我們希望在 for 循環開始之前就做一步這樣的工作。舉個列子,給定集合大小是 1 w,鏈表大小是 9k,按照現在代碼實現,只能在 for 循環 9k 次時才能發現,原來給定集合的大小已經大于鏈表大小了,導致 9k 次循環都是在浪費資源,還不如在 for 循環之前就 check 一次,如果 1w > 9k,直接報錯即可。
### 2 阻塞新增
新增有多種方法,如:add、put、offer,三者的區別上文有說。我們拿 put 方法為例,put 方法在碰到隊列滿的時候,會一直阻塞下去,直到隊列不滿時,并且自己被喚醒時,才會繼續去執行,源碼如下:
```
// 把e新增到隊列的尾部。 // 如果有可以新增的空間的話,直接新增成功,否則當前線程陷入等待 public void put(E e) throws InterruptedException { // e 為空,拋出異常 if (e == null) throw new NullPointerException(); // 預先設置 c 為 -1,約定負數為新增失敗 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 設置可中斷鎖 putLock.lockInterruptibly(); try { // 隊列滿了 // 當前線程阻塞,等待其他線程的喚醒(其他線程 take 成功后就會喚醒此處被阻塞的線程) while (count.get() == capacity) {
// await 無限等待 notFull.await(); } // 隊列沒有滿,直接新增到隊列的尾部 enqueue(node); // 新增計數賦值,注意這里 getAndIncrement 返回的是舊值 // 這里的 c 是比真實的 count 小 1 的 c = count.getAndIncrement(); // 如果鏈表現在的大小 小于鏈表的容量,說明隊列未滿 // 可以嘗試喚醒一個 put 的等待線程 if (c + 1 < capacity) notFull.signal(); } finally { // 釋放鎖 putLock.unlock(); } // c==0,代表隊列里面有一個元素 // 會嘗試喚醒一個take的等待線程 if (c == 0) signalNotEmpty(); } // 入隊,把新元素放到隊尾 private void enqueue(Node<E> node) { last = last.next = node; }
```
從源碼中我們可以總結以下幾點:
1. 往隊列新增數據,第一步是上鎖,所以新增數據是線程安全的;
2. 隊列新增數據,簡單的追加到鏈表的尾部即可;
3. 新增時,如果隊列滿了,當前線程是會被阻塞的,阻塞的底層使用是鎖的能力,底層實現其它也和隊列相關,原理我們在鎖章節會說到;
4. 新增數據成功后,在適當時機,會喚起 put 的等待線程(隊列不滿時),或者 take 的等待線程(隊列不為空時),這樣保證隊列一旦滿足 put 或者 take 條件時,立馬就能喚起阻塞線程,繼續運行,保證了喚起的時機不被浪費。
以上就是 put 方法的原理,至于 offer 方法阻塞超過一端時間后,仍未成功,就會直接返回默認值的實現,和 put 方法相比只修改了幾行代碼,如下截圖:

### 3 阻塞刪除
刪除的方法也很多,我們主要看兩個關鍵問題:
1. 刪除的原理是怎樣的;
2. 查看并刪除和只查看不刪除兩種的區別是如何實現的。
首先我們來看第一個問題,我們以 take 方法為例,說明一下查看并刪除的底層源碼:
```
// 阻塞拿數據 public E take() throws InterruptedException { E x; // 默認負數,代表失敗 int c = -1; // count 代表當前鏈表數據的真實大小 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 空隊列時,阻塞,等待其他線程喚醒 while (count.get() == 0) { notEmpty.await(); } // 非空隊列,從隊列的頭部拿一個出來 x = dequeue(); // 減一計算,注意 getAndDecrement 返回的值是舊值 // c 比真實的 count 大1 c = count.getAndDecrement(); // 如果隊列里面有值,從 take 的等待線程里面喚醒一個。 // 意思是隊列里面有值啦,喚醒之前被阻塞的線程 if (c > 1) notEmpty.signal(); } finally { // 釋放鎖 takeLock.unlock(); } // 如果隊列空閑還剩下一個,嘗試從 put 的等待線程中喚醒一個
if (c == capacity) signalNotFull(); return x; } // 隊頭中取數據 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null;// 頭節點指向 null,刪除 return x; }
```
整體流程和 put 很相似,都是先上鎖,然后從隊列的頭部拿出數據,如果隊列為空,會一直阻塞到隊列有值為止。
而查看不刪除元素更加簡單,直接把隊列頭的數據拿出來即可,我們以 peek 為例,源碼如下:
```
// 查看并不刪除元素,如果隊列為空,返回 null public E peek() { // count 代表隊列實際大小,隊列為空,直接返回 null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 拿到隊列頭 Node<E> first = head.next; // 判斷隊列頭是否為空,并返回 if (first == null) return null; else
return first.item; } finally { takeLock.unlock(); } }
```
可以看出,查看并刪除,和查看不刪除兩者從隊頭拿數據的邏輯不太一致,從而導致一個會刪除,一個不會刪除隊頭數據。
### 4 總結
本文通過 LinkedBlockingQueue 的源碼,來介紹了下鏈表隊列,當隊列滿和空的場景下,新增和刪除數據時,隊列有啥變化。
隊列本身就是一個阻塞工具,我們可以把這個工具應用到各種阻塞場景中,比如說隊列應用到線程池,當線程池跑滿時,我們把新的請求都放到阻塞隊列中等待;隊列應用到消息隊列,當消費者處理能力有限時,我們可以把消息放到隊列中等待,讓消費者慢慢消費;每應用到一個新的場景中,都是一個新的技術工具,所以學好隊列,用處很大。
- 前言
- 第1章 基礎
- 01 開篇詞:為什么學習本專欄
- 02 String、Long 源碼解析和面試題
- 03 Java 常用關鍵字理解
- 04 Arrays、Collections、Objects 常用方法源碼解析
- 第2章 集合
- 05 ArrayList 源碼解析和設計思路
- 06 LinkedList 源碼解析
- 07 List 源碼會問哪些面試題
- 08 HashMap 源碼解析
- 09 TreeMap 和 LinkedHashMap 核心源碼解析
- 10 Map源碼會問哪些面試題
- 11 HashSet、TreeSet 源碼解析
- 12 彰顯細節:看集合源碼對我們實際工作的幫助和應用
- 13 差異對比:集合在 Java 7 和 8 有何不同和改進
- 14 簡化工作:Guava Lists Maps 實際工作運用和源碼
- 第3章 并發集合類
- 15 CopyOnWriteArrayList 源碼解析和設計思路
- 16 ConcurrentHashMap 源碼解析和設計思路
- 17 并發 List、Map源碼面試題
- 18 場景集合:并發 List、Map的應用場景
- 第4章 隊列
- 19 LinkedBlockingQueue 源碼解析
- 20 SynchronousQueue 源碼解析
- 21 DelayQueue 源碼解析
- 22 ArrayBlockingQueue 源碼解析
- 23 隊列在源碼方面的面試題
- 24 舉一反三:隊列在 Java 其它源碼中的應用
- 25 整體設計:隊列設計思想、工作中使用場景
- 26 驚嘆面試官:由淺入深手寫隊列
- 第5章 線程
- 27 Thread 源碼解析
- 28 Future、ExecutorService 源碼解析
- 29 押寶線程源碼面試題
- 第6章 鎖
- 30 AbstractQueuedSynchronizer 源碼解析(上)
- 31 AbstractQueuedSynchronizer 源碼解析(下)
- 32 ReentrantLock 源碼解析
- 33 CountDownLatch、Atomic 等其它源碼解析
- 34 只求問倒:連環相扣系列鎖面試題
- 35 經驗總結:各種鎖在工作中使用場景和細節
- 36 從容不迫:重寫鎖的設計結構和細節
- 第7章 線程池
- 37 ThreadPoolExecutor 源碼解析
- 38 線程池源碼面試題
- 39 經驗總結:不同場景,如何使用線程池
- 40 打動面試官:線程池流程編排中的運用實戰
- 第8章 Lambda 流
- 41 突破難點:如何看 Lambda 源碼
- 42 常用的 Lambda 表達式使用場景解析和應用
- 第9章 其他
- 43 ThreadLocal 源碼解析
- 44 場景實戰:ThreadLocal 在上下文傳值場景下的實踐
- 45 Socket 源碼及面試題
- 46 ServerSocket 源碼及面試題
- 47 工作實戰:Socket 結合線程池的使用
- 第10章 專欄總結
- 48 一起看過的 Java 源碼和面試真題