### AQS(AbstractQueuedSynchronizer)
AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于實現基于FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計為作為一些可用原子int值來表示狀態的同步器的基類。如果你有看過類似 CountDownLatch 類的源碼實現,會發現其內部有一個繼承了 AbstractQueuedSynchronizer 的內部類 Sync;可見CountDownLatch 是基于AQS框架來實現的一個同步器
如JDK的文檔中所說,使用AQS來實現一個同步器需要覆蓋實現如下幾個方法,并且使用getState,setState,compareAndSetState這幾個方法來設置獲取狀態
基于AQS構建的同步器:
* ReentrantLock
* ReadWriteLock
* Semaphore
* ReentrantReadWriteLock
* CountDownLatch
* SynchronousQueue
* FutureTask
```
public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java.io.Serializable {
//等待隊列的頭節點
private transient volatile Node head;
//等待隊列的尾節點
private transient volatile Node tail;
//同步狀態
private volatile int state;
protected final int getState() { return state;}
protected final void setState(int newState) { state = newState;}
...
}
```
隊列同步器AQS是用來構建鎖或其他同步組件的基礎框架,內部使用一個int成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作,其中內部狀態state,等待隊列的頭節點head和尾節點head,都是通過volatile修飾,保證了多線程之間的可見
### 實現原理
AQS原理:AQS就是一個同步器,要做的事情就相當于一個鎖,所以就會有兩個動作:一個是獲取,一個是釋放。獲取釋放的時候該有一個東西來記住他是被用還是沒被用,這個東西就是一個狀態。如果鎖被獲取了,也就是被用了,還有很多其他的要來獲取鎖,總不能給全部拒絕了,這時候就需要他們排隊,這里就需要一個隊列。這大概就清楚了AQS的主要構成了:
* 獲取和釋放兩個動作
* 同步狀態(原子操作)
* 阻塞隊列
CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀態的管理,當前線程如果獲取同步狀態失敗時,AQS則會將當前線程等待狀態等信息構造成一個節點(Node)并將其加入到CLH同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態,在CLH同步隊列中,一個節點表示一個線程,它保存著線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、后繼節點(next),其定義如下:
```
static final class Node {
/** 共享 */
static final Node SHARED = new Node();
/** 獨占 */
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
/** 前驅節點 */
volatile Node prev;
/** 后繼節點 */
volatile Node next;
/** 獲取同步狀態的線程 */
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
```
結構圖:

