## 31 憑票取餐—Future模式詳解
> 與有肝膽人共事,從無字句處讀書。
> ——周恩來
從本節開始,我們進入新的一章學習,同時也是最后一章的學習。我們從如何實現一個線程開始學起,學習了并發的問題和解決辦法,學習了線程池等工具的使用,學習了各種并發容器。本章將會講解實際開發中經常會用到的多線程設計模式及其在 JDK 中的實現和應用。
本節我們要學習的是 Future 模式。我們先來看一個例子,假如你中午要出去買一份午餐打包帶回家,并且要去超市買一管牙膏,應該怎么做才會時間最短?當然是點好外賣,然后去超市買牙膏,等你回來看外賣是否已經做好了,如果做好了,拿小票取餐。如果還沒好,那就繼續等待,等做好后取餐回家。
如果程序不使用多線程實現的話,那么主線程就會阻塞在外賣加工過程上,直到午餐做好,才能去超市買東西。但如果我們采用多線程,可以點餐后馬上去超市買牙膏,同時有新的線程加工你的午餐。今天我們來學習一種新的多線程應用模式 Future,解決起類似問題就容易多了。
## 1、Future 模式介紹
我們先不著急講解 Future,先來回顧下之前我們講解的 Thread 和 runnable,實現多線程的方式是新起線程運行 run 方法,但是 run 方法有個缺陷是沒有返回值,并且主線程也并不知道新的線程何時運行完畢。上文的例子,我們不但需要做飯的線程返回午餐,并且主線程需要知道午餐已經好了。使用我們之前學習知識,通過 wait、notify 和共享資源也可以實現,但會比較復雜。其實 JDK 提供了非常方便的工具就是 Future。Future 持有要運行的任務,以及任務的結果。主線程只要聲明了 Future 對象,并且啟動新的線程運行他。那么隨時能通過 Future 對象獲取另外線程運行的結果。
接下來我們看看 Future 如何實現例子中的場景。
## 2、Future 使用
上述例子的代碼如下:
~~~java
public class Client {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> cookTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return "5斤的龍蝦";
}
});
Long startTime = System.currentTimeMillis();
System.out.println("我點了5斤的龍蝦。");
new Thread(cookTask).start();
System.out.println("我去買牙膏。");
TimeUnit.SECONDS.sleep(2);
System.out.println("我買到牙膏了!");
String lunch = cookTask.get();
System.out.println("我點的"+lunch+"已經OK了!");
Long userTime = (System.currentTimeMillis() - startTime)/1000;
System.out.println("我一共用了"+userTime+"秒買午餐并且買牙膏。");
}
}
~~~
代碼中先了一個 FutureTask 對象,稱之為 cookTask。顧名思義,這個 task 是用來做飯的。可以看到構造方法中傳入 Callable 的實現。實現的 call 方法中模擬做飯用了 3 秒鐘。
主線程運行后,先點了 5 斤的龍蝦,然后一個新的線程就開始去執行 cookTask 了。等會兒,到這里你一定會問,Thread 構造方法需要傳入 Runnable 的實現啊?沒錯,FutureTask 實現了 Runnable 接口。FutureTask 的 run 方法實際執行的是 Callable 的 call 方法。那么新的線程 start 后,實際做飯的邏輯會被執行:自線程 sleep3 秒后返回 “5 斤的龍蝦”。
主線程在啟動做飯的自線程后繼續向下執行,去買牙膏。這里 sleep 兩秒,模擬買牙膏的時間消耗。
買到牙膏接下來的一行代碼 String lobster = cookTask.get (); 重點說一下,此時分兩種情況:
1. cookTask 運行的線程已經結束了,那么可以直接取到運行的結果賦值給 lunch;
2. cookTask 運行的線程還沒有執行結束,此時主線程會阻塞,直到能取得運行結果。
cookTask 就是你的購物小票,只要你沒弄丟,隨時能去取你的午飯。
程序最后計算了整個過程的執行時間。由于采用了多線程并發,所以執行時間應該等于耗時最長的那個任務。這個例子中做龍蝦 3 秒 > 買牙膏 2 秒,所以總共耗時 3 秒,輸出如下:
~~~
我點了5斤的龍蝦
我去買牙膏
我買到牙膏了!
我點的5斤的龍蝦已經OK了
我一共用了3秒買午餐并且買牙膏
~~~
加入我調整買牙膏需要 10 秒,那么輸出則如下:
~~~
我點了5斤的龍蝦
我去買牙膏
我買到牙膏了!
我點的5斤的龍蝦已經OK了
我一共用了10秒買午餐并且買牙膏
~~~
總共耗時 10 秒。
現在我們想一下,假如單線程串行執行,點完午餐必須等待午餐做好了,才能去買牙膏。那么永遠耗時都是 2 者之和。采用并發執行后,僅為時間較長的那個任務的時間。
由于我們調用 Future 的 get 方法后主線程就開始阻塞了,所以我們應該在真正需要使用 Future 對象的返回結果時才去調用,充分利用并發的特性來提升程序性能。
## 3、Future 源碼解析
Future 是一個接口,而 FutrueTask 則是他的實現,我們看一下它們的繼承關系:

FutureTask 不但實現了 Future 而且實現了 Runnable 接口。這也是為什么它能作為參數傳入 Thread 構造方法。
Runnable 接口我們講過,里面只有一個 run 方法,用于被 Thread 調用。我們看一下 Future 接口有哪些方法:

