## 24 舉一反三:隊列在 Java 其它源碼中的應用
## 引導語
隊列除了提供 API 供開發者使用外,自身也和 Java 中其他 API 緊密結合,比如線程池和鎖,線程池直接使用了隊列的 API,鎖借鑒了隊列的思想,重新實現了隊列,線程池和鎖都是我們工作中經常使用的 API,也是面試官常問的 API,隊列在兩者的實現上發揮著至關重要的作用,接下來我們一起來看下。
### 1 隊列和線程池的結合
#### 1.1 隊列在線程池中的作用
線程池大家應該都使用過,比如我們想新建一個固定大小的線程池,并讓運行的線程打印一句話出來,我們會這么寫代碼:
```
ExecutorService executorService = Executors.newFixedThreadPool(10); // submit 是提交任務的意思 // Thread.currentThread() 得到當前線程 executorService.submit(() -> System.out.println(Thread.currentThread().getName() + " is run")); // 打印結果(我們打印出了當前線程的名字): pool-1-thread-1 is run
```
代碼中的 Executors 是并發的工具類,主要是為了幫助我們更方便的構造線程池的,其中 newFixedThreadPool 方法表示會構造出固定大小的線程池,我們給的入參是 10,代表線程池最大可以構造 10 個線程出來。
在實際的工作中,我們對流量的大小是無法控制的,這里我們設定的最大是 10 個線程,但如果一下子來了 100 個請求,這時候 10 個線程肯定是忙不過來了,那么剩余的 90 個請求怎么辦呢?
這時候就需要隊列出馬了,我們會把線程無法消化的數據放到隊列中去,讓數據在隊列中排隊,等線程有能力消費了,再從隊列中拿出來慢慢去消費。
我們畫一個圖釋義一下:
上圖右邊表示 10 個線程正在全力消費請求,左邊表示剩余請求正在隊列中排隊,等待消費。
由此可見,隊列在線程池中占有很重要的地位,當線程池中的線程忙不過來的時候,請求都可以在隊列中等待,從而慢慢地消費。
接下來我們來看下,線程池到底用到了那幾種隊列類型,分別起的什么作用。
#### 1.2 線程池中使用到的隊列的類型
#### 1.2.1 LinkedBlockingQueue 隊列的使用
剛剛我們說的 newFixedThreadPool 是一種固定大小的線程池,意思是當線程池初始化好后,線程池里面的線程大小是不會變的了(線程池默認設置是不會回收核心線程數的),我們來看下 newFixedThreadPool 的源碼:
```
// ThreadPoolExecutor 初始化時,第一個參數表示 coreSize,第二個參數是 maxSize,coreSize == maxSize, // 表示線程池初始化時,線程大小已固定,所以叫做固定(Fixed)線程池。 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
```
源碼中可以看到初始化了 ThreadPoolExecutor,ThreadPoolExecutor 是線程池的 API,我們在線程池章節會細說,它的第五個構造參數就是隊列,線程池根據場景會選擇不同的隊列,此處使用的是 LinkedBlockingQueue,并且是默認參數的 Queue,這說明此阻塞隊列的最大容量是 Integer 的最大值,也就是說當線程池的處理能力有限時,阻塞隊列中最大可以存放 Integer 最大值個任務。
但我們在實際工作中,常常不建議直接使用 newFixedThreadPool,主要是因為其使用的是 LinkedBlockingQueue 的默認構造器,隊列容量太大了,在要求實時響應的請求中,隊列容量太大往往危害也很大。
比如說我們用上述的線程池,線程 10 個,隊列是 Integer 的最大值,當并發流量很大時,比如來了 1w/qps 請求,這時候 10 個線程根本消費不完,就會有很多請求被阻塞在隊列中,雖然 10 個線程仍然在不斷地消費,但需要消費完隊列中的所有數據是需要時間的,假設需要 3 秒才能全部消費完,而這些實時請求都是有超時時間的,默認超時時間是 2 秒,當時間到達 2 秒時,請求已經超時了,返回報錯,可這時候隊列中的任務還有很多都在等待消費呢,即使后來消費完成,也無法返回給調用方了。
以上情況就會造成,調用方看到接口是超時報錯返回的,但服務端的任務其實還在排隊執行,過了 3 秒后,服務端的任務可能都會執行成功,但調用方已經無法感知了,調用方再次調用時,就會發現其實這筆請求已經成功了。
如果調用方是從頁面發起的,那么體驗就會更差,頁面上第一次調用頁面報錯,用戶重新刷新頁面時,頁面顯示上次的請求已經成功了,這個就是很不好的體驗了。
所以我們希望隊列的大小不要設置成那么大,可以根據實際的消費情況來設置隊列的大小,這樣就可以保證在接口超時前,隊列中排隊的請求可以執行完。
場景比較復雜,為了方便理解,我們畫了一個圖,把整個流程釋義一下:

