## 32 請按到場順序發言—Completion Service詳解
> 時間像海綿里的水,只要你愿意擠,總還是有的。
> ——魯迅
講解 CompletionService 之前,我們先回憶一下 ExcutorSevice。ExcutorService 實現了通過線程池來并發執行任務。其中有一種方式是通過線程池執行 Callable 任務,然后通過 Future 獲取異步執行的結果,如下面的代碼:
~~~java
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
Callable callable1 = () -> {
Thread.sleep(10000);
return "任務1完成";
};
Callable callable2 = () -> {
Thread.sleep(5000);
return "任務2完成";
};
Future future1 = executor.submit(callable1);
Future future2 = executor.submit(callable2);
System.out.println(future1.get());
System.out.println(future2.get());
}
~~~
任務一執行需要 10 秒,任務二執行只需要 5 秒。但是當執行到 future1.get () 時,主線程會被阻塞。等待 10 秒后第一個任務執行完才會去獲取第二個任務。然后執行和第二個任務相關的打印操作。大家有沒有看出問題?任務 2 明明在 5 秒前就已經執行完成,卻不能立刻打印。主線程阻塞在任務一結果的獲取上。這樣程序執行的效率并不高。如果任務完成后能夠立刻被取得執行結果,然后執行后面的邏輯,效率就會有顯著的提升。今天我們要講解的 CompletionService 就是用來做這個事情的。CompletionService 可以按照執行完成結果的到場順序,被主線程獲取到,從而繼續執行后面邏輯。
## 1、了解 CompletionService
了解一個類最好、最快的方法就是閱讀源代碼的注解。而大多數人通常的做法卻是去百度或者 google。這樣有兩個弊端,一是效率并不一定高,可能搜出來很多無用的內容。二是看到的文章并不權威,甚至可能是錯的。有的同學可能覺得英文閱讀費勁,其實作為開發人員,英語閱讀已經是必備技能。這就如同你要熟知 IDE 的快捷鍵一樣,所以如果覺得英文閱讀困難,可以刻意練習。其實多讀一些技術文檔,會發現用詞基本都是類似的。
扯的有點遠,我們收回來,先看看源代碼中對 CompletionService 的解釋:
對異步任務執行和執行結果消費解藕。生產者提交任務執行。消費者則獲取完成的任務,然后按照完成任務的順序對任務結果進行處理。
官方的解釋是不是十分簡潔明了?
# 2、使用 CompletionService
下面我們使用 CompletionService 實現一個吃蘋果的程序。首先我聲明一個流,里面是一些水果,每個水果會對應一個洗干凈的任務。然后主線程拿到洗干凈的水果再一個個吃掉。代碼如下:
~~~java
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<String> service = new ExecutorCompletionService<String>(pool);
Stream.of("蘋果", "梨", "葡萄", "桃")
.forEach(fruit -> service.submit(() -> {
if(fruit.equals("蘋果")){
TimeUnit.SECONDS.sleep(6);
}else if(fruit.equals("梨")){
TimeUnit.SECONDS.sleep(1);
}else if(fruit.equals("葡萄")){
TimeUnit.SECONDS.sleep(10);
}else if(fruit.equals("桃")){
TimeUnit.SECONDS.sleep(3);
}
return "洗干凈的"+fruit;
})
);
String result;
while((result=service.take().get())!=null){
System.out.println("吃掉"+result);
}
}
~~~
可以看到有四種水果。會為每個水果啟一個洗水果的任務。每種水果洗的時間不同,其中葡萄最不好洗要 10 秒,而梨最好洗,只需要 1 秒。等待水果洗好后,主線程通過 service.take () 取得執行完成的 Future,然后從里面 get 出返回值,把洗干凈的水果吃掉。
我們可以看到輸出如下:
~~~
吃掉洗干凈的梨
吃掉洗干凈的桃
吃掉洗干凈的蘋果
吃掉洗干凈的葡萄
~~~
可以看到哪個水果先洗干凈就會先被吃掉。這也證明了 service.take () 的順序是任務的完成順序,而不是任務提交的順序。
通過 CompletionService 我們就可以一端生產,另一端按照完成的順序進行消費。這避免提交大量任務時,不知道哪個任務先完成,從而在調用 Future 的 get 方法時產生阻塞。使用 CompletionService,永遠都是完成一個返回一個,然后消費一個。這樣你的程序才更為高效。
主線程收到返回后,可以再繼續使用 CompletionService 來異步執行下一步的邏輯,這和非阻塞的編程方式異曲同工。
## 3、CompletionService 源碼分析
### 3.1 CompletionService 構造方法
我們先看如何初始化 CompletionService:
~~~java
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<String> service = new ExecutorCompletionService<String>(pool);
~~~
首先初始化 ExecutorService,在構造 ExecutorCompletionService 時作為參數傳入。其實 CompletionService 對任務的執行其實就是借助于 ExecutorService 來完成的。接下來我們進入它的構造函數:
~~~java
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
~~~
構造函數中構造了三個屬性:
~~~java
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
~~~
executor 就是你傳入的 ExecutorService,用來執行任務。
aes 的作用是創建新的 task。它的初始化過程比較有意思,判斷了是否為 AbstractExecutorService 的實例。至于為什么這么做,我們后面再詳細講解。
completionQueue 是一個存放 Future 的阻塞隊列,并且是無界的。這意味著如果源頭不斷的產生 Future,但是沒有去消費,就會造成內存泄漏。
executor 執行完成的 Future 會被放入 completionQueue 中,take 方法將會從
completionQueue 中取得最新的 future 對象(最近執行完的 task 的結果)。
### 3.2 CompletionService 的 submit 方法
~~~java
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
~~~
首先將 Callable 類型的 task 轉為 RunnableFuture 類型。RunnableFuture 是個接口,FutureTask 是其一種實現。
然后通過 new QueueingFuture (f),再將 RunnableFuture 包裝為 QueueingFuture 類型的對象。QueueingFuture 的作用就是在 Future 完成時,加入到 completionQueue 中。
我們先看 newTaskFor 的源碼:
~~~java
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
~~~
如果 aes 為空,那么直接 new FutureTask。如果不為空則調用 aes 的 newTaskFor 方法。什么情況 aes 會為空呢?我們再看下 aes 初始化的代碼:
~~~java
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
~~~
當傳入的 executor 為 AbstractExecutorService 類型時,那么 aes 不為空。否則 aes 為空。這兩處邏輯處理是相關聯的,這么做的原因如下:
1、如果 executor 是 AbstractExecutorService 的子類,有可能會重寫 newTaskFor 方法,所以這里優先使用 executo r 的方法來創建 Task,這樣后面通過 executor 執行 task 才能正確。比如 ForkJoinPool 就對 newTaskFor 方法進行了重寫;
2、如果 executor 不是繼承自 AbstractExecutorService。那么它可能并沒有 newTaskFor 方法。所以需要 CompletionService 自己來創建 FutureTask。
這樣看來 aes 的存在,只是為了盡量使用 executor 提供的 newTaskFor 方法來創建 task,以使后面 excute 方法能夠正常運行。
接下來我們分析 QueueingFuture 方法:
~~~java
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
~~~
QueueingFuture 是內部靜態類,并且是 FutureTask 的子類。他只是重寫了 done 方法。大家回憶上一節對 Future 的分析,應該還記得 done 方法在任務執行結果返回后被調用,但是留給子類來實。這里就用上了這個特性。done 方法里面做的就是把 task 加入阻塞隊列中。這意味著,先完成的 task 會先把自己的 Future 放入隊列中。那么當然也會被 take 方法先取到。而由于是阻塞隊列,所以 take 方法取不到 task 時,就會阻塞。但由于能被 take 到的 task 肯定已經有了返回值,所以調用 task 的 get 方法時就不會再次阻塞了。也就是說 client 代碼中的下面一行只會在 take 時發生阻塞:
~~~java
while((result=service.take().get())!=null){
System.out.println("吃掉"+result);
}
~~~
executor 執行任務的代碼就不用再次分析了,這在之前學習 Executor 的時候已經詳細分析過了。submit 方法分析完后我們再來看看 take 方法。
### 3.3 CompletionService 的 take 方法
相比較 submit 方法,take 方法就更為簡單了,如下:
~~~java
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
~~~
只有一行代碼,就是從 completionQueue 中取得 Futrue 對象。由于 completionQueue 是阻塞隊列,當沒有 Future 時,就會阻塞在此。而 completionQueue 中保存 Future 的順序是完成順序。
## 4、 總結
CompletionService 給我們提供了一種非阻塞的異步執行方式。讓程序更為高效。他的實現非常的簡單和巧妙,值得我們借鑒。其實我們學習到這里,不知道你是否有這種體會,這些工具實際上就是我們之前學習內容的組合運用,如果前面你掌握的很牢固,學習起來一點也不費勁。如果前面就似懂非懂,那么就會越看越糊涂。其實我們在學習上至少有一半的時間都是在打基礎,但這個過程必不可少,并且受益更為深遠。
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35 拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語