**一、死鎖的定義**
多線程以及多進程改善了系統資源的利用率并提高了系統 的處理能力。然而,并發執行也帶來了新的問題——死鎖。所謂死鎖是指多個線程因競爭資源而造成的一種僵局(互相等待),若無外力作用,這些進程都將無法向前推進。
所謂死鎖是指兩個或兩個以上的線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。
下面我們通過一些實例來說明死鎖現象。
先看生活中的一個實例,兩個人面對面過獨木橋,甲和乙都已經在橋上走了一段距離,即占用了橋的資源,甲如果想通過獨木橋的話,乙必須退出橋面讓出橋的資源,讓甲通過,但是乙不服,為什么讓我先退出去,我還想先過去呢,于是就僵持不下,導致誰也過不了橋,這就是死鎖。
在計算機系統中也存在類似的情況。例如,某計算機系統中只有一臺打印機和一臺輸入 設備,進程P1正占用輸入設備,同時又提出使用打印機的請求,但此時打印機正被進程P2 所占用,而P2在未釋放打印機之前,又提出請求使用正被P1占用著的輸入設備。這樣兩個進程相互無休止地等待下去,均無法繼續執行,此時兩個進程陷入死鎖狀態。
**二、死鎖產生的原因**
**1、系統資源的競爭**
通常系統中擁有的不可剝奪資源,其數量不足以滿足多個進程運行的需要,使得進程在運行過程中,會因爭奪資源而陷入僵局,如磁帶機、打印機等。只有對不可剝奪資源的競爭才可能產生死鎖,對可剝奪資源的競爭是不會引起死鎖的。
**2、進程推進順序非法**
進程在運行過程中,請求和釋放資源的順序不當,也同樣會導致死鎖。例如,并發進程 P1、P2分別保持了資源R1、R2,而進程P1申請資源R2,進程P2申請資源R1時,兩者都會因為所需資源被占用而阻塞。
Java中死鎖最簡單的情況是,一個線程T1持有鎖L1并且申請獲得鎖L2,而另一個線程T2持有鎖L2并且申請獲得鎖L1,因為默認的鎖申請操作都是阻塞的,所以線程T1和T2永遠被阻塞了。導致了死鎖。這是最容易理解也是最簡單的死鎖的形式。但是實際環境中的死鎖往往比這個復雜的多。可能會有多個線程形成了一個死鎖的環路,比如:線程T1持有鎖L1并且申請獲得鎖L2,而線程T2持有鎖L2并且申請獲得鎖L3,而線程T3持有鎖L3并且申請獲得鎖L1,這樣導致了一個鎖依賴的環路:T1依賴T2的鎖L2,T2依賴T3的鎖L3,而T3依賴T1的鎖L1。從而導致了死鎖。
從上面兩個例子中,我們可以得出結論,產生死鎖可能性的最根本原因是:**線程在獲得一個鎖L1的情況下再去申請另外一個鎖L2,也就是鎖L1想要包含了鎖L2,也就是說在獲得了鎖L1,并且沒有釋放鎖L1的情況下,又去申請獲得鎖L2,這個是產生死鎖的最根本原因**。另一個原因是**默認的鎖申請操作是阻塞的**。
![60\_1.png][60_1.png]
**3、死鎖產生的必要條件:**
產生死鎖必須同時滿足以下四個條件,只要其中任一條件不成立,死鎖就不會發生。
(1)互斥條件:進程要求對所分配的資源(如打印機)進行排他性控制,即在一段時間內某資源僅為一個進程所占有。此時若有其他進程請求該資源,則請求進程只能等待。
(2)不剝奪條件:進程所獲得的資源在未使用完畢之前,不能被其他進程強行奪走,即只能由獲得該資源的進程自己來釋放(只能是主動釋放)。
(3)請求和保持條件:進程已經保持了至少一個資源,但又提出了新的資源請求,而該資源已被其他進程占有,此時請求進程被阻塞,但對自己已獲得的資源保持不放。
(4)循環等待條件:存在一種進程資源的循環等待鏈,鏈中每一個進程已獲得的資源同時被鏈中下一個進程所請求。即存在一個處于等待狀態的進程集合\{Pl, P2, ..., pn\},其中Pi等 待的資源被P(i+1)占有(i=0, 1, ..., n-1),Pn等待的資源被P0占有,如圖1所示。
直觀上看,循環等待條件似乎和死鎖的定義一樣,其實不然。按死鎖定義構成等待環所 要求的條件更嚴,它要求Pi等待的資源必須由P(i+1)來滿足,而循環等待條件則無此限制。 例如,系統中有兩臺輸出設備,P0占有一臺,PK占有另一臺,且K不屬于集合\{0, 1, ..., n\}。
Pn等待一臺輸出設備,它可以從P0獲得,也可能從PK獲得。因此,雖然Pn、P0和其他 一些進程形成了循環等待圈,但PK不在圈內,若PK釋放了輸出設備,則可打破循環等待, 如圖2-16所示。因此循環等待只是死鎖的必要條件。
![60\_2.png][60_2.png]
資源分配圖含圈而系統又不一定有死鎖的原因是同類資源數大于1。但若系統中每類資 源都只有一個資源,則資源分配圖含圈就變成了系統出現死鎖的充分必要條件。
下面再來通俗的解釋一下死鎖發生時的條件:
(1)互斥條件:一個資源每次只能被一個進程使用。獨木橋每次只能通過一個人。
(2)請求與保持條件:一個進程因請求資源而阻塞時,對已獲得的資源保持不放。乙不退出橋面,甲也不退出橋面。
(3)不剝奪條件: 進程已獲得的資源,在未使用完之前,不能強行剝奪。甲不能強制乙退出橋面,乙也不能強制甲退出橋面。
(4)循環等待條件:若干進程之間形成一種頭尾相接的循環等待資源關系。如果乙不退出橋面,甲不能通過,甲不退出橋面,乙不能通過。
**三、死鎖實例**
**例子1:**
```
package com.demo.test;
/**
* 一個簡單的死鎖類
* t1先運行,這個時候flag==true,先鎖定obj1,然后睡眠1秒鐘
* 而t1在睡眠的時候,另一個線程t2啟動,flag==false,先鎖定obj2,然后也睡眠1秒鐘
* t1睡眠結束后需要鎖定obj2才能繼續執行,而此時obj2已被t2鎖定
* t2睡眠結束后需要鎖定obj1才能繼續執行,而此時obj1已被t1鎖定
* t1、t2相互等待,都需要得到對方鎖定的資源才能繼續執行,從而死鎖。
*/
public class DeadLock implements Runnable{
private static Object obj1 = new Object();
private static Object obj2 = new Object();
private boolean flag;
public DeadLock(boolean flag){
this.flag = flag;
}
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + "運行");
if(flag){
synchronized(obj1){
System.out.println(Thread.currentThread().getName() + "已經鎖住obj1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized(obj2){
// 執行不到這里
System.out.println("1秒鐘后,"+Thread.currentThread().getName()
+ "鎖住obj2");
}
}
}else{
synchronized(obj2){
System.out.println(Thread.currentThread().getName() + "已經鎖住obj2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized(obj1){
// 執行不到這里
System.out.println("1秒鐘后,"+Thread.currentThread().getName()
+ "鎖住obj1");
}
}
}
}
}
```
```
package com.demo.test;
public class DeadLockTest {
public static void main(String[] args) {
Thread t1 = new Thread(new DeadLock(true), "線程1");
Thread t2 = new Thread(new DeadLock(false), "線程2");
t1.start();
t2.start();
}
}
```
運行結果:
```
線程1運行
線程1已經鎖住obj1
線程2運行
線程2已經鎖住obj2
```
線程1鎖住了obj1(甲占有橋的一部分資源),線程2鎖住了obj2(乙占有橋的一部分資源),線程1企圖鎖住obj2(甲讓乙退出橋面,乙不從),進入阻塞,線程2企圖鎖住obj1(乙讓甲退出橋面,甲不從),進入阻塞,死鎖了。
從這個例子也可以反映出,死鎖是因為多線程訪問共享資源,由于訪問的順序不當所造成的,通常是一個線程鎖定了一個資源A,而又想去鎖定資源B;在另一個線程中,鎖定了資源B,而又想去鎖定資源A以完成自身的操作,兩個線程都想得到對方的資源,而不愿釋放自己的資源,造成兩個線程都在等待,而無法執行的情況。
**例子2:**
```
package com.demo.test;
public class SyncThread implements Runnable{
private Object obj1;
private Object obj2;
public SyncThread(Object o1, Object o2){
this.obj1=o1;
this.obj2=o2;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
synchronized (obj1) {
System.out.println(name + " acquired lock on "+obj1);
work();
synchronized (obj2) {
System.out.println("After, "+name + " acquired lock on "+obj2);
work();
}
System.out.println(name + " released lock on "+obj2);
}
System.out.println(name + " released lock on "+obj1);
System.out.println(name + " finished execution.");
}
private void work() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
```
```
package com.demo.test;
public class ThreadDeadTest {
public static void main(String[] args) throws InterruptedException {
Object obj1 = new Object();
Object obj2 = new Object();
Object obj3 = new Object();
Thread t1 = new Thread(new SyncThread(obj1, obj2), "t1");
Thread t2 = new Thread(new SyncThread(obj2, obj3), "t2");
Thread t3 = new Thread(new SyncThread(obj3, obj1), "t3");
t1.start();
Thread.sleep(1000);
t2.start();
Thread.sleep(1000);
t3.start();
}
}
```
運行結果:
```
t1 acquired lock on java.lang.Object@5e1077
t2 acquired lock on java.lang.Object@1db05b2
t3 acquired lock on java.lang.Object@181ed9e
```
在這個例子中,形成了一個鎖依賴的環路。以t1為例,它先將第一個對象鎖住,但是當它試著向第二個對象獲取鎖時,它就會進入等待狀態,因為第二個對象已經被另一個線程鎖住了。這樣以此類推,t1依賴t2鎖住的對象obj2,t2依賴t3鎖住的對象obj3,而t3依賴t1鎖住的對象obj1,從而導致了死鎖。在線程引起死鎖的過程中,就形成了一個依賴于資源的循環。
**四、如何避免死鎖**
在有些情況下死鎖是可以避免的。下面介紹三種用于避免死鎖的技術:
* 加鎖順序(線程按照一定的順序加鎖)
* 加鎖時限(線程嘗試獲取鎖的時候加上一定的時限,超過時限則放棄對該鎖的請求,并釋放自己占有的鎖)
* 死鎖檢測
**1、加鎖順序**
當多個線程需要相同的一些鎖,但是按照不同的順序加鎖,死鎖就很容易發生。如果能確保所有的線程都是按照相同的順序獲得鎖,那么死鎖就不會發生。看下面這個例子:
```
Thread 1:
lock A
lock B
Thread 2:
wait for A
lock C (when A locked)
Thread 3:
wait for A
wait for B
wait for C
```
如果一個線程(比如線程3)需要一些鎖,那么它必須按照確定的順序獲取鎖。它只有獲得了從順序上排在前面的鎖之后,才能獲取后面的鎖。
例如,線程2和線程3只有在獲取了鎖A之后才能嘗試獲取鎖C(*獲取鎖A是獲取鎖C的必要條件*)。因為線程1已經擁有了鎖A,所以線程2和3需要一直等到鎖A被釋放。然后在它們嘗試對B或C加鎖之前,必須成功地對A加了鎖。
按照順序加鎖是一種有效的死鎖預防機制。但是,這種方式需要你事先知道所有可能會用到的鎖(*并對這些鎖做適當的排序*),但總有些時候是無法預知的。
**下面對例子1進行改造:**
將
```
Thread t2 = new Thread(new DeadLock(false), "線程2");
```
改為:
```
Thread t2 = new Thread(new DeadLock(true), "線程2");
```
現在應該不會出現死鎖了,因為線程1和線程2都是先對obj1加鎖,然后再對obj2加鎖,當t1啟動后,鎖住了obj1,而t2也啟動后,只有當t1釋放了obj1后t2才會執行,從而有效的避免了死鎖。
運行結果:
```
線程1運行
線程1已經鎖住obj1
線程2運行
1秒鐘后,線程1鎖住obj2
線程2已經鎖住obj1
1秒鐘后,線程2鎖住obj2
```
**例子2改造:**
```
package com.demo.test;
public class SyncThread1 implements Runnable{
private Object obj1;
private Object obj2;
public SyncThread1(Object o1, Object o2){
this.obj1=o1;
this.obj2=o2;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
synchronized (obj1) {
System.out.println(name + " acquired lock on "+obj1);
work();
}
System.out.println(name + " released lock on "+obj1);
synchronized(obj2){
System.out.println("After, "+ name + " acquired lock on "+obj2);
work();
}
System.out.println(name + " released lock on "+obj2);
System.out.println(name + " finished execution.");
}
private void work() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
```
```
package com.demo.test;
public class ThreadDeadTest1 {
public static void main(String[] args) throws InterruptedException {
Object obj1 = new Object();
Object obj2 = new Object();
Object obj3 = new Object();
Thread t1 = new Thread(new SyncThread1(obj1, obj2), "t1");
Thread t2 = new Thread(new SyncThread1(obj2, obj3), "t2");
Thread t3 = new Thread(new SyncThread1(obj3, obj1), "t3");
t1.start();
Thread.sleep(1000);
t2.start();
Thread.sleep(1000);
t3.start();
}
}
```
運行結果:
```
t1 acquired lock on java.lang.Object@60e128
t2 acquired lock on java.lang.Object@18b3364
t3 acquired lock on java.lang.Object@76fba0
t1 released lock on java.lang.Object@60e128
t2 released lock on java.lang.Object@18b3364
After, t1 acquired lock on java.lang.Object@18b3364
t3 released lock on java.lang.Object@76fba0
After, t2 acquired lock on java.lang.Object@76fba0
After, t3 acquired lock on java.lang.Object@60e128
t1 released lock on java.lang.Object@18b3364
t1 finished execution.
t2 released lock on java.lang.Object@76fba0
t3 released lock on java.lang.Object@60e128
t3 finished execution.
t2 finished execution.
```
從結果中看,沒有出現死鎖的局面。因為在run()方法中,不存在嵌套封鎖。
**避免嵌套封鎖:**這是死鎖最主要的原因的,如果你已經有一個資源了就要避免封鎖另一個資源。如果你運行時只有一個對象封鎖,那是幾乎不可能出現一個死鎖局面的。
再舉個生活中的例子,比如銀行轉賬的場景下,我們必須同時獲得兩個賬戶上的鎖,才能進行操作,兩個鎖的申請必須發生交叉。這時我們也可以打破死鎖的那個閉環,在涉及到要同時申請兩個鎖的方法中,總是以相同的順序來申請鎖,比如總是先申請 id 大的賬戶上的鎖 ,然后再申請 id 小的賬戶上的鎖,這樣就無法形成導致死鎖的那個閉環。
```
public class Account {
private int id; // 主鍵
private String name;
private double balance;
public void transfer(Account from, Account to, double money){
if(from.getId() > to.getId()){
synchronized(from){
synchronized(to){
// transfer
}
}
}else{
synchronized(to){
synchronized(from){
// transfer
}
}
}
}
public int getId() {
return id;
}
}
```
這樣的話,即使發生了兩個賬戶比如 id=1的和id=100的兩個賬戶相互轉賬,因為不管是哪個線程先獲得了id=100上的鎖,另外一個線程都不會去獲得id=1上的鎖(因為他沒有獲得id=100上的鎖),只能是哪個線程先獲得id=100上的鎖,哪個線程就先進行轉賬。這里除了使用id之外,如果沒有類似id這樣的屬性可以比較,那么也可以使用對象的hashCode()的值來進行比較。
**2、加鎖時限**
另外一個可以避免死鎖的方法是在嘗試獲取鎖的時候加一個超時時間,這也就意味著在嘗試獲取鎖的過程中若超過了這個時限該線程則放棄對該鎖請求。若一個線程沒有在給定的時限內成功獲得所有需要的鎖,則會進行回退并釋放所有已經獲得的鎖,然后等待一段隨機的時間再重試。這段隨機的等待時間讓其它線程有機會嘗試獲取相同的這些鎖,并且讓該應用在沒有獲得鎖的時候可以繼續運行(*加鎖超時后可以先繼續運行干點其它事情,再回頭來重復之前加鎖的邏輯*)。
以下是一個例子,展示了兩個線程以不同的順序嘗試獲取相同的兩個鎖,在發生超時后回退并重試的場景:
```
Thread 1 locks A
Thread 2 locks B
Thread 1 attempts to lock B but is blocked
Thread 2 attempts to lock A but is blocked
Thread 1's lock attempt on B times out
Thread 1 backs up and releases A as well
Thread 1 waits randomly (e.g. 257 millis) before retrying.
Thread 2's lock attempt on A times out
Thread 2 backs up and releases B as well
Thread 2 waits randomly (e.g. 43 millis) before retrying.
```
在上面的例子中,線程2比線程1早200毫秒進行重試加鎖,因此它可以先成功地獲取到兩個鎖。這時,線程1嘗試獲取鎖A并且處于等待狀態。當線程2結束時,線程1也可以順利的獲得這兩個鎖(除非線程2或者其它線程在線程1成功獲得兩個鎖之前又獲得其中的一些鎖)。
需要注意的是,由于存在鎖的超時,所以我們不能認為這種場景就一定是出現了死鎖。也可能是因為獲得了鎖的線程(導致其它線程超時)需要很長的時間去完成它的任務。此外,如果有非常多的線程同一時間去競爭同一批資源,就算有超時和回退機制,還是可能會導致這些線程重復地嘗試但卻始終得不到鎖。如果只有兩個線程,并且重試的超時時間設定為0到500毫秒之間,這種現象可能不會發生,但是如果是10個或20個線程情況就不同了。因為這些線程等待相等的重試時間的概率就高的多(或者非常接近以至于會出現問題)。(*超時和重試機制是為了避免在同一時間出現的競爭,但是當線程很多時,其中兩個或多個線程的超時時間一樣或者接近的可能性就會很大,因此就算出現競爭而導致超時后,由于超時時間一樣,它們又會同時開始重試,導致新一輪的競爭,帶來了新的問題。*)
這種機制存在一個問題,在Java中不能對synchronized同步塊設置超時時間。你需要創建一個自定義鎖,或使用Java5中java.util.concurrent包下的工具。
**3、死鎖檢測**
死鎖檢測是一個更好的死鎖預防機制,它主要是針對那些不可能實現按序加鎖并且鎖超時也不可行的場景。
每當一個線程獲得了鎖,會在線程和鎖相關的數據結構中(map、graph等等)將其記下。除此之外,每當有線程請求鎖,也需要記錄在這個數據結構中。當一個線程請求鎖失敗時,這個線程可以遍歷鎖的關系圖看看是否有死鎖發生。例如,線程A請求鎖7,但是鎖7這個時候被線程B持有,這時線程A就可以檢查一下線程B是否已經請求了線程A當前所持有的鎖。如果線程B確實有這樣的請求,那么就是發生了死鎖(線程A擁有鎖1,請求鎖7;線程B擁有鎖7,請求鎖1)。
當然,死鎖一般要比兩個線程互相持有對方的鎖這種情況要復雜的多。線程A等待線程B,線程B等待線程C,線程C等待線程D,線程D又在等待線程A。線程A為了檢測死鎖,它需要遞進地檢測所有被B請求的鎖。從線程B所請求的鎖開始,線程A找到了線程C,然后又找到了線程D,發現線程D請求的鎖被線程A自己持有著。這是它就知道發生了死鎖。
下面是一幅關于四個線程(A,B,C和D)之間鎖占有和請求的關系圖。像這樣的數據結構就可以被用來檢測死鎖。
![60\_3.png][60_3.png]
那么當檢測出死鎖時,這些線程該做些什么呢?
一個可行的做法是釋放所有鎖,回退,并且等待一段隨機的時間后重試。這個和簡單的加鎖超時類似,不一樣的是只有死鎖已經發生了才回退,而不會是因為加鎖的請求超時了。雖然有回退和等待,但是如果有大量的線程競爭同一批鎖,它們還是會重復地死鎖(*原因同超時類似,不能從根本上減輕競爭*)。
一個更好的方案是給這些線程設置優先級,讓一個(或幾個)線程回退,剩下的線程就像沒發生死鎖一樣繼續保持著它們需要的鎖。如果賦予這些線程的優先級是固定不變的,同一批線程總是會擁有更高的優先級。為避免這個問題,可以在死鎖發生的時候設置隨機的優先級。
**總結:避免死鎖的方式**
1、讓程序每次至多只能獲得一個鎖。當然,在多線程環境下,這種情況通常并不現實。
2、設計時考慮清楚鎖的順序,盡量減少嵌在的加鎖交互數量。
3、既然死鎖的產生是兩個線程無限等待對方持有的鎖,那么只要等待時間有個上限不就好了。當然synchronized不具備這個功能,但是我們可以使用Lock類中的tryLock方法去嘗試獲取鎖,這個方法可以指定一個超時時限,在等待超過該時限之后便會返回一個失敗信息。
我們可以使用ReentrantLock.tryLock()方法,在一個循環中,如果tryLock()返回失敗,那么就釋放以及獲得的鎖,并睡眠一小段時間。這樣就打破了死鎖的閉環。比如:線程T1持有鎖L1并且申請獲得鎖L2,而線程T2持有鎖L2并且申請獲得鎖L3,而線程T3持有鎖L3并且申請獲得鎖L1。此時如果T3申請鎖L1失敗,那么T3釋放鎖L3,并進行睡眠,那么T2就可以獲得L3了,然后T2執行完之后釋放L2, L3,所以T1也可以獲得L2了執行完然后釋放鎖L1, L2,然后T3睡眠醒來,也可以獲得L1, L3了。打破了死鎖的閉環。
[60_1.png]: https://gitee.com/souyunkutech/souyunku-home/raw/master/images/souyunku-web/2020/1/15/2223/15/2/60_1.png
[60_2.png]: https://gitee.com/souyunkutech/souyunku-home/raw/master/images/souyunku-web/2020/1/15/2223/15/2/60_2.png
[60_3.png]: https://gitee.com/souyunkutech/souyunku-home/raw/master/images/souyunku-web/2020/1/15/2223/15/2/60_3.png
文章永久鏈接:[https://tech.souyunku.com/?p=13590](https://tech.souyunku.com/?p=13590)