<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ## 4.2.1 Java的Future機制 Future顧名思義,是一個未來完成的異步操作,可以獲得未來返回的值。常用的場景如:調用一個耗時的方法search()(根據產品名稱在全網查詢價格,假設需要3s左右才能返回),該方法會立即返回Future對象,調使用Future.get()可以同步等待耗時方法的返回,也可以調用future的cancel()取消Future任務。如下面的程序,search方法邏輯會根據名字在全網查找價格,假設需要耗時3s,該方法會立即返回一個Future對象供用戶線程使用;在主方法中可以使用get()等待獲取到價格,也可以使用cancel()取消查詢。 ``` public Future<String> search(String prodName) { FutureTask<String> future = new FutureTask<String>(new Callable<String>() { @Override public String call() { try { System.out.println(String.format(" >>search price of %s from internet!",prodName)); Thread.sleep(3000); return "$99.99"; }catch(InterruptedException e){ System.out.println("search function is Interrupted!"); } return null; } }); new Thread(future).start();//交給線程去執行 return future; // 立刻返回future對象 } JavaFuture jf = new JavaFuture(); Future<String> future = jf.search("Netty權威指南");// 返回future System.out.println("Begin search,get future!"); // 測試1-【獲取結果】等待3s后會返回 String prods = future.get();//獲取prods System.out.println("get result:"+prods); // 測試2-【取消任務】1s后取消任務 Thread.sleep(1000); future.cancel(false);//true時會中斷線程,false不會 System.out.println("Future is canceled? " + (future.isCancelled()?"yes":"no")); Thread.sleep(4000); //等待4s檢查一下future所在線程是否還在執行 ``` ## 4.2.2 Future的實現 假如我們需要實現一個Future,考慮一下需要實現哪些功能: ``` Future<String> future = jf.search("Netty權威指南"); Future search(){ //啟動線程或者在線程池中執行業務邏輯 return future; //立刻返回future } ``` * search方法需要立即返回一個Future對象,并且需要啟動一個線程(或線程池)執行業務邏輯; * 由于Future對象可以等待線程執行結束或者取消線程,Future內部需要能夠管理業務邏輯的執行狀態。 * 業務邏輯結束或異常時需要告訴Future對象,有兩種方式:在Future中啟動線程執行業務邏輯;或者業務邏輯單獨執行,通過創建的Future實例的方法如setSuccess(result)方法通知Future。Java的FutureTask采用了第一種方法,其本身繼承了Runnable,在run方法中執行傳入的業務邏輯。而Netty的Promise中采用了第二種方法。 * get()方法中,如果業務邏輯還未執行完畢,需要等待,可以用鎖機制實現。 Java中的Future是一個接口,內部有如下方法: ``` boolean cancel(boolean mayInterruptIfRunning) 試圖取消對此任務的執行。 V get() 如有必要,等待計算完成,然后獲取其結果。 V get(long timeout, TimeUnit unit) 如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。 boolean isCancelled() 如果在任務正常完成前將其取消,則返回 true。 boolean isDone() 如果任務已完成,則返回 true。 ``` 下面,我們自己實現一個Future加深理解,下面定義了一個繼承Future的MyFutureTask,初始化時傳遞一個Callable作為業務邏輯,實現Future接口是為了控制業務邏輯線程,實現Runnable接口是為了業務線程執行時能夠修改Future的內部狀態。 ``` public class MyFutureTask<V> implements Future<V>,Runnable { Callable<V> callable; //業務邏輯 boolean running = false ,done = false,cancel = false;// 業務邏輯執行狀態 ReentrantLock lock ;//鎖 V outcome;//結果 public MyFutureTask(Callable<V> callable) { if(callable == null) { throw new NullPointerException("callable cannot be null!"); } this.callable = callable; this.done = false; this.lock = new ReentrantLock(); } @Override public boolean cancel(boolean mayInterruptIfRunning) { callable = null; cancel = true; return true; } @Override public boolean isCancelled() { return cancel; } @Override public boolean isDone() { return done; } @Override public V get() throws InterruptedException, ExecutionException { try { this.lock.lock();//先獲取鎖,獲得后說明業務邏輯已經執行完畢 return outcome; }finally{ this.lock.unlock(); } } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { this.lock.tryLock(timeout, unit); return outcome; }catch (InterruptedException e) { return null; }finally{ this.lock.unlock(); } } @Override public void run() { try { this.lock.lock(); // 啟動線程,先上鎖,防止get時直接返回 running = true; try { outcome = callable.call(); // 業務邏輯 } catch (Exception e) { e.printStackTrace(); } done = true; running = false; }finally { this.lock.unlock(); // 解鎖后get可獲取 } } } ``` 測試程序如下: ``` public Future<String> search(String prodName) { MyFutureTask<String> future = new MyFutureTask<String>(new Callable<String>() { @Override public String call() { try { System.out.println(String.format(" >>search price of %s from internet!",prodName)); Thread.sleep(3000); return "$99.99"; }catch(InterruptedException e){ System.out.println("search function is Interrupted!"); } return null; } }); new Thread(future).start();// 或提交到線程池中 return future; } ``` ## 4.2.3 Java的Future實現 當然,上面是自己實現的FutureTask,Java自帶的FutureTask要比上面的更加復雜和健壯。下面我們進行一些分析。 1. FutureTask內部維護了state,表示運行狀態,只能通過set,setException, 和 cancel來修改。 ``` private static final int NEW = 0; //初始狀態, private static final int COMPLETING = 1; // 業務邏輯已經結束 private static final int NORMAL = 2; // 正常結束 private static final int EXCEPTIONAL = 3; // 異常結束 private static final int CANCELLED = 4; // 已經取消 private static final int INTERRUPTING = 5; // 中斷中 private static final int INTERRUPTED = 6; // 已經中斷 ``` 2. private volatile WaitNode waiters; 維護了等待的線程,get()方法時,如果業務邏輯還未執行完畢,則創建WaitNode q,將其q.next設置為waiters,waiters設置為q;這樣組成了一個等待鏈表。在業務邏輯執行完畢(正常或異常結束)時, **run方法** run方法用來執行業務邏輯,在此過程中需要維護好業務邏輯的運行狀態 ``` public void run() { // 1. 如果state不為初始狀態或者runner不為null,說明已經在運行了,直接返回 // 如果為空,使用CAS將runner設置為當前線程,防止并發進入 //runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner")); if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { // 2.業務邏輯不為空并且state為NEW時才運行 V result; boolean ran; try { result = c.call(); // 3. 執行業務邏輯 ran = true; // ran為true表示正常返回 } catch (Throwable ex) { result = null; // 發生異常,結果為null ran = false; // 非正常結束 setException(ex); // 設置異常 } if (ran) set(result); // 正常結束,設置結果 } } finally { // 為例防止并發調用run()方法,進入run時使用cas將runner設置為非空,結束時設為null runner = null; int s = state; // 當前狀態為INTERRUPTING或者INTERRUPTED 說明要取消 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s);// 如果在中斷進行中,則一直等待 } } ``` * 執行run方法時,要判斷Future狀態是否正確,必須為NEW;使用CAS將runner對象設置為當前線程,若runner不為null,說明其他線程已經執行了run方法,則直接return; * 狀態為NEW,執行傳入的業務邏輯,正常結束時,將結果保存到result,ran設置為true;若發生異常,設置result為空,ran為false,并設置異常setException(ex); * 正常結束,調用set(result);設置結果 * 業務邏輯執行結束,講runner設置為null,若線程在INTERRUPTING或者INTERRUPTED 說明要取消;如果在中斷進行中,則一直等待。 * setException(ex); 業務邏輯異常時調用 ``` protected void setException(Throwable t) { // 若狀態為NEW,將其設置為COMPLETING-完成 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // 結果為拋出的異常 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最終狀態為EXCEPTIONAL-異常 finishCompletion(); } } ``` * set(V v) 業務邏輯正常結束時設置結果 ``` protected void set(V v) { // 若狀態為NEW,將其設置為COMPLETING-完成 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終狀態為NORMAL-正常結束 finishCompletion(); } } ``` * finishCompletion做了一些收尾性工作,根據waiters鏈表,喚醒等待的線程。 ``` private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // 遍歷鏈表 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); // 喚醒線程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } ``` **get方法** get時,如果業務邏輯尚未結束,需要使用LockSupport.park(this);將休眠等待的線程,在業務邏輯完成后,finishCompletion()會喚醒線程,之后返回業務邏輯的處理結果。 ``` public V get() throws InterruptedException, ExecutionException { int s = state; // 如果狀態為NEW或者COMPLETING,說明還未結束,加入等待鏈表waiters if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); // 返回結果 } ``` **cancel方法** cancel方法會取消執行業務邏輯的,主要邏輯如下: ``` public boolean cancel(boolean mayInterruptIfRunning) { // mayInterruptIfRunning表示以中斷取消 // 如果狀態為NEW,說明還未執行,無需取消;講狀態設置為打斷或取消 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { // 以中斷取消 try { Thread t = runner; if (t != null) t.interrupt(); // 執行線程的interrupt方法 } finally { // 中斷完成,修改狀態為INTERRUPTED-已中斷 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); // 喚醒等待線程 } return true; } ``` 通過分析,可以看到,java的FutureTask通過state記錄業務邏輯的執行狀態;多線程時使用CAS防止重復進入;業務邏輯未執行完成時,會將線程加入到waiter鏈表,使用LockSupport.park()阻塞業務線程;業務邏輯執行完畢或發生異常或被取消時,喚醒等待列表的線程。 與我們實現時使用的ReentrantLock在原理上是一樣的,ReentrantLock的lock在獲取不到鎖時,也會維護一個鏈表保存等待列表,釋放鎖時,喚醒等待列表上的線程。區別在與,Java的實現會同時喚醒所有的等待線程,而unlock時等線程表會依次獲得鎖。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看