## 9 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
```
不要問你的國家能夠為你做些什么,而要問你可以為國家做些什么。
——林肯
```
前面講了那么多,其實我們一直還沒有體會到多線程帶來好處,甚至還因為使用多線程引來了一些麻煩。不過好在上一節中,我們通過同步操作,解決了抄寫單詞程序的線程安全問題。在本節中,我們通過一個經典的多線程設計模式–生產者 / 消費者,來體驗多線程開發及其帶來的好處。
## 1\. 生產者 / 消費者介紹
生產者 / 消費者是一種經典的設計模式,被廣泛應用于軟件領域。生產者 / 消費者設計模式能夠充分解藕,每一方只需要關注自己的職責。生產者專注生產,消費者專注消費。雙方通過某種機制進行資源共享。舉個例子,就像男人負責賺錢養家,女人負責貌美如花。男人可以看作生產者,不停賺錢存入銀行,而老婆負責花錢買生活用品等。銀行賬戶就是共享金錢的機制。

## 2\. 需求分析
我們在寫代碼前,需要先把需求理解清楚。我們的需求是這樣的:
1. 有兩個角色,老師和學生。但每個角色可能有多個人。
2. 老師負責留抄寫單詞的作業,學生負責完成作業。一項作業是指抄寫指定單詞指定次數。
3. 每個學生領取一項作業,獨立完成。
4. 當沒有作業可領取的時候,學生等待。
5. 當有了新的作業可領取,學生被喚醒,繼續領取作業。
6. 當作業數量達到上限的時候,老師停止布置作業。
7. 當作業小于上限的時候,老師被喚醒,繼續布置作業。
這是一個典型的生產者 / 消費者的場景。老師負責生產作業,學生負責消化作業。當不滿足某些條件的時候兩種角色都可能休眠,而在特定條件下則會被喚醒。
## 3\. Teacher 類代碼
我們先看 Teacher 的全部代碼:
~~~java
public class Teacher extends Thread {
private String name;
private List<String> punishWords = Arrays.asList("internationalization", "hedgehog", "penicillin", "oasis", "nirvana", "miserable");
private LinkedList<Task> tasks;
private int MAX = 10;
public Teacher(String name, LinkedList<Task> tasks) {
//調用Thread構造方法,設置threadName
super(name);
this.name = name;
this.tasks = tasks;
}
public void arrangePunishment() throws InterruptedException {
String threadName = Thread.currentThread().getName();
while (true) {
synchronized (tasks) {
if (tasks.size() < MAX) {
Task task = new Task(new Random().nextInt(3) + 1, getPunishedWord());
tasks.addLast(task);
System.out.println(threadName + "留了作業,抄寫" + task.getWordToCopy() + " " + task.getLeftCopyCount() + "次");
tasks.notifyAll();
} else {
System.out.println(threadName+"開始等待");
tasks.wait();
System.out.println("teacher線程 " + threadName + "線程-" + name + "等待結束");
}
}
}
}
//重寫run方法,完成任務。
@Override
public void run() {
try {
arrangePunishment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private String getPunishedWord() {
return punishWords.get(new Random().nextInt(punishWords.size()));
}
}
~~~
我們看核心的 arrangePunishment () 方法,在這個方法中會進入自旋,當任務數量未達 MAX 值時,teacher 會生成新的 task 加入任務列表,并喚醒所有線程。這是因為可能有學生線程因為無 task 可做已經 wait,所以需要喚醒。而當任務數量超出 MAX,當前老師線程則會進入 wait set。
## 4\. Student 代碼
我們再看學生的全部代碼:
~~~java
public class Student extends Thread {
private String name;
private LinkedList<Task> tasks;
public Student(String name, LinkedList<Task> tasks) {
//調用Thread構造方法,設置threadName
super(name);
this.name = name;
this.tasks = tasks;
}
public void copyWord() throws InterruptedException {
String threadName = Thread.currentThread().getName();
while (true) {
Task task = null;
synchronized (tasks) {
if (tasks.size() > 0) {
task = tasks.removeFirst();
sleep(100);
tasks.notifyAll();
} else {
System.out.println(threadName+"開始等待");
tasks.wait();
System.out.println("學生線程 "+threadName + "線程-" + name + "等待結束");
}
}
if (task != null) {
for (int i = 1; i <= task.getLeftCopyCount(); i++) {
System.out.println(threadName + "線程-" + name + "抄寫" + task.getWordToCopy() + "。已經抄寫了" + i + "次");
}
}
}
}
//重寫run方法,完成任務。
@Override
public void run() {
try {
copyWord();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
~~~
我們也直接看核心方法 copyWord (),在這個方法同樣會進入 while 自旋,如果任務的數量大于 0,則會取出第一個任務,并從任務列表移除,同時會喚醒所有線程。這里 notifyAll 這是因為可能有 teacher 線程由于觸及 MAX 值而進入了 wait set。當此次消費完成后,任務列表中任務數量會小于 MAX 值,所以需要喚醒 teacher 線程繼續生成新的 task。當 task 數量為 0 時,學生線程則會進入等待,因為無 task 可做。
學生的同步代碼塊中讓線程 sleep 了 100 毫秒,這樣是為了讓 teacher 生產 task 的速度更快,以讓 task 數量能夠達到 MAX 值,觸發 teacher 線程的 wait 操作。以便我們看到 wait 和 notifyAll 的效果。
以上程序代碼邏輯,可參考下圖理解:

## 5\. 客戶端代碼
最后我們來看客戶端代碼:
~~~java
public class WaitNotifyClient {
public static void main(String[] args) {
LinkedList<Task> tasks = new LinkedList<>();
Student xiaoming = new Student("小明", tasks);
xiaoming.start();
Student xiaowang = new Student("小王", tasks);
xiaowang.start();
Teacher lilaoshi = new Teacher("李老師", tasks);
lilaoshi.start();
Teacher zhanglaoshi = new Teacher("張老師", tasks);
zhanglaoshi.start();
}
}
~~~
我們聲明了兩個學生線程寫作業,兩個老師線程留作業。
接下來我們運行客戶端代碼,看一下輸出。
我截取了部分輸出如下:
~~~
小明開始等待
小王開始等待
李老師留了作業,抄寫hedgehog 1次
李老師留了作業,抄寫nirvana 2次
李老師留了作業,抄寫miserable 2次
李老師留了作業,抄寫nirvana 2次
李老師留了作業,抄寫nirvana 3次
李老師留了作業,抄寫nirvana 2次
李老師留了作業,抄寫nirvana 3次
李老師留了作業,抄寫nirvana 3次
李老師留了作業,抄寫penicillin 3次
李老師留了作業,抄寫oasis 1次
李老師開始等待
學生線程 小王線程-小王等待結束
學生線程 小明線程-小明等待結束
小王線程-小王抄寫hedgehog。已經抄寫了1次
小明線程-小明抄寫nirvana。已經抄寫了1次
張老師留了作業,抄寫hedgehog 1次
張老師留了作業,抄寫miserable 3次
張老師開始等待
~~~
整個過程如下:
1. 學生線程先啟動后,由于沒有作業,開始等待。
2. 李老師線程啟動后開始留作業,學生線程由于有了 task 被喚醒。
3. 而李老師線程由于 task 數量達到上限,開始等待。
4. 學生消費作業的同時,張老師線程也啟動完成,開始繼續留作業。
5. 此時作業數量又達 MAX 值。老師線程開始等待。
如果你本地運行了以上代碼,還能看到更多有規律的輸出,每當學生消費掉一個 task,那么就有一個老師的線程被喚醒。這里注意由于線程輸出時會有時間差,所以在順序上和我們想象的順序會有所區別,但這并不代表程序運行是錯誤的。
我們對以上代碼做一個總結。學生和老師線程都會操作任務列表 tasks,所以對 tasks 的操作需要加上同步保護。任務達到 MAX 上限時,老師 wait,當任務數量為 0 時,學生休息。無論學生和老師,在執行完自己正常生產和消費邏輯后都會執行 notifyAll,確保如果有 wait 的線程,能被喚醒。
## 6\. 使用多線程的好處
采用生產者 / 消費者的多線程方式帶來很多好處,我列了主要幾條如下:
1. \*\* 更貼近真實世界。\*\* 在真實世界里,每個人都像一個單獨的線程,在并行工作。
2. \*\* 解耦。\*\* 多線程之間不需要互相調用,并且不需要知道其他線程運行的情況。通過共享資源 tasks,讓不同線程維系在一起工作。
3. \*\* 緩沖。\*\*tasks 列表其實就是一個緩沖池,能夠緩解生產和消費的速度不一致。有了 task 列表,我們就不需要運行完一個 task,才能繼續生成下一個 task。本文的例子比較理想化,生成和消費 task 速度是一樣的。但實際情況可能并不是這樣,假如生產 task 速度開始比較快,那么會先把 task 列表裝滿。此時如果生產 task 速度慢下來了,消費者還是可以持續消費 task 列表中已有的 task,而不會停下來等待。這大大提高了程序的整體效率
## 7\. 本章總結
本節是第二章《如何編寫多線程》的最后一節。通過本章學習,我們了解了實現多線程的方式;學習了線程的狀態和 API;講解了線程間是如何通訊的。最后我們通過生產者和消費者的代碼實踐,對多線程開發有了更為直觀的認識。這一章我們重點圍繞著多線程的實現做講解。不過在這個過程中,我們也接觸到了線程安全問題。最后通過同步,我們保證了線程安全。其實在多線程開發的過程中,非常重要的一環就是確保線程安全,否則你的多線程程序是無法正常工作的。下一章我們將會講解多線程開發中可能遇到的各種各樣的問題以及問題產生的原因。只有解決了這些問題,多線程才能為我所用。否則,我們空握多線程這把利器,不但無法發揮出他的威力,還會誤傷自己。
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語