<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                ## 4.3.1 Netty的Future Netty的Future在concurrent包的Future基礎上,增加了更多的功能。在Java的Future中,主要是任務的運行/取消,而Netty的Future增加了更多的功能。 ``` public interface Future<V> extends java.util.concurrent.Future<V> boolean isSuccess(); 只有IO操作完成時才返回true boolean isCancellable(); 只有當cancel(boolean)成功取消時才返回true Throwable cause(); IO操作發生異常時,返回導致IO操作以此的原因,如果沒有異常,返回null // 向Future添加事件,future完成時,會執行這些事件,如果add時future已經完成,會立即執行監聽事件 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); // 移除監聽事件,future完成時,不會觸發 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> sync() throws InterruptedException; //等待future done Future<V> syncUninterruptibly(); // 等待future done,不可打斷 Future<V> await() throws InterruptedException; // 等待future完成 Future<V> awaitUninterruptibly(); // 等待future 完成,不可打斷 V getNow(); // 立刻獲得結果,如果沒有完成,返回null boolean cancel(boolean mayInterruptIfRunning); // 如果成功取消,future會失敗,導致CancellationException ``` Netty為Future加入的功能主要是添加/刪除監聽事件,在Promise中會有實例演示。其他的方法是為get()方法服務的,get()方法可以通過調用await/getNow等方法實現。 ## 4.3.2 Netty的Promise機制 Netty的Future與Java自帶到Future略有不同,其引入了Promise機制。在Java的Future中,業務邏輯為一個Callable或Runnable實現類,該類的call()或run()執行完畢意味著業務邏輯的完結;而在Promise機制中,可以在業務邏輯中人工設置業務邏輯的成功與失敗。 Netty中Promise接口的定義如下: ``` public interface Promise<V> extends Future<V> { // 設置future執行結果為成功 Promise<V> setSuccess(V result); // 嘗試設置future執行結果為成功,返回是否設置成功 boolean trySuccess(V result); // 設置失敗 Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); // 設置為不能取消 boolean setUncancellable(); //一下省略了覆蓋Future的一些方法 } ``` 下面以一個例子來說明Promise的使用方法,還是以seach()查詢產品報價為例: ``` // main 方法 NettyFuture4Promise test = new NettyFuture4Promise(); Promise<String> promise = test.search("Netty In Action"); String result = promise.get(); System.out.println("price is " + result); // private Promise<String> search(String prod) { NioEventLoopGroup loop = new NioEventLoopGroup(); // 創建一個DefaultPromise并返回 DefaultPromise<String> promise = new DefaultPromise<String>(loop.next()); loop.schedule(new Runnable() { @Override public void run() { try { System.out.println(String.format(" >>search price of %s from internet!",prod)); Thread.sleep(5000); promise.setSuccess("$99.99");// 等待5S后設置future為成功, // promise.setFailure(new NullPointerException()); //當然,也可以設置失敗 } catch (InterruptedException e) { e.printStackTrace(); } } },0,TimeUnit.SECONDS); return promise; } ``` 可以看到,Promise能夠在業務邏輯線程中通知Future成功或失敗,由于Promise繼承了Netty的Future,因此可以加入監聽事件。 ``` // main方法中,查詢結束后獲取promise,加入兩個監聽事件,分別給小Hong發通知和Email Promise<String> promise = test.search("Netty In Action"); promise.addListener(new GenericFutureListener<Future<? super String>>() { @Override public void operationComplete(Future<? super String> future) throws Exception { System.out.println("Listener 1, make a notifice to Hong,price is " + future.get()); } }); promise.addListener(new GenericFutureListener<Future<? super String>>() { @Override public void operationComplete(Future<? super String> future) throws Exception { System.out.println("Listener 2, send a email to Hong,price is " + future.get()); } }); ``` Future和Promise的好處在于,獲取到Promise對象后可以為其設置異步調用完成后的操作,然后立即繼續去做其他任務。 ## 4.3.3 Netty常用的Promise類 Netty常用的純Future機制的類,有SucceededFuture和FailedFuture,他們不需要設置業務邏輯代碼,會立刻完成,只需要設置成功后的返回和拋出的異常。 Netty的常用Promise類有DefalutPromise類,這是Promise實現的基礎,后續會對這個類的實現進行解讀;DefaultChannelPromise是DefalutPromise的子類,加入了channel這個屬性。 下面對DefaultChannelPromise進行分析,其類圖如下: ![NettyFuture類圖](http://www.uxiaowo.com/netty/Future/Future.png) ### DefaultPromise的使用 Netty中涉及到異步操做的地方都使用了promise,例如,下面是服務器/客戶端啟動時的注冊任務,最終會調用unsafe的register,調用過程中會傳入一個promise,unsafe進行事件的注冊時調用promise可以設置成功/失敗。 ``` // SingleThreadEventLoop.java public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; } // AbstractChannel.AbstractUnsafe public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } ...... } ``` ### DefaultPromise的實現 DefaultChannelPromise提供的功能可以分為兩個部分:一方面是為調用者提供get()和addListener()用于獲取Future任務執行結果和添加監聽事件;另一方面是為業務處理任務提供setSuccess()等方法設置任務的成功或失敗。 **get方法** DefaultPromise的get方法有兩個,無參數的get會阻塞等待;有參數的get會等待指定事件,若未結束拋出超時異常。這兩個get()是在其父類AbstractFuture中實現的,通過調用下面四個方法實現: ``` await(); // 等待Future任務結束 await(timeout, unit) // 等待Future任務結束,超過事件則拋出異常 cause(); // 返回Future任務的異常 getNow() // /返回Future任務的執行結果 // 先等待,如果有異常則拋出,無異常返回getNow() public V get() throws InterruptedException, ExecutionException { await(); Throwable cause = cause(); if (cause == null) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } ``` **await** await()方法判斷Future任務是否結束,之后獲取this鎖,如果任務未完成,則調用Object的wait()等待 ``` public Promise<V> await() throws InterruptedException { // 判斷Future任務是否結束,內部根據result是否為null判斷,setSuccess或setFailure時會通過CAS修改result if (isDone()) { return this; } if (Thread.interrupted()) { // 線程是否被中斷 throw new InterruptedException(toString()); } checkDeadLock(); // 檢查當前線程是否與線程池運行的線程是一個 synchronized (this) { while (!isDone()) { incWaiters(); // waiters計數加1 try { wait(); // Object的方法,讓出cpu,加入等待隊列 } finally { decWaiters(); // waiters計數減1 } } } return this; } ``` await(long timeout, TimeUnit unit)與awite類似,只是調用了Object對象的wait(long timeout, int nanos)方法 awaitUninterruptibly()方法在內部catch住了等待線程的中斷異常,因此不會拋出中斷異常。 #### 監聽事件相關方法 **add/remove方法** addListener方法被調用時,將傳入的回調類傳入到listeners對象中,如果監聽多于1個,會創建DefaultFutureListeners對象將回調方法保存在一個數組中。removeListener會將listeners設置為null(只有一個時)或從數組中移除(多個回調時)。 ``` private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener); } } private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).remove(listener); } else if (listeners == listener) { listeners = null; } } ``` **notifyListeners()** 在添加監聽器的過程中,如果任務剛好執行完畢done(),則立即觸發監聽事件。觸發監聽通過notifyListeners()實現。主要邏輯為:如果當前addListener的線程(準確來說應該是調用notifyListeners的線程,因為addListener和setSuccess都會調用notifyListeners()和Promise內的線程池當前執行的線程是同一個線程,則放在線程池中執行,否則提交到線程池去執行;例如,main線程中調用addListener時任務完成,notifyListeners()執行回調,會提交到線程池中執行;而如果是執行Future任務的線程池中setSuccess()時調用notifyListeners(),會放在當前線程中執行。 內部維護了notifyingListeners用來記錄是否已經觸發過監聽事件,只有未觸發過且監聽列表不為空,才會依次便利并調用operationComplete ``` private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } } ``` #### setSuccess()方法 Future任務在執行完成后調用setSuccess()或setFailure()通知Future執行結果;主要邏輯是:修改result的值,若有等待線程則喚醒,通知監聽事件。 ``` if (setSuccess0(result)) { // 設置成功后喚醒等待線程 notifyListeners(); // 通知 return this; } // 通知成功時將結果保存在變量result,通知失敗時,使用CauseHolder包裝Throwable賦值給result // RESULT_UPDATER 是一個使用CAS更新內部屬性result的類, // 如果result為null或UNCANCELLABLE,更新為成功/失敗結果;UNCANCELLABLE是不可取消狀態 private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters();// 調用Object的notifyAll();通知等待線程 return true; } return false; } ``` #### cancel()方法 cancel用來取消任務,根據result判斷,如果可以取消,則喚醒等待線程,通知監聽事件。 ``` public boolean cancel(boolean mayInterruptIfRunning) { //如果result為null,說明未setUncancellable()/setSuccess/setFailure if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { checkNotifyWaiters(); // 喚醒等待線程 notifyListeners(); // 觸發監聽事件 return true; } return false; } ``` 通過上面的分析,我們可以看到DefaultPromise內部通過result記錄Future任務的執行狀態: ``` null - 未完成 CANCELLATION_CAUSE_HOLDER -被取消 UNCANCELLABLE - 不可取消 業務處理調用setSuccess時傳入的結果 業務處理調用setFailure時包裝Throws的CauseHolder ``` DefaultPromise內部維護了一個監聽列表保存監聽事件,在任務完成或取消時通知監聽事件(提交到線程池中執行);任務的等待與喚醒通過Object的wait()和notifyAll()完成 ### DefaultChannelPromise實現 DefaultChannelPromise是DefaultPromise的子類,內部維護了一個通道變量Channel channel;Promise機制相關的方法都是調用父類方法。 除此之外,還實現了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,我們可以將ChannelFuture注冊到ChannelFlushPromiseNotifier類,當有數據寫入或到達checkpoint時使用。 ``` interface FlushCheckpoint { long flushCheckpoint(); void flushCheckpoint(long checkpoint); ChannelPromise promise(); } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看