## 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
> 讀書而不思考,等于吃飯而不消化。
> ——波爾克
前面兩節我們對 ConcurrentHashMap 的源代碼進行了分析,是不是意猶未盡?那么做好準備,本小節,我們將對阻塞隊列的借口 BlockingQueue 以及它的一個實現 ArrayBlockingQueue 做源碼分析。說實話,源碼分析十分的枯燥和艱難,不過堅持下來,收獲將會很大。
## 1、BlockingQueue 介紹
BlockingQueue 顧名思義----阻塞隊列。隊列大家應該都很清楚了,阻塞隊列是指當隊列滿時,入隊操作需要等待;當隊列空時,出隊操作需要等待。而非阻塞的方式則會直接返回false。
BlockingQueue 是一個接口。定義了出隊和入隊的方法。
###### 我們首先看入隊的方法:
**1、boolean add(E e)**
該方法向隊列添加元素e,如果可以有空間添加,那么添加成功返回 true。如果沒有空間,那么直接拋出 IllegalStateException。
**2、boolean offer(E e)**
該方法向隊列添加元素 e,如果可以有空間添加,那么添加成功返回 true。如果沒有空間,則會返回 false。和 add 方法很像,只不過不是拋異常。
**3、boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException**
該方法向隊列添加元素 e,如果可以有空間添加,那么添加成功返回 true。如果沒有空間,則會等待一定時長,如果仍舊沒有空間則返回 false。
**4、void put(E e) throws InterruptedException**
該方法向隊列添加元素 e,如果成功立即返回,如果沒有空間,則一直等待空間。等待是可被直接中斷。
這四種入隊的方法中前兩種為非阻塞方式,后兩種為阻塞方式。
###### 接下來我們再來分析出隊的方法:
**1、E take() throws InterruptedException**
返回并且從隊列中移除 head 元素。如果隊列為空,則會阻塞等待直到元素入隊。
**2、E poll(long timeout, TimeUnit unit)**
返回并且從隊列中移除 head 元素。如果隊列為空,則會阻塞一定的時間,等待有元素入隊。如果等待時間內沒有元素入隊,那么返回 null。
以上兩種出隊方法均為阻塞方法。
###### 還有幾個其它的方法,也做下介紹:
**1、boolean contains(Object o)**
如果隊列中至少含有一個元素 equals 對象 o,那么返回 true。
**2、int drainTo(Collection c)和int drainTo(Collection c, int maxElements)**
第一個方法是一次性“榨干”隊列,把所有元素 remove 掉,放入集合 c 中。第二個方法的區別是每次 remove 掉 maxElements 個元素,放入集合 c。
**3、int remainingCapacity()**
返回列表的剩余容量。
###### 我們通過以下表格對出入對方法進行總結:
**入隊:**
| 方法名 | 是否阻塞 | 拋出異常 |
| --- | --- | --- |
| add(E e) | 否 | IllegalStateException |
| offer(E e) | 否 | 無。返回false |
| offer(E e, long timeout, TimeUnit unit) | 阻塞指定時長 | InterruptedException |
| put(E e) | 是 | InterruptedException |
**出隊:**
| 方法名 | 是否阻塞 | 拋出異常 |
| --- | --- | --- |
| take() | 是 | InterruptedException |
| poll(long timeout, TimeUnit unit) | 阻塞指定時長,超時返回null | InterruptedException |
## 2、ArrayBlockingQueue源碼分析
ArrayBlockingQueue 是 BlockingQueue 的一種實現。底層通過數組實現。ArrayBlockingQueue 支持公平和非公平的方式來入隊和出隊。公平是指當出現多個線程阻塞時,等待時間長的會先獲得鎖,非公平則不一定。
我們先看 ArrayBlockingQueue 的構造函數,有如下三個
1. public ArrayBlockingQueue(int capacity);
2. public ArrayBlockingQueue(int capacity, boolean fair);
3. public ArrayBlockingQueue(int capacity, boolean fair, Collection c) 。
capacity 為隊列的大小,一旦初始化了無法改變。fair 指鎖的類型,c 用來初始化隊列時設置初始元素。
ArrayBlockingQueue 通過兩個 Condition 信號量來控制出隊和入隊的阻塞,分別為 notEmpty 和 notFull。
接下來,我們先分析入隊方法 put 的源代碼。
### 2.1 put 方法源碼分析
~~~java
public void put(E e) throws InterruptedException {
//檢查e是否為null,如果為null,拋出NullPointerException
checkNotNull(e);
//聲明顯式鎖
final ReentrantLock lock = this.lock;
//以lockInterruptibly方式上鎖,如果其他線程打斷等待的線程,那么等待的線程會立刻終止等待,拋出InterruptedException
lock.lockInterruptibly();
try {
//如果隊列已經滿了,那么等待
while (count == items.length)
notFull.await();
//直到消費者消費了一個元素后,通過notFull.signal()來通知等待的線程添加元素
enqueue(e);
} finally {
//釋放鎖
lock.unlock();
}
}
~~~
enqueue 中實現元素的入隊操作,代碼如下:
~~~java
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
~~~
putIndex 記錄了當前可以寫入的數組下標。
count 是 queue 中保存的元素總數。
這段代碼中,第 4、5 行不太好理解。當 ++putIndex 和數組長度一樣時,說明到了數組的最后一個元素。然后putIndex 被置為0。這是因為這里循環使用數組,由于數組長度就是隊列的長度,并且出一個才能進一個,所以進到數組最后一位時,下一個等待出隊的位置肯定是 >=0 。此時循環使用數組,下一個 put 的位置轉到數組第一位。如果消費得比較快,消費的位置 >0,那么沒問題,再次 put 時候就會寫入 index=0 的位置。如果恰巧出隊位置就是 0。那么下次 put 的時候,由于count == items.length,而會進入等待。直到位置 0 的元素被消費掉,那么寫入 0 位置,也不會有任何問題。
這么做的好處是循環使用數組,而不需要每次消費都向前移動數組中元素的位置。
### 2.2 offer 方法和 add 方法源碼分析
下面我們再來看看 offer 的源代碼:
~~~java
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
~~~
區別就是使用 lock,在等待鎖的期間不能被打斷。另外如果 count == items.length,隊列滿了直接返回 false,而不是阻塞等待。
這里一并講解 add 方法:
~~~java
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
~~~
其實 add 方法中調用的 offer 方法,只不過 add 在 offer 返回 false 時,拋出異常。
### 2.3 有等待超時參數的 offer 方法源碼分析
~~~java
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
~~~
其實這個方法和 put 更像一點,put 阻塞直到有空位出現。而此方法阻塞時會有超時時間設置, notFull.awaitNanos(nanos) 返回剩余的超時時長,當 nanos <= 0,也就是說沒有時長了,如果還沒有等到空位,那么也會放回 false。
以上入隊的方法已經講完了,接下來我們看一下出隊的方法,出隊方法的區別其實和入隊類似,所以這里我不再詳細講解每一個,我們看一個典型就可以了。
### 2.4 take 方法源代碼分析
tabke 方法和 put 方法相對應,代碼如下:
~~~csharp
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
~~~
可以看到,和put 一樣都是使用的 lockInterruptibly 方法上鎖,可以被中斷。當 count 為0時,說明隊列已經空了,無法取出元素,那么通過調用 notEmpty.await() 阻塞。直到某個入隊方法添加元素后調用了notEmpty.signal(),通知該線程可以繼續出隊操作了。
poll() 方法不會阻塞,如果沒有元素,則直接返回 null。
poll(long timeout, TimeUnit unit) ,則會阻塞一定時長,如果還是沒有元素,才會返回 null。
## 3、總結
關于 ArrayBlockingQueue 的源代碼就講解到這里,可以看出源代碼并不復雜,而且不同方法的實現也很類似,只有細微的差別。如果大家感興趣,相信你輕松就能讀懂源代碼。另外 BlockingQueue 的實現還有很多,還有個比較常用的是 LinkedBlockingQueue,通過鏈表存儲來實現,如果感興趣也可以自己去看源代碼。
阻塞隊列用來實現生產者和消費者很好用。另外我們還應該知道在 Executor 中使用了 BlockingQueue。大家如果已經忘了,可以回過頭再去看看 Executor 源碼分析那一節的內容。
}
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語