# 先說點別的,為什么要逐漸學會讀英文書籍
解釋一個名詞:上下文切換、Context switch
多任務系統中,上下文切換是指CPU的控制權由運行任務轉移到另外一個就緒任務時所發生的事件。
When one thread’s execution is suspended and swapped off the processor, and another thread is swapped onto the processor and its execution is resumed, this is called a context switch.
為什么讀英文版的計算機書籍,能理解的透徹和深刻。我們使用的語言,比如Java,是用英語開發的,其JDK中的類的命名、方法的命名,都是英文的,所以,當用英文解釋一個名詞、場景時,基于對這門編程語言的了解,我們馬上就理解了,是非常形象的,簡直就是圖解,而用中文解釋,雖然是我們的母語,但是對于計算機語言而言,卻是外語,卻是抽象的,反而不容易理解。
**推薦書籍:**Java Thread Programming ,但是要注意,這本書挺古老的,JDK1.1、1.2時代的產物,所以書中的思想OK,有些代碼例子,可能得不出想演示的結果。
# 前言
關于多線程的知識,有非常多的資料可以參考。這里稍微總結一下,以求加深記憶。
關于多線程在日常工作中的使用:對于大多數的日常應用系統,比如各種管理系統,可能根本不需要深入了解,僅僅知道Thread/Runnable就夠了;如果是需要很多計算任務的系統,比如推薦系統中各種中間數據的計算,對多線程的使用就較為頻繁,也需要進行一下稍微深入的研究。
**幾篇實戰分析線程問題的好文章:**
[怎樣分析 JAVA 的 Thread Dumps](http://tbstone.iteye.com/blog/2096423)
[各種 Java Thread State 第一分析法則](http://blog.csdn.net/wgw335363240/article/details/21373015)
[數據庫死鎖及解決死鎖問題](http://blog.eastmoney.com/fk/blog_120150478.html)
[全面解決五大數據庫死鎖問題](http://tech.ccidnet.com/zt/sisuo/)
**關于線程池的幾篇文章:**
[http://blog.csdn.net/wangpeng047/article/details/7748457](http://blog.csdn.net/wangpeng047/article/details/7748457)
[http://www.oschina.net/question/12_11255](http://www.oschina.net/question/12_11255)
[http://jamie-wang.iteye.com/blog/1554927](http://jamie-wang.iteye.com/blog/1554927)
# 基本知識
JVM最多支持多少個線程:[http://www.importnew.com/10780.html](http://www.importnew.com/10780.html)
### 線程安全
如果你的代碼所在的進程中有多個線程在同時運行,而這些線程可能會同時運行這段代碼。
如果每次運行結果和單線程運行的結果是一樣的,而且其他的變量的值也和預期的是一樣的,就是線程安全的。
或者這樣說:一個類或者程序所提供的接口對于線程來說是原子操作或者多個線程之間的切換不會導致該接口的執行結果存在二義性,也就是說我們不用考慮同步的問題。
或者我們這樣來簡單理解,同一段程序塊,從某一個時間點同時操作某個數據,對于這個數據來說,分叉了,則這就不是線程安全;如果對這段暑假保護起來,保證順序執行,則就是線程安全。
### 原子操作
原子操作(atomic operation):是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。
原子操作(atomic operation):如果一個操作所處的層(layer)的更高層不能發現其內部實現與結構,則這個操作就是原子的。
原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。
在多進程(線程)訪問資源時,原子操作能夠確保所有其他的進程(線程)都不在同一時間內訪問相同的資源。
原子操作時不需要synchronized,這是Java多線程編程的老生常談,但是,這是真的嗎?我們通過測試發現(return i),當對象處于不穩定狀態時,仍舊很有可能使用原子操作來訪問他們,所以,對于java中的多線程,要遵循兩個原則:
a、Brian Goetz的同步規則,如果你正在寫一個變量,它可能接下來將被另一個線程讀取,或者正在讀取一個上一次已經被另一個線程寫過的變量,那么你必須使用同步,并且,讀寫線程都必須用相同的監視器鎖同步;
b、Brain Goetz測試:如果你可以編寫用于現代微處理器的高性能JVM,那么就有資格去考慮是否可以避免使用同步
通常所說的原子操作包括對非long和double型的primitive進行賦值,以及返回這兩者之外的primitive。之所以要把它們排除在外是因為它們都比較大,而JVM的設計規范又沒有要求讀操作和賦值操作必須是原子操作(JVM可以試著去這么作,但并不保證)。
### 錯誤理解
~~~
import java.util.Hashtable;
class Test
{
public static void main(String[] args) throws Exception {
final Hashtable<String,Integer> h = new Hashtable<String,Integer>();
long l1 = System.currentTimeMillis();
for(int i=0;i<10000;i++) {
new Thread(new Runnable(){
@Override
public void run() {
h.put("test",1);
Integer i1 = h.get("test");
h.put("test",2);
Integer i2 = h.get("test");
if(i1 == i2) {
System.out.println(i1 + ":" + i2);
}
}
}).start();
}
long l2 = System.currentTimeMillis();
System.out.println((l2-l1)/1000);
}
}
~~~
有人覺得:既然Hashtable是線程安全的,那么以上代碼的run()方法中的代碼應該是線程安全的,這是錯誤理解。
線程安全的對象,指的是其內部操作是線程安全的,其外部操作還是需要自己來保證其同步的,針對以上代碼,在run方法的內部使用synchronized就可以了。
### 互斥鎖和自旋鎖
自旋鎖:不睡覺,循環等待獲取鎖的方式成為自旋鎖,在ConcurrentHashMap的實現中使用了自旋鎖,一般自旋鎖實現會有一個參數限定最多持續嘗試次數,超出后,自旋鎖放棄當前time slice,等下一次機會,自旋鎖比較適用于鎖使用者保持鎖時間比較短的情況。
正是由于自旋鎖使用者一般保持鎖時間非常短,因此選擇自旋而不是睡眠是非常必要的,自旋鎖的效率遠高于互斥鎖。
CAS樂觀鎖適用的場景:[http://www.tuicool.com/articles/zuui6z](http://www.tuicool.com/articles/zuui6z)
# ThreadLocal與synchronized
### 區別ThreadLocal 與 synchronized
ThreadLocal是一個線程隔離(或者說是線程安全)的變量存儲的管理實體(注意:不是存儲用的),它以Java類方式表現;?
synchronized是Java的一個保留字,只是一個代碼標識符,它依靠JVM的鎖機制來實現臨界區的函數、變量在CPU運行訪問中的原子性。?
兩者的性質、表現及設計初衷不同,因此沒有可比較性。
synchronized對塊使用,用的是Object對象鎖,對于方法使用,用的是this鎖,對于靜態方法使用,用的是Class對象的鎖,只有使用同一個鎖的代碼,才是同步的。
### 理解ThreadLocal中提到的變量副本
事實上,我們向ThreadLocal中set的變量不是由ThreadLocal來存儲的,而是Thread線程對象自身保存。
當用戶調用ThreadLocal對象的set(Object o)時,該方法則通過Thread.currentThread()獲取當前線程,將變量存入Thread中的一個Map內,而Map的Key就是當前的ThreadLocal實例。
Runnable與Thread
實現多線程,Runnable接口和Thread類是最常用的了,實現Runnable接口比繼承Thread類會更有優勢:
- 適合多個相同的程序代碼的線程去處理同一個資源
- 可以避免java中的單繼承的限制
- 增加程序的健壯性,代碼可以被多個線程共享,代碼和數據獨立。
# Java線程互斥和協作
阻塞指的是暫停一個線程的執行以等待某個條件發生(如某資源就緒)。Java 提供了大量方法來支持阻塞,下面讓對它們逐一分析。
1、sleep()方法:sleep()允許指定以毫秒為單位的一段時間作為參數,它使得線程在指定的時間內進入阻塞狀態,不能得到CPU 時間,指定的時間一過,線程重新進入可執行狀態。
典型地,sleep() 被用在等待某個資源就緒的情形:測試發現條件不滿足后,讓線程阻塞一段時間后重新測試,直到條件滿足為止。
2、(Java 5已經不推薦使用,易造成死鎖!!) suspend()和resume()方法:兩個方法配套使用,suspend()使得線程進入阻塞狀態,并且不會自動恢復,必須其對應的 resume() 被調用,才能使得線程重新進入可執行狀態。典型地,suspend() 和 resume() 被用在等待另一個線程產生的結果的情形:測試發現結果還沒有產生后,讓線程阻塞,另一個線程產生了結果后,調用resume()使其恢復。
stop()方法,原用于停止線程,也已經不推薦使用,因為stop時會解鎖,可能造成不可預料的后果;推薦設置一個flag標記變量,結合interrupt()方法來讓線程終止。
3.、yield() 方法:yield() 使得線程放棄當前分得的 CPU 時間,但是不使線程阻塞,即線程仍處于可執行狀態,隨時可能再次分得 CPU 時間。調用 yield() 的效果等價于調度程序認為該線程已執行了足夠的時間從而轉到另一個線程。
4.、wait() 和 notify() 方法:兩個方法配套使用,wait() 使得線程進入阻塞狀態,它有兩種形式,一種允許指定以毫秒為單位的一段時間作為參數,另一種沒有參數,前者當對應的 notify() 被調用或者超出指定時間時線程重新進入可執行狀態,后者則必須對應的 notify() 被調用。
2和4區別的核心在于,前面敘述的所有方法,阻塞時都不會釋放占用的鎖(如果占用了的話),而這一對方法則相反。上述的核心區別導致了一系列的細節上的區別。
首先,前面敘述的所有方法都隸屬于Thread 類,但是這一對卻直接隸屬于 Object 類,也就是說,所有對象都擁有這一對方法。因為這一對方法阻塞時要釋放占用的鎖,而鎖是任何對象都具有的,調用任意對象的 wait() 方法導致線程阻塞,并且該對象上的鎖被釋放。而調用任意對象的notify()方法則導致因調用該對象的 wait() 方法而阻塞的線程中隨機選擇的一個解除阻塞(但要等到獲得鎖后才真正可執行)。
其次,前面敘述的所有方法都可在任何位置調用,但是這一對方法卻必須在 synchronized 方法或塊中調用,理由也很簡單,只有在synchronized 方法或塊中當前線程才占有鎖,才有鎖可以釋放。同樣的道理,調用這一對方法的對象上的鎖必須為當前線程所擁有,這樣才有鎖可以釋放。因此,這一對方法調用必須放置在這樣的 synchronized 方法或塊中,該方法或塊的上鎖對象就是調用這一對方法的對象。若不滿足這一條件,則程序雖然仍能編譯,但在運行時會出現 IllegalMonitorStateException 異常。
wait() 和 notify() 方法的上述特性決定了它們經常和synchronized 方法或塊一起使用,將它們和操作系統的進程間通信機制作一個比較就會發現它們的相似性:synchronized方法或塊提供了類似于操作系統原語的功能,它們的結合用于解決各種復雜的線程間通信問題。
關于 wait() 和 notify() 方法最后再說明三點:
第一:調用 notify() 方法導致解除阻塞的線程是從因調用該對象的 wait() 方法而阻塞的線程中隨機選取的,我們無法預料哪一個線程將會被選擇,所以編程時要特別小心,避免因這種不確定性而產生問題。
第二:除了 notify(),還有一個方法 notifyAll() 也可起到類似作用,唯一的區別在于,調用 notifyAll() 方法將把因調用該對象的wait()方法而阻塞的所有線程一次性全部解除阻塞。當然,只有獲得鎖的那一個線程才能進入可執行狀態。
**第三:wait/notify,是在作為監視器鎖的對象上執行的,如果鎖是a,執行b的wait,則會報java.lang.IllegalMonitorStateException。**
**關于interrupted,很容易理解錯誤,看兩篇文章,如下:**
[http://www.blogjava.net/fhtdy2004/archive/2009/06/08/280728.html](http://www.blogjava.net/fhtdy2004/archive/2009/06/08/280728.html)
[http://www.blogjava.net/fhtdy2004/archive/2009/08/22/292181.html](http://www.blogjava.net/fhtdy2004/archive/2009/08/22/292181.html)
### 演示線程間協作機制,wait/notify/condition
代碼示例1(wait/notify):
必須說這個代碼是有缺陷的,會錯失信號,想一想問題出在哪里,應該怎么完善?
~~~
/*
* 線程之間協作問題:兩個線程,一個打印奇數,一個打印偶數
* 在調用wait方法時,都是用while判斷條件的,而不是if,
* 在wait方法說明中,也推薦使用while,因為在某些特定的情況下,線程有可能被假喚醒,使用while會循環檢測更穩妥
* */
public class OddAndEven {
static int[] num = new int[]{1,2,3,4,5,6,7,8,9,10};
static int index = 0;
public static void main(String[] args) throws Exception {
OddAndEven oae = new OddAndEven();
//這里如果起超過2個線程,則可能出現所有的線程都處于wait狀態的情況(網上很多代碼都有這個Bug,要注意,這其實是一個錯失信號產生的死鎖問題,如果用notifyAll就不會有這個問題)
new Thread(new ThreadOdd(oae)).start();
new Thread(new ThreadEven(oae)).start();
}
static class ThreadOdd implements Runnable {
private OddAndEven oae;
public ThreadOdd(OddAndEven oae) {
this.oae = oae;
}
@Override
public void run() {
while(index < 10) {
oae.odd();
}
}
}
static class ThreadEven implements Runnable {
private OddAndEven oae;
public ThreadEven(OddAndEven oae) {
this.oae = oae;
}
@Override
public void run() {
while(index < 10) {
oae.even();
}
}
}
//奇數
public synchronized void odd() {
while(index<10 && num[index] % 2 == 0) {
try {
wait(); //阻塞偶數
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(index >= 10) return;
System.out.println(Thread.currentThread().getName() + " 打印奇數 : " + num[index]);
index++;
notify(); //喚醒偶數線程
}
//偶數
public synchronized void even() {
while(index<10 && num[index] % 2 == 1) {
try {
wait(); //阻塞奇數
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(index >= 10) return;
System.out.println(Thread.currentThread().getName() + " 打印偶數 : " + num[index]);
index++;
notify(); //喚醒奇數線程
}
}
~~~
代碼示例2:
~~~
public class Test implements Runnable {
private String name;
private Object prev;
private Object self;
private Test(String name, Object prev, Object self) {
this.name = name;
this.prev = prev;
this.self = self;
}
@Override
public void run() {
int count = 10;
while (count > 0) {
synchronized (prev) {
synchronized (self) {
System.out.print(name);
count--;
try{
Thread.sleep(1);
}
catch (InterruptedException e){
e.printStackTrace();
}
self.notify();
}
try {
prev.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Object a = new Object();
Object b = new Object();
Object c = new Object();
Test pa = new Test("A", c, a);
Test pb = new Test("B", a, b);
Test pc = new Test("C", b, c);
new Thread(pa).start();
Thread.sleep(10);
new Thread(pb).start();
Thread.sleep(10);
new Thread(pc).start();
Thread.sleep(10);
}
}
~~~
代碼示例3(Condition實現生產者消費者模式):
~~~
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionTest test = new ConditionTest();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while(true){
lock.lock();
try {
while(queue.size() == 0){
try {
System.out.println("隊列空,等待數據");
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll(); //每次移走隊首元素
notFull.signal();
System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素");
} finally{
lock.unlock();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while(true){
lock.lock();
try {
while(queue.size() == queueSize){
try {
System.out.println("隊列滿,等待有空余空間");
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1); //每次插入一個元素
notEmpty.signal();
System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size()));
} finally{
lock.unlock();
}
}
}
}
}
~~~
代碼示例4(兩把鎖的生產者消費者模式):
~~~
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 程序是Catch住InterruptedException,還是走Thread.interrupted(),實際是不確定的
*/
public class Restaurant {
public Meal meal;
public ExecutorService exec = Executors.newCachedThreadPool();
public WaitPerson waitPerson = new WaitPerson(this);
public Chef chef = new Chef(this);
public Restaurant() {
exec.execute(waitPerson);
exec.execute(chef);
}
public static void main(String[] args) {
new Restaurant();
}
}
class Meal {
private final int orderNum;
public Meal(int orderNum) {this.orderNum = orderNum;}
public String toString() {return "Meal " + orderNum;}
}
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant restaurant) {this.restaurant = restaurant;}
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal == null) {
wait();
}
}
System.out.println("WaitPerson got " + restaurant.meal);
synchronized(restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll();
}
}
}
catch(InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " " + e.toString());
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant restaurant) {this.restaurant = restaurant;}
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
while(restaurant.meal != null) {
wait();
}
}
if(++count == 10) {
restaurant.exec.shutdownNow();
}
System.out.println("Order up!");
synchronized(restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
}
}
catch(InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " " + e.toString());
}
}
}
~~~
### 反面教材代碼演示:
~~~
class SuspendAndResume {
private final static Object object = new Object();
static class ThreadA extends Thread {
public void run() {
synchronized(object) {
System.out.println("start...");
Thread.currentThread().suspend();
System.out.println("thread end");
}
}
}
public static void main(String[] args) throws InterruptedException {
ThreadA t1 = new ThreadA();
ThreadA t2 = new ThreadA();
t1.start();
t2.start();
Thread.sleep(100);
System.out.println(t1.getState());
System.out.println(t2.getState());
t1.resume();
t2.resume();
}
}
~~~
程序輸出結果如下:
~~~
localhost:test puma$ sudo java SuspendAndResume
start...
RUNNABLE
BLOCKED
thread end
start...
~~~
關于suspend()/resume()這兩個方法,類似于wait()/notify(),但是它們不是等待和喚醒線程。suspend()后的線程處于RUNNING狀態,而不是WAITING狀態,但是線程本身在這里已經掛起了,線程本身餓狀態就開始對不上號了。
以上的例子解釋如下:
首先t1.start()/t2.start(),main睡sleep10秒,讓兩個子線程都進入運行的區域;
打印狀態,t1運行,t2被synchronized阻塞;
t1.resume(),此時t1打印thread end,馬上執行t2.resume(),此時由于t1的synchronized還沒來得及釋放鎖,所以這段代碼是在t2的synchronized外執行的,也就是在t2.suspend()之前執行的,所以是無效的;而當t2線程被掛起時,輸出start,但是由于t2.suspend()已經被執行完了,所以t2就會一直處于掛起狀態,一直持有鎖不釋放,這些信息的不一致就導致了各種資源無法釋放的問題。
對于這個程序,如果在t1.resume()和t2.resume()之間增加一個Thread.sleep(),可以看到又正常執行了。
總得來說,問題應當出在線程狀態對外看到的是RUNNING狀態,外部程序并不知道這個對象掛起了需要去做resume()操作。另外,它并不是基于對象來完成這個動作的,因此suspend()和wait()相關的順序性很難保證。所以suspend()和resume()不推薦使用了。
反過來想,這也更加說明了wait()和notify()為什么要基于對象(而不是線程本身)來做數據結構,因為要控制生產者和消費者之間的關系,它需要一個臨界區來控制它們之間的平衡。它不是隨意地在線程上做操作來控制資源的,而是由資源反過來控制線程狀態的。當然wait()和notify()并非不會導致死鎖,只是它們的死鎖通常是程序設計不當導致的,并且在通常情況下是可以通過優化來解決的。
### 同步隊列
wait和notify以一種非常低級的方式解決了任務互操作問題,即每次交互時都握手。在許多情況下,可以瞄向更高的抽象級別,使用同步隊列來解決任務協作問題,同步隊列在任何時刻都只允許一個操作插入或移除元素。
如果消費者任務試圖從隊列中獲取對象,而該隊列此時為空,那么這些隊列還可以掛起消費者任務,并且當有更多的元素可用時恢復消費者任務。阻塞隊列可以解決非常大量的問題,而其方式與wait和notify相比,則簡單并可靠的多。
代碼示例1(將多個LiftOff的執行串行化,消費者是LiftOffRunner,將每個LiftOff對象從BlockingQueue中推出并直接運行):
~~~
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class TestBlockingQueues {
public static void main(String[] args) {
test("LinkedBlockingQueue",new LinkedBlockingQueue<LiftOff>()); //Unlimited size
test("ArrayBlockingQueue",new ArrayBlockingQueue<LiftOff>(3)); //Fixed size
test("SynchronousQueue",new SynchronousQueue<LiftOff>()); //Size of 1
}
private static void test(String msg,BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for(int i = 0; i < 5; i++) {
runner.add(new LiftOff(i));
}
getKey("Press 'Enter' (" + msg + ")");
t.interrupt();
System.out.println("Finished " + msg + " test");
}
private static void getKey() {
try {
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void getKey(String message) {
System.out.println(message);
getKey();
}
}
class LiftOffRunner implements Runnable {
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue) {rockets = queue;}
public void add(LiftOff lo) {
try {
rockets.put(lo);
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");
}
}
@Override
public void run() {
try
{
while(!Thread.interrupted()) {
LiftOff rocket = rockets.take();
rocket.run();
}
} catch (InterruptedException e) {
System.out.println("Waking from take()");
}
System.out.print("Exiting LiftOffRunner");
}
}
class LiftOff {
private int num;
public LiftOff(int num) {this.num = num;}
public void run() {
System.out.println(Thread.currentThread().getName() + " " + num);
}
}
~~~
代碼示例2:
線程間通過BlockingQueue協作,3個任務,一個做面包,一個對面包抹黃油,一個對抹過黃油的面包抹果醬。整個代碼沒有顯示的使用同步,任務之間完成了很好地協作;因為同步由隊列(其內部是同步的)和系統的設計隱式的管理了。每片Toast在任何時刻都只有一個任務在操作。因為隊列的阻塞,使得處理過程將被自動的掛起和恢復。
我們可以看到通過使用BlockingQueue帶來的簡化十分明顯,在使用顯式的wait/notify時存在的類和類之間的耦合被消除了,因為每個類都只和他得BlockingQueue通信。
~~~
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue,butteredQueue));
exec.execute(new Jammer(butteredQueue,finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
class Toast {
public enum Status {DRY,BUTTERED,JAMMED};
private Status status = Status.DRY;
private final int id;
public Toast(int idn) {id = idn;}
public void butter() {status = Status.BUTTERED;}
public void jam() {status = Status.JAMMED;}
public Status getStatus() {return status;}
public int getId() {return id;}
public String toString() {return "Toast " + id + " : " + status;}
}
@SuppressWarnings("serial")
class ToastQueue extends LinkedBlockingQueue<Toast> {}
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) {toastQueue = tq;}
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MICROSECONDS.sleep(100 + rand.nextInt(500));
//Make toast
Toast t = new Toast(count++);
System.out.println(t);
//Insert into queue
toastQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Toaster interrupted");
}
System.out.println("Toaster off");
}
}
//Apply butter to toast
class Butterer implements Runnable {
private ToastQueue dryQueue,butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
public void run() {
try {
while(!Thread.interrupted()) {
//Blocks until next piece of toast is available
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butteredQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Butterer interrupted");
}
System.out.println("Butterer off");
}
}
//Apply jam to buttered toast
class Jammer implements Runnable {
private ToastQueue butteredQueue,finishedQueue;
public Jammer(ToastQueue buttered,ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
//Blocks until next piece of toast is available
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finishedQueue.put(t);;
}
} catch(InterruptedException e) {
System.out.println("Jammer interrupted");
}
System.out.println("Jammer off");
}
}
//Consume the toast
class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
//Blocks until next piece of toast is available
Toast t = finishedQueue.take();
//Verify that the toast is coming in order, and that all pieces are getting jammed
if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>>> Error : " + t);
System.exit(1);
} else {
System.out.println("Chomp! " + t);
}
}
} catch(InterruptedException e) {
System.out.println("Eater interrupted");
}
System.out.println("Eater off");
}
}
~~~
### 任務間使用管道進行輸入輸出
Java輸入輸出類庫中的PipedWriter(允許任務向管道寫),PipedReader(允許不同任務從同一個管道中讀取),這個模型可以看成“生產者-消費者”問題的變體,這里的管道就是一個封裝好的解決方案。
管道基本上是一個阻塞隊列,存在于多個引入BlockingQueue之前的Java版本中。
代碼示例1:
當Receiver調用read時,如果沒有更多地數據,管道將自動阻塞。
Sender和Receiver是在main中啟動的,即對象構造徹底完成之后。如果啟動了一個沒有徹底構造完畢的對象,在不同的平臺上,管道可能產生不一致的行為。相比較而言,BlockingQueue使用起來更加健壯而容易。
shutdownNow被調用時,可以看到PipedReader和普通IO之間最重要的差異,PipedReader是可以中斷的。
~~~
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() { return out; }
public void run() {
try {
while(true) {
for(char c = 'A'; c <= 'Z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(50));
}
}
} catch(IOException e) {
System.out.println(e + " Sender write exception");
} catch(InterruptedException e) {
System.out.println(e + " Sender sleep interrupted");
}
}
}
class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while(true) {
//Blocks until characters are there
System.out.println("Read: " + (char)in.read() + ", ");
}
} catch(IOException e) {
System.out.println(e + " Receiver read exception");
}
}
}
~~~
# 線程是JVM級別的
我們知道靜態變量是ClassLoader級別的,如果Web應用程序停止,這些靜態變量也會從JVM中清除。
但是線程則是JVM級別的,如果用戶在Web應用中啟動一個線程,這個線程的生命周期并不會和Web應用程序保持同步。
也就是說,即使停止了Web應用,這個線程依舊是活躍的。
正是因為這個很隱晦的問題,所以很多有經驗的開發者不太贊成在Web應用中私自啟動線程。
# 獲取異步線程的返回結果
通過java.util.concurrent包種的相關類,實現異步線程返回結果的獲取。代碼演示例子如下:
~~~
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureTest {
public static class TaskRunnable implements Runnable {
@Override
public void run() {
System.out.println("runnable");
}
}
public static class TaskCallable implements Callable<String> {
private String s;
public TaskCallable(String s) {
this.s = s;
}
@Override
public String call() throws Exception {
System.out.println("callable");
return s;
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
es.submit(new TaskRunnable());
System.out.println(i);
}
List<Future<String>> futList = new LinkedList<Future<String>>();
for (int i = 0; i < 100; i++) {
futList.add(es.submit(new TaskCallable(String.valueOf(i))));
}
for (Future<String> fut : futList) {
try {
System.out.println(fut.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
~~~
# ReentrantLock和synchronized兩種鎖定機制的對比
~~~
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
public static int I;
public static Object oLock = new Object();
public static Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
long b = System.currentTimeMillis();
List<Thread> t1= new ArrayList<Thread>();
for(int j=0;j<30;j++)
{
t1.add(new Thread(new R1()));
}
for(Thread t: t1) t.start();
for(Thread t:t1) {
t.join();
}
long e = System.currentTimeMillis();
System.out.println(Test.I + " | " + (e-b));
Test.I = 0;
b = System.currentTimeMillis();
List<Thread> t2= new ArrayList<Thread>();
for(int j=0;j<30;j++)
{
t2.add(new Thread(new R2()));
}
for(Thread t: t2) t.start();
for(Thread t:t2) {
t.join();
}
e = System.currentTimeMillis();
System.out.println(Test.I + " | " + (e-b));
}
}
class R1 implements Runnable {
@Override
public void run() {
for(int i=0;i<1000000;i++)
{
Test.lock.lock();
Test.I++;
Test.lock.unlock();
}
}
}
class R2 implements Runnable {
@Override
public void run() {
for(int i=0;i<1000000;i++)
{
synchronized("") {
Test.I++;
}
}
}
}
~~~
經過測試,輸出結果分別為:
Windows7(2核,2G內存),結果:3000000 ?| ?2890,和3000000 ?| ?8198,性能差距還是比較明顯;
Mac10.7(4核,4G內存),結果:3億次計算,ReentrantLock用8秒,synchronized反而只用4秒,結果反過來了;
RHEL6.1(24核,64G內存),結果:3億次計算,二者相差不多,都是20-50之間,但是ReentrantLock表現更好一些。
ReentrantLock利用的是“非阻塞同步算法與CAS(Compare and Swap)無鎖算法”,是CPU級別的,參考網址:
[http://www.cnblogs.com/Mainz/p/3556430.html](http://www.cnblogs.com/Mainz/p/3556430.html)
關于兩種鎖機制的更多比較,請參閱:[http://www.ibm.com/developerworks/cn/java/j-jtp10264/index.html](http://www.ibm.com/developerworks/cn/java/j-jtp10264/index.html)
# 關于線程安全的N種實現場景
(1)synchronized
(2)immutable對象是自動線程安全的
(3)volatile,被volatile修飾的屬性,對于讀操作是線程安全的
(4)ConcurrentHashMap之類,java.util.concurrent包中的一些并發操作類,是線程安全的,但是沒有使用synchronized關鍵字,實現巧妙,利用的基本特性是:volatile、Compare And Swap、分段,對于讀不加鎖(volatile保證線程安全),對于寫,對于相關的segment通過ReentrantLock加鎖
# Java線程死鎖
代碼示例:
~~~
public class Test {
public static void main(String[] args) {
Runnable t1 = new DeadLock(true);
Runnable t2 = new DeadLock(false);
new Thread(t1).start();
new Thread(t2).start();
}
}
class DeadLock implements Runnable {
private static Object lock1 = new Object();
private static Object lock2 = new Object();
private boolean flag;
public DeadLock(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
if(flag) {
synchronized(lock1) {
try {
Thread.sleep(1000); //保證晚于另一個線程鎖lock2,目的是產生死鎖
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized(lock2) {
System.out.println("flag=true:死鎖了,還能print的出來嗎?");
}
}
} else {
synchronized(lock2) {
try {
Thread.sleep(1000); //保證晚于另一個線程鎖lock1,目的是產生死鎖
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized(lock1) {
System.out.println("flag=false:死鎖了,還能print的出來嗎?");
}
}
}
}
}
~~~
死鎖可以這樣比喻:兩個人吃飯,需要刀子和叉子,其中一人拿了刀子,等待叉子;另一個人拿了叉子,等待刀子;就死鎖了。
Java線程死鎖是一個經典的多線程問題,因為不同的線程都在等待那些根本不可能被釋放的鎖,導致所有的工作都無法完成;
導致死鎖的根源在于不適當地運用“synchronized”關鍵詞來管理線程對特定對象的訪問。
**如何避免死鎖的設計規則:**
(1)讓所有的線程按照同樣地順序獲得一組鎖。這種方法消除了X和Y的擁有者分別等待對方的資源的問題。這也是避免死鎖的一個通用的經驗法則是:當幾個線程都要訪問共享資源A、B、C時,保證使每個線程都按照同樣的順序去訪問它們,比如都先訪問A,在訪問B和C。
(2)將多個鎖組成一組并放到同一個鎖下。比如,把刀子和叉子,都放在一個新創建的(銀器對象)下面,要獲得子鎖,先獲得父鎖。
# 更多
微博設計框架:[http://mars914.iteye.com/blog/1218492](http://mars914.iteye.com/blog/1218492)??[http://timyang.net/](http://timyang.net/)
避免多線程時,開銷分布在調度上,可以采取的策略:減少線程到合適的程度、避免線程內IO、采用合適的優先級。
關于IO和多線程,有個案例,Redis是單線程的,支持10萬的QPS;MemberCache是多線程的,性能反而不如Redis;也可以佐證,對于IO非常多的操作,多線程未必能提高更好的性能,即使是內存IO。
另外,聽百分點公司的講座時,他們分享了一個案例,當計算的性能瓶頸在硬盤時,把硬盤換成SSD,可以性能翻倍,所以,應該把SSD當做是便宜的內存來使用,而不應該是當做昂貴的硬盤來使用。
在java.util.concurrent中,還提供了很多有用的線程寫作類,比如:
CountDownLatch:倒計時鎖、CyclicBarrier:循環柵欄、DelayQueue:延遲隊列、PriorityBlockingQueue:優先級隊列、ScheduledThreadPoolExecutor:定時任務、Semaphore:信號量、Exchanger:交互柵欄。
- 前言
- Java之旅--如何從草根成為技術專家
- 《深入理解Java虛擬機》學習筆記
- 《Spring3.X企業應用開發實戰》學習筆記--IoC和AOP
- 《Tomcat權威指南》第二版學習筆記
- Java之旅--多線程進階
- Java之旅--Web.xml解析
- 《Spring3.X企業應用開發實戰》學習筆記--DAO和事務
- 《Spring3.X企業應用開發實戰》學習筆記--SpringMVC
- Java之旅--定時任務(Timer、Quartz、Spring、LinuxCron)
- Spring實用功能--Profile、WebService、緩存、消息、ORM
- JDK框架簡析--java.lang包中的基礎類庫、基礎數據類型
- JDK框架簡析--java.util包中的工具類庫
- JDK框架簡析--java.io包中的輸入輸出類庫
- Java之旅--通訊
- Java之旅--XML/JSON
- Java之旅--Linux&amp;java進階(看清操作系統層面的事)
- Java之旅--硬件和Java并發(神之本源)
- Java之旅--設計模式
- jetty