## 36 為多線程們安排一位經理—Master/Slave模式詳解
> 沒有引發任何行動的思想都不是思想,而是夢想。
> —— 馬丁
> 前文我們講過 ForkJoinPool 是分而治之的思想。今天我們將要學習的 Master/Slave 也是同樣的思想。其中 Master 負責承接一個大的任務,然后它會根據一定策略把大任務拆散為若干個小任務,然后隨機分發給一組 Slave。每個 Slave 完成任務后上報自己的任務完成情況。當所有 Slave 都完成了自己的任務時,Master 也就完成了自己的任務。Master 就像是 Slave 的經理,把自己的任務分發下去,而 Slave 則在完成工作后向它匯報。
## 1、Master/Slave 模式設計
### 1.1 Master 設計
在 Master/Slave 模式中,一個 Master 持有一組 Slave 的引用。Master 對外暴露一個承接任務的方法 startTask。這是 Master 的主要方法,在內部做了如下事情:
1. 創建 slave
由于創建 Slave 線程并啟動的操作比較重,所以放到提交任務的時候才真正去做;
2. 分發任務
把 Task 進行拆分,然后分發給每個 Slave;
3. 等待處理結果
輪循檢查任務是否全部完成,全部完成結束輪循;
4. 返回處理結果
返回任務執行結果。
可以看到這四個方法邏輯十分的清晰。
### 1.2 Slave 設計
下面我們再看看 Slave 的設計:
Slave 繼承自 Thread。內部通過阻塞隊列 BlockingQueue 保存 Task。這樣在取任務時候如果已經沒有,則會阻塞等待。它有一個 submitTask 用來提交子任務,這個方法在 Master 分發任務時會被調用。此外還有 run 方法從 BlockingQueue 中取得任務執行。執行結束后通知 Master。
以上的設計并不是固定的模式。但 Master 接收任務,分割任務,派發任務這些功能是要有的,此外 Master 要有能力知道所有子任務都被執行完畢。而 Slave 則需要不斷承接子任務,并且執行。執行完畢能夠把執行結果回寫給 Master。設計如下圖:

