## 35 拆分你的任務—學習使用Fork/Join框架
> 讀書給人以快樂、給人以光彩、給人以才干。
> ——培根
本節我們學習 Excutor 的另外一種實現 ForkJoinPool。顧名思義,ForkJoinPool 的核心功能有兩個。第一個是 Fork,拆解你的任務。第二個是 Join,合并任務的執行結果。這個場景很常見,比如我們要處理一批數據,由于數據間沒有依賴性,那么我們可以把這一批數據拆解為更小的批次,多線程并行處理。最后再合并處理的結果。 Fork/Join 的核心思想就是分而治之。
## 1、ForkJoinPool 介紹
ForkJoinPool 自 Java 7 引入。它和 ThreadPoolExecutor 都繼承自 AbstractExecutorService,實現了 ExecutorService 和 Executor 接口。ForkJoinPool 用來把大任務切分為小任務,如果切分完小任務還不夠小(由你設置的閾值決定),那么就繼續向下切分。經過切分后,最后的任務是金字塔形狀,計算完成后向上匯總。如下圖:

ForkJoinPool 處理任務的核心思想可以用如下偽代碼表示:
~~~java
Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
~~~
如果一個任務足夠小,那么執行任務邏輯。如果不夠小,拆分為兩個獨立的子任務。子任務執行后, 取得兩個子任務的執行結果進行合并。
ForkJoinPool 通過 submit 執行 ForkJoinTask 類型的任務。ForkJoinTask 是抽象類,有著不同的子類實現。比較常用的是如下兩種:
1、RecursiveAction,沒有返回值;
2、RecurisiveTask,有返回值。
此外 submit 方法還可以執行 Callable 和 Runnable 的接口實現。
ForkJoinTask 就是我們為代碼中的 problem。我來舉個例子看具體如何使用。假如讓你計算 1-10000 的和,
我們可以把任務拆解為 100 個,每個任務計算 100 個數字之和。代碼如下.
Task 代碼;
~~~java
public class Task extends RecursiveTask<Integer> {
private static final int THRESHOLD = 100;
private int from;
private int to;
public Task(int from, int to) {
super();
this.from = from;
this.to = to;
}
@Override
protected Integer compute() {
if (THRESHOLD > (to - from)) {
return IntStream.range(from, to + 1)
.reduce((a, b) -> a + b)
.getAsInt();
} else {
int forkNumber = (from + to) / 2;
Task left = new Task(from, forkNumber);
Task right = new Task(forkNumber + 1, to);
left.fork();
right.fork();
return left.join() + right.join();
}
}
}
~~~
Task 繼承自 RecursiveTask。遞歸任務的大小力度為 100。重寫的 compute 方法和文章開頭的偽代碼 solve 是一樣的思路。先判斷任務的大小是否在 THRESHOLD 之內。如果已經拆解到 THRESHOLD 內,那么進行計算。如果任務拆分還沒達到 THRESHOLD,那么繼續拆解任務。fork 操作會把當前任務放入線程池中來執行。最后再通過 join 取得執行結果做合并。
Client 代碼:
~~~java
public class Client {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
ForkJoinTask<Integer> result = forkJoinPool.submit(new Task(1, 10000));
System.out.println("計算結果為"+result.get());
forkJoinPool.shutdown();
}
}
~~~
我們首先通過靜態方法 commonPool 聲明一個 ForkJoinPool。commonPool 創建的 ForkJoinPool 滿足絕大多數的應用場景。然后通過 submit 方法提交我們的 Task,計算 1-10000 的和。提交 Task 后,Task 中的 compute 方法最終會被調用,通過對任務的拆解,以及對任務計算結果的合并,匯總到此處的 Task 中。通過 Task 的 get 方法獲取計算結果。最后關閉線程池。
執行結果如下:
~~~
計算結果為50005000
~~~
## 2、ForkJoinPool 原理介紹
ForkJoinPool 中的每個線程都維護自己的工作隊列。這是一個雙端隊列,既可以先進先出,也可以先進后出。簡單來說就是隊列兩端都可以做出隊操作。當每個線程產生新的任務時(比如說調用了 fork 操作),會被加入到隊尾。線程工作的時候會從自己維護的工作隊列的 top 做出隊操作(LIFO),取得任務來執行。線程還會去其它線程任務隊列竊取任務,此時是從其它隊列的 base 取得任務(FIFO)。如下圖所示:

下面簡單介紹幾個常用方法:
1、fork 方法中會判斷如果當前線程不是 ForkJoinWorkerThread,則把任務加入 submission queue。否則加入自己的工作隊列中。submission queue 沒有關聯的線程,是所有線程都可以執行的任務隊列。fork 代碼如下:
~~~java
public final ForkJoinTask<V> fork() {
Thread t;
//判斷本線程是否為ForkJoinWorkerThread,是的話,加入到自己的workQueue中,否則調用externalPush
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
~~~
2、join 方法中,自己任務沒有執行完,則取的自己任務隊列中的任務執行。如果發現自己的任務已經沒有了,則會去竊取其它線程的任務來執行。Join 代碼如下:
~~~java
public final V join() {
int s;
//取得doJoin后的狀態,位運算后判斷是否正常,不正常的話拋出異常。正常的話返回計算結果
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
~~~
主要邏輯在 doJoin 中,代碼如下:
~~~java
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
~~~
如果當前線程不是 ForkJoinWorkerThread,則調用 externalAwaitDone。如果是 ForkJoinWorkerThread 那么先通過 tryUnpush 從自己的 workQueue 的 top 位置取得當前 task,然后調用 doExec 執行。這兩步成功的話返回執行結果 s,否則調用 awaitJoin。這個方法中判斷本任務是否執行完成,完成直接返回,否則會嘗試竊執行取別的線程的任務。
3、submit 方法中,會把任務 push 到 submission queue。
ForkJoinPool 通過任務竊取,使得任務的執行更為高效。
## 3、總結
ForkJoinPool 為我們拆分大任務再匯總小任務計算結果提供了很好的支持。它很適合執行計算密集型的任務。但是如果你的任務拆分邏輯比計算邏輯還要復雜,ForkJoinPool 并不能為你帶來性能的提升,反而會起到負面作用。因此需要結合自己的場景來選擇使用。
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35 拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語