在今天這一講中,我來分析一下并發包內部的組成,一起來看看各種同步結構、線程池等,是基于什么原理來設計和實現的。
今天我要問你的問題是,AtomicInteger 底層實現原理是什么?如何在自己的產品代碼中應用 CAS 操作?
## 典型回答
AtomicIntger 是對 int 類型的一個封裝,提供原子性的訪問和更新操作,其原子性操作的實現是基于 CAS([compare-and-swap](https://en.wikipedia.org/wiki/Compare-and-swap))技術。
所謂 CAS,表征的是一些列操作的集合,獲取當前數值,進行一些運算,利用 CAS 指令試圖進行更新。如果當前數值未變,代表沒有其他線程進行并發修改,則成功更新。否則,可能出現不同的選擇,要么進行重試,要么就返回一個成功或者失敗的結果。
從 AtomicInteger 的內部屬性可以看出,它依賴于 Unsafe 提供的一些底層能力,進行底層操作;以 volatile 的 value 字段,記錄數值,以保證可見性。
~~~
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;
~~~
具體的原子操作細節,可以參考任意一個原子更新方法,比如下面的 getAndIncrement。
Unsafe 會利用 value 字段的內存地址偏移,直接完成操作。
~~~
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
~~~
因為 getAndIncrement 需要返歸數值,所以需要添加失敗重試邏輯。
~~~
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
~~~
而類似 compareAndSet 這種返回 boolean 類型的函數,因為其返回值表現的就是成功與否,所以不需要重試。
~~~
public final boolean compareAndSet(int expectedValue, int newValue)
~~~
CAS 是 Java 并發中所謂 lock-free 機制的基礎。
## 考點分析
今天的問題有點偏向于 Java 并發機制的底層了,雖然我們在開發中未必會涉及 CAS 的實現層面,但是理解其機制,掌握如何在 Java 中運用該技術,還是十分有必要的,尤其是這也是個并發編程的面試熱點。
有的同學反饋面試官會問 CAS 更加底層是如何實現的,這依賴于 CPU 提供的特定指令,具體根據體系結構的不同還存在著明顯區別。比如,x86 CPU 提供 cmpxchg 指令;而在精簡指令集的體系架構中,則通常是靠一對兒指令(如“load and reserve”和“store conditional”)實現的,在大多數處理器上 CAS 都是個非常輕量級的操作,這也是其優勢所在。
大部分情況下,掌握到這個程度也就夠用了,我認為沒有必要讓每個 Java 工程師都去了解到指令級別,我們進行抽象、分工就是為了讓不同層面的開發者在開發中,可以盡量屏蔽不相關的細節。
如果我作為面試官,很有可能深入考察這些方向:
* 在什么場景下,可以采用 CAS 技術,調用 Unsafe 畢竟不是大多數場景的最好選擇,有沒有更加推薦的方式呢?畢竟我們掌握一個技術,cool 不是目的,更不是為了應付面試,我們還是希望能在實際產品中有價值。
* 對 ReentrantLock、CyclicBarrier 等并發結構底層的實現技術的理解。
## 知識擴展
關于 CAS 的使用,你可以設想這樣一個場景:在數據庫產品中,為保證索引的一致性,一個常見的選擇是,保證只有一個線程能夠排他性地修改一個索引分區,如何在數據庫抽象層面實現呢?
可以考慮為索引分區對象添加一個邏輯上的鎖,例如,以當前獨占的線程 ID 作為鎖的數值,然后通過原子操作設置 lock 數值,來實現加鎖和釋放鎖,偽代碼如下:
~~~
public class AtomicBTreePartition {
private volatile long lock;
public void acquireLock(){}
public void releaseeLock(){}
}
~~~
那么在 Java 代碼中,我們怎么實現鎖操作呢?Unsafe 似乎不是個好的選擇,例如,我就注意到類似 Cassandra 等產品,因為 Java 9 中移除了 Unsafe.moniterEnter()/moniterExit(),導致無法平滑升級到新的 JDK 版本。目前 Java 提供了兩種公共 API,可以實現這種 CAS 操作,比如使用 java.util.concurrent.atomic.AtomicLongFieldUpdater,它是基于反射機制創建,我們需要保證類型和字段名稱正確。
~~~
private static final AtomicLongFieldUpdater<AtomicBTreePartition> lockFieldUpdater =
AtomicLongFieldUpdater.newUpdater(AtomicBTreePartition.class, "lock");
private void acquireLock(){
long t = Thread.currentThread().getId();
while (!lockFieldUpdater.compareAndSet(this, 0L, t)){
// 等待一會兒,數據庫操作可能比較慢
…
}
}
~~~
[Atomic 包](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/atomic/package-summary.html)提供了最常用的原子性數據類型,甚至是引用、數組等相關原子類型和更新操作工具,是很多線程安全程序的首選。
我在專欄第七講中曾介紹使用原子數據類型和 Atomic\*FieldUpdater,創建更加緊湊的計數器實現,以替代 AtomicLong。優化永遠是針對特定需求、特定目的,我這里的側重點是介紹可能的思路,具體還是要看需求。如果僅僅創建一兩個對象,其實完全沒有必要進行前面的優化,但是如果對象成千上萬或者更多,就要考慮緊湊性的影響了。而 atomic 包提供的[LongAdder](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/atomic/LongAdder.html),在高度競爭環境下,可能就是比 AtomicLong 更佳的選擇,盡管它的本質是空間換時間。
回歸正題,如果是 Java 9 以后,我們完全可以采用另外一種方式實現,也就是 Variable Handle API,這是源自于[JEP 193](http://openjdk.java.net/jeps/193),提供了各種粒度的原子或者有序性的操作等。我將前面的代碼修改為如下實現:
~~~
private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle
(AtomicBTreePartition.class, "lock");
private void acquireLock(){
long t = Thread.currentThread().getId();
while (!HANDLE.compareAndSet(this, 0L, t)){
// 等待一會兒,數據庫操作可能比較慢
…
}
}
~~~
過程非常直觀,首先,獲取相應的變量句柄,然后直接調用其提供的 CAS 方法。
一般來說,我們進行的類似 CAS 操作,可以并且推薦使用 Variable Handle API 去實現,其提供了精細粒度的公共底層 API。我這里強調公共,是因為其 API 不會像內部 API 那樣,發生不可預測的修改,這一點提供了對于未來產品維護和升級的基礎保障,坦白說,很多額外工作量,都是源于我們使用了 Hack 而非 Solution 的方式解決問題。
CAS 也并不是沒有副作用,試想,其常用的失敗重試機制,隱含著一個假設,即競爭情況是短暫的。大多數應用場景中,確實大部分重試只會發生一次就獲得了成功,但是總是有意外情況,所以在有需要的時候,還是要考慮限制自旋的次數,以免過度消耗 CPU。
另外一個就是著名的[ABA](https://en.wikipedia.org/wiki/ABA_problem)問題,這是通常只在 lock-free 算法下暴露的問題。我前面說過 CAS 是在更新時比較前值,如果對方只是恰好相同,例如期間發生了 A -> B -> A 的更新,僅僅判斷數值是 A,可能導致不合理的修改操作。針對這種情況,Java 提供了 AtomicStampedReference 工具類,通過為引用建立類似版本號(stamp)的方式,來保證 CAS 的正確性,具體用法請參考這里的[介紹](http://tutorials.jenkov.com/java-util-concurrent/atomicstampedreference.html)。
前面介紹了 CAS 的場景與實現,幸運的是,大多數情況下,Java 開發者并不需要直接利用 CAS 代碼去實現線程安全容器等,更多是通過并發包等間接享受到 lock-free 機制在擴展性上的好處。
下面我來介紹一下 AbstractQueuedSynchronizer(AQS),其是 Java 并發包中,實現各種同步結構和部分其他組成單元(如線程池中的 Worker)的基礎。
學習 AQS,如果上來就去看它的一系列方法(下圖所示),很有可能把自己看暈,這種似懂非懂的狀態也沒有太大的實踐意義。
我建議的思路是,盡量簡化一下,理解為什么需要 AQS,如何使用 AQS,**至少**要做什么,再進一步結合 JDK 源代碼中的實踐,理解 AQS 的原理與應用。
[Doug Lea](https://en.wikipedia.org/wiki/Doug_Lea)曾經介紹過 AQS 的設計初衷。從原理上,一種同步結構往往是可以利用其他的結構實現的,例如我在專欄第 19 講中提到過可以使用 Semaphore 實現互斥鎖。但是,對某種同步結構的傾向,會導致復雜、晦澀的實現邏輯,所以,他選擇了將基礎的同步相關操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 為我們構建同步結構提供了范本。
AQS 內部數據和方法,可以簡單拆分為:
* 一個 volatile 的整數成員表征狀態,同時提供了 setState 和 getState 方法
~~~
private volatile int state;
~~~
* 一個先入先出(FIFO)的等待線程隊列,以實現多線程間競爭和等待,這是 AQS 機制的核心之一。
* 各種基于 CAS 的基礎操作方法,以及各種期望具體同步結構去實現的 acquire/release 方法。
利用 AQS 實現一個同步結構,至少要實現兩個基本類型的方法,分別是 acquire 操作,獲取資源的獨占權;還有就是 release 操作,釋放對某個資源的獨占。
以 ReentrantLock 為例,它內部通過擴展 AQS 實現了 Sync 類型,以 AQS 的 state 來反映鎖的持有情況。
~~~
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer { …}
~~~
下面是 ReentrantLock 對應 acquire 和 release 操作,如果是 CountDownLatch 則可以看作是 await()/countDown(),具體實現也有區別。
~~~
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
~~~
排除掉一些細節,整體地分析 acquire 方法邏輯,其直接實現是在 AQS 內部,調用了 tryAcquire 和 acquireQueued,這是兩個需要搞清楚的基本部分。
~~~
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
~~~
首先,我們來看看 tryAcquire。在 ReentrantLock 中,tryAcquire 邏輯實現在 NonfairSync 和 FairSync 中,分別提供了進一步的非公平或公平性方法,而 AQS 內部 tryAcquire 僅僅是個接近未實現的方法(直接拋異常),這是留個實現者自己定義的操作。
我們可以看到公平性在 ReentrantLock 構建時如何指定的,具體如下:
~~~
public ReentrantLock() {
sync = new NonfairSync(); // 默認是非公平的
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
~~~
以非公平的 tryAcquire 為例,其內部實現了如何配合狀態與 CAS 獲取鎖,注意,對比公平版本的 tryAcquire,它在鎖無人占有時,并不檢查是否有其他等待者,這里體現了非公平的語義。
~~~
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();// 獲取當前 AQS 內部狀態量
if (c == 0) { // 0 表示無人占有,則直接用 CAS 修改狀態位,
if (compareAndSetState(0, acquires)) {// 不檢查排隊情況,直接爭搶
setExclusiveOwnerThread(current); // 并設置當前線程獨占鎖
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 即使狀態不是 0,也可能當前線程是鎖持有者,因為這是再入鎖
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
~~~
接下來我再來分析 acquireQueued,如果前面的 tryAcquire 失敗,代表著鎖爭搶失敗,進入排隊競爭階段。這里就是我們所說的,利用 FIFO 隊列,實現線程間對鎖的競爭的部分,算是是 AQS 的核心邏輯。
當前線程會被包裝成為一個排他模式的節點(EXCLUSIVE),通過 addWaiter 方法添加到隊列中。acquireQueued 的邏輯,簡要來說,就是如果當前節點的前面是頭節點,則試圖獲取鎖,一切順利則成為新的頭節點;否則,有必要則等待,具體處理邏輯請參考我添加的注釋。
~~~
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {// 循環
final Node p = node.predecessor();// 獲取前一個節點
if (p == head && tryAcquire(arg)) { // 如果前一個節點是頭結點,表示當前節點合適去 tryAcquire
setHead(node); // acquire 成功,則設置新的頭節點
p.next = null; // 將前面節點對當前節點的引用清空
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)) // 檢查是否失敗后需要 park
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);// 出現異常,取消
if (interrupted)
selfInterrupt();
throw t;
}
}
~~~
到這里線程試圖獲取鎖的過程基本展現出來了,tryAcquire 是按照特定場景需要開發者去實現的部分,而線程間競爭則是 AQS 通過 Waiter 隊列與 acquireQueued 提供的,在 release 方法中,同樣會對隊列進行對應操作。
今天我介紹了 Atomic 數據類型的底層技術 CAS,并通過實例演示了如何在產品代碼中利用 CAS,最后介紹了并發包的基礎技術 AQS,希望對你有所幫助。
## 一課一練
關于今天我們討論的題目你做到心中有數了嗎?今天布置一個源碼閱讀作業,AQS 中 Node 的 waitStatus 有什么作用?
- 前言
- 開篇詞
- 開篇詞 -以面試題為切入點,有效提升你的Java內功
- 模塊一 Java基礎
- 第1講 談談你對Java平臺的理解?
- 第2講 Exception和Error有什么區別?
- 第3講 談談final、finally、 finalize有什么不同?
- 第4講 強引用、軟引用、弱引用、幻象引用有什么區別?
- 第5講 String、StringBuffer、StringBuilder有什么區別?
- 第6講 動態代理是基于什么原理?
- 第7講 int和Integer有什么區別?
- 第8講 對比Vector、ArrayList、LinkedList有何區別?
- 第9講 對比Hashtable、HashMap、TreeMap有什么不同?
- 第10講 如何保證集合是線程安全的? ConcurrentHashMap如何實現高效地線程安全?
- 第11講 Java提供了哪些IO方式? NIO如何實現多路復用?
- 第12講 Java有幾種文件拷貝方式?哪一種最高效?
- 第13講 談談接口和抽象類有什么區別?
- 第14講 談談你知道的設計模式?
- 模塊二 Java進階
- 第15講 synchronized和ReentrantLock有什么區別呢?
- 第16講 synchronized底層如何實現?什么是鎖的升級、降級?
- 第17講 一個線程兩次調用start()方法會出現什么情況?
- 第18講 什么情況下Java程序會產生死鎖?如何定位、修復?
- 第19講 Java并發包提供了哪些并發工具類?
- 第20講 并發包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什么區別?
- 第21講 Java并發類庫提供的線程池有哪幾種? 分別有什么特點?
- 第22講 AtomicInteger底層實現原理是什么?如何在自己的產品代碼中應用CAS操作?
- 第23講 請介紹類加載過程,什么是雙親委派模型?
- 第24講 有哪些方法可以在運行時動態生成一個Java類?
- 第25講 談談JVM內存區域的劃分,哪些區域可能發生OutOfMemoryError?
- 第26講 如何監控和診斷JVM堆內和堆外內存使用?
- 第27講 Java常見的垃圾收集器有哪些?
- 第28講 談談你的GC調優思路?
- 第29講 Java內存模型中的happen-before是什么?
- 第30講 Java程序運行在Docker等容器環境有哪些新問題?
- 模塊三 Java安全基礎
- 第31講 你了解Java應用開發中的注入攻擊嗎?
- 第32講 如何寫出安全的Java代碼?
- 模塊四 Java性能基礎
- 第33講 后臺服務出現明顯“變慢”,談談你的診斷思路?
- 第34講 有人說“Lambda能讓Java程序慢30倍”,你怎么看?
- 第35講 JVM優化Java代碼時都做了什么?
- 模塊五 Java應用開發擴展
- 第36講 談談MySQL支持的事務隔離級別,以及悲觀鎖和樂觀鎖的原理和應用場景?
- 第37講 談談Spring Bean的生命周期和作用域?
- 第38講 對比Java標準NIO類庫,你知道Netty是如何實現更高性能的嗎?
- 第39講 談談常用的分布式ID的設計方案?Snowflake是否受冬令時切換影響?
- 周末福利
- 周末福利 談談我對Java學習和面試的看法
- 周末福利 一份Java工程師必讀書單
- 結束語
- 結束語 技術沒有終點
- 結課測試 Java核心技術的這些知識,你真的掌握了嗎?