這種問題,在實際工作中已經屬于非常嚴重的生產事故了,我們使用時一定要小心。
和 newFixedThreadPool 相同的是,newSingleThreadExecutor 方法底層使用的也是 LinkedBlockingQueue,newSingleThreadExecutor 線程池底層線程只會有一個,這代表著這個線程池一次只能處理一個請求,其余的請求都會在隊列中排隊等待執行,我們看下 newSingleThreadExecutor 的源碼實現:
```
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService // 前兩個參數規定了這個線程池一次只能消費一個線程 // 第五個參數使用的是 LinkedBlockingQueue,說明當請求超過單線程消費能力時,就會排隊 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
```
可以看到,底層使用的也是 LinkedBlockingQueue 的默認參數,也就是說排隊的最大值是 Integer 的最大值。
#### 1.2.2 SynchronousQueue 隊列
除了 newFixedThreadPool 方法,在線程池新建時,還有其他的幾個方法也對應著不同的隊列,我們一起來看下 newCachedThreadPool,newCachedThreadPool 底層對應的是 SynchronousQueue 隊列,源碼如下:
```
public static ExecutorService newCachedThreadPool() { // 第五個參數是 SynchronousQueue return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
```
SynchronousQueue 隊列是沒有大小限制的,請求多少隊列都能承受的住,可以說這是他的優點,缺點就是每次往隊列里面 put 數據時,并不能立馬返回,而是需要等待有線程 take 數據之后,才能正常返回,如果請求量大,而消費能力較差時,就會導致大量請求被 hodler 住,必須等到慢慢消費完成之后才能被釋放,所以在平時工作使用中也需要慎重。
#### 1.2.3 DelayedWorkQueue
newScheduledThreadPool 代表定時任務線程池,底層源碼如下:

