[TOC]
# 一、概述
談到并發,不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)!
類如其名,抽象的隊列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。
以下是本文的目錄大綱:
1. 1. 概述
2. 框架
3. 源碼詳解
4. 簡單應用
# 二、框架

它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。這里volatile是核心關鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:
* getState()
* setState()
* compareAndSetState()
AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。
不同的自定義同步器爭用共享資源的方式也不同。**自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可**,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:
* isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
* tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
* tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
* tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
* tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。
以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨占該鎖并將state+1。此后,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重復獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態的。
再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一致)。這N個子線程是并行執行的,每個子線程執行完后countDown()一次,state會CAS減1。等到所有子線程都執行完后(即state=0),會unpark()主調用線程,然后主調用線程就會從await()函數返回,繼續后余動作。
一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨占和共享兩種方式,如ReentrantReadWriteLock。
# 三、源碼詳解
本節開始講解AQS的源碼實現。依照acquire-release、acquireShared-releaseShared的次序來。
## 3.0 結點狀態waitStatus
? ? ? 這里我們說下Node。Node結點是對每一個等待獲取資源的線程的封裝,其包含了需要同步的線程本身及其等待狀態,如是否被阻塞、是否等待喚醒、是否已經被取消等。變量waitStatus則表示當前Node結點的等待狀態,共有5種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。
* **CANCELLED**(1):表示當前結點已取消調度。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態后的結點將不會再變化。
* **SIGNAL**(-1):表示后繼結點在等待當前結點喚醒。后繼結點入隊時,會將前繼結點的狀態更新為SIGNAL。
* **CONDITION**(-2):表示結點等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態的結點將**從等待隊列轉移到同步隊列中**,等待獲取同步鎖。
* **PROPAGATE**(-3):共享模式下,前繼結點不僅會喚醒其后繼結點,同時也可能會喚醒后繼的后繼結點。
* **0**:新結點入隊時的默認狀態。
注意,**負值表示結點處于有效等待狀態,而正值表示結點已被取消。所以源碼中很多地方用>0、<0來判斷結點的狀態是否正常**。
## 3.1 acquire(int)
此方法是獨占模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅只限于lock()。獲取到資源后,線程就可以去執行其臨界區代碼了。下面是acquire()的源碼:
~~~
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
~~~
函數流程如下:
1. 1. tryAcquire()嘗試直接去獲取資源,如果成功則直接返回(這里體現了非公平鎖,每個線程獲取鎖時會嘗試直接搶占加塞一次,而CLH隊列中可能還有別的線程在等待);
2. addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式;
3. acquireQueued()使線程阻塞在等待隊列中獲取資源,一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
4. 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
這時單憑這4個抽象的函數來看流程還有點朦朧,不要緊,看完接下來的分析后,你就會明白了。就像《大話西游》里唐僧說的:等你明白了舍生取義的道理,你自然會回來和我唱這首歌的。
### 3.1.1 tryAcquire(int)
此方法嘗試去獲取獨占資源。如果獲取成功,則直接返回true,否則直接返回false。這也正是tryLock()的語義,還是那句話,當然不僅僅只限于tryLock()。如下是tryAcquire()的源碼:
~~~
1 protected boolean tryAcquire(int arg) {
2 throw new UnsupportedOperationException();
3 }
~~~
什么?直接throw異常?說好的功能呢?好吧,**還記得概述里講的AQS只是一個框架,具體資源的獲取/釋放方式交由自定義同步器去實現嗎?**就是這里了!!!AQS這里只定義了一個接口,具體資源的獲取交由自定義同步器去實現了(通過state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具體的自定義同步器怎么去設計了!!!當然,自定義同步器在進行資源訪問時要考慮線程安全的影響。
這里之所以沒有定義成abstract,是因為獨占模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。如果都定義成abstract,那么每個模式也要去實現另一模式下的接口。說到底,Doug Lea還是站在咱們開發者的角度,盡量減少不必要的工作量。
### 3.1.2 addWaiter(Node)
此方法用于將當前線程加入到等待隊列的隊尾,并返回當前線程所在的結點。還是上源碼吧:
~~~
1 private Node addWaiter(Node mode) {
2 //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)
3 Node node = new Node(Thread.currentThread(), mode);
4
5 //嘗試快速方式直接放到隊尾。
6 Node pred = tail;
7 if (pred != null) {
8 node.prev = pred;
9 if (compareAndSetTail(pred, node)) {
10 pred.next = node;
11 return node;
12 }
13 }
14
15 //上一步失敗則通過enq入隊。
16 enq(node);
17 return node;
18 }
~~~
?不用再說了,直接看注釋吧。
#### 3.1.2.1 enq(Node)
? 此方法用于將node加入隊尾。源碼如下:
~~~
1 private Node enq(final Node node) {
2 //CAS"自旋",直到成功加入隊尾
3 for (;;) {
4 Node t = tail;
5 if (t == null) { // 隊列為空,創建一個空的標志結點作為head結點,并將tail也指向它。
6 if (compareAndSetHead(new Node()))
7 tail = head;
8 } else {//正常流程,放入隊尾
9 node.prev = t;
10 if (compareAndSetTail(t, node)) {
11 t.next = node;
12 return t;
13 }
14 }
15 }
16 }
~~~
如果你看過AtomicInteger.getAndIncrement()函數源碼,那么相信你一眼便看出這段代碼的精華。**CAS自旋volatile變量**,是一種很經典的用法。還不太了解的,自己去百度一下吧。
### 3.1.3?acquireQueued(Node, int)
OK,通過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。聰明的你立刻應該能想到該線程下一部該干什么了吧:**進入等待狀態休息,直到其他線程徹底釋放資源后喚醒自己,自己再拿到資源,然后就可以去干自己想干的事了**。沒錯,就是這樣!是不是跟醫院排隊拿號有點相似~~acquireQueued()就是干這件事:**在等待隊列中排隊拿號(中間沒其它事干可以休息),直到拿到號后再返回**。這個函數非常關鍵,還是上源碼吧:
~~~
1 final boolean acquireQueued(final Node node, int arg) {
2 boolean failed = true;//標記是否成功拿到資源
3 try {
4 boolean interrupted = false;//標記等待過程中是否被中斷過
5
6 //又是一個“自旋”!
7 for (;;) {
8 final Node p = node.predecessor();//拿到前驅
9 //如果前驅是head,即該結點已成老二,那么便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
10 if (p == head && tryAcquire(arg)) {
11 setHead(node);//拿到資源后,將head指向該結點。所以head所指的標桿結點,就是當前獲取到資源的那個結點或null。
12 p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結點。也就意味著之前拿完資源的結點出隊了!
13 failed = false; // 成功獲取資源
14 return interrupted;//返回等待過程中是否被中斷過
15 }
16
17 //如果自己可以休息了,就通過park()進入waiting狀態,直到被unpark()。如果不可中斷的情況下被中斷了,那么會從park()中醒過來,發現拿不到資源,從而繼續進入park()等待。
18 if (shouldParkAfterFailedAcquire(p, node) &&
19 parkAndCheckInterrupt())
20 interrupted = true;//如果等待過程中被中斷過,哪怕只有那么一次,就將interrupted標記為true
21 }
22 } finally {
23 if (failed) // 如果等待過程中沒有成功獲取資源(如timeout,或者可中斷的情況下被中斷了),那么取消結點在隊列中的等待。
24 cancelAcquire(node);
25 }
26 }
~~~
到這里了,我們先不急著總結acquireQueued()的函數流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體干些什么。
#### 3.1.3.1 shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于檢查狀態,看看自己是否真的可以去休息了(進入waiting狀態,如果線程狀態轉換不熟,可以參考本人上一篇寫的[Thread詳解](http://www.cnblogs.com/waterystone/p/4920007.html)),萬一隊列前邊的線程都放棄了只是瞎站著,那也說不定,對吧!
~~~
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
2 int ws = pred.waitStatus;//拿到前驅的狀態
3 if (ws == Node.SIGNAL)
4 //如果已經告訴前驅拿完號后通知自己一下,那就可以安心休息了
5 return true;
6 if (ws > 0) {
7 /*
8 * 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,并排在它的后邊。
9 * 注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鏈,稍后就會被保安大叔趕走了(GC回收)!
10 */
11 do {
12 node.prev = pred = pred.prev;
13 } while (pred.waitStatus > 0);
14 pred.next = node;
15 } else {
16 //如果前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號后通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!
17 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
18 }
19 return false;
20 }
~~~
整個流程中,如果前驅結點的狀態不是SIGNAL,那么自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿號。
#### 3.1.3.2 parkAndCheckInterrupt()
如果線程找好安全休息點后,那就可以安心去休息了。此方法就是讓線程去休息,真正進入等待狀態。
~~~
1 private final boolean parkAndCheckInterrupt() {
2 LockSupport.park(this);//調用park()使線程進入waiting狀態
3 return Thread.interrupted();//如果被喚醒,查看自己是不是被中斷的。
4 }
~~~
? park()會讓當前線程進入waiting狀態。在此狀態下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。(再說一句,如果線程狀態轉換不熟,可以參考本人寫的[Thread詳解](http://www.cnblogs.com/waterystone/p/4920007.html))。需要注意的是,Thread.interrupted()會清除當前線程的中斷標記位。?
#### 3.1.3.3 小結
OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),現在讓我們再回到acquireQueued(),總結下該函數的具體流程:
1. 結點進入隊尾后,檢查狀態,找到安全休息點;
2. 調用park()進入waiting狀態,等待unpark()或interrupt()喚醒自己;
3. 被喚醒后,看自己是不是有資格能拿到號。如果拿到,head指向當前結點,并返回從入隊到拿到號的整個過程中是否被中斷過;如果沒拿到,繼續流程1。
### 3.1.4 小結
OKOK,acquireQueued()分析完之后,我們接下來再回到acquire()!再貼上它的源碼吧:
~~~
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
~~~
再來總結下它的流程吧:
1. 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
2. 沒成功,則addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式;
3. acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
4. 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
由于此函數是重中之重,我再用流程圖總結一下:

至此,acquire()的流程終于算是告一段落了。這也就是ReentrantLock.lock()的流程,不信你去看其lock()源碼吧,整個函數就是一條acquire(1)!!!
## 3.2 release(int)
? 上一小節已經把acquire()說完了,這一小節就來講講它的反操作release()吧。此方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。這也正是unlock()的語義,當然不僅僅只限于unlock()。下面是release()的源碼:
~~~
1 public final boolean release(int arg) {
2 if (tryRelease(arg)) {
3 Node h = head;//找到頭結點
4 if (h != null && h.waitStatus != 0)
5 unparkSuccessor(h);//喚醒等待隊列里的下一個線程
6 return true;
7 }
8 return false;
9 }
~~~
邏輯并不復雜。它調用tryRelease()來釋放資源。有一點需要注意的是,**它是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自定義同步器在設計tryRelease()的時候要明確這一點!!**
### 3.2.1 tryRelease(int)
此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:
~~~
1 protected boolean tryRelease(int arg) {
2 throw new UnsupportedOperationException();
3 }
~~~
跟tryAcquire()一樣,這個方法是需要獨占模式的自定義同步器去實現的。正常來說,tryRelease()都會成功的,因為這是獨占模式,該線程來釋放資源,那么它肯定已經拿到獨占資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要注意它的返回值,上面已經提到了,**release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!**所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。
### 3.2.2 unparkSuccessor(Node)
此方法用于喚醒等待隊列中下一個線程。下面是源碼:
~~~
1 private void unparkSuccessor(Node node) {
2 //這里,node一般為當前線程所在的結點。
3 int ws = node.waitStatus;
4 if (ws < 0)//置零當前線程所在的結點狀態,允許失敗。
5 compareAndSetWaitStatus(node, ws, 0);
6
7 Node s = node.next;//找到下一個需要喚醒的結點s
8 if (s == null || s.waitStatus > 0) {//如果為空或已取消
9 s = null;
10 for (Node t = tail; t != null && t != node; t = t.prev) // 從后向前找。
11 if (t.waitStatus <= 0)//從這里可以看出,<=0的結點,都是還有效的結點。
12 s = t;
13 }
14 if (s != null)
15 LockSupport.unpark(s.thread);//喚醒
16 }
~~~
這個函數并不復雜。一句話概括:**用unpark()喚醒等待隊列中最前邊的那個未放棄線程**,這里我們也用s來表示吧。此時,再和acquireQueued()聯系起來,s被喚醒后,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關系,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這里既然s已經是等待隊列中最前邊的那個未放棄線程了,那么通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),然后s把自己設置成head標桿結點,表示自己已經獲取到資源了,acquire()也返回了!!And then, DO what you WANT!
### 3.2.3 小結
release()是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。
? ? ? 74樓的朋友提了一個非常有趣的問題:如果獲取鎖的線程在release時異常了,沒有unpark隊列中的其他結點,這時隊列中的其他結點會怎么辦?是不是沒法再被喚醒了?
? ? ? 答案是**YES**(測試程序詳見76樓)!!!這時,隊列中等待鎖的線程將永遠處于park狀態,無法再被喚醒!!!但是我們再回頭想想,獲取鎖的線程在什么情形下會release拋出異常呢??
1. 線程突然死掉了?可以通過thread.stop來停止線程的執行,但該函數的執行條件要嚴苛的多,而且函數注明是非線程安全的,已經標明Deprecated;
2. 線程被interupt了?線程在運行態是不響應中斷的,所以也不會拋出異常;
3. release代碼有bug,拋出異常了?目前來看,Doug Lea的release方法還是比較健壯的,沒有看出能引發異常的情形(如果有,恐怕早被用戶吐槽了)。**除非自己寫的tryRelease()有bug,那就沒啥說的,自己寫的bug只能自己含著淚去承受了**。
## 3.3 acquireShared(int)
此方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止,整個過程忽略中斷。下面是acquireShared()的源碼:
~~~
1 public final void acquireShared(int arg) {
2 if (tryAcquireShared(arg) < 0)
3 doAcquireShared(arg);
4 }
~~~
這里tryAcquireShared()依然需要自定義同步器去實現。但是AQS已經把其返回值的語義定義好了:負值代表獲取失敗;0代表獲取成功,但沒有剩余資源;正數表示獲取成功,還有剩余資源,其他線程還可以去獲取。所以這里acquireShared()的流程就是:
1. 1. tryAcquireShared()嘗試獲取資源,成功則直接返回;
2. 失敗則通過doAcquireShared()進入等待隊列,直到獲取到資源為止才返回。
### 3.3.1 doAcquireShared(int)
此方法用于將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源后才返回。下面是doAcquireShared()的源碼:
~~~
1 private void doAcquireShared(int arg) {
2 final Node node = addWaiter(Node.SHARED);//加入隊列尾部
3 boolean failed = true;//是否成功標志
4 try {
5 boolean interrupted = false;//等待過程中是否被中斷過的標志
6 for (;;) {
7 final Node p = node.predecessor();//前驅
8 if (p == head) {//如果到head的下一個,因為head是拿到資源的線程,此時node被喚醒,很可能是head用完資源來喚醒自己的
9 int r = tryAcquireShared(arg);//嘗試獲取資源
10 if (r >= 0) {//成功
11 setHeadAndPropagate(node, r);//將head指向自己,還有剩余資源可以再喚醒之后的線程
12 p.next = null; // help GC
13 if (interrupted)//如果等待過程中被打斷過,此時將中斷補上。
14 selfInterrupt();
15 failed = false;
16 return;
17 }
18 }
19
20 //判斷狀態,尋找安全點,進入waiting狀態,等著被unpark()或interrupt()
21 if (shouldParkAfterFailedAcquire(p, node) &&
22 parkAndCheckInterrupt())
23 interrupted = true;
24 }
25 } finally {
26 if (failed)
27 cancelAcquire(node);
28 }
29 }
~~~
有木有覺得跟acquireQueued()很相似?對,其實流程并沒有太大區別。只不過這里將補中斷的selfInterrupt()放到doAcquireShared()里了,而獨占模式是放到acquireQueued()之外,其實都一樣,不知道Doug Lea是怎么想的。
跟獨占模式比,還有一點需要注意的是,這里只有線程是head.next時(“老二”),才會去嘗試獲取資源,有剩余的話還會喚醒之后的隊友。那么問題就來了,假如老大用完后釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會繼續park()等待其他線程釋放資源,也更不會去喚醒老三和老四了。獨占模式,同一時刻只有一個線程去執行,這樣做未嘗不可;但共享模式下,多個線程是可以同時執行的,現在因為老二的資源需求量大,而把后面量小的老三和老四也都卡住了。當然,這并不是問題,只是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但降低了并發)。
#### 3.3.1.1 setHeadAndPropagate(Node, int)
~~~
1 private void setHeadAndPropagate(Node node, int propagate) {
2 Node h = head;
3 setHead(node);//head指向自己
4 //如果還有剩余量,繼續喚醒下一個鄰居線程
5 if (propagate > 0 || h == null || h.waitStatus < 0) {
6 Node s = node.next;
7 if (s == null || s.isShared())
8 doReleaseShared();
9 }
10 }
~~~
此方法在setHead()的基礎上多了一步,就是自己蘇醒的同時,如果條件符合(比如還有剩余資源),還會去喚醒后繼結點,畢竟是共享模式!
doReleaseShared()我們留著下一小節的releaseShared()里來講。
### 3.3.2 小結
OK,至此,acquireShared()也要告一段落了。讓我們再梳理一下它的流程:
1. tryAcquireShared()嘗試獲取資源,成功則直接返回;
2. 失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()并成功獲取到資源才返回。整個等待過程也是忽略中斷的。
其實跟acquire()的流程大同小異,只不過多了個**自己拿到資源后,還會去喚醒后繼隊友的操作(這才是共享嘛)**。
## 3.4?releaseShared()
上一小節已經把acquireShared()說完了,這一小節就來講講它的反操作releaseShared()吧。此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。下面是releaseShared()的源碼:
~~~
1 public final boolean releaseShared(int arg) {
2 if (tryReleaseShared(arg)) {//嘗試釋放資源
3 doReleaseShared();//喚醒后繼結點
4 return true;
5 }
6 return false;
7 }
~~~
此方法的流程也比較簡單,一句話:釋放掉資源后,喚醒后繼。跟獨占模式下的release()相似,但有一點稍微需要注意:獨占模式下的tryRelease()在完全釋放掉資源(state=0)后,才會返回true去喚醒其他線程,這主要是基于獨占下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的線程并發執行,那么擁有資源的線程在釋放掉部分資源時就可以喚醒后繼等待結點。例如,資源總量是13,A(5)和B(7)分別獲取到資源并發運行,C(4)來時只剩1個資源就需要等待。A在運行過程中釋放掉2個資源量,然后tryReleaseShared(2)返回true喚醒C,C一看只有3個仍不夠繼續等待;隨后B又釋放2個,tryReleaseShared(2)返回true喚醒C,C一看有5個夠自己用了,然后C就可以跟A和B一起運行。而ReentrantReadWriteLock讀鎖的tryReleaseShared()只有在完全釋放掉資源(state=0)才返回true,所以自定義同步器可以根據需要決定tryReleaseShared()的返回值。
### 3.4.1?doReleaseShared()
此方法主要用于喚醒后繼。下面是它的源碼:
~~~
1 private void doReleaseShared() {
2 for (;;) {
3 Node h = head;
4 if (h != null && h != tail) {
5 int ws = h.waitStatus;
6 if (ws == Node.SIGNAL) {
7 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
8 continue;
9 unparkSuccessor(h);//喚醒后繼
10 }
11 else if (ws == 0 &&
12 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
13 continue;
14 }
15 if (h == head)// head發生變化
16 break;
17 }
18 }
~~~
## 3.5 小結
本節我們詳解了獨占和共享兩種模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,相信大家都有一定認識了。值得注意的是,acquire()和acquireShared()兩種方法下,線程在等待隊列中都是忽略中斷的。AQS也支持響應中斷的,acquireInterruptibly()/acquireSharedInterruptibly()即是,相應的源碼跟acquire()和acquireShared()差不多,這里就不再詳解了。
# 四、簡單應用
通過前邊幾個章節的學習,相信大家已經基本理解AQS的原理了。這里再將“框架”一節中的一段話復制過來:
不同的自定義同步器爭用共享資源的方式也不同。**自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可**,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:
* isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
* tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
* tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
* tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
* tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。
OK,下面我們就以AQS源碼里的Mutex為例,講一下AQS的簡單應用。
## 4.1 Mutex(互斥鎖)
Mutex是一個不可重入的互斥鎖實現。鎖資源(AQS里的state)只有兩種狀態:0表示未鎖定,1表示鎖定。下邊是Mutex的核心源碼:
~~~
1 class Mutex implements Lock, java.io.Serializable {
2 // 自定義同步器
3 private static class Sync extends AbstractQueuedSynchronizer {
4 // 判斷是否鎖定狀態
5 protected boolean isHeldExclusively() {
6 return getState() == 1;
7 }
8
9 // 嘗試獲取資源,立即返回。成功則返回true,否則false。
10 public boolean tryAcquire(int acquires) {
11 assert acquires == 1; // 這里限定只能為1個量
12 if (compareAndSetState(0, 1)) {//state為0才設置為1,不可重入!
13 setExclusiveOwnerThread(Thread.currentThread());//設置為當前線程獨占資源
14 return true;
15 }
16 return false;
17 }
18
19 // 嘗試釋放資源,立即返回。成功則為true,否則false。
20 protected boolean tryRelease(int releases) {
21 assert releases == 1; // 限定為1個量
22 if (getState() == 0)//既然來釋放,那肯定就是已占有狀態了。只是為了保險,多層判斷!
23 throw new IllegalMonitorStateException();
24 setExclusiveOwnerThread(null);
25 setState(0);//釋放資源,放棄占有狀態
26 return true;
27 }
28 }
29
30 // 真正同步類的實現都依賴繼承于AQS的自定義同步器!
31 private final Sync sync = new Sync();
32
33 //lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。
34 public void lock() {
35 sync.acquire(1);
36 }
37
38 //tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。
39 public boolean tryLock() {
40 return sync.tryAcquire(1);
41 }
42
43 //unlock<-->release。兩者語文一樣:釋放資源。
44 public void unlock() {
45 sync.release(1);
46 }
47
48 //鎖是否占有狀態
49 public boolean isLocked() {
50 return sync.isHeldExclusively();
51 }
52 }
~~~
同步類在實現時一般都將自定義同步器(sync)定義為內部類,供自己使用;而同步類自己(Mutex)則實現某個接口,對外服務。當然,接口的實現要直接依賴sync,它們在語義上也存在某種對應關系!!而sync只用實現資源state的獲取-釋放方式tryAcquire-tryRelelase,至于線程的排隊、等待、喚醒等,上層的AQS都已經實現好了,我們不用關心。
除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實現方式都差不多,不同的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點,AQS的核心便被攻破了!
OK,至此,整個AQS的講解也要落下帷幕了。希望本文能夠對學習Java并發編程的同學有所借鑒,中間寫的有不對的地方,也歡迎討論和指正~
轉載請標明原文鏈接[http://www.cnblogs.com/waterystone/p/4920797.html](http://www.cnblogs.com/waterystone/p/4920797.html)
- 一.JVM
- 1.1 java代碼是怎么運行的
- 1.2 JVM的內存區域
- 1.3 JVM運行時內存
- 1.4 JVM內存分配策略
- 1.5 JVM類加載機制與對象的生命周期
- 1.6 常用的垃圾回收算法
- 1.7 JVM垃圾收集器
- 1.8 CMS垃圾收集器
- 1.9 G1垃圾收集器
- 2.面試相關文章
- 2.1 可能是把Java內存區域講得最清楚的一篇文章
- 2.0 GC調優參數
- 2.1GC排查系列
- 2.2 內存泄漏和內存溢出
- 2.2.3 深入理解JVM-hotspot虛擬機對象探秘
- 1.10 并發的可達性分析相關問題
- 二.Java集合架構
- 1.ArrayList深入源碼分析
- 2.Vector深入源碼分析
- 3.LinkedList深入源碼分析
- 4.HashMap深入源碼分析
- 5.ConcurrentHashMap深入源碼分析
- 6.HashSet,LinkedHashSet 和 LinkedHashMap
- 7.容器中的設計模式
- 8.集合架構之面試指南
- 9.TreeSet和TreeMap
- 三.Java基礎
- 1.基礎概念
- 1.1 Java程序初始化的順序是怎么樣的
- 1.2 Java和C++的區別
- 1.3 反射
- 1.4 注解
- 1.5 泛型
- 1.6 字節與字符的區別以及訪問修飾符
- 1.7 深拷貝與淺拷貝
- 1.8 字符串常量池
- 2.面向對象
- 3.關鍵字
- 4.基本數據類型與運算
- 5.字符串與數組
- 6.異常處理
- 7.Object 通用方法
- 8.Java8
- 8.1 Java 8 Tutorial
- 8.2 Java 8 數據流(Stream)
- 8.3 Java 8 并發教程:線程和執行器
- 8.4 Java 8 并發教程:同步和鎖
- 8.5 Java 8 并發教程:原子變量和 ConcurrentMap
- 8.6 Java 8 API 示例:字符串、數值、算術和文件
- 8.7 在 Java 8 中避免 Null 檢查
- 8.8 使用 Intellij IDEA 解決 Java 8 的數據流問題
- 四.Java 并發編程
- 1.線程的實現/創建
- 2.線程生命周期/狀態轉換
- 3.線程池
- 4.線程中的協作、中斷
- 5.Java鎖
- 5.1 樂觀鎖、悲觀鎖和自旋鎖
- 5.2 Synchronized
- 5.3 ReentrantLock
- 5.4 公平鎖和非公平鎖
- 5.3.1 說說ReentrantLock的實現原理,以及ReentrantLock的核心源碼是如何實現的?
- 5.5 鎖優化和升級
- 6.多線程的上下文切換
- 7.死鎖的產生和解決
- 8.J.U.C(java.util.concurrent)
- 0.簡化版(快速復習用)
- 9.鎖優化
- 10.Java 內存模型(JMM)
- 11.ThreadLocal詳解
- 12 CAS
- 13.AQS
- 0.ArrayBlockingQueue和LinkedBlockingQueue的實現原理
- 1.DelayQueue的實現原理
- 14.Thread.join()實現原理
- 15.PriorityQueue 的特性和原理
- 16.CyclicBarrier的實際使用場景
- 五.Java I/O NIO
- 1.I/O模型簡述
- 2.Java NIO之緩沖區
- 3.JAVA NIO之文件通道
- 4.Java NIO之套接字通道
- 5.Java NIO之選擇器
- 6.基于 Java NIO 實現簡單的 HTTP 服務器
- 7.BIO-NIO-AIO
- 8.netty(一)
- 9.NIO面試題
- 六.Java設計模式
- 1.單例模式
- 2.策略模式
- 3.模板方法
- 4.適配器模式
- 5.簡單工廠
- 6.門面模式
- 7.代理模式
- 七.數據結構和算法
- 1.什么是紅黑樹
- 2.二叉樹
- 2.1 二叉樹的前序、中序、后序遍歷
- 3.排序算法匯總
- 4.java實現鏈表及鏈表的重用操作
- 4.1算法題-鏈表反轉
- 5.圖的概述
- 6.常見的幾道字符串算法題
- 7.幾道常見的鏈表算法題
- 8.leetcode常見算法題1
- 9.LRU緩存策略
- 10.二進制及位運算
- 10.1.二進制和十進制轉換
- 10.2.位運算
- 11.常見鏈表算法題
- 12.算法好文推薦
- 13.跳表
- 八.Spring 全家桶
- 1.Spring IOC
- 2.Spring AOP
- 3.Spring 事務管理
- 4.SpringMVC 運行流程和手動實現
- 0.Spring 核心技術
- 5.spring如何解決循環依賴問題
- 6.springboot自動裝配原理
- 7.Spring中的循環依賴解決機制中,為什么要三級緩存,用二級緩存不夠嗎
- 8.beanFactory和factoryBean有什么區別
- 九.數據庫
- 1.mybatis
- 1.1 MyBatis-# 與 $ 區別以及 sql 預編譯
- Mybatis系列1-Configuration
- Mybatis系列2-SQL執行過程
- Mybatis系列3-之SqlSession
- Mybatis系列4-之Executor
- Mybatis系列5-StatementHandler
- Mybatis系列6-MappedStatement
- Mybatis系列7-參數設置揭秘(ParameterHandler)
- Mybatis系列8-緩存機制
- 2.淺談聚簇索引和非聚簇索引的區別
- 3.mysql 證明為什么用limit時,offset很大會影響性能
- 4.MySQL中的索引
- 5.數據庫索引2
- 6.面試題收集
- 7.MySQL行鎖、表鎖、間隙鎖詳解
- 8.數據庫MVCC詳解
- 9.一條SQL查詢語句是如何執行的
- 10.MySQL 的 crash-safe 原理解析
- 11.MySQL 性能優化神器 Explain 使用分析
- 12.mysql中,一條update語句執行的過程是怎么樣的?期間用到了mysql的哪些log,分別有什么作用
- 十.Redis
- 0.快速復習回顧Redis
- 1.通俗易懂的Redis數據結構基礎教程
- 2.分布式鎖(一)
- 3.分布式鎖(二)
- 4.延時隊列
- 5.位圖Bitmaps
- 6.Bitmaps(位圖)的使用
- 7.Scan
- 8.redis緩存雪崩、緩存擊穿、緩存穿透
- 9.Redis為什么是單線程、及高并發快的3大原因詳解
- 10.布隆過濾器你值得擁有的開發利器
- 11.Redis哨兵、復制、集群的設計原理與區別
- 12.redis的IO多路復用
- 13.相關redis面試題
- 14.redis集群
- 十一.中間件
- 1.RabbitMQ
- 1.1 RabbitMQ實戰,hello world
- 1.2 RabbitMQ 實戰,工作隊列
- 1.3 RabbitMQ 實戰, 發布訂閱
- 1.4 RabbitMQ 實戰,路由
- 1.5 RabbitMQ 實戰,主題
- 1.6 Spring AMQP 的 AMQP 抽象
- 1.7 Spring AMQP 實戰 – 整合 RabbitMQ 發送郵件
- 1.8 RabbitMQ 的消息持久化與 Spring AMQP 的實現剖析
- 1.9 RabbitMQ必備核心知識
- 2.RocketMQ 的幾個簡單問題與答案
- 2.Kafka
- 2.1 kafka 基礎概念和術語
- 2.2 Kafka的重平衡(Rebalance)
- 2.3.kafka日志機制
- 2.4 kafka是pull還是push的方式傳遞消息的?
- 2.5 Kafka的數據處理流程
- 2.6 Kafka的腦裂預防和處理機制
- 2.7 Kafka中partition副本的Leader選舉機制
- 2.8 如果Leader掛了的時候,follower沒來得及同步,是否會出現數據不一致
- 2.9 kafka的partition副本是否會出現腦裂情況
- 十二.Zookeeper
- 0.什么是Zookeeper(漫畫)
- 1.使用docker安裝Zookeeper偽集群
- 3.ZooKeeper-Plus
- 4.zk實現分布式鎖
- 5.ZooKeeper之Watcher機制
- 6.Zookeeper之選舉及數據一致性
- 十三.計算機網絡
- 1.進制轉換:二進制、八進制、十六進制、十進制之間的轉換
- 2.位運算
- 3.計算機網絡面試題匯總1
- 十四.Docker
- 100.面試題收集合集
- 1.美團面試常見問題總結
- 2.b站部分面試題
- 3.比心面試題
- 4.騰訊面試題
- 5.哈羅部分面試
- 6.筆記
- 十五.Storm
- 1.Storm和流處理簡介
- 2.Storm 核心概念詳解
- 3.Storm 單機版本環境搭建
- 4.Storm 集群環境搭建
- 5.Storm 編程模型詳解
- 6.Storm 項目三種打包方式對比分析
- 7.Storm 集成 Redis 詳解
- 8.Storm 集成 HDFS 和 HBase
- 9.Storm 集成 Kafka
- 十六.Elasticsearch
- 1.初識ElasticSearch
- 2.文檔基本CRUD、集群健康檢查
- 3.shard&replica
- 4.document核心元數據解析及ES的并發控制
- 5.document的批量操作及數據路由原理
- 6.倒排索引
- 十七.分布式相關
- 1.分布式事務解決方案一網打盡
- 2.關于xxx怎么保證高可用的問題
- 3.一致性hash原理與實現
- 4.微服務注冊中心 Nacos 比 Eureka的優勢
- 5.Raft 協議算法
- 6.為什么微服務架構中需要網關
- 0.CAP與BASE理論
- 十八.Dubbo
- 1.快速掌握Dubbo常規應用
- 2.Dubbo應用進階
- 3.Dubbo調用模塊詳解
- 4.Dubbo調用模塊源碼分析
- 6.Dubbo協議模塊