我在之前兩講介紹了 Java 集合框架的典型容器類,它們絕大部分都不是線程安全的,僅有的線程安全實現,比如 Vector、Stack,在性能方面也遠不盡如人意。幸好 Java 語言提供了并發包(java.util.concurrent),為高度并發需求提供了更加全面的工具支持。
今天我要問你的問題是,如何保證容器是線程安全的?ConcurrentHashMap 如何實現高效地線程安全?
## 典型回答
Java 提供了不同層面的線程安全支持。在傳統集合框架內部,除了 Hashtable 等同步容器,還提供了所謂的同步包裝器(Synchronized Wrapper),我們可以調用 Collections 工具類提供的包裝方法,來獲取一個同步的包裝容器(如 Collections.synchronizedMap),但是它們都是利用非常粗粒度的同步方式,在高并發情況下,性能比較低下。
另外,更加普遍的選擇是利用并發包提供的線程安全容器類,它提供了:
* 各種并發容器,比如 ConcurrentHashMap、CopyOnWriteArrayList。
* 各種線程安全隊列(Queue/Deque),如 ArrayBlockingQueue、SynchronousQueue。
* 各種有序容器的線程安全版本等。
具體保證線程安全的方式,包括有從簡單的 synchronize 方式,到基于更加精細化的,比如基于分離鎖實現的 ConcurrentHashMap 等并發實現等。具體選擇要看開發的場景需求,總體來說,并發包內提供的容器通用場景,遠優于早期的簡單同步實現。
## 考點分析
談到線程安全和并發,可以說是 Java 面試中必考的考點,我上面給出的回答是一個相對寬泛的總結,而且 ConcurrentHashMap 等并發容器實現也在不斷演進,不能一概而論。
如果要深入思考并回答這個問題及其擴展方面,至少需要:
* 理解基本的線程安全工具。
* 理解傳統集合框架并發編程中 Map 存在的問題,清楚簡單同步方式的不足。
* 梳理并發包內,尤其是 ConcurrentHashMap 采取了哪些方法來提高并發表現。
* 最好能夠掌握 ConcurrentHashMap 自身的演進,目前的很多分析資料還是基于其早期版本。
今天我主要是延續專欄之前兩講的內容,重點解讀經常被同時考察的 HashMap 和 ConcurrentHashMap。今天這一講并不是對并發方面的全面梳理,畢竟這也不是專欄一講可以介紹完整的,算是個開胃菜吧,類似 CAS 等更加底層的機制,后面會在 Java 進階模塊中的并發主題有更加系統的介紹。
## 知識擴展
1\. 為什么需要 ConcurrentHashMap?
Hashtable 本身比較低效,因為它的實現基本就是將 put、get、size 等各種方法加上“synchronized”。簡單來說,這就導致了所有并發操作都要競爭同一把鎖,一個線程在進行同步操作時,其他線程只能等待,大大降低了并發操作的效率。
前面已經提過 HashMap 不是線程安全的,并發情況會導致類似 CPU 占用 100% 等一些問題,那么能不能利用 Collections 提供的同步包裝器來解決問題呢?
看看下面的代碼片段,我們發現同步包裝器只是利用輸入 Map 構造了另一個同步版本,所有操作雖然不再聲明成為 synchronized 方法,但是還是利用了“this”作為互斥的 mutex,沒有真正意義上的改進!
~~~
private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private final Map<K,V> m; // Backing Map
final Object mutex; // Object on which to synchronize
// …
public int size() {
synchronized (mutex) {return m.size();}
}
// …
}
~~~
所以,Hashtable 或者同步包裝版本,都只是適合在非高度并發的場景下。
2.ConcurrentHashMap 分析
我們再來看看 ConcurrentHashMap 是如何設計實現的,為什么它能大大提高并發效率。
首先,我這里強調,**ConcurrentHashMap 的設計實現其實一直在演化**,比如在 Java 8 中就發生了非常大的變化(Java 7 其實也有不少更新),所以,我這里將比較分析結構、實現機制等方面,對比不同版本的主要區別。
早期 ConcurrentHashMap,其實現是基于:
* 分離鎖,也就是將內部進行分段(Segment),里面則是 HashEntry 的數組,和 HashMap 類似,哈希相同的條目也是以鏈表形式存放。
* HashEntry 內部使用 volatile 的 value 字段來保證可見性,也利用了不可變對象的機制以改進利用 Unsafe 提供的底層能力,比如 volatile access,去直接完成部分操作,以最優化性能,畢竟 Unsafe 中的很多操作都是 JVM intrinsic 優化過的。
你可以參考下面這個早期 ConcurrentHashMap 內部結構的示意圖,其核心是利用分段設計,在進行并發操作的時候,只需要鎖定相應段,這樣就有效避免了類似 Hashtable 整體同步的問題,大大提高了性能。

