## 4.3.1 Netty的Future
Netty的Future在concurrent包的Future基礎上,增加了更多的功能。在Java的Future中,主要是任務的運行/取消,而Netty的Future增加了更多的功能。
```
public interface Future<V> extends java.util.concurrent.Future<V>
boolean isSuccess(); 只有IO操作完成時才返回true
boolean isCancellable(); 只有當cancel(boolean)成功取消時才返回true
Throwable cause(); IO操作發生異常時,返回導致IO操作以此的原因,如果沒有異常,返回null
// 向Future添加事件,future完成時,會執行這些事件,如果add時future已經完成,會立即執行監聽事件
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 移除監聽事件,future完成時,不會觸發
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> sync() throws InterruptedException; //等待future done
Future<V> syncUninterruptibly(); // 等待future done,不可打斷
Future<V> await() throws InterruptedException; // 等待future完成
Future<V> awaitUninterruptibly(); // 等待future 完成,不可打斷
V getNow(); // 立刻獲得結果,如果沒有完成,返回null
boolean cancel(boolean mayInterruptIfRunning); // 如果成功取消,future會失敗,導致CancellationException
```
Netty為Future加入的功能主要是添加/刪除監聽事件,在Promise中會有實例演示。其他的方法是為get()方法服務的,get()方法可以通過調用await/getNow等方法實現。
## 4.3.2 Netty的Promise機制
Netty的Future與Java自帶到Future略有不同,其引入了Promise機制。在Java的Future中,業務邏輯為一個Callable或Runnable實現類,該類的call()或run()執行完畢意味著業務邏輯的完結;而在Promise機制中,可以在業務邏輯中人工設置業務邏輯的成功與失敗。
Netty中Promise接口的定義如下:
```
public interface Promise<V> extends Future<V> {
// 設置future執行結果為成功
Promise<V> setSuccess(V result);
// 嘗試設置future執行結果為成功,返回是否設置成功
boolean trySuccess(V result);
// 設置失敗
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 設置為不能取消
boolean setUncancellable();
//一下省略了覆蓋Future的一些方法
}
```
下面以一個例子來說明Promise的使用方法,還是以seach()查詢產品報價為例:
```
// main 方法
NettyFuture4Promise test = new NettyFuture4Promise();
Promise<String> promise = test.search("Netty In Action");
String result = promise.get();
System.out.println("price is " + result);
//
private Promise<String> search(String prod) {
NioEventLoopGroup loop = new NioEventLoopGroup();
// 創建一個DefaultPromise并返回
DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());
loop.schedule(new Runnable() {
@Override
public void run() {
try {
System.out.println(String.format(" >>search price of %s from internet!",prod));
Thread.sleep(5000);
promise.setSuccess("$99.99");// 等待5S后設置future為成功,
// promise.setFailure(new NullPointerException()); //當然,也可以設置失敗
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},0,TimeUnit.SECONDS);
return promise;
}
```
可以看到,Promise能夠在業務邏輯線程中通知Future成功或失敗,由于Promise繼承了Netty的Future,因此可以加入監聽事件。
```
// main方法中,查詢結束后獲取promise,加入兩個監聽事件,分別給小Hong發通知和Email
Promise<String> promise = test.search("Netty In Action");
promise.addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
System.out.println("Listener 1, make a notifice to Hong,price is " + future.get());
}
});
promise.addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
System.out.println("Listener 2, send a email to Hong,price is " + future.get());
}
});
```
Future和Promise的好處在于,獲取到Promise對象后可以為其設置異步調用完成后的操作,然后立即繼續去做其他任務。
## 4.3.3 Netty常用的Promise類
Netty常用的純Future機制的類,有SucceededFuture和FailedFuture,他們不需要設置業務邏輯代碼,會立刻完成,只需要設置成功后的返回和拋出的異常。
Netty的常用Promise類有DefalutPromise類,這是Promise實現的基礎,后續會對這個類的實現進行解讀;DefaultChannelPromise是DefalutPromise的子類,加入了channel這個屬性。
下面對DefaultChannelPromise進行分析,其類圖如下:

