# Java線程(三):線程協作-生產者/消費者問題
上一篇講述了線程的互斥(同步),但是在很多情況下,僅僅同步是不夠的,還需要線程與線程協作(通信),生產者/消費者問題是一個經典的線程同步以及通信的案例。該問題描述了兩個共享固定大小緩沖區的線程,即所謂的“生產者”和“消費者”在實際運行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩沖區中,然后重復此過程。與此同時,消費者也在緩沖區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩沖區滿時加入數據,消費者也不會在緩沖區中空時消耗數據。要解決該問題,就必須讓生產者在緩沖區滿時休眠(要么干脆就放棄數據),等到下次消費者消耗緩沖區中的數據的時候,生產者才能被喚醒,開始往緩沖區添加數據。同樣,也可以讓消費者在緩沖區空時進入休眠,等到生產者往緩沖區添加數據之后,再喚醒消費者,通常采用線程間通信的方法解決該問題。如果解決方法不夠完善,則容易出現死鎖的情況。出現死鎖時,兩個線程都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生產者和消費者的情形。本文講述了JDK5之前傳統線程的通信方式,更高級的通信方式可參見[Java線程(九):Condition-線程通信更高效的方式](http://blog.csdn.net/ghsau/article/details/7481142)和[Java線程(篇外篇):阻塞隊列BlockingQueue](http://blog.csdn.net/ghsau/article/details/8108292)。
假設有這樣一種情況,有一個盤子,盤子里只能放一個雞蛋,A線程專門往盤子里放雞蛋,如果盤子里有雞蛋,則一直等到盤子里沒雞蛋,B線程專門從盤子里取雞蛋,如果盤子里沒雞蛋,則一直等到盤子里有雞蛋。這里盤子是一個互斥區,每次放雞蛋是互斥的,每次取雞蛋也是互斥的,A線程放雞蛋,如果這時B線程要取雞蛋,由于A沒有釋放鎖,B線程處于等待狀態,進入阻塞隊列,放雞蛋之后,要通知B線程取雞蛋,B線程進入就緒隊列,反過來,B線程取雞蛋,如果A線程要放雞蛋,由于B線程沒有釋放鎖,A線程處于等待狀態,進入阻塞隊列,取雞蛋之后,要通知A線程放雞蛋,A線程進入就緒隊列。我們希望當盤子里有雞蛋時,A線程阻塞,B線程就緒,盤子里沒雞蛋時,A線程就緒,B線程阻塞,代碼如下:
~~~
import?java.util.ArrayList;??
import?java.util.List;??
/**?定義一個盤子類,可以放雞蛋和取雞蛋?*/??
public?class?Plate?{??
????/**?裝雞蛋的盤子?*/??
????List?eggs?=?new?ArrayList();??
????/**?取雞蛋?*/??
????public?synchronized?Object?getEgg()?{??
????????while?(eggs.size()?==?0)?{??
????????????try?{??
????????????????wait();??
????????????}?catch?(InterruptedException?e)?{??
????????????????e.printStackTrace();??
????????????}??
????????}??
????????Object?egg?=?eggs.get(0);??
????????eggs.clear();//?清空盤子??
????????notify();//?喚醒阻塞隊列的某線程到就緒隊列??
????????System.out.println("拿到雞蛋");??
????????return?egg;??
????}??
????/**?放雞蛋?*/??
????public?synchronized?void?putEgg(Object?egg)?{??
????????while?(eggs.size()?>?0)?{??
????????????try?{??
????????????????wait();??
????????????}?catch?(InterruptedException?e)?{??
????????????????e.printStackTrace();??
????????????}??
????????}??
????????eggs.add(egg);//?往盤子里放雞蛋??
????????notify();//?喚醒阻塞隊列的某線程到就緒隊列??
????????System.out.println("放入雞蛋");??
????}??
????static?class?AddThread?implements?Runnable??{??
????????private?Plate?plate;??
????????private?Object?egg?=?new?Object();??
????????public?AddThread(Plate?plate)?{??
????????????this.plate?=?plate;??
????????}??
????????public?void?run()?{??
????????????plate.putEgg(egg);??
????????}??
????}??
????static?class?GetThread?implements?Runnable??{??
????????private?Plate?plate;??
????????public?GetThread(Plate?plate)?{??
????????????this.plate?=?plate;??
????????}??
????????public?void?run()?{??
????????????plate.getEgg();??
????????}??
????}??
????public?static?void?main(String?args[])?{??
????????Plate?plate?=?new?Plate();??
????????for(int?i?=?0;?i?10;?i++)?{??
????????????new?Thread(new?AddThread(plate)).start();??
????????????new?Thread(new?GetThread(plate)).start();??
????????}??
????}??
}??
~~~
輸出結果:
~~~
放入雞蛋??
拿到雞蛋??
放入雞蛋??
拿到雞蛋??
放入雞蛋??
拿到雞蛋??
放入雞蛋??
拿到雞蛋??
放入雞蛋??
拿到雞蛋??
放入雞蛋??
拿到雞蛋??
放入雞蛋??
拿到雞蛋??
放入雞蛋??
拿到雞蛋??
放入雞蛋??
拿到雞蛋??
放入雞蛋??
拿到雞蛋??
~~~
程序開始,A線程判斷盤子是否為空,放入一個雞蛋,并且喚醒在阻塞隊列的一個線程,阻塞隊列為空;假設CPU又調度了一個A線程,盤子非空,執行等待,這個A線程進入阻塞隊列;然后一個B線程執行,盤子非空,取走雞蛋,并喚醒阻塞隊列的A線程,A線程進入就緒隊列,此時就緒隊列就一個A線程,馬上執行,放入雞蛋;如果再來A線程重復第一步,在來B線程重復第二步,整個過程就是生產者(A線程)生產雞蛋,消費者(B線程)消費雞蛋。
前段時間看了張孝祥老師線程的視頻,講述了一個其學員的面試題,也是線程通信的,在此也分享一下。
題目:子線程循環10次,主線程循環100次,如此循環100次,好像是空中網的筆試題。
~~~
public?class?ThreadTest2?{??
????public?static?void?main(String[]?args)?{??
????????final?Business?business?=?new?Business();??
????????new?Thread(new?Runnable()?{??
????????????@Override??
????????????public?void?run()?{??
????????????????threadExecute(business,?"sub");??
????????????}??
????????}).start();??
????????threadExecute(business,?"main");??
????}?????
????public?static?void?threadExecute(Business?business,?String?threadType)?{??
????????for(int?i?=?0;?i?100;?i++)?{??
????????????try?{??
????????????????if("main".equals(threadType))?{??
????????????????????business.main(i);??
????????????????}?else?{??
????????????????????business.sub(i);??
????????????????}??
????????????}?catch?(InterruptedException?e)?{??
????????????????e.printStackTrace();??
????????????}??
????????}??
????}??
}??
class?Business?{??
????private?boolean?bool?=?true;??
????public?synchronized?void?main(int?loop)?throws?InterruptedException?{??
????????while(bool)?{??
????????????this.wait();??
????????}??
????????for(int?i?=?0;?i?100;?i++)?{??
????????????System.out.println("main?thread?seq?of?"?+?i?+?",?loop?of?"?+?loop);??
????????}??
????????bool?=?true;??
????????this.notify();??
????}?????
????public?synchronized?void?sub(int?loop)?throws?InterruptedException?{??
????????while(!bool)?{??
????????????this.wait();??
????????}??
????????for(int?i?=?0;?i?10;?i++)?{??
????????????System.out.println("sub?thread?seq?of?"?+?i?+?",?loop?of?"?+?loop);??
????????}??
????????bool?=?false;??
????????this.notify();??
????}??
}??
~~~
大家注意到沒有,在調用wait方法時,都是用while判斷條件的,而不是if,在wait方法說明中,也推薦使用while,因為在某些特定的情況下,線程有可能被假喚醒,使用while會循環檢測更穩妥。wait和notify方法必須工作于synchronized內部,且這兩個方法只能由鎖對象來調用。另附這兩種方法的JavaDoc說明:
### notify
~~~
public final void notify()
~~~
喚醒在此對象監視器上等待的單個線程。如果所有線程都在此對象上等待,則會選擇喚醒其中一個線程。選擇是任意性的,并在對實現做出決定時發生。線程通過調用其中一個`wait`?方法,在對象的監視器上等待。
直到當前的線程放棄此對象上的鎖定,才能繼續執行被喚醒的線程。被喚醒的線程將以常規方式與在該對象上主動同步的其他所有線程進行競爭;例如,喚醒的線程在作為鎖定此對象的下一個線程方面沒有可靠的特權或劣勢。
此方法只應由作為此對象監視器的所有者的線程來調用。通過以下三種方法之一,線程可以成為此對象監視器的所有者:
* 通過執行此對象的同步 (Sychronized) 實例方法。
* 通過執行在此對象上進行同步的?`synchronized`?語句的正文。
* 對于?`Class`?類型的對象,可以通過執行該類的同步靜態方法。
一次只能有一個線程擁有對象的監視器。
**拋出:**
`[IllegalMonitorStateException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類")`?- 如果當前的線程不是此對象監視器的所有者。
### notifyAll
~~~
public final void notifyAll()
~~~
喚醒在此對象監視器上等待的所有線程。線程通過調用其中一個?`wait`?方法,在對象的監視器上等待。
直到當前的線程放棄此對象上的鎖定,才能繼續執行被喚醒的線程。被喚醒的線程將以常規方式與在該對象上主動同步的其他所有線程進行競爭;例如,喚醒的線程在作為鎖定此對象的下一個線程方面沒有可靠的特權或劣勢。
此方法只應由作為此對象監視器的所有者的線程來調用。請參閱?`notify`?方法,了解線程能夠成為監視器所有者的方法的描述。
**拋出:**
[IllegalMonitorStateException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類") 如果當前的線程不是此對象監視器的所有者。
### wait
~~~
public final void wait(long?timeout)
throws InterruptedException
~~~
導致當前的線程等待,直到其他線程調用此對象的?[`notify()`](http://blog.csdn.net/ghsau/article/details/7433673)?方法或[`notifyAll()`](http://blog.csdn.net/ghsau/article/details/7433673)?方法,或者超過指定的時間量。
當前的線程必須擁有此對象監視器。
此方法導致當前線程(稱之為?T)將其自身放置在對象的等待集中,然后放棄此對象上的所有同步要求。出于線程調度目的,線程?T?被禁用,且處于休眠狀態,直到發生以下四種情況之一:
* 其他某個線程調用此對象的?notify?方法,并且線程?T?碰巧被任選為被喚醒的線程。
* 其他某個線程調用此對象的?notifyAll?方法。
* 其他某個線程[`中斷`](http://blog.csdn.net/ghsau/article/details/7433673)線程?T。
* 已經到達指定的實際時間。但是,如果?timeout?為零,則不考慮實際時間,該線程將一直等待,直到獲得通知。
然后,從對象的等待集中刪除線程?T,并重新進行線程調度。然后,該線程以常規方式與其他線程競爭,以獲得在該對象上同步的權利;一旦獲得對該對象的控制權,該對象上的所有其同步聲明都將被還原到以前的狀態 - 這就是調用wait?方法時的情況。然后,線程T?從wait方法的調用中返回。所以,從wait?方法返回時,該對象和線程T?的同步狀態與調用wait?方法時的情況完全相同。
在沒有被通知、中斷或超時的情況下,線程還可以喚醒一個所謂的*虛假喚醒*?(spurious wakeup)。雖然這種情況在實踐中很少發生,但是應用程序必須通過以下方式防止其發生,即對應該導致該線程被提醒的條件進行測試,如果不滿足該條件,則繼續等待。換句話說,等待應總是發生在循環中,如下面的示例:
~~~
synchronized (obj) {
while (<condition does not hold>)
obj.wait(timeout);
... // Perform action appropriate to condition
}
~~~
(有關這一主題的更多信息,請參閱 Doug Lea 撰寫的《Concurrent Programming in Java (Second Edition)》(Addison-Wesley, 2000) 中的第 3.2.3 節或 Joshua Bloch 撰寫的《Effective Java Programming Language Guide》(Addison-Wesley, 2001) 中的第 50 項。
如果當前線程在等待時被其他線程[`中斷`](http://blog.csdn.net/ghsau/article/details/7433673),則會拋出InterruptedException。在按上述形式恢復此對象的鎖定狀態時才會拋出此異常。
注意,由于?wait?方法將當前的線程放入了對象的等待集中,所以它只能解除此對象的鎖定;可以同步當前線程的任何其他對象在線程等待時仍處于鎖定狀態。
此方法只應由作為此對象監視器的所有者的線程來調用。請參閱?`notify`?方法,了解線程能夠成為監視器所有者的方法的描述。
**參數:**
`timeout`?- 要等待的最長時間(以毫秒為單位)。
**拋出:**
`[IllegalArgumentException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類")`?- 如果超時值為負。
`[IllegalMonitorStateException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類")`?- 如果當前的線程不是此對象監視器的所有者。
`[InterruptedException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類")`?- 如果在當前線程等待通知之前或者正在等待通知時,另一個線程中斷了當前線程。在拋出此異常時,當前線程的*中斷狀態*?被清除。
* * *
### wait
~~~
public final void wait(long?timeout,
int?nanos)
throws InterruptedException
~~~
導致當前的線程等待,直到其他線程調用此對象的?[`notify()`](http://blog.csdn.net/ghsau/article/details/7433673)?方法或[`notifyAll()`](http://blog.csdn.net/ghsau/article/details/7433673)?方法,或者其他某個線程中斷當前線程,或者已超過某個實際時間量。
此方法類似于一個參數的?`wait`?方法,但它允許更好地控制在放棄之前等待通知的時間量。用毫微秒度量的實際時間量可以通過以下公式計算出來:
> ~~~
> 1000000*timeout+nanos
> ~~~
在其他所有方面,此方法執行的操作與帶有一個參數的?[`wait(long)`](http://blog.csdn.net/ghsau/article/details/7433673)?方法相同。需要特別指出的是,wait(0, 0)?與wait(0)?相同。
當前的線程必須擁有此對象監視器。該線程發布對此監視器的所有權,并等待下面兩個條件之一發生:
* 其他線程通過調用?`notify`?方法,或?`notifyAll`?方法通知在此對象的監視器上等待的線程醒來。
* `timeout`?毫秒值與?`nanos`?毫微秒參數值之和指定的超時時間已用完。
然后,該線程等到重新獲得對監視器的所有權后才能繼續執行。
對于某一個參數的版本,實現中斷和虛假喚醒是有可能的,并且此方法應始終在循環中使用:
~~~
synchronized (obj) {
while (<condition does not hold>)
obj.wait(timeout, nanos);
... // Perform action appropriate to condition
}
~~~
此方法只應由作為此對象監視器的所有者的線程來調用。請參閱?`notify`?方法,了解線程能夠成為監視器所有者的方法的描述。
**參數:**
`timeout`?- 要等待的最長時間(以毫秒為單位)。
`nanos`?- 額外時間(以毫微秒為單位,范圍是 0-999999)。
**拋出:**
`[IllegalArgumentException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類")`?- 如果超時值是負數,或者毫微秒值不在 0-999999 范圍內。
`[IllegalMonitorStateException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類")`?- 如果當前線程不是此對象監視器的所有者。
`[InterruptedException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類")`?- 如果在當前線程等待通知之前或者正在等待通知時,其他線程中斷了當前線程。在拋出此異常時,當前線程的*中斷狀態*?被清除。
### wait
~~~
public final void wait()
throws InterruptedException
~~~
導致當前的線程等待,直到其他線程調用此對象的?[`notify()`](http://blog.csdn.net/ghsau/article/details/7433673)?方法或[`notifyAll()`](http://blog.csdn.net/ghsau/article/details/7433673)?方法。換句話說,此方法的行為就好像它僅執行wait(0)調用一樣。
當前的線程必須擁有此對象監視器。該線程發布對此監視器的所有權并等待,直到其他線程通過調用?`notify`?方法,或?`notifyAll`?方法通知在此對象的監視器上等待的線程醒來。然后該線程將等到重新獲得對監視器的所有權后才能繼續執行。
對于某一個參數的版本,實現中斷和虛假喚醒是可能的,而且此方法應始終在循環中使用:
~~~
synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // Perform action appropriate to condition
}
~~~
此方法只應由作為此對象監視器的所有者的線程來調用。請參閱?`notify`?方法,了解線程能夠成為監視器所有者的方法的描述。
**拋出:**
`[IllegalMonitorStateException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類")`?- 如果當前的線程不是此對象監視器的所有者。
`[InterruptedException](http://blog.csdn.net/ghsau/article/details/7433673 "java.lang 中的類")`?- 如果在當前線程等待通知之前或者正在等待通知時,另一個線程中斷了當前線程。在拋出此異常時,當前線程的*中斷狀態*?被清除。?
- 前言
- Java線程(一):線程安全與不安全
- Java線程(二):線程同步synchronized和volatile
- Java線程(三):線程協作-生產者/消費者問題
- Java線程(四):線程中斷、線程讓步、線程睡眠、線程合并
- Java線程(五):Timer和TimerTask
- Java線程(六):線程池
- Java線程(七):Callable和Future
- Java線程(八):鎖對象Lock-同步問題更完美的處理方式
- Java線程(九):Condition-線程通信更高效的方式
- Java線程(十):CAS
- Java線程(十一):Fork/Join-Java并行計算框架
- Java線程(篇外篇):阻塞隊列BlockingQueue
- Java線程(篇外篇):線程本地變量ThreadLocal
- Java線程(篇外篇):線程和鎖