## 4.2.1 Java的Future機制
Future顧名思義,是一個未來完成的異步操作,可以獲得未來返回的值。常用的場景如:調用一個耗時的方法search()(根據產品名稱在全網查詢價格,假設需要3s左右才能返回),該方法會立即返回Future對象,調使用Future.get()可以同步等待耗時方法的返回,也可以調用future的cancel()取消Future任務。如下面的程序,search方法邏輯會根據名字在全網查找價格,假設需要耗時3s,該方法會立即返回一個Future對象供用戶線程使用;在主方法中可以使用get()等待獲取到價格,也可以使用cancel()取消查詢。
```
public Future<String> search(String prodName) {
FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
@Override
public String call() {
try {
System.out.println(String.format(" >>search price of %s from internet!",prodName));
Thread.sleep(3000);
return "$99.99";
}catch(InterruptedException e){
System.out.println("search function is Interrupted!");
}
return null;
}
});
new Thread(future).start();//交給線程去執行
return future; // 立刻返回future對象
}
JavaFuture jf = new JavaFuture();
Future<String> future = jf.search("Netty權威指南");// 返回future
System.out.println("Begin search,get future!");
// 測試1-【獲取結果】等待3s后會返回
String prods = future.get();//獲取prods
System.out.println("get result:"+prods);
// 測試2-【取消任務】1s后取消任務
Thread.sleep(1000);
future.cancel(false);//true時會中斷線程,false不會
System.out.println("Future is canceled? " + (future.isCancelled()?"yes":"no"));
Thread.sleep(4000); //等待4s檢查一下future所在線程是否還在執行
```
## 4.2.2 Future的實現
假如我們需要實現一個Future,考慮一下需要實現哪些功能:
```
Future<String> future = jf.search("Netty權威指南");
Future search(){
//啟動線程或者在線程池中執行業務邏輯
return future; //立刻返回future
}
```
* search方法需要立即返回一個Future對象,并且需要啟動一個線程(或線程池)執行業務邏輯;
* 由于Future對象可以等待線程執行結束或者取消線程,Future內部需要能夠管理業務邏輯的執行狀態。
* 業務邏輯結束或異常時需要告訴Future對象,有兩種方式:在Future中啟動線程執行業務邏輯;或者業務邏輯單獨執行,通過創建的Future實例的方法如setSuccess(result)方法通知Future。Java的FutureTask采用了第一種方法,其本身繼承了Runnable,在run方法中執行傳入的業務邏輯。而Netty的Promise中采用了第二種方法。
* get()方法中,如果業務邏輯還未執行完畢,需要等待,可以用鎖機制實現。
Java中的Future是一個接口,內部有如下方法:
```
boolean cancel(boolean mayInterruptIfRunning) 試圖取消對此任務的執行。
V get() 如有必要,等待計算完成,然后獲取其結果。
V get(long timeout, TimeUnit unit) 如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。
boolean isCancelled() 如果在任務正常完成前將其取消,則返回 true。
boolean isDone() 如果任務已完成,則返回 true。
```
下面,我們自己實現一個Future加深理解,下面定義了一個繼承Future的MyFutureTask,初始化時傳遞一個Callable作為業務邏輯,實現Future接口是為了控制業務邏輯線程,實現Runnable接口是為了業務線程執行時能夠修改Future的內部狀態。
```
public class MyFutureTask<V> implements Future<V>,Runnable {
Callable<V> callable; //業務邏輯
boolean running = false ,done = false,cancel = false;// 業務邏輯執行狀態
ReentrantLock lock ;//鎖
V outcome;//結果
public MyFutureTask(Callable<V> callable) {
if(callable == null) {
throw new NullPointerException("callable cannot be null!");
}
this.callable = callable;
this.done = false;
this.lock = new ReentrantLock();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
callable = null;
cancel = true;
return true;
}
@Override
public boolean isCancelled() {
return cancel;
}
@Override
public boolean isDone() {
return done;
}
@Override
public V get() throws InterruptedException, ExecutionException {
try {
this.lock.lock();//先獲取鎖,獲得后說明業務邏輯已經執行完畢
return outcome;
}finally{
this.lock.unlock();
}
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
this.lock.tryLock(timeout, unit);
return outcome;
}catch (InterruptedException e) {
return null;
}finally{
this.lock.unlock();
}
}
@Override
public void run() {
try {
this.lock.lock(); // 啟動線程,先上鎖,防止get時直接返回
running = true;
try {
outcome = callable.call(); // 業務邏輯
} catch (Exception e) {
e.printStackTrace();
}
done = true;
running = false;
}finally {
this.lock.unlock(); // 解鎖后get可獲取
}
}
}
```
測試程序如下:
```
public Future<String> search(String prodName) {
MyFutureTask<String> future = new MyFutureTask<String>(new Callable<String>() {
@Override
public String call() {
try {
System.out.println(String.format(" >>search price of %s from internet!",prodName));
Thread.sleep(3000);
return "$99.99";
}catch(InterruptedException e){
System.out.println("search function is Interrupted!");
}
return null;
}
});
new Thread(future).start();// 或提交到線程池中
return future;
}
```
## 4.2.3 Java的Future實現
當然,上面是自己實現的FutureTask,Java自帶的FutureTask要比上面的更加復雜和健壯。下面我們進行一些分析。
1. FutureTask內部維護了state,表示運行狀態,只能通過set,setException, 和 cancel來修改。
```
private static final int NEW = 0; //初始狀態,
private static final int COMPLETING = 1; // 業務邏輯已經結束
private static final int NORMAL = 2; // 正常結束
private static final int EXCEPTIONAL = 3; // 異常結束
private static final int CANCELLED = 4; // 已經取消
private static final int INTERRUPTING = 5; // 中斷中
private static final int INTERRUPTED = 6; // 已經中斷
```
2. private volatile WaitNode waiters; 維護了等待的線程,get()方法時,如果業務邏輯還未執行完畢,則創建WaitNode q,將其q.next設置為waiters,waiters設置為q;這樣組成了一個等待鏈表。在業務邏輯執行完畢(正常或異常結束)時,
**run方法**
run方法用來執行業務邏輯,在此過程中需要維護好業務邏輯的運行狀態
```
public void run() {
// 1. 如果state不為初始狀態或者runner不為null,說明已經在運行了,直接返回
// 如果為空,使用CAS將runner設置為當前線程,防止并發進入
//runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) { // 2.業務邏輯不為空并且state為NEW時才運行
V result;
boolean ran;
try {
result = c.call(); // 3. 執行業務邏輯
ran = true; // ran為true表示正常返回
} catch (Throwable ex) {
result = null; // 發生異常,結果為null
ran = false; // 非正常結束
setException(ex); // 設置異常
}
if (ran)
set(result); // 正常結束,設置結果
}
} finally {
// 為例防止并發調用run()方法,進入run時使用cas將runner設置為非空,結束時設為null
runner = null;
int s = state; // 當前狀態為INTERRUPTING或者INTERRUPTED 說明要取消
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);// 如果在中斷進行中,則一直等待
}
}
```
* 執行run方法時,要判斷Future狀態是否正確,必須為NEW;使用CAS將runner對象設置為當前線程,若runner不為null,說明其他線程已經執行了run方法,則直接return;
* 狀態為NEW,執行傳入的業務邏輯,正常結束時,將結果保存到result,ran設置為true;若發生異常,設置result為空,ran為false,并設置異常setException(ex);
* 正常結束,調用set(result);設置結果
* 業務邏輯執行結束,講runner設置為null,若線程在INTERRUPTING或者INTERRUPTED 說明要取消;如果在中斷進行中,則一直等待。
* setException(ex); 業務邏輯異常時調用
```
protected void setException(Throwable t) {
// 若狀態為NEW,將其設置為COMPLETING-完成
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t; // 結果為拋出的異常
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最終狀態為EXCEPTIONAL-異常
finishCompletion();
}
}
```
* set(V v) 業務邏輯正常結束時設置結果
```
protected void set(V v) {
// 若狀態為NEW,將其設置為COMPLETING-完成
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終狀態為NORMAL-正常結束
finishCompletion();
}
}
```
* finishCompletion做了一些收尾性工作,根據waiters鏈表,喚醒等待的線程。
```
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) { // 遍歷鏈表
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // 喚醒線程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
```
**get方法**
get時,如果業務邏輯尚未結束,需要使用LockSupport.park(this);將休眠等待的線程,在業務邏輯完成后,finishCompletion()會喚醒線程,之后返回業務邏輯的處理結果。
```
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果狀態為NEW或者COMPLETING,說明還未結束,加入等待鏈表waiters
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s); // 返回結果
}
```
**cancel方法**
cancel方法會取消執行業務邏輯的,主要邏輯如下:
```
public boolean cancel(boolean mayInterruptIfRunning) {
// mayInterruptIfRunning表示以中斷取消
// 如果狀態為NEW,說明還未執行,無需取消;講狀態設置為打斷或取消
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) { // 以中斷取消
try {
Thread t = runner;
if (t != null)
t.interrupt(); // 執行線程的interrupt方法
} finally { // 中斷完成,修改狀態為INTERRUPTED-已中斷
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion(); // 喚醒等待線程
}
return true;
}
```
通過分析,可以看到,java的FutureTask通過state記錄業務邏輯的執行狀態;多線程時使用CAS防止重復進入;業務邏輯未執行完成時,會將線程加入到waiter鏈表,使用LockSupport.park()阻塞業務線程;業務邏輯執行完畢或發生異常或被取消時,喚醒等待列表的線程。
與我們實現時使用的ReentrantLock在原理上是一樣的,ReentrantLock的lock在獲取不到鎖時,也會維護一個鏈表保存等待列表,釋放鎖時,喚醒等待列表上的線程。區別在與,Java的實現會同時喚醒所有的等待線程,而unlock時等線程表會依次獲得鎖。