其實說這么多,不如直接看代碼。下面我們就通過一個小例子,來感受一下 Master/Slave 模式。
## 2、Master/Slave 代碼示例
### 2.1 Client 代碼
不知道你是否還記得本專欄開始幾節反復用來舉例的單詞抄寫的需求。本節是正文最后一篇,正好我們回到最初的例子,用 Master/Slave 方式來實現它。我們這次先看 Client 的代碼:
~~~java
public class Client {
public static void main(String[] args) throws InterruptedException {
Task task = new Task(123,"internationalization");
Master master = new Master();
master.startTask(task);
master.printResult();
}
}
~~~
特別的簡單,創建一個單詞抄寫的 Task,然后通過 Master 來執行,最后打印執行結果。
### 2.2 Task 代碼
Task 代碼如下,省略了 get 方法 :
~~~java
public class Task {
//要抄寫的次數
private int copyCount;
//抄寫的序號開始
private int from;
//抄寫的序號結束
private int to;
//要抄寫的單詞
private String word;
public Task(int copyCount, int from, int to, String word) {
this.copyCount = copyCount;
this.word = word;
this.from = from;
this.to = to;
}
public Task(int copyCount, String word) {
this.copyCount = copyCount;
this.word = word;
this.from = 1;
this.to = copyCount;
}
}
~~~
接下來我們看看 Master 代碼。
### 2.3 Master 代碼
我們先來看看 Master 有哪些屬性:
~~~java
//保存干活的Slave線程
private List<Slave> slaves;
//slave的數量
private static final int SLAVES_COUNT = 8;
//子任務拆分的力度
private static final int SUB_TASK_SIZE = 4;
//完成的任務數量。各個Slave線程都會更新此數量,所以使用Atomic變量
private AtomicInteger finishedTaskCount = new AtomicInteger(0);
//執行結果,key為線程名字,value為此線程完成的數量
private ConcurrentHashMap<String, Integer> results;
~~~
可以看到 Master 持有一組 slave 線程,用來為它干活。我們的任務是單詞抄寫,每個子任務由 SUB\_TASK\_SIZE 來控制單個小任務的抄寫次數。子線程抄寫完成后會更新 finishedTaskCount 和 results 做任務完成記錄。
Master 對外提供了如下方法:
~~~java
//主方法,用于執行任務
public ConcurrentHashMap<String, Integer> startTask(Task task)
//子方法完成后向Master提交完成記錄
public void subTaskFinished(String slaveName,int finishedSubTaskCount)
//打印執行結果
public void printResult()
~~~
這三個方法里最重要的就是 startTask,Master 主要的執行邏輯都在里面,代碼如下:
~~~java
public ConcurrentHashMap<String, Integer> startTask(Task task) throws InterruptedException {
// 1 創建slave
createSlaves(this);
// 2 分發任務
splitAndAssignTask(task);
// 3 等待結果處理
checkTaskFinished(task);
// 4 返回處理結果
return results;
}
~~~
startTask 內部主要調用三個方法,最后返回執行結果。由于創建線程成本高,所以在構造 Master 時并沒有創建 Slave,而是延遲到 startTask 的時候來創建。splitAndAssignTask 做的事情就是把大的 task 按照拆分邏輯拆開,分發給 slave 去執行。checkTaskFinished 會輪循檢查 task 的執行情況,當全部完成時,執行下面的 return 語句。這幾個方法都很重要,接下來我們一個個看。
#### 2.3.1 createSlaves 方法
~~~java
private void createSlaves(Master master) {
if(slaves.size()==0){
IntStream.range(0, this.SLAVES_COUNT).forEach(count ->
slaves.add(new Slave("slave " + count, master))
);
slaves.forEach(slave -> {
slave.start();
});
}
}
~~~
這個方法比較簡單,就是創建 SLAVES\_COUNT 個 slave,然后啟動起來。
#### 2.3.2 splitAndAssignTask 方法
~~~java
private void splitAndAssignTask(Task task) throws InterruptedException {
int count = task.getCopyCount();
int start = 1;
List<Task> subTasks = new ArrayList<>();
//拆分task
while (start <= count) {
int end = count + 1;
if (start + SUB_TASK_SIZE <= count) {
end = start + SUB_TASK_SIZE;
}
subTasks.add(new Task(end-start, start, end, task.getWord()));
start = end;
}
//分發subTask
for (int i = 0; i < subTasks.size(); i++) {
int slaveIndex = i % SLAVES_COUNT;
slaves.get(slaveIndex).submitTask(subTasks.get(i));
}
}
~~~
這個方法做了兩件事情,一是把 task 拆分為多個 subTask。二是把 subTask 分發給 slave 去執行。subTask 中保存了要 copy 的數量,以及 copy 的 from 序號和 to 序號。當然還有要抄寫的單詞。
#### 2.3.3 checkTaskFinished
這個方法用來檢查 task 是否全部執行完成。
~~~java
private void checkTaskFinished(Task task) throws InterruptedException {
while (true) {
if (task.getCopyCount() == finishedTaskCount.get()) {
finished();
break;
}
TimeUnit.MILLISECONDS.sleep(200);
}
}
~~~
方法中使用的輪循的方式來檢查 task 的 copy 總數和已完成數量 finishedTaskCount 是否一致,如果一致則說明 task 已經全部完成,那么調用 finished 方法工作做收尾,跳出循環。
#### 2.3.4 subTaskFinished
Master 除了這幾個方法還有一個方法用于子線程提交執行結果。代碼如下:
~~~java
public void subTaskFinished(String slaveName,int finishedSubTaskCount) {
Integer count = results.get(slaveName);
if(count==null){
results.put(slaveName,finishedSubTaskCount);
}else{
results.put(slaveName,count+finishedSubTaskCount);
}
finishedTaskCount.getAndAdd(finishedSubTaskCount);
}
~~~
首先把執行結果放入 results,如果已經存在,則進行累計。此外更新 finishedTaskCount。
Master 的主要方法都已經介紹完畢。下面我們來看看 Slave。
### 2.4 Slave 代碼
Slave 是一個工作的線程,它繼承自 Thread 類,
~~~java
public class Slave extends Thread
~~~
我們先看看 Slave 的屬性:
~~~java
//slave的線程名字
private String name;
//持有master引用,因為需要向master提交執行結果
private Master master;
//阻塞隊列來保存task
private BlockingQueue<Task> tasks;
~~~
slave 中提供兩個方法,一個是提交 task 的方法 submitTask,代碼如下:
~~~java
public void submitTask(Task task) throws InterruptedException {
tasks.put(task);
}
~~~
代碼很簡單,只是向阻塞隊列中放入 task。
Slave 執行 task 的邏輯在 run 方法中,Slave 繼承自 Thread,當他啟動后,run 方法就會被調用。代碼如下:
~~~java
@Override
public void run() {
try {
while (true) {
Task task = tasks.take();
IntStream.range(task.getFrom(), task.getTo()).forEach(
count -> System.out.println(String.format("線程%s第%d抄寫單詞%s", name, count, task.getWord()))
);
master.subTaskFinished(name, task.getCopyCount());
}
} catch (InterruptedException e) {
System.out.println(String.format("線程%s被打斷", name));
}
}
~~~
這段代碼不斷的從阻塞隊列中 take 出 task。如果沒有 task,就會阻塞在此。然后根據 task 內容進行輸出。執行完成后調用 master 的 subTaskFinished 方法把自己的執行結果提交給 master。如果阻塞的時候被打斷,則打印出日志。
## 3、執行結果分析
在 Client 的 main 方法中我們聲明了一個 task = new Task (123,“internationalization”),抄寫 internationalization 單詞 123 次。運行后輸出如下:
~~~
線程slave 1第5抄寫單詞internationalization
線程slave 5第21抄寫單詞internationalization
線程slave 4第17抄寫單詞internationalization
線程slave 2第9抄寫單詞internationalization
線程slave 5第22抄寫單詞internationalization
線程slave 3第13抄寫單詞internationalization
線程slave 0第1抄寫單詞internationalization
…………………
線程slave 2第107抄寫單詞internationalization
線程slave 0第100抄寫單詞internationalization
線程slave 2第108抄寫單詞internationalization
任務全部完成!
線程slave 4被打斷
線程slave 0被打斷
線程slave 6被打斷
線程slave 1被打斷
線程slave 7被打斷
線程slave 3被打斷
線程slave 5被打斷
線程slave 2被打斷
線程slave 0,完成了16次抄寫
線程slave 7,完成了12次抄寫
線程slave 5,完成了16次抄寫
線程slave 6,完成了15次抄寫
線程slave 3,完成了16次抄寫
線程slave 4,完成了16次抄寫
線程slave 1,完成了16次抄寫
線程slave 2,完成了16次抄寫
~~~
中間省略了一些輸出。可以看到所有任務完成后 slave 線程都被打斷。最后結果輸出了每個線程抄寫的次數,相加總和為 123。我把上面的 slave 打印日志做了統計,也是打印了 123 條。完全符合我們的預期。
## 4、總結
Master/Slave 模式是常用的多線程設計模式。一般用于大任務的拆分和分發。Master 作為門面對外暴露任務執行的接口,內部則是分發給多個 Slave 線程完成。這一切對于調用者來說是透明的。Master/Slave 模式關鍵點在于任務的分發和結果的匯總。它的實現方式很靈活,本文只是一種方式,也可以通過線程池來實現。子任務的計算結果也可以使用 Future。此外,分布式系統也有 Master/slave 的設計模式,可以借助 ZooKeeper 來實現。在 Akka 中使用 Actor 也能實現 Master/Slave 模式。實際使用中可以根據業務需求來自己實現。我們只需要掌握模式的核心思想,而不用拘泥于某一種具體的實現方式。
### 附完成代碼
Master 代碼:
~~~java
public class Master {
private List<Slave> slaves;
private static final int SLAVES_COUNT = 8;
private static final int SUB_TASK_SIZE = 4;
private AtomicInteger finishedTaskCount = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> results;
public Master() {
results = new ConcurrentHashMap<>();
slaves = new ArrayList<>();
}
public ConcurrentHashMap<String, Integer> startTask(Task task) throws InterruptedException {
// 1 創建slave
createSlaves(this);
// 2 分發任務
splitAndAssignTask(task);
// 3 等待結果處理
checkTaskFinished(task);
// 4 返回處理結果
return results;
}
private void createSlaves(Master master) {
if (slaves.size() == 0) {
IntStream.range(0, this.SLAVES_COUNT).forEach(count ->
slaves.add(new Slave("slave " + count, master))
);
slaves.forEach(slave -> {
slave.start();
});
}
}
private void splitAndAssignTask(Task task) throws InterruptedException {
int count = task.getCopyCount();
int start = 1;
List<Task> subTasks = new ArrayList<>();
while (start <= count) {
int end = count + 1;
if (start + SUB_TASK_SIZE <= count) {
end = start + SUB_TASK_SIZE;
}
subTasks.add(new Task(end - start, start, end, task.getWord()));
start = end;
}
for (int i = 0; i < subTasks.size(); i++) {
int slaveIndex = i % SLAVES_COUNT;
slaves.get(slaveIndex).submitTask(subTasks.get(i));
}
}
public void subTaskFinished(String slaveName, int finishedSubTaskCount) {
Integer count = results.get(slaveName);
if (count == null) {
results.put(slaveName, finishedSubTaskCount);
} else {
results.put(slaveName, count + finishedSubTaskCount);
}
finishedTaskCount.getAndAdd(finishedSubTaskCount);
}
private void checkTaskFinished(Task task) throws InterruptedException {
while (true) {
if (task.getCopyCount() == finishedTaskCount.get()) {
finished();
break;
}
TimeUnit.MILLISECONDS.sleep(200);
}
}
private void finished() {
System.out.println("任務全部完成!");
slaves.forEach(slave -> slave.interrupt());
slaves.clear();
}
public void printResult() {
results.forEach((key, value) ->
System.out.println(String.format("線程%s,完成了%d次抄寫", key, value)));
}
}
~~~
Slave 代碼:
~~~java
public class Slave extends Thread {
private String name;
private Master master;
private BlockingQueue<Task> tasks;
public Slave(String name, Master master) {
this.name = name;
this.master = master;
this.tasks = new ArrayBlockingQueue<Task>(100);
}
public void submitTask(Task task) throws InterruptedException {
tasks.put(task);
}
@Override
public void run() {
try {
while (true) {
Task task = tasks.take();
IntStream.range(task.getFrom(), task.getTo()).forEach(
count -> System.out.println(String.format("線程%s第%d抄寫單詞%s", name, count, task.getWord()))
);
master.subTaskFinished(name, task.getCopyCount());
}
} catch (InterruptedException e) {
System.out.println(String.format("線程%s被打斷", name));
}
}
}
~~~
Task 代碼:
~~~java
public class Task {
private int copyCount;
private int from;
private int to;
private String word;
public Task(int copyCount, int from, int to, String word) {
this.copyCount = copyCount;
this.word = word;
this.from = from;
this.to = to;
}
public Task(int copyCount, String word) {
this.copyCount = copyCount;
this.word = word;
this.from = 1;
this.to = copyCount;
}
public int getCopyCount() {
return copyCount;
}
public String getWord() {
return word;
}
public int getFrom() {
return from;
}
public int getTo() {
return to;
}
}
~~~
Client 代碼:
~~~java
public class Client {
public static void main(String[] args) throws InterruptedException {
Task task = new Task(123,"internationalization");
Master master = new Master();
master.startTask(task);
master.printResult();
}
}
~~~
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35 拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語