在上一講中,我分析了 Java 并發包中的部分內容,今天我來介紹一下線程安全隊列。Java 標準庫提供了非常多的線程安全隊列,很容易混淆。
今天我要問你的問題是,并發包中的 ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什么區別?
## 典型回答
有時候我們把并發包下面的所有容器都習慣叫作并發容器,但是嚴格來講,類似 ConcurrentLinkedQueue 這種“Concurrent\*”容器,才是真正代表并發。
關于問題中它們的區別:
* Concurrent 類型基于 lock-free,在常見的多線程訪問場景,一般可以提供較高吞吐量。
* 而 LinkedBlockingQueue 內部則是基于鎖,并提供了 BlockingQueue 的等待性方法。
不知道你有沒有注意到,java.util.concurrent 包提供的容器(Queue、List、Set)、Map,從命名上可以大概區分為 Concurrent\*、CopyOnWrite*和 Blocking*等三類,同樣是線程安全容器,可以簡單認為:
* Concurrent 類型沒有類似 CopyOnWrite 之類容器相對較重的修改開銷。
* 但是,凡事都是有代價的,Concurrent 往往提供了較低的遍歷一致性。你可以這樣理解所謂的弱一致性,例如,當利用迭代器遍歷時,如果容器發生修改,迭代器仍然可以繼續進行遍歷。
* 與弱一致性對應的,就是我介紹過的同步容器常見的行為“fail-fast”,也就是檢測到容器在遍歷過程中發生了修改,則拋出 ConcurrentModificationException,不再繼續遍歷。
* 弱一致性的另外一個體現是,size 等操作準確性是有限的,未必是 100% 準確。
* 與此同時,讀取的性能具有一定的不確定性。
## 考點分析
今天的問題是又是一個引子,考察你是否了解并發包內部不同容器實現的設計目的和實現區別。
隊列是非常重要的數據結構,我們日常開發中很多線程間數據傳遞都要依賴于它,Executor 框架提供的各種線程池,同樣無法離開隊列。面試官可以從不同角度考察,比如:
* 哪些隊列是有界的,哪些是無界的?(很多同學反饋了這個問題)
* 針對特定場景需求,如何選擇合適的隊列實現?
* 從源碼的角度,常見的線程安全隊列是如何實現的,并進行了哪些改進以提高性能表現?
為了能更好地理解這一講,需要你掌握一些基本的隊列本身和數據結構方面知識,如果這方面知識比較薄弱,《數據結構與算法分析》是一本比較全面的參考書,專欄還是盡量專注于 Java 領域的特性。
## 知識擴展
**線程安全隊列一覽**
我在[專欄第 8 講](http://time.geekbang.org/column/article/7810)中介紹過,常見的集合中如 LinkedList 是個 Deque,只不過不是線程安全的。下面這張圖是 Java 并發類庫提供的各種各樣的**線程安全**隊列實現,注意,圖中并未將非線程安全部分包含進來。

我們可以從不同的角度進行分類,從基本的數據結構的角度分析,有兩個特別的[Deque](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html)實現,ConcurrentLinkedDeque 和 LinkedBlockingDeque。Deque 的側重點是支持對隊列頭尾都進行插入和刪除,所以提供了特定的方法,如:
* 尾部插入時需要的[addLast(e)](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html#addLast-E-)、[offerLast(e)](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html#offerLast-E-)。
* 尾部刪除所需要的[removeLast()](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html#removeLast--)、[pollLast()](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html#pollLast--)。
從上面這些角度,能夠理解 ConcurrentLinkedDeque 和 LinkedBlockingQueue 的主要功能區別,也就足夠日常開發的需要了。但是如果我們深入一些,通常會更加關注下面這些方面。
從行為特征來看,絕大部分 Queue 都是實現了 BlockingQueue 接口。在常規隊列操作基礎上,Blocking 意味著其提供了特定的等待性操作,獲取時(take)等待元素進隊,或者插入時(put)等待隊列出現空位。
~~~
/**
* 獲取并移除隊列頭結點,如果必要,其會等待直到隊列出現元素
…
*/
E take() throws InterruptedException;
/**
* 插入元素,如果隊列已滿,則等待直到隊列出現空閑空間
…
*/
void put(E e) throws InterruptedException;
~~~
另一個 BlockingQueue 經常被考察的點,就是是否有界(Bounded、Unbounded),這一點也往往會影響我們在應用開發中的選擇,我這里簡單總結一下。
* ArrayBlockingQueue 是最典型的的有界隊列,其內部以 final 的數組保存數據,數組的大小就決定了隊列的邊界,所以我們在創建 ArrayBlockingQueue 時,都要指定容量,如
~~~
public ArrayBlockingQueue(int capacity, boolean fair)
~~~
* LinkedBlockingQueue,容易被誤解為無邊界,但其實其行為和內部代碼都是基于有界的邏輯實現的,只不過如果我們沒有在創建隊列時就指定容量,那么其容量限制就自動被設置為 Integer.MAX\_VALUE,成為了無界隊列。
* SynchronousQueue,這是一個非常奇葩的隊列實現,每個刪除操作都要等待插入操作,反之每個插入操作也都要等待刪除動作。那么這個隊列的容量是多少呢?是 1 嗎?其實不是的,其內部容量是 0。
* PriorityBlockingQueue 是無邊界的優先隊列,雖然嚴格意義上來講,其大小總歸是要受系統資源影響。
* DelayedQueue 和 LinkedTransferQueue 同樣是無邊界的隊列。對于無邊界的隊列,有一個自然的結果,就是 put 操作永遠也不會發生其他 BlockingQueue 的那種等待情況。
如果我們分析不同隊列的底層實現,BlockingQueue 基本都是基于鎖實現,一起來看看典型的 LinkedBlockingQueue。
~~~
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
~~~
我在介紹 ReentrantLock 的條件變量用法的時候分析過 ArrayBlockingQueue,不知道你有沒有注意到,其條件變量與 LinkedBlockingQueue 版本的實現是有區別的。notEmpty、notFull 都是同一個再入鎖的條件變量,而 LinkedBlockingQueue 則改進了鎖操作的粒度,頭、尾操作使用不同的鎖,所以在通用場景下,它的吞吐量相對要更好一些。
下面的 take 方法與 ArrayBlockingQueue 中的實現,也是有不同的,由于其內部結構是鏈表,需要自己維護元素數量值,請參考下面的代碼。
~~~
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
~~~
類似 ConcurrentLinkedQueue 等,則是基于 CAS 的無鎖技術,不需要在每個操作時使用鎖,所以擴展性表現要更加優異。
相對比較另類的 SynchronousQueue,在 Java 6 中,其實現發生了非常大的變化,利用 CAS 替換掉了原本基于鎖的邏輯,同步開銷比較小。它是 Executors.newCachedThreadPool() 的默認隊列。
**隊列使用場景與典型用例**
在實際開發中,我提到過 Queue 被廣泛使用在生產者 - 消費者場景,比如利用 BlockingQueue 來實現,由于其提供的等待機制,我們可以少操心很多協調工作,你可以參考下面樣例代碼:
~~~
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ConsumerProducer {
public static final String EXIT_MSG = "Good bye!";
public static void main(String[] args) {
// 使用較小的隊列,以更好地在輸出中展示其影響
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
}
static class Producer implements Runnable {
private BlockingQueue<String> queue;
public Producer(BlockingQueue<String> q) {
this.queue = q;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try{
Thread.sleep(5L);
String msg = "Message" + i;
System.out.println("Produced new item: " + msg);
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
System.out.println("Time to say good bye!");
queue.put(EXIT_MSG);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable{
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> q){
this.queue=q;
}
@Override
public void run() {
try{
String msg;
while(!EXIT_MSG.equalsIgnoreCase( (msg = queue.take()))){
System.out.println("Consumed item: " + msg);
Thread.sleep(10L);
}
System.out.println("Got exit message, bye!");
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
~~~
上面是一個典型的生產者 - 消費者樣例,如果使用非 Blocking 的隊列,那么我們就要自己去實現輪詢、條件判斷(如檢查 poll 返回值是否 null)等邏輯,如果沒有特別的場景要求,Blocking 實現起來代碼更加簡單、直觀。
前面介紹了各種隊列實現,在日常的應用開發中,如何進行選擇呢?
以 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 為例,我們一起來分析一下,根據需求可以從很多方面考量:
* 考慮應用場景中對隊列邊界的要求。ArrayBlockingQueue 是有明確的容量限制的,而 LinkedBlockingQueue 則取決于我們是否在創建時指定,SynchronousQueue 則干脆不能緩存任何元素。
* 從空間利用角度,數組結構的 ArrayBlockingQueue 要比 LinkedBlockingQueue 緊湊,因為其不需要創建所謂節點,但是其初始分配階段就需要一段連續的空間,所以初始內存需求更大。
* 通用場景中,LinkedBlockingQueue 的吞吐量一般優于 ArrayBlockingQueue,因為它實現了更加細粒度的鎖操作。
* ArrayBlockingQueue 實現比較簡單,性能更好預測,屬于表現穩定的“選手”。
* 如果我們需要實現的是兩個線程之間接力性(handoff)的場景,按照[專欄上一講](http://time.geekbang.org/column/article/9373)的例子,你可能會選擇 CountDownLatch,但是[SynchronousQueue](http://www.baeldung.com/java-synchronous-queue)也是完美符合這種場景的,而且線程間協調和數據傳輸統一起來,代碼更加規范。
* 可能令人意外的是,很多時候 SynchronousQueue 的性能表現,往往大大超過其他實現,尤其是在隊列元素較小的場景。
今天我分析了 Java 中讓人眼花繚亂的各種線程安全隊列,試圖從幾個角度,讓每個隊列的特點更加明確,進而希望減少你在日常工作中使用時的困擾。
## 一課一練
關于今天我們討論的題目你做到心中有數了嗎? 今天的內容側重于 Java 自身的角度,面試官也可能從算法的角度來考察,所以今天留給你的思考題是,指定某種結構,比如棧,用它實現一個 BlockingQueue,實現思路是怎樣的呢?
- 前言
- 開篇詞
- 開篇詞 -以面試題為切入點,有效提升你的Java內功
- 模塊一 Java基礎
- 第1講 談談你對Java平臺的理解?
- 第2講 Exception和Error有什么區別?
- 第3講 談談final、finally、 finalize有什么不同?
- 第4講 強引用、軟引用、弱引用、幻象引用有什么區別?
- 第5講 String、StringBuffer、StringBuilder有什么區別?
- 第6講 動態代理是基于什么原理?
- 第7講 int和Integer有什么區別?
- 第8講 對比Vector、ArrayList、LinkedList有何區別?
- 第9講 對比Hashtable、HashMap、TreeMap有什么不同?
- 第10講 如何保證集合是線程安全的? ConcurrentHashMap如何實現高效地線程安全?
- 第11講 Java提供了哪些IO方式? NIO如何實現多路復用?
- 第12講 Java有幾種文件拷貝方式?哪一種最高效?
- 第13講 談談接口和抽象類有什么區別?
- 第14講 談談你知道的設計模式?
- 模塊二 Java進階
- 第15講 synchronized和ReentrantLock有什么區別呢?
- 第16講 synchronized底層如何實現?什么是鎖的升級、降級?
- 第17講 一個線程兩次調用start()方法會出現什么情況?
- 第18講 什么情況下Java程序會產生死鎖?如何定位、修復?
- 第19講 Java并發包提供了哪些并發工具類?
- 第20講 并發包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什么區別?
- 第21講 Java并發類庫提供的線程池有哪幾種? 分別有什么特點?
- 第22講 AtomicInteger底層實現原理是什么?如何在自己的產品代碼中應用CAS操作?
- 第23講 請介紹類加載過程,什么是雙親委派模型?
- 第24講 有哪些方法可以在運行時動態生成一個Java類?
- 第25講 談談JVM內存區域的劃分,哪些區域可能發生OutOfMemoryError?
- 第26講 如何監控和診斷JVM堆內和堆外內存使用?
- 第27講 Java常見的垃圾收集器有哪些?
- 第28講 談談你的GC調優思路?
- 第29講 Java內存模型中的happen-before是什么?
- 第30講 Java程序運行在Docker等容器環境有哪些新問題?
- 模塊三 Java安全基礎
- 第31講 你了解Java應用開發中的注入攻擊嗎?
- 第32講 如何寫出安全的Java代碼?
- 模塊四 Java性能基礎
- 第33講 后臺服務出現明顯“變慢”,談談你的診斷思路?
- 第34講 有人說“Lambda能讓Java程序慢30倍”,你怎么看?
- 第35講 JVM優化Java代碼時都做了什么?
- 模塊五 Java應用開發擴展
- 第36講 談談MySQL支持的事務隔離級別,以及悲觀鎖和樂觀鎖的原理和應用場景?
- 第37講 談談Spring Bean的生命周期和作用域?
- 第38講 對比Java標準NIO類庫,你知道Netty是如何實現更高性能的嗎?
- 第39講 談談常用的分布式ID的設計方案?Snowflake是否受冬令時切換影響?
- 周末福利
- 周末福利 談談我對Java學習和面試的看法
- 周末福利 一份Java工程師必讀書單
- 結束語
- 結束語 技術沒有終點
- 結課測試 Java核心技術的這些知識,你真的掌握了嗎?