## 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
> 青年是學習智慧的時期,中年是付諸實踐的時期。
> —— 盧梭
通過上一節的學習,我們了解了 ConcurrentHashMap 的核心 hash 算法實現。本節我們將繼續學習 put 相關的幾個方法以及 get 方法。
上節提到對哈希表的加載是在第一次 put 操作時進行的,put 方法中相關的代碼如下:
~~~java
if (tab == null || (n = tab.length) == 0)
tab = initTable();
~~~
那么接下來我們就來看看 initTable 方法,如何創建哈希表。
## 1、initTable 源碼分析
initTable 是初始化 table 的方法。內部考慮了多線程的并發安全。我們直接看 initTable 的代碼:
~~~java
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl<0,那么有其他線程正在創建table,所以本線程讓出CPU的執行權。直到table創建完成,while循環跳出。if中同時還把sizeCtl的值賦值給了sc。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//以CAS方式修改sizeCtl為-1,表示本線程已經開始創建table的工作。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次確認是否table還是空的
if ((tab = table) == null || tab.length == 0) {
//如果sc有值,那么使用sc的值作為table的size,否則使用默認值16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//sc被設置為table大小的3/4
sc = n - (n >>> 2);
}
} finally {
//sizeCtl被設置為table大小的3/4
sizeCtl = sc;
}
break;
}
}
return tab;
}
~~~
里面有個關鍵的值 sizeCtl,這個值有多個含義。
1、-1 代表有線程正在創建 table;
2、-N 代表有 N-1 個線程正在復制 table;
3、在 table 被初始化前,代表根據構造函數傳入的值計算出的應被初始化的大小;
4、在 table 被初始化后,則被設置為 table 大小 的 75%,代表 table 的容量(數組容量)。
initTable 中使用到 1 和 4,2 和 3 在其它方法中會有使用。下面我們可以先看下 ConcurrentHashMap 的構造方法,里面會使用上面的 3 。
## 2、ConcurrentHashMap 構造函數源碼分析
ConcurrentHashMap 帶容量參數的構造函數源碼如下:
~~~java
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
//如果傳入的初始化容量值超過最大容量的一半,那么sizeCtl會被設置為最大容量。
//否則通過tableSizeFor方法就算出一個2的n次方數值作為size
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
~~~
這是一個有參數的構造方法。如果你對未來存儲的數據量有預估,我們可以指定哈希表的大小,避免頻繁的擴容操作。tableSizeFor 這個方法確保了哈希表的大小永遠都是 2 的 n 次方。這里我們回想一下上一節的內容,如果 size 不是 2 的 n 次方,那么 hash 算法計算的下標發生的碰撞概率會大大增加。因此通過 tableSizeFor 方法確保了返回大于傳入參數的最小 2 的 n 次方。注意這里傳入的參數不是 initialCapacity,而是 initialCapacity 的 1.5 倍 + 1。這樣做是為了保證在默認 75% 的負載因子下,能夠足夠容納 initialCapacity 數量的元素。講到這里你一定好奇 tableSizeFor 是如何實現向上取得最接近入參 2 的 n 次方的。下面我們來看 tableSizeFor 源代碼:
~~~java
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
~~~
依舊是二進制按位操作,這樣一頓操作后,得到的數值就是大于 c 的最小 2 的 n 次。我們推演下過程,假設 c 是 9:
**1、int n = 9 - 1**
n=8
**2、n |= n >>> 1**
n=1000
n >>> 1=0100
兩個值按位或后
n=1100
**3、n |= n >>> 2**
n=1100
n >>> 2=0011
n=1111
到這里可以看出規律來了。如果 c 足夠大,使得 n 很大,那么運算到 n |= n >>> 16 時,n 的 32 位都為 1。
總結一下這一段邏輯,其實就是**把 n 有數值的 bit 位全部置為 1**。這樣就得到了一個肯定大于等于 n 的值。我們再看最后一行代碼,最終返回的是 n+1,那么一個所有位都是 1 的二進制數字,+1 后得到的就是一個 2 的 n 次方數值。
關于 ConcurrentHashMap (int initialCapacity) 構造函數的分析我們總結下:
1、構造函數中并不會初始化哈希表;
2、構造函數中僅設置哈希表大小的變量 sizeCtl;
3、initialCapacity 并不是哈希表大小;
4、哈希表大小為 initialCapacity\*1.5+1 后,向上取最小的 2 的 n 次方。如果超過最大容量一半,那么就是最大容量。
## 3、Put 方法中,保存 key/value 源碼分析
前面我們還一直圍繞在哈希表的創建在做講解。接下來我們分析真正往哈希表存儲數據的邏輯,我們先進行下回顧:
~~~java
else {
V oldVal = null;
synchronized (f) {
//再次確認該位置的值是否已經發生了變化
if (tabAt(tab, i) == f) {
//fh大于0,表示該位置存儲的還是鏈表
if (fh >= 0) {
binCount = 1;
//遍歷鏈表
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果存在一樣hash值的node,那么根據onlyIfAbsent的值選擇覆蓋value或者不覆蓋
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果找到最后一個元素,也沒有找到相同hash的node,那么生成新的node存儲key/value,作為尾節點放入鏈表。
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//下面的邏輯處理鏈表已經轉為紅黑樹時的key/value保存
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
~~~
這段代碼主邏輯如下:
**第一種情況:hash 值映射哈希表對應位置存儲的是鏈表:**
1、遍歷 hash 值映射位置的鏈表;
2、如果存在同樣 hash 值的 node,那么根據要求選擇覆蓋或者不覆蓋;
3、如果不存在同樣 hash 值的 node,那么創建新的 node 用來保存 key/value,并且放在鏈表尾部。
**第二種情況:hash 值映射哈希表對應位置存儲的是紅黑樹:**
通過 TreeBin 對象的 putTreeVal 方法保存 key/value
以上邏輯還是比較清晰和簡單。我們繼續往下看,保存完 key/value 后,其實并沒有結束 put 操作,而是進行了擴容的操作,代碼如下:
~~~java
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
~~~
binCount 是用來記錄鏈表保存 node 的數量的,可以看到當其大于 TREEIFY\_THRESHOLD,也就是 8 的時候進行擴容。
## 4、擴容源碼分析
首先我們要理解為什么 Map 需要擴容,這是因為我們采用哈希表存儲數據,當固定大小的哈希表存儲數據越來越多時,鏈表長度會越來越長,這會造成 put 和 get 的性能下降。此時我們希望哈希表中多一些桶位,預防鏈表繼續堆積的更長。接下來我們分析 treeifyBin 方法代碼,這個代碼中會選擇是把此時保存數據所在的鏈表轉為紅黑樹,還是對整個哈希表擴容。
~~~java
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//如果哈希表長度小于64,那么選擇擴大哈希表的大小,而不是把鏈表轉為紅黑樹
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
//將哈希表中index位置的鏈表轉為紅黑樹
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
//下面邏輯將node鏈表轉化為TreeNode鏈表
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//TreeBin代表紅黑樹,將TreeBin保存在哈希表的index位置
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
~~~
我們再重點看一下 tryPresize,此方法中實現了對數組的擴容,傳入的參數 size 是原來哈希表大小的一倍。我們假定原來哈希表大小為 16,那么傳入的 size 值為 32,以此數值作為例子來分析源代碼。注意 while 中第一個 if 此時不會進入,但為了講解代碼我也在注釋中一并講解了,大家看的時候在這個分支中不要以 size=16 作為前提來分析。
~~~java
//size為32
//sizeCtl為原大小16的3/4,也就是12
private final void tryPresize(int size) {
//根據tableSizeFor計算出滿足要求的哈希表大小,對齊為2的n次方。c被賦值為64,這是擴容的上限,擴容一般都是擴容為原來的2倍,這里c值為了處理一些特殊的情況,確保擴容能夠正常退出。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
//此時sc和sizeCtl均為12,進入while循環
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//這里處理的table還未初始化的邏輯,這是由于putAll操作不調用initTable,而是直接調用tryPresize
if (tab == null || (n = tab.length) == 0) {
//putAll第一次調用時,假設putAll進來的map只有一個元素,那么size傳入1,計算出c為2.而sc和sizeCtl都為0,因此n=2
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
//經過計算sc=2
sc = n - (n >>> 2);
}
} finally {
//sizeCtl設置為2.第二次循環時,因為sc和c相等,都為2,進入下面的else if分支,結束while循環。
sizeCtl = sc;
}
}
}
//擴容已經達到C值,結束擴容
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//table已經存在,那么就對已有table進行擴容
else if (tab == table) {
int rs = resizeStamp(n);
//sc小于0,說明別的線程正在擴容,本線程協助擴容
if (sc < 0) {
Node<K,V>[] nt;
//判斷是否擴容的線程達到上限,如果達到上限,退出
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//未達上限,參與擴容,更新sizeCtl值。transfer方法負責把當前哈希表數據移入新的哈希表。
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//本線程為第一個擴容線程,transfer第二個參數傳入null,代表需要新建擴容后的哈希表
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
~~~
擴容方法 transfer 中會創建新的哈希表,關鍵代碼如下:
~~~java
int n = tab.length, stride;
......
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
~~~
n<<1 得到的數值為 2n,也就是說每次都是擴容到原來 2 倍,這樣保證了哈希表的大小始終為 2 的 n 次方。
擴容的核心代碼到這里就分析完了,擴容相關代碼還有很多,不過主要的核心思想我們能理解就可以了。
講到這里我們再回一下 put 方法中最后有如下一行代碼:
~~~java
addCount(1L, binCount);
~~~
這行代碼其實是對哈希表保存的元素數量進行計數。同時根據當前保存狀況,判斷是否進行擴容。你可能會問,在添加元素的過程中不是已經執行了擴容的邏輯了嗎?沒錯,不過上面的擴容邏輯是鏈表過長引起的。而 addCount 方法中會判斷哈希表是否超過 75% 的位置已經被使用,從而觸發擴容。擴容的邏輯是基本一致的。
## 5、get 方法源碼分析
本節和前一節耗費了大量筆墨分析 put 的源代碼。put 的源代碼比較復雜,其實 put 方法的復雜是為了 get 服務,以提高 get 的效率。相比較 put 方法而言,get 方法就簡單多了。我們直接看源代碼:
~~~java
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//獲取key值的hash值
int h = spread(key.hashCode());
//這個if判斷中做了如下幾件事情:
//1、哈希表是否存在
//2、哈希表是否保存了數據,同時取得哈希表length
//3、哈希表中hash值映射位置保存的對象不為null,并取出給e,e為鏈表頭節點
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果e的hash值和傳入key的hash值相等
if ((eh = e.hash) == h) {
//如果e的key和傳入的key引用相同,或者key eaquals ek。那么返回e的value。
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果頭節點的hash<0,有兩種情況
//1、hash=-1,正在擴容,該節點為ForwardingNode,通過find方法在nextTable中查找
//2、hash=-2,該節點為TreeBin,鏈表已經轉為了紅黑樹。同樣通過TreeBin的find方法查找。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//以上兩種條件不滿足,說明hash映射位置保存的還是鏈表頭節點,但是和傳入key值不同。那么遍歷鏈表查找即可。
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
~~~
## 6、總結
通過兩小節的學習,我們把 ConcurrentHashMap 中的主要源代碼學習完成了,由于篇幅有限,還有很多更細節的地方沒有講解。如果想繼續研究的話,建議把 Node、TreeNode 相關結構看一下。對算法感興趣的話,可以看一下紅黑樹轉化的過程。
ConcurrentHashMap 中,通過大量的 CAS 操作加上 Synchronized 來確保線程安全。對 ConcurrentHashMap 的學習我們把重點放在哈希算法和擴容上,面試的時候是考察的重點。
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35 拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語