在構造的時候,Segment 的數量由所謂的 concurrentcyLevel 決定,默認是 16,也可以在相應構造函數直接指定。注意,Java 需要它是 2 的冪數值,如果輸入是類似 15 這種非冪值,會被自動調整到 16 之類 2 的冪數值。
具體情況,我們一起看看一些 Map 基本操作的[源碼](http://hg.openjdk.java.net/jdk7/jdk7/jdk/file/9b8c96f96a0f/src/share/classes/java/util/concurrent/ConcurrentHashMap.java),這是 JDK 7 比較新的 get 代碼。針對具體的優化部分,為方便理解,我直接注釋在代碼段里,get 操作需要保證的是可見性,所以并沒有什么同步邏輯。
~~~
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key.hashCode());
// 利用位操作替換普通數學運算
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 以 Segment 為單位,進行定位
// 利用 Unsafe 直接進行 volatile access
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 省略
}
return null;
}
~~~
而對于 put 操作,首先是通過二次哈希避免哈希沖突,然后以 Unsafe 調用方式,直接獲取相應的 Segment,然后進行線程安全的 put 操作:
~~~
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 二次哈希,以保證數據的分散性,避免哈希沖突
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
~~~
其核心邏輯實現在下面的內部方法中:
~~~
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// scanAndLockForPut 會去查找是否有 key 相同 Node
// 無論如何,確保獲取鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// 更新已有 value...
}
else {
// 放置 HashEntry 到特定位置,如果超過閾值,進行 rehash
// ...
}
}
} finally {
unlock();
}
return oldValue;
}
~~~
所以,從上面的源碼清晰的看出,在進行并發寫操作時:
* ConcurrentHashMap 會獲取再入鎖,以保證數據一致性,Segment 本身就是基于 ReentrantLock 的擴展實現,所以,在并發修改期間,相應 Segment 是被鎖定的。
* 在最初階段,進行重復性的掃描,以確定相應 key 值是否已經在數組里面,進而決定是更新還是放置操作,你可以在代碼里看到相應的注釋。重復掃描、檢測沖突是 ConcurrentHashMap 的常見技巧。
* 我在專欄上一講介紹 HashMap 時,提到了可能發生的擴容問題,在 ConcurrentHashMap 中同樣存在。不過有一個明顯區別,就是它進行的不是整體的擴容,而是單獨對 Segment 進行擴容,細節就不介紹了。
另外一個 Map 的 size 方法同樣需要關注,它的實現涉及分離鎖的一個副作用。
試想,如果不進行同步,簡單的計算所有 Segment 的總值,可能會因為并發 put,導致結果不準確,但是直接鎖定所有 Segment 進行計算,就會變得非常昂貴。其實,分離鎖也限制了 Map 的初始化等操作。
所以,ConcurrentHashMap 的實現是通過重試機制(RETRIES\_BEFORE\_LOCK,指定重試次數 2),來試圖獲得可靠值。如果沒有監控到發生變化(通過對比 Segment.modCount),就直接返回,否則獲取鎖進行操作。
下面我來對比一下,**在 Java 8 和之后的版本中,ConcurrentHashMap 發生了哪些變化呢?**
* 總體結構上,它的內部存儲變得和我在專欄上一講介紹的 HashMap 結構非常相似,同樣是大的桶(bucket)數組,然后內部也是一個個所謂的鏈表結構(bin),同步的粒度要更細致一些。
* 其內部仍然有 Segment 定義,但僅僅是為了保證序列化時的兼容性而已,不再有任何結構上的用處。
* 因為不再使用 Segment,初始化操作大大簡化,修改為 lazy-load 形式,這樣可以有效避免初始開銷,解決了老版本很多人抱怨的這一點。
* 數據存儲利用 volatile 來保證可見性。
* 使用 CAS 等操作,在特定場景進行無鎖并發操作。
* 使用 Unsafe、LongAdder 之類底層手段,進行極端情況的優化。
先看看現在的數據存儲內部實現,我們可以發現 Key 是 final 的,因為在生命周期中,一個條目的 Key 發生變化是不可能的;與此同時 val,則聲明為 volatile,以保證可見性。
~~~
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
// …
}
~~~
我這里就不再介紹 get 方法和構造函數了,相對比較簡單,直接看并發的 put 是如何實現的。
~~~
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 利用 CAS 去進行無鎖線程安全操作,如果 bin 是空的
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // 不加鎖,進行檢查
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) {
// 細粒度的同步修改操作...
}
}
// Bin 超過閾值,進行樹化
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
~~~
初始化操作實現在 initTable 里面,這是一個典型的 CAS 使用場景,利用 volatile 的 sizeCtl 作為互斥手段:如果發現競爭性的初始化,就 spin 在那里,等待條件恢復;否則利用 CAS 設置排他標志。如果成功則進行初始化;否則重試。
請參考下面代碼:
~~~
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果發現沖突,進行 spin 等待
if ((sc = sizeCtl) < 0)
Thread.yield();
// CAS 成功返回 true,則進入真正的初始化邏輯
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
~~~
當 bin 為空時,同樣是沒有必要鎖定,也是以 CAS 操作去放置。
你有沒有注意到,在同步邏輯上,它使用的是 synchronized,而不是通常建議的 ReentrantLock 之類,這是為什么呢?現代 JDK 中,synchronized 已經被不斷優化,可以不再過分擔心性能差異,另外,相比于 ReentrantLock,它可以減少內存消耗,這是個非常大的優勢。
與此同時,更多細節實現通過使用 Unsafe 進行了優化,例如 tabAt 就是直接利用 getObjectAcquire,避免間接調用的開銷。
~~~
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
~~~
再看看,現在是如何實現 size 操作的。[閱讀代碼](http://hg.openjdk.java.net/jdk/jdk/file/12fc7bf488ec/src/java.base/share/classes/java/util/concurrent/ConcurrentHashMap.java)你會發現,真正的邏輯是在 sumCount 方法中, 那么 sumCount 做了什么呢?
~~~
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
~~~
我們發現,雖然思路仍然和以前類似,都是分而治之的進行計數,然后求和處理,但實現卻基于一個奇怪的 CounterCell。 難道它的數值,就更加準確嗎?數據一致性是怎么保證的?
~~~
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
~~~
其實,對于 CounterCell 的操作,是基于 java.util.concurrent.atomic.LongAdder 進行的,是一種 JVM 利用空間換取更高效率的方法,利用了[Striped64](http://hg.openjdk.java.net/jdk/jdk/file/12fc7bf488ec/src/java.base/share/classes/java/util/concurrent/atomic/Striped64.java)內部的復雜邏輯。這個東西非常小眾,大多數情況下,建議還是使用 AtomicLong,足以滿足絕大部分應用的性能需求。
今天我從線程安全問題開始,概念性的總結了基本容器工具,分析了早期同步容器的問題,進而分析了 Java 7 和 Java 8 中 ConcurrentHashMap 是如何設計實現的,希望 ConcurrentHashMap 的并發技巧對你在日常開發可以有所幫助。
## 一課一練
關于今天我們討論的題目你做到心中有數了嗎?留一個道思考題給你,在產品代碼中,有沒有典型的場景需要使用類似 ConcurrentHashMap 這樣的并發容器呢?
- 前言
- 開篇詞
- 開篇詞 -以面試題為切入點,有效提升你的Java內功
- 模塊一 Java基礎
- 第1講 談談你對Java平臺的理解?
- 第2講 Exception和Error有什么區別?
- 第3講 談談final、finally、 finalize有什么不同?
- 第4講 強引用、軟引用、弱引用、幻象引用有什么區別?
- 第5講 String、StringBuffer、StringBuilder有什么區別?
- 第6講 動態代理是基于什么原理?
- 第7講 int和Integer有什么區別?
- 第8講 對比Vector、ArrayList、LinkedList有何區別?
- 第9講 對比Hashtable、HashMap、TreeMap有什么不同?
- 第10講 如何保證集合是線程安全的? ConcurrentHashMap如何實現高效地線程安全?
- 第11講 Java提供了哪些IO方式? NIO如何實現多路復用?
- 第12講 Java有幾種文件拷貝方式?哪一種最高效?
- 第13講 談談接口和抽象類有什么區別?
- 第14講 談談你知道的設計模式?
- 模塊二 Java進階
- 第15講 synchronized和ReentrantLock有什么區別呢?
- 第16講 synchronized底層如何實現?什么是鎖的升級、降級?
- 第17講 一個線程兩次調用start()方法會出現什么情況?
- 第18講 什么情況下Java程序會產生死鎖?如何定位、修復?
- 第19講 Java并發包提供了哪些并發工具類?
- 第20講 并發包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什么區別?
- 第21講 Java并發類庫提供的線程池有哪幾種? 分別有什么特點?
- 第22講 AtomicInteger底層實現原理是什么?如何在自己的產品代碼中應用CAS操作?
- 第23講 請介紹類加載過程,什么是雙親委派模型?
- 第24講 有哪些方法可以在運行時動態生成一個Java類?
- 第25講 談談JVM內存區域的劃分,哪些區域可能發生OutOfMemoryError?
- 第26講 如何監控和診斷JVM堆內和堆外內存使用?
- 第27講 Java常見的垃圾收集器有哪些?
- 第28講 談談你的GC調優思路?
- 第29講 Java內存模型中的happen-before是什么?
- 第30講 Java程序運行在Docker等容器環境有哪些新問題?
- 模塊三 Java安全基礎
- 第31講 你了解Java應用開發中的注入攻擊嗎?
- 第32講 如何寫出安全的Java代碼?
- 模塊四 Java性能基礎
- 第33講 后臺服務出現明顯“變慢”,談談你的診斷思路?
- 第34講 有人說“Lambda能讓Java程序慢30倍”,你怎么看?
- 第35講 JVM優化Java代碼時都做了什么?
- 模塊五 Java應用開發擴展
- 第36講 談談MySQL支持的事務隔離級別,以及悲觀鎖和樂觀鎖的原理和應用場景?
- 第37講 談談Spring Bean的生命周期和作用域?
- 第38講 對比Java標準NIO類庫,你知道Netty是如何實現更高性能的嗎?
- 第39講 談談常用的分布式ID的設計方案?Snowflake是否受冬令時切換影響?
- 周末福利
- 周末福利 談談我對Java學習和面試的看法
- 周末福利 一份Java工程師必讀書單
- 結束語
- 結束語 技術沒有終點
- 結課測試 Java核心技術的這些知識,你真的掌握了嗎?