### DefaultPromise的使用
Netty中涉及到異步操做的地方都使用了promise,例如,下面是服務器/客戶端啟動時的注冊任務,最終會調用unsafe的register,調用過程中會傳入一個promise,unsafe進行事件的注冊時調用promise可以設置成功/失敗。
```
// SingleThreadEventLoop.java
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
// AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
......
}
```
### DefaultPromise的實現
DefaultChannelPromise提供的功能可以分為兩個部分:一方面是為調用者提供get()和addListener()用于獲取Future任務執行結果和添加監聽事件;另一方面是為業務處理任務提供setSuccess()等方法設置任務的成功或失敗。
**get方法**
DefaultPromise的get方法有兩個,無參數的get會阻塞等待;有參數的get會等待指定事件,若未結束拋出超時異常。這兩個get()是在其父類AbstractFuture中實現的,通過調用下面四個方法實現:
```
await(); // 等待Future任務結束
await(timeout, unit) // 等待Future任務結束,超過事件則拋出異常
cause(); // 返回Future任務的異常
getNow() // /返回Future任務的執行結果
// 先等待,如果有異常則拋出,無異常返回getNow()
public V get() throws InterruptedException, ExecutionException {
await();
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
```
**await**
await()方法判斷Future任務是否結束,之后獲取this鎖,如果任務未完成,則調用Object的wait()等待
```
public Promise<V> await() throws InterruptedException {
// 判斷Future任務是否結束,內部根據result是否為null判斷,setSuccess或setFailure時會通過CAS修改result
if (isDone()) {
return this;
}
if (Thread.interrupted()) { // 線程是否被中斷
throw new InterruptedException(toString());
}
checkDeadLock(); // 檢查當前線程是否與線程池運行的線程是一個
synchronized (this) {
while (!isDone()) {
incWaiters(); // waiters計數加1
try {
wait(); // Object的方法,讓出cpu,加入等待隊列
} finally {
decWaiters(); // waiters計數減1
}
}
}
return this;
}
```
await(long timeout, TimeUnit unit)與awite類似,只是調用了Object對象的wait(long timeout, int nanos)方法
awaitUninterruptibly()方法在內部catch住了等待線程的中斷異常,因此不會拋出中斷異常。
#### 監聽事件相關方法
**add/remove方法**
addListener方法被調用時,將傳入的回調類傳入到listeners對象中,如果監聽多于1個,會創建DefaultFutureListeners對象將回調方法保存在一個數組中。removeListener會將listeners設置為null(只有一個時)或從數組中移除(多個回調時)。
```
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).remove(listener);
} else if (listeners == listener) {
listeners = null;
}
}
```
**notifyListeners()**
在添加監聽器的過程中,如果任務剛好執行完畢done(),則立即觸發監聽事件。觸發監聽通過notifyListeners()實現。主要邏輯為:如果當前addListener的線程(準確來說應該是調用notifyListeners的線程,因為addListener和setSuccess都會調用notifyListeners()和Promise內的線程池當前執行的線程是同一個線程,則放在線程池中執行,否則提交到線程池去執行;例如,main線程中調用addListener時任務完成,notifyListeners()執行回調,會提交到線程池中執行;而如果是執行Future任務的線程池中setSuccess()時調用notifyListeners(),會放在當前線程中執行。
內部維護了notifyingListeners用來記錄是否已經觸發過監聽事件,只有未觸發過且監聽列表不為空,才會依次便利并調用operationComplete
```
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
```
#### setSuccess()方法
Future任務在執行完成后調用setSuccess()或setFailure()通知Future執行結果;主要邏輯是:修改result的值,若有等待線程則喚醒,通知監聽事件。
```
if (setSuccess0(result)) { // 設置成功后喚醒等待線程
notifyListeners(); // 通知
return this;
}
// 通知成功時將結果保存在變量result,通知失敗時,使用CauseHolder包裝Throwable賦值給result
// RESULT_UPDATER 是一個使用CAS更新內部屬性result的類,
// 如果result為null或UNCANCELLABLE,更新為成功/失敗結果;UNCANCELLABLE是不可取消狀態
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters();// 調用Object的notifyAll();通知等待線程
return true;
}
return false;
}
```
#### cancel()方法
cancel用來取消任務,根據result判斷,如果可以取消,則喚醒等待線程,通知監聽事件。
```
public boolean cancel(boolean mayInterruptIfRunning) {
//如果result為null,說明未setUncancellable()/setSuccess/setFailure
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
checkNotifyWaiters(); // 喚醒等待線程
notifyListeners(); // 觸發監聽事件
return true;
}
return false;
}
```
通過上面的分析,我們可以看到DefaultPromise內部通過result記錄Future任務的執行狀態:
```
null - 未完成
CANCELLATION_CAUSE_HOLDER -被取消
UNCANCELLABLE - 不可取消
業務處理調用setSuccess時傳入的結果
業務處理調用setFailure時包裝Throws的CauseHolder
```
DefaultPromise內部維護了一個監聽列表保存監聽事件,在任務完成或取消時通知監聽事件(提交到線程池中執行);任務的等待與喚醒通過Object的wait()和notifyAll()完成
### DefaultChannelPromise實現
DefaultChannelPromise是DefaultPromise的子類,內部維護了一個通道變量Channel channel;Promise機制相關的方法都是調用父類方法。
除此之外,還實現了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,我們可以將ChannelFuture注冊到ChannelFlushPromiseNotifier類,當有數據寫入或到達checkpoint時使用。
```
interface FlushCheckpoint {
long flushCheckpoint();
void flushCheckpoint(long checkpoint);
ChannelPromise promise();
}
```