<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ## 32 請按到場順序發言—Completion Service詳解 > 時間像海綿里的水,只要你愿意擠,總還是有的。 > ——魯迅 講解 CompletionService 之前,我們先回憶一下 ExcutorSevice。ExcutorService 實現了通過線程池來并發執行任務。其中有一種方式是通過線程池執行 Callable 任務,然后通過 Future 獲取異步執行的結果,如下面的代碼: ~~~java public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(5); Callable callable1 = () -> { Thread.sleep(10000); return "任務1完成"; }; Callable callable2 = () -> { Thread.sleep(5000); return "任務2完成"; }; Future future1 = executor.submit(callable1); Future future2 = executor.submit(callable2); System.out.println(future1.get()); System.out.println(future2.get()); } ~~~ 任務一執行需要 10 秒,任務二執行只需要 5 秒。但是當執行到 future1.get () 時,主線程會被阻塞。等待 10 秒后第一個任務執行完才會去獲取第二個任務。然后執行和第二個任務相關的打印操作。大家有沒有看出問題?任務 2 明明在 5 秒前就已經執行完成,卻不能立刻打印。主線程阻塞在任務一結果的獲取上。這樣程序執行的效率并不高。如果任務完成后能夠立刻被取得執行結果,然后執行后面的邏輯,效率就會有顯著的提升。今天我們要講解的 CompletionService 就是用來做這個事情的。CompletionService 可以按照執行完成結果的到場順序,被主線程獲取到,從而繼續執行后面邏輯。 ## 1、了解 CompletionService 了解一個類最好、最快的方法就是閱讀源代碼的注解。而大多數人通常的做法卻是去百度或者 google。這樣有兩個弊端,一是效率并不一定高,可能搜出來很多無用的內容。二是看到的文章并不權威,甚至可能是錯的。有的同學可能覺得英文閱讀費勁,其實作為開發人員,英語閱讀已經是必備技能。這就如同你要熟知 IDE 的快捷鍵一樣,所以如果覺得英文閱讀困難,可以刻意練習。其實多讀一些技術文檔,會發現用詞基本都是類似的。 扯的有點遠,我們收回來,先看看源代碼中對 CompletionService 的解釋: 對異步任務執行和執行結果消費解藕。生產者提交任務執行。消費者則獲取完成的任務,然后按照完成任務的順序對任務結果進行處理。 官方的解釋是不是十分簡潔明了? # 2、使用 CompletionService 下面我們使用 CompletionService 實現一個吃蘋果的程序。首先我聲明一個流,里面是一些水果,每個水果會對應一個洗干凈的任務。然后主線程拿到洗干凈的水果再一個個吃掉。代碼如下: ~~~java public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService pool = Executors.newFixedThreadPool(5); CompletionService<String> service = new ExecutorCompletionService<String>(pool); Stream.of("蘋果", "梨", "葡萄", "桃") .forEach(fruit -> service.submit(() -> { if(fruit.equals("蘋果")){ TimeUnit.SECONDS.sleep(6); }else if(fruit.equals("梨")){ TimeUnit.SECONDS.sleep(1); }else if(fruit.equals("葡萄")){ TimeUnit.SECONDS.sleep(10); }else if(fruit.equals("桃")){ TimeUnit.SECONDS.sleep(3); } return "洗干凈的"+fruit; }) ); String result; while((result=service.take().get())!=null){ System.out.println("吃掉"+result); } } ~~~ 可以看到有四種水果。會為每個水果啟一個洗水果的任務。每種水果洗的時間不同,其中葡萄最不好洗要 10 秒,而梨最好洗,只需要 1 秒。等待水果洗好后,主線程通過 service.take () 取得執行完成的 Future,然后從里面 get 出返回值,把洗干凈的水果吃掉。 我們可以看到輸出如下: ~~~ 吃掉洗干凈的梨 吃掉洗干凈的桃 吃掉洗干凈的蘋果 吃掉洗干凈的葡萄 ~~~ 可以看到哪個水果先洗干凈就會先被吃掉。這也證明了 service.take () 的順序是任務的完成順序,而不是任務提交的順序。 通過 CompletionService 我們就可以一端生產,另一端按照完成的順序進行消費。這避免提交大量任務時,不知道哪個任務先完成,從而在調用 Future 的 get 方法時產生阻塞。使用 CompletionService,永遠都是完成一個返回一個,然后消費一個。這樣你的程序才更為高效。 主線程收到返回后,可以再繼續使用 CompletionService 來異步執行下一步的邏輯,這和非阻塞的編程方式異曲同工。 ## 3、CompletionService 源碼分析 ### 3.1 CompletionService 構造方法 我們先看如何初始化 CompletionService: ~~~java ExecutorService pool = Executors.newFixedThreadPool(5); CompletionService<String> service = new ExecutorCompletionService<String>(pool); ~~~ 首先初始化 ExecutorService,在構造 ExecutorCompletionService 時作為參數傳入。其實 CompletionService 對任務的執行其實就是借助于 ExecutorService 來完成的。接下來我們進入它的構造函數: ~~~java public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } ~~~ 構造函數中構造了三個屬性: ~~~java private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; ~~~ executor 就是你傳入的 ExecutorService,用來執行任務。 aes 的作用是創建新的 task。它的初始化過程比較有意思,判斷了是否為 AbstractExecutorService 的實例。至于為什么這么做,我們后面再詳細講解。 completionQueue 是一個存放 Future 的阻塞隊列,并且是無界的。這意味著如果源頭不斷的產生 Future,但是沒有去消費,就會造成內存泄漏。 executor 執行完成的 Future 會被放入 completionQueue 中,take 方法將會從 completionQueue 中取得最新的 future 對象(最近執行完的 task 的結果)。 ### 3.2 CompletionService 的 submit 方法 ~~~java public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } ~~~ 首先將 Callable 類型的 task 轉為 RunnableFuture 類型。RunnableFuture 是個接口,FutureTask 是其一種實現。 然后通過 new QueueingFuture (f),再將 RunnableFuture 包裝為 QueueingFuture 類型的對象。QueueingFuture 的作用就是在 Future 完成時,加入到 completionQueue 中。 我們先看 newTaskFor 的源碼: ~~~java private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } ~~~ 如果 aes 為空,那么直接 new FutureTask。如果不為空則調用 aes 的 newTaskFor 方法。什么情況 aes 會為空呢?我們再看下 aes 初始化的代碼: ~~~java this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; ~~~ 當傳入的 executor 為 AbstractExecutorService 類型時,那么 aes 不為空。否則 aes 為空。這兩處邏輯處理是相關聯的,這么做的原因如下: 1、如果 executor 是 AbstractExecutorService 的子類,有可能會重寫 newTaskFor 方法,所以這里優先使用 executo r 的方法來創建 Task,這樣后面通過 executor 執行 task 才能正確。比如 ForkJoinPool 就對 newTaskFor 方法進行了重寫; 2、如果 executor 不是繼承自 AbstractExecutorService。那么它可能并沒有 newTaskFor 方法。所以需要 CompletionService 自己來創建 FutureTask。 這樣看來 aes 的存在,只是為了盡量使用 executor 提供的 newTaskFor 方法來創建 task,以使后面 excute 方法能夠正常運行。 接下來我們分析 QueueingFuture 方法: ~~~java private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } ~~~ QueueingFuture 是內部靜態類,并且是 FutureTask 的子類。他只是重寫了 done 方法。大家回憶上一節對 Future 的分析,應該還記得 done 方法在任務執行結果返回后被調用,但是留給子類來實。這里就用上了這個特性。done 方法里面做的就是把 task 加入阻塞隊列中。這意味著,先完成的 task 會先把自己的 Future 放入隊列中。那么當然也會被 take 方法先取到。而由于是阻塞隊列,所以 take 方法取不到 task 時,就會阻塞。但由于能被 take 到的 task 肯定已經有了返回值,所以調用 task 的 get 方法時就不會再次阻塞了。也就是說 client 代碼中的下面一行只會在 take 時發生阻塞: ~~~java while((result=service.take().get())!=null){ System.out.println("吃掉"+result); } ~~~ executor 執行任務的代碼就不用再次分析了,這在之前學習 Executor 的時候已經詳細分析過了。submit 方法分析完后我們再來看看 take 方法。 ### 3.3 CompletionService 的 take 方法 相比較 submit 方法,take 方法就更為簡單了,如下: ~~~java public Future<V> take() throws InterruptedException { return completionQueue.take(); } ~~~ 只有一行代碼,就是從 completionQueue 中取得 Futrue 對象。由于 completionQueue 是阻塞隊列,當沒有 Future 時,就會阻塞在此。而 completionQueue 中保存 Future 的順序是完成順序。 ## 4、 總結 CompletionService 給我們提供了一種非阻塞的異步執行方式。讓程序更為高效。他的實現非常的簡單和巧妙,值得我們借鑒。其實我們學習到這里,不知道你是否有這種體會,這些工具實際上就是我們之前學習內容的組合運用,如果前面你掌握的很牢固,學習起來一點也不費勁。如果前面就似懂非懂,那么就會越看越糊涂。其實我們在學習上至少有一半的時間都是在打基礎,但這個過程必不可少,并且受益更為深遠。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看