# Java線程(十一):Fork/Join-Java并行計算框架
并行計算在處處都有大數據的今天已經不是一個新鮮的詞匯了,現在已經有單機多核甚至多機集群并行計算,注意,這里說的是并行,而不是并發。嚴格的將,**并行是指系統內有多個任務同時執行**,而**并發是指系統內有多個任務同時存在**,不同的任務按時間分片的方式切換執行,由于切換的時間很短,給人的感覺好像是在同時執行。?
Java在JDK7之后加入了并行計算的框架Fork/Join,可以解決我們系統中大數據計算的性能問題。Fork/Join采用的是分治法,Fork是將一個大任務拆分成若干個子任務,子任務分別去計算,而Join是獲取到子任務的計算結果,然后合并,這個是遞歸的過程。子任務被分配到不同的核上執行時,效率最高。偽代碼如下:
~~~
Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
~~~
Fork/Join框架的核心類是[ForkJoinPool](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html),它能夠接收一個[ForkJoinTask](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinTask.html),并得到計算結果。ForkJoinTask有兩個子類,[RecursiveTask](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/RecursiveTask.html)(有返回值)和[RecursiveAction](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/RecursiveAction.html)(無返回結果),我們自己定義任務時,只需選擇這兩個類繼承即可。類圖如下:?
?
下面來看一個實例:計算一個超大數組所有元素的和。代碼如下:
~~~
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* @author: shuang.gao Date: 2015/7/14 Time: 8:16
*/
public class SumTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = -6196480027075657316L;
private static final int THRESHOLD = 500000;
private long[] array;
private int low;
private int high;
public SumTask(long[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
@Override
protected Integer compute() {
int sum = 0;
if (high - low <= THRESHOLD) {
// 小于閾值則直接計算
for (int i = low; i < high; i++) {
sum += array[i];
}
} else {
// 1\. 一個大任務分割成兩個子任務
int mid = (low + high) >>> 1;
SumTask left = new SumTask(array, low, mid);
SumTask right = new SumTask(array, mid + 1, high);
// 2\. 分別計算
left.fork();
right.fork();
// 3\. 合并結果
sum = left.join() + right.join();
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
long[] array = genArray(1000000);
System.out.println(Arrays.toString(array));
// 1\. 創建任務
SumTask sumTask = new SumTask(array, 0, array.length - 1);
long begin = System.currentTimeMillis();
// 2\. 創建線程池
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 3\. 提交任務到線程池
forkJoinPool.submit(sumTask);
// 4\. 獲取結果
Integer result = sumTask.get();
long end = System.currentTimeMillis();
System.out.println(String.format("結果 %s 耗時 %sms", result, end - begin));
}
private static long[] genArray(int size) {
long[] array = new long[size];
for (int i = 0; i < size; i++) {
array[i] = new Random().nextLong();
}
return array;
}
}
~~~
我們通過調整閾值(THRESHOLD),可以發現耗時是不一樣的。實際應用中,如果需要分割的任務大小是固定的,可以經過測試,得到最佳閾值;如果大小不是固定的,就需要設計一個可伸縮的算法,來動態計算出閾值。如果子任務很多,效率并不一定會高。?
未完待續。。。
- 前言
- Java線程(一):線程安全與不安全
- Java線程(二):線程同步synchronized和volatile
- Java線程(三):線程協作-生產者/消費者問題
- Java線程(四):線程中斷、線程讓步、線程睡眠、線程合并
- Java線程(五):Timer和TimerTask
- Java線程(六):線程池
- Java線程(七):Callable和Future
- Java線程(八):鎖對象Lock-同步問題更完美的處理方式
- Java線程(九):Condition-線程通信更高效的方式
- Java線程(十):CAS
- Java線程(十一):Fork/Join-Java并行計算框架
- Java線程(篇外篇):阻塞隊列BlockingQueue
- Java線程(篇外篇):線程本地變量ThreadLocal
- Java線程(篇外篇):線程和鎖