[TOC]
> ### `ConcurrentHashMap`
* 1.8版本的`ConcurrentHashMap`相比之前的版本主要做了兩處改進:
* 使用`CAS`代替分段鎖。
* 紅黑樹,這一點和`HashMap`是一致的。
* `ConcurrentHashMap`不可以插入`value`為`null`的鍵值,`HashMap`可以。
<br/>
> ### `put`
* table便是其數據的存放載體: `transient volatile Node<K,V>[] table;`
```
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
//volatile讀
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//初始化
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // bin為空,tabAt是一次volatile讀,casTabAt為CAS操作設置節點
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//節點添加
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
```
<br/>
> ### `initTable()`初始化
* `sizeCtl`,`ConcurrentHashMap`初始化的擴容操作中一個至關重要的控制變量
* `private transient volatile int sizeCtl;`
* 0: 初始值。
* -1: 正在進行初始化。
* 負值(小于-1): 表示正在進行擴容,因為ConcurrentHashMap支持多線程并行擴容。
* 正數: 表示下一次觸發擴容的臨界值大小,即當前值 \* 0.75(負載因子)。
* `ConcurrentHashMap`只允許一個線程進行初始化操作,當其它線程競爭失敗(`sizeCtl < 0`)時便會進行自旋,直到競爭成功(初始化)線程完成初始化,那么此時`table`便不再為`null`,也就退出了while循環。
* 對`SIZECTL`字段`CAS`更新的成功便標志者線程贏得了競爭,可以進行初始化工作了,剩下的就是一個數組的構造過程。
```
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//volatile讀
while ((tab = table) == null || tab.length == 0) {
//volatile讀
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//sizeCtl設為當前大小的3 / 4
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
```
<br/>
> ### `tryPresize(int size) `擴容
* `ConcurrentHashMap`支持多線程并行擴容,具體來說,是支持多線程將節點從老的數組拷貝到新的數組,而新數組創建仍是一個線程完成。
* 擴容競爭成功的線程為`transfer`方法的`nextTab`參數傳入null,這將導致新數組的創建。競爭失敗的線程將會判斷當前節點轉移工作是否已經完成,如果已經完成,那么意味著擴容的完成,退出即可,如果沒有完成,那么此線程將會進行輔助轉移。
```
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
//volatile讀,沒有正在進行初始化或擴容的操作
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//這里實際上進行了初始化工作
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
//已達到最大值,無法再進行擴容
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
//競爭失敗
Node<K,V>[] nt;
//判斷是否已經完成
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//競爭成功
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
```
<br/>
> ### ` transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)` 轉移
* 每個線程針對一個分片來進行轉移操作,所謂的一個分片其實就是bin數組的一段。默認的最小分片大小為16,如果所在機器 只有一個CPU核心,那么就取16,否則取(數組大小 / 8 / CPU核心數)與16的較大者。
```
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//1. 分片
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//nextTab初始化,CAS保證了只會有一個線程執行這里的代碼
if (nextTab == null) {
try {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//分片的最大下標i實際上就是在這里完成減一的,因為從下面可以看出,每處理完一個桶位便將advance設為true //從而便又進入了內層循環,但是注意,當最后一次(即bound)處理完成時,i會被再次減一,從而導致進入下面的 //分支再次讀取transferIndex,這就說明了轉移線程會在轉移完一個分片后繼續嘗試剩余的分片(桶位)
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//所有bin均轉移完畢
i = -1;
advance = false;
}
//申請分片
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
//bound表示此分片的截止(最小)下標
bound = nextBound;
//i表示此分片的最大下標
i = nextIndex - 1;
//advance意為前進,跳出內層循環
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
//進入到這里就意味著所有的桶位都已被處理完畢或是被包含在某個轉移線程的申請分片中(即待轉移)
int sc;
if (finishing) {
//進行收尾工作,此工作一定是由最后一個分片申請線程進行的,這里用volatile寫將nextTable置為null
//,table指向新數組
nextTable = null;
table = nextTab;
//sizeCtl設為新數組大小的3 / 4
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//轉移線程開始轉移之前會將sizeCtl自增,轉移完成之后自減,所以判斷轉移是否已經完成的方式便是sizeCtl是 //否等于初始值
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
//還有其它線程尚未轉移完成,直接退出,將收尾工作交給最后完成的那個線程
return;
//進行到這里就說明當前線程為最后一個完成的線程,有意思的是這里又將advance置為true且i置為n(原)
//數組的大小,作用就是最后再全部掃描一遍所有的桶位,看是否還有漏網之魚
finishing = advance = true;
i = n;
}
}
else if ((f = tabAt(tab, i)) == null)
//2.
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
//3. 轉移算法
//雙重檢查
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//runBit代表了當前桶位是否需要移動
int runBit = fh & n;
Node<K,V> lastRun = f;
//這里是找出最后一個和頭結點的移動屬性相同的
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//構造無需移動和需要移動的鏈表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//設置到新數組
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//將原數組的當前桶位設為MOVED,即已處理完(轉移)
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
```
<br/>
> ### `get(Object key)`
```
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
//命中頭結點
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
//遍歷當前桶位的節點鏈表
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
```
<br/>
<br/>
***
參考:
[ConcurrentHashMap](https://github.com/seaswalker/JDK/blob/master/note/ConcurrentHashMap/concurrenthashmap.md)
- asD
- Java
- Java基礎
- Java編譯器
- 反射
- collection
- IO
- JDK
- HashMap
- ConcurrentHashMap
- LinkedHashMap
- TreeMap
- 阻塞隊列
- java語法
- String.format()
- JVM
- JVM內存、對象、類
- JVM GC
- JVM監控
- 多線程
- 基礎概念
- volatile
- synchronized
- wait_notify
- join
- lock
- ThreadLocal
- AQS
- 線程池
- Spring
- IOC
- 特性介紹
- getBean()
- creatBean()
- createBeanInstance()
- populateBean()
- AOP
- 基本概念
- Spring處理請求的過程
- 注解
- 微服務
- 服務注冊與發現
- etcd
- zk
- 大數據
- Java_spark
- 基礎知識
- Thrift
- hdfs
- 計算機網絡
- OSI七層模型
- HTTP
- SSL
- 數據庫
- Redis
- mysql
- mybatis
- sql
- 容器
- docker
- k8s
- nginx
- tomcat
- 數據結構/算法
- 排序算法
- 快排
- 插入排序
- 歸并排序
- 堆排序
- 計算時間復雜度
- leetcode
- LRU緩存
- B/B+ 樹
- 跳躍表
- 設計模式
- 單例模式
- 裝飾者模式
- 工廠模式
- 運維
- git
- 前端
- thymeleaf
- 其他
- 代碼規范
- work_project
- Interview