cancel 用于嘗試取消任務。
get 用于等待并獲取任務執行結果。帶時間參數的 get 方法只會等待指定時間長度。
isCancelled 返回任務在完成前是否已經被取消。
isDone 返回任務是否完成。
我們用到最多的就是 get 方法,獲取任務的執行結果。
### 3.1 FutureTask 構造方法
~~~java
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
~~~
需要傳入 Callable 的實現,Callable 是一個接口,定義了 call 方法,返回 V 類型。
然后定義了 FutureTask 的狀態為 NEW。FutrueTask 定義了如下狀態:
~~~java
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
~~~
通過字面我們很容易理解其含義。
### 3.2 run 方法解析
FutrueTask 實現了 Runnbale 接口,所以 Thread 運行后實際上執行的是 FutrueTask 的 run 方法。我們要想了解 Future 的實現原理,那么就應該從它的 run 方法開始入手。
~~~java
public void run() {
//如果此時狀態不為NEW直接結束
//如果為NEW,但是CAS操作把本線程寫入為runner時,發現runner已經不為null,那么也直接結束
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//取得Callable對象
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//運行Callable對象的call方法,并且取得返回值。
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
//如果call方法成功執行結束,那么把執行結果設置給成員變量outcome;
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
~~~
核心邏輯就是執行運行 Callable 對象的 call 方法,把返回結果寫入 outcome。outcome 用來保存計算結果。
保存計算結果則是通過 set 方法。
### 3.3 set 方法解析
set 方法代碼如下:
~~~java
protected void set(V v) {
//狀態還是NEW,保存計算結果給outcome
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
//更新狀態為NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//喚醒等待的線程
finishCompletion();
}
}
~~~
如果沒有被取消則會保存計算結果 v 到 outcome。然后更新最終狀態為 NORMAL。最后調用 finishCompletion 方法喚醒阻塞的線程。代碼如下:
~~~java
private void finishCompletion() {
// assert state > COMPLETING;
//遍歷等待線程,結束等待
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//結束等待線程的掛起
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
//如果沒有下一個等待線程,那么結束循環
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//全部完成后回調FutrueTask的done方法。done方法為空,可以由子類實現。
done();
//清除callable
callable = null; // to reduce footprint
}
~~~
### 3.4 get 方法解析
get 方法用于獲取任務的返回值,如果還沒有執行完成,則會阻塞,代碼如下:
~~~java
public V get() throws InterruptedException, ExecutionException {
//獲取當前Task的狀態
int s = state;
//如果還沒有完成,則阻塞等待完成
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//獲取任務執行的返回結果
return report(s);
}
~~~
我們先來看 awaitDone 的代碼:
~~~java
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//計算等待截止時長
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//當前線程如果被打斷,則不再等待。從等待鏈表中移除
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//取得目前的狀態
int s = state;
//如果已經執行完成,清空q節點保存的線程
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//如果正在執行,讓出CPU執行權
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//沒有進入以上分支,運行到此分支,這說明此線程確實需要開始等待了,
//那么如果還未為此線程建立關聯的等待節點,則進行創建。
else if (q == null)
q = new WaitNode();
//通過CAS把此線程的等待node加入到連表中。失敗的話,下次循環若能運行到此分支,會繼續添加。
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//如果設置了超時,檢查是否超時。超時的話結束等待。 否則掛起超時時長
//如果沒有設置超時時長,則永久掛起
//回到上面的finishCompletion方法,等到task執行完成后會執行LockSupport.unpark(t),結束阻塞。
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
~~~
最后我們看一下 report 方法:
~~~java
private V report(int s) throws ExecutionException {
//獲取執行結果
Object x = outcome;
//NORMAL為正常結束,那么直接把X轉型后返回
if (s == NORMAL)
return (V)x;
//如果任務被取消了,則拋出異常
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
~~~
outcome 保存的就是任務的執行結果。根據此時的狀態,選擇返回執行結果還是拋出取消的異常。
最后我們總結下 FutureTask 的代碼:
1、FutureTask 實現 Runnable 和 Future 接口;
2、在線程上運行 FutureTask 后,run 方法被調用,run 方法會調用傳入的 Callable 接口的 call 方法;
3、拿到返回值后,通過 set 方法保存結果到 outcome,并且喚醒所有等待的線程;
4、調用 get 方法獲取執行結果時,如果沒有執行完畢,則進入等待,直到 set 方法調用后被喚醒。
下圖示意了兩個線程運行 task 和 get 時的程序邏輯:

## 4、總結
Future 模式在實際開發中有著大量的應用場景。比如說微服務架構中,需要調用不同服務接口獲取數據,但是接口調用間并無依賴關系,那么可以通過 FutureTask 并發調用,然后再執行后續邏輯。如果我們采用串行的方式,則需要一個接口返回后,再調用下一個接口。FutreTask 需要結合 Callable 接口使用,示例代碼中為了讓大家顯示的看到 Callable 接口,所以采用匿名對象的方式。實際使用中我們可以使用 lambda 表達式來簡化代碼,如下:
~~~java
FutureTask<String> cookTask = new FutureTask<>(() -> {
Thread.sleep(3000);
return "5斤的龍蝦";
})
~~~
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35 拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語