黃色節點是默認head節點,其實是一個空節點,我覺得可以理解成代表當前持有鎖的線程,每當有線程競爭失敗,都是插入到隊列的尾節點,tail節點始終指向隊列中的最后一個元素。
每個節點中, 除了存儲了當前線程,前后節點的引用以外,還有一個waitStatus變量,用于描述節點當前的狀態。多線程并發執行時,隊列中會有多個節點存在,這個waitStatus其實代表對應線程的狀態:有的線程可能獲取鎖因為某些原因放棄競爭;有的線程在等待滿足條件,滿足之后才能執行等等。一共有4中狀態:
* CANCELLED 取消狀態
* SIGNAL 等待觸發狀態
* CONDITION 等待條件狀態
* PROPAGATE 狀態需要向后傳播
等待隊列是FIFO先進先出,只有前一個節點的狀態為SIGNAL時,當前節點的線程才能被掛起;
### 線程獲取鎖過程
下列步驟中線程A和B進行競爭:
1.線程A執行CAS執行成功,state值被修改并返回true,線程A繼續執行。
2.線程A執行CAS指令失敗,說明線程B也在執行CAS指令且成功,這種情況下線程A會執行步驟3。
3.生成新Node節點node,并通過CAS指令插入到等待隊列的隊尾(同一時刻可能會有多個Node節點插入到等待隊列中),如果tail節點為空,則將head節點指向一個空節點(代表線程B),具體實現如下:
```
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
```
4.node插入到隊尾后,該線程不會立馬掛起,會進行自旋操作。因為在node的插入過程,線程B(即之前沒有阻塞的線程)可能已經執行完成,所以要判斷該node的前一個節點pred是否為head節點(代表線程B),如果pred == head,表明當前節點是隊列中第一個“有效的”節點,因此再次嘗試tryAcquire獲取鎖:
a)如果成功獲取到鎖,表明線程B已經執行完成,線程A不需要掛起;
b)如果獲取失敗,表示線程B還未完成,至少還未修改state值。跳到步驟5
```
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
```
5.前面我們已經說過只有前一個節點pred的線程狀態為SIGNAL時,當前節點的線程才能被掛起;
a)如果pred的waitStatus == 0,則通過CAS指令修改waitStatus為Node.SIGNAL;
b)如果pred的waitStatus > 0,表明pred的線程狀態CANCELLED,需從隊列中刪除;
c)如果pred的waitStatus為Node.SIGNAL,則通過LockSupport.park\(\)方法把線程A掛起,并等待被喚醒,被喚醒后進入步驟6;
```
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
```
6.線程每次被喚醒時,都要進行中斷檢測,如果發現當前線程被中斷,那么拋出InterruptedException并退出循環。從無限循環的代碼可以看出,并不是被喚醒的線程一定能獲得鎖,必須調用tryAccquire重新競爭,因為鎖是非公平的,有可能被新加入的線程獲得,從而導致剛被喚醒的線程再次被阻塞,這個細節充分體現了“非公平”的精髓;
### 線程釋放鎖過程
1. 如果頭結點head的waitStatus值為-1,則用CAS指令重置為0;
2. 找到waitStatus值小于0的節點s,通過LockSupport.unpark\(s.thread\)喚醒線程。
```
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
```
【參考資料】
[http://cxis.me/2017/03/23/JUC中AQS簡介/](http://cxis.me/2017/03/23/JUC中AQS簡介/)
https://www.jianshu.com/p/d8eeb31bee5c
- 簡介
- 概述
- 進程vs線程
- 資源限制
- 有關并行的兩個定律
- 線程同步和阻塞
- 線程阻塞
- 線程的特性
- 守護線程
- 線程異常
- Thread
- 線程狀態
- 線程中斷
- wait¬ify
- suspend&resume
- join&yield
- notify¬ifyAll
- Thread.sleep
- 線程任務
- Runnable
- Callable
- Future模式
- FutureTask
- 線程實現方式
- 內核線程實現
- 用戶線程實現
- 混合實現
- Java線程的實現
- java與協程
- 纖程-Fiber
- 線程調度
- 多線程協作方式
- 阻塞
- 放棄
- 休眠
- 連接線程
- 線程估算公式
- 線程活躍性
- 死鎖
- 線程安全性
- 對象的發布與逸出
- 構造方法溢出
- 線程封閉
- 對象的可變性
- 原子性
- 原子操作
- CPU原子操作原理
- 總線鎖
- 緩存鎖
- JAVA如何實現原子操作
- long和double讀寫操作原子性
- Adder和Accumulator
- 線程性能
- 同步工具類
- 閉鎖
- CountDownLatch
- FutureTask
- 信號量
- 柵欄
- CyclicBarrier
- Exchanger
- 并發編程
- volatile
- synchronized
- 無鎖
- 偏向鎖
- 輕量級鎖
- 鎖的優缺點對比
- 鎖升級
- 鎖消除
- Monitor
- synchronized語法
- Mutex Lock
- synchronized實踐問題
- synchronized&ReentrantLock
- Lock
- ReentrantLock
- Condition
- 讀寫鎖
- ReadWriteLock
- StampedLock
- 線程池
- Executor
- ExecutorService
- Executors
- ThreadPoolExecutor
- RejectedExecutionHandler
- ThreadFactory
- 線程池大小公式
- 動態調整線程池大小
- Fork/Join框架
- ForkJoinPool
- CompletableFuture
- JUC并發工具包
- LockSupport
- 延時任務與周期任務
- Timer
- TimerTask
- 異構任務并行化
- CompletionService
- volatile和synchronized比較
- 鎖優化
- 鎖相關概念
- 悲觀鎖(排它鎖)
- 樂觀鎖
- 自旋鎖
- 樂觀鎖vs悲觀鎖
- JVM鎖優化-鎖消除
- ThreadLocal
- InheritableThreadLocal
- TransmittableThreadLocal
- ThreadLocalRandom
- 無鎖
- AtomicInteger
- Unsafe
- AtomicReference
- AtomicStampedReference
- AtomicIntegerArray
- AtomicIntegerFieldUpdater
- 無鎖Vector
- LongAdder
- LongAccumulator
- 常見鎖類型
- 悲觀鎖&獨占鎖
- 樂觀鎖
- 樂觀鎖vs悲觀鎖
- 自旋鎖vs適應性自旋鎖
- 公平鎖vs非公平鎖
- 可重入鎖vs非可重入鎖
- 獨享鎖vs共享鎖
- 互斥鎖
- CAS
- AQS介紹
- AQS深入剖析
- AQS框架
- AQS核心思想
- AQS數據結構
- 同步狀態State
- ReentrantLock vs AQS
- AQS與ReentrantLock的關聯
- ReentrantLock具體實現
- 線程加入等待隊列
- 等待隊列中線程出隊列時機
- 如何解鎖
- 中斷恢復后的執行流程
- ReentrantLock的可重入應用
- JUC中的應用場景
- 自定義同步工具
- CLH鎖
- 并發框架
- Akka
- Disruptor-無鎖緩存框架
- 常見面試題
- 兩個線程交替打印A和B
- 附錄