截圖從左往右我們可以看到,底層隊列使用的是 DelayedWorkQueue 延遲隊列,說明線程池底層延時的功能就是 DelayedWorkQueue 隊列提供的,新的延遲請求都先到隊列中去,延遲時間到了,線程池自然就能從隊列中拿出線程進行執行了。
newSingleThreadScheduledExecutor 方法也是和 newScheduledThreadPool 一樣的,使用 DelayedWorkQueue 的延遲功能,只不過前者是單個線程執行。
#### 1.3 小結
從線程池的源碼中,我們可以看到:
1. 隊列在線程池的設計中,起著緩沖數據,延遲執行數據的作用,當線程池消費能力有限時,可以讓請求進行排隊,讓線程池可以慢慢消費。
2. 線程池根據不同的場景,選擇使用了 DelayedWorkQueue、SynchronousQueue、LinkedBlockingQueue 多種隊列,從而實現自己不同的功能,比如使用 DelayedWorkQueue 的延遲功能來實現定時執行線程池。
### 2 隊列和鎖的結合
我們平時寫鎖代碼的時候都這么寫:
```
ReentrantLock lock = new ReentrantLock(); try{ lock.lock(); // do something }catch(Exception e){ //throw Exception; }finally { lock.unlock(); }
```
初始化鎖 -> 加鎖 -> 執行業務邏輯 -> 釋放鎖,這是正常的流程,但我們知道同一時刻只能有一個線程才能獲得鎖的,那么此時其他獲取不到鎖的線程該怎么辦呢?
等待,其他獲取不到鎖的線程,都會到一個等待隊列中去等待,等待鎖被釋放掉時,再去競爭鎖,我們畫一個示意圖。
圖中紅色標識的就是同步隊列,獲取不到鎖的線程都會到同步隊列中去排隊,當鎖被釋放后,同步隊列中的線程就又開始去競爭鎖。
可以看出隊列在鎖中起的作用之一,就是幫助管理獲取不到鎖的線程,讓這些線程可以耐心的等待。
同步隊列并沒有使用現有的隊列的 API 去實現,但底層的結構,思想和目前隊列是一致的,所以我們學好隊列章節,對理解鎖的同步隊列,用處非常大。
### 3 總結
隊列的數據結構真的很重要,在線程池和鎖兩個重量級 API 中起著非常重要的作用,我們要非常清楚隊列底層的大體的數據結構,了解數據是如何入隊的,如何出隊的,隊列這章也是比較復雜的,建議大家多多 debug,我們 github 上也提供了一些 debug 的 demo,大家可以嘗試調試起來。
- 前言
- 第1章 基礎
- 01 開篇詞:為什么學習本專欄
- 02 String、Long 源碼解析和面試題
- 03 Java 常用關鍵字理解
- 04 Arrays、Collections、Objects 常用方法源碼解析
- 第2章 集合
- 05 ArrayList 源碼解析和設計思路
- 06 LinkedList 源碼解析
- 07 List 源碼會問哪些面試題
- 08 HashMap 源碼解析
- 09 TreeMap 和 LinkedHashMap 核心源碼解析
- 10 Map源碼會問哪些面試題
- 11 HashSet、TreeSet 源碼解析
- 12 彰顯細節:看集合源碼對我們實際工作的幫助和應用
- 13 差異對比:集合在 Java 7 和 8 有何不同和改進
- 14 簡化工作:Guava Lists Maps 實際工作運用和源碼
- 第3章 并發集合類
- 15 CopyOnWriteArrayList 源碼解析和設計思路
- 16 ConcurrentHashMap 源碼解析和設計思路
- 17 并發 List、Map源碼面試題
- 18 場景集合:并發 List、Map的應用場景
- 第4章 隊列
- 19 LinkedBlockingQueue 源碼解析
- 20 SynchronousQueue 源碼解析
- 21 DelayQueue 源碼解析
- 22 ArrayBlockingQueue 源碼解析
- 23 隊列在源碼方面的面試題
- 24 舉一反三:隊列在 Java 其它源碼中的應用
- 25 整體設計:隊列設計思想、工作中使用場景
- 26 驚嘆面試官:由淺入深手寫隊列
- 第5章 線程
- 27 Thread 源碼解析
- 28 Future、ExecutorService 源碼解析
- 29 押寶線程源碼面試題
- 第6章 鎖
- 30 AbstractQueuedSynchronizer 源碼解析(上)
- 31 AbstractQueuedSynchronizer 源碼解析(下)
- 32 ReentrantLock 源碼解析
- 33 CountDownLatch、Atomic 等其它源碼解析
- 34 只求問倒:連環相扣系列鎖面試題
- 35 經驗總結:各種鎖在工作中使用場景和細節
- 36 從容不迫:重寫鎖的設計結構和細節
- 第7章 線程池
- 37 ThreadPoolExecutor 源碼解析
- 38 線程池源碼面試題
- 39 經驗總結:不同場景,如何使用線程池
- 40 打動面試官:線程池流程編排中的運用實戰
- 第8章 Lambda 流
- 41 突破難點:如何看 Lambda 源碼
- 42 常用的 Lambda 表達式使用場景解析和應用
- 第9章 其他
- 43 ThreadLocal 源碼解析
- 44 場景實戰:ThreadLocal 在上下文傳值場景下的實踐
- 45 Socket 源碼及面試題
- 46 ServerSocket 源碼及面試題
- 47 工作實戰:Socket 結合線程池的使用
- 第10章 專欄總結
- 48 一起看過的 Java 源碼和面試真題