<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ## 35拆分你的任務—學習使用Fork/Join框架 > 讀書給人以快樂、給人以光彩、給人以才干。 > ——培根 本節我們學習 Excutor 的另外一種實現 ForkJoinPool。顧名思義,ForkJoinPool 的核心功能有兩個。第一個是 Fork,拆解你的任務。第二個是 Join,合并任務的執行結果。這個場景很常見,比如我們要處理一批數據,由于數據間沒有依賴性,那么我們可以把這一批數據拆解為更小的批次,多線程并行處理。最后再合并處理的結果。 Fork/Join 的核心思想就是分而治之。 ## 1、ForkJoinPool 介紹 ForkJoinPool 自 Java 7 引入。它和 ThreadPoolExecutor 都繼承自 AbstractExecutorService,實現了 ExecutorService 和 Executor 接口。ForkJoinPool 用來把大任務切分為小任務,如果切分完小任務還不夠小(由你設置的閾值決定),那么就繼續向下切分。經過切分后,最后的任務是金字塔形狀,計算完成后向上匯總。如下圖: ![圖片描述](https://img.mukewang.com/5e01ac0f0001bcc116000994.jpg) ForkJoinPool 處理任務的核心思想可以用如下偽代碼表示: ~~~java Result solve(Problem problem) { if (problem is small) directly solve problem else { split problem into independent parts fork new subtasks to solve each part join all subtasks compose result from subresults } } ~~~ 如果一個任務足夠小,那么執行任務邏輯。如果不夠小,拆分為兩個獨立的子任務。子任務執行后, 取得兩個子任務的執行結果進行合并。 ForkJoinPool 通過 submit 執行 ForkJoinTask 類型的任務。ForkJoinTask 是抽象類,有著不同的子類實現。比較常用的是如下兩種: 1、RecursiveAction,沒有返回值; 2、RecurisiveTask,有返回值。 此外 submit 方法還可以執行 Callable 和 Runnable 的接口實現。 ForkJoinTask 就是我們為代碼中的 problem。我來舉個例子看具體如何使用。假如讓你計算 1-10000 的和, 我們可以把任務拆解為 100 個,每個任務計算 100 個數字之和。代碼如下. Task 代碼; ~~~java public class Task extends RecursiveTask<Integer> { private static final int THRESHOLD = 100; private int from; private int to; public Task(int from, int to) { super(); this.from = from; this.to = to; } @Override protected Integer compute() { if (THRESHOLD > (to - from)) { return IntStream.range(from, to + 1) .reduce((a, b) -> a + b) .getAsInt(); } else { int forkNumber = (from + to) / 2; Task left = new Task(from, forkNumber); Task right = new Task(forkNumber + 1, to); left.fork(); right.fork(); return left.join() + right.join(); } } } ~~~ Task 繼承自 RecursiveTask。遞歸任務的大小力度為 100。重寫的 compute 方法和文章開頭的偽代碼 solve 是一樣的思路。先判斷任務的大小是否在 THRESHOLD 之內。如果已經拆解到 THRESHOLD 內,那么進行計算。如果任務拆分還沒達到 THRESHOLD,那么繼續拆解任務。fork 操作會把當前任務放入線程池中來執行。最后再通過 join 取得執行結果做合并。 Client 代碼: ~~~java public class Client { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); ForkJoinTask<Integer> result = forkJoinPool.submit(new Task(1, 10000)); System.out.println("計算結果為"+result.get()); forkJoinPool.shutdown(); } } ~~~ 我們首先通過靜態方法 commonPool 聲明一個 ForkJoinPool。commonPool 創建的 ForkJoinPool 滿足絕大多數的應用場景。然后通過 submit 方法提交我們的 Task,計算 1-10000 的和。提交 Task 后,Task 中的 compute 方法最終會被調用,通過對任務的拆解,以及對任務計算結果的合并,匯總到此處的 Task 中。通過 Task 的 get 方法獲取計算結果。最后關閉線程池。 執行結果如下: ~~~ 計算結果為50005000 ~~~ ## 2、ForkJoinPool 原理介紹 ForkJoinPool 中的每個線程都維護自己的工作隊列。這是一個雙端隊列,既可以先進先出,也可以先進后出。簡單來說就是隊列兩端都可以做出隊操作。當每個線程產生新的任務時(比如說調用了 fork 操作),會被加入到隊尾。線程工作的時候會從自己維護的工作隊列的 top 做出隊操作(LIFO),取得任務來執行。線程還會去其它線程任務隊列竊取任務,此時是從其它隊列的 base 取得任務(FIFO)。如下圖所示: ![圖片描述](https://img.mukewang.com/5e01abfb0001f1d315260880.jpg) 下面簡單介紹幾個常用方法: 1、fork 方法中會判斷如果當前線程不是 ForkJoinWorkerThread,則把任務加入 submission queue。否則加入自己的工作隊列中。submission queue 沒有關聯的線程,是所有線程都可以執行的任務隊列。fork 代碼如下: ~~~java public final ForkJoinTask<V> fork() { Thread t; //判斷本線程是否為ForkJoinWorkerThread,是的話,加入到自己的workQueue中,否則調用externalPush if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } ~~~ 2、join 方法中,自己任務沒有執行完,則取的自己任務隊列中的任務執行。如果發現自己的任務已經沒有了,則會去竊取其它線程的任務來執行。Join 代碼如下: ~~~java public final V join() { int s; //取得doJoin后的狀態,位運算后判斷是否正常,不正常的話拋出異常。正常的話返回計算結果 if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } ~~~ 主要邏輯在 doJoin 中,代碼如下: ~~~java private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } ~~~ 如果當前線程不是 ForkJoinWorkerThread,則調用 externalAwaitDone。如果是 ForkJoinWorkerThread 那么先通過 tryUnpush 從自己的 workQueue 的 top 位置取得當前 task,然后調用 doExec 執行。這兩步成功的話返回執行結果 s,否則調用 awaitJoin。這個方法中判斷本任務是否執行完成,完成直接返回,否則會嘗試竊執行取別的線程的任務。 3、submit 方法中,會把任務 push 到 submission queue。 ForkJoinPool 通過任務竊取,使得任務的執行更為高效。 ## 3、總結 ForkJoinPool 為我們拆分大任務再匯總小任務計算結果提供了很好的支持。它很適合執行計算密集型的任務。但是如果你的任務拆分邏輯比計算邏輯還要復雜,ForkJoinPool 并不能為你帶來性能的提升,反而會起到負面作用。因此需要結合自己的場景來選擇使用。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看