<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ## 16 ConcurrentHashMap 源碼解析和設計思路 ## 引導語 當我們碰到線程不安全場景下,需要使用 Map 的時候,我們第一個想到的 API 估計就是 ConcurrentHashMap,ConcurrentHashMap 內部封裝了鎖和各種數據結構來保證訪問 Map 是線程安全的,接下來我們一一來看下,和 HashMap 相比,多了哪些數據結構,又是如何保證線程安全的。 ### 1 類注釋 我們從類注釋上大概可以得到如下信息: 1. 所有的操作都是線程安全的,我們在使用時,無需再加鎖; 2. 多個線程同時進行 put、remove 等操作時并不會阻塞,可以同時進行,和 HashTable 不同,HashTable 在操作時,會鎖住整個 Map; 3. 迭代過程中,即使 Map 結構被修改,也不會拋 ConcurrentModificationException 異常; 4. 除了數組 + 鏈表 + 紅黑樹的基本結構外,新增了轉移節點,是為了保證擴容時的線程安全的節點; 5. 提供了很多 Stream 流式方法,比如說:forEach、search、reduce 等等。 從類注釋中,我們可以看出 ConcurrentHashMap 和 HashMap 相比,新增了轉移節點的數據結構,至于底層如何實現線程安全,轉移節點的具體細節,暫且看不出來,接下來我們細看源碼。 ### 2 結構 雖然 ConcurrentHashMap 的底層數據結構,和方法的實現細節和 HashMap 大體一致,但兩者在類結構上卻沒有任何關聯,我們看下 ConcurrentHashMap 的類圖: ![](https://img.kancloud.cn/21/96/2196a864ded7967067870a07c811511d_467x199.jpg) 看 ConcurrentHashMap 源碼,我們會發現很多方法和代碼和 HashMap 很相似,有的同學可能會問,為什么不繼承 HashMap 呢?繼承的確是個好辦法,但尷尬的是,ConcurrentHashMap 都是在方法中間進行一些加鎖操作,也就是說加鎖把方法切割了,繼承就很難解決這個問題。 ConcurrentHashMap 和 HashMap 兩者的相同之處: 1. 數組、鏈表結構幾乎相同,所以底層對數據結構的操作思路是相同的(只是思路相同,底層實現不同); 2. 都實現了 Map 接口,繼承了 AbstractMap 抽象類,所以大多數的方法也都是相同的,HashMap 有的方法,ConcurrentHashMap 幾乎都有,所以當我們需要從 HashMap 切換到 ConcurrentHashMap 時,無需關心兩者之間的兼容問題。 不同之處: 1. 紅黑樹結構略有不同,HashMap 的紅黑樹中的節點叫做 TreeNode,TreeNode 不僅僅有屬性,還維護著紅黑樹的結構,比如說查找,新增等等;ConcurrentHashMap 中紅黑樹被拆 分成兩塊, TreeNode 僅僅維護的屬性和查找功能,新增了 TreeBin,來維護紅黑樹結構,并負責根節點的加鎖和解鎖; 2. 新增 ForwardingNode (轉移)節點,擴容的時候會使用到,通過使用該節點,來保證擴容時的線程安全。 ### 3 put ConcurrentHashMap 在 put 方法上的整體思路和 HashMap 相同,但在線程安全方面寫了很多保障的代碼,我們先來看下大體思路: 1. 如果數組為空,初始化,初始化完成之后,走 2; 2. 計算當前槽點有沒有值,沒有值的話,cas 創建,失敗繼續自旋(for 死循環),直到成功,槽點有值的話,走 3; 3. 如果槽點是轉移節點(正在擴容),就會一直自旋等待擴容完成之后再新增,不是轉移節點走 4; 4. 槽點有值的,先鎖定當前槽點,保證其余線程不能操作,如果是鏈表,新增值到鏈表的尾部,如果是紅黑樹,使用紅黑樹新增的方法新增; 5. 新增完成之后 check 需不需要擴容,需要的話去擴容。 具體源碼如下: ``` final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //計算hash int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //table是空的,進行初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //如果當前索引位置沒有值,直接創建 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //cas 在 i 位置創建新的元素,當 i 位置是空時,即能創建成功,結束for自循,否則繼續自旋 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //如果當前槽點是轉移節點,表示該槽點正在擴容,就會一直等待擴容完成 //轉移節點的 hash 值是固定的,都是 MOVED else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //槽點上有值的 else { V oldVal = null; //鎖定當前槽點,其余線程不能操作,保證了安全 synchronized (f) { //這里再次判斷 i 索引位置的數據沒有被修改 //binCount 被賦值的話,說明走到了修改表的過程里面 if (tabAt(tab, i) == f) { //鏈表 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //值有的話,直接返回 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //把新增的元素賦值到鏈表的最后,退出自旋 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //紅黑樹,這里沒有使用 TreeNode,使用的是 TreeBin,TreeNode 只是紅黑樹的一個節點 //TreeBin 持有紅黑樹的引用,并且會對其加鎖,保證其操作的線程安全 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; //滿足if的話,把老的值給oldVal //在putTreeVal方法里面,在給紅黑樹重新著色旋轉的時候 //會鎖住紅黑樹的根節點 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //binCount不為空,并且 oldVal 有值的情況,說明已經新增成功了 if (binCount != 0) { // 鏈表是否需要轉化成紅黑樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; //這一步幾乎走不到。槽點已經上鎖,只有在紅黑樹或者鏈表新增失敗的時候 //才會走到這里,這兩者新增都是自旋的,幾乎不會失敗 break; } } } //check 容器是否需要擴容,如果需要去擴容,調用 transfer 方法去擴容 //如果已經在擴容中了,check有無完成 addCount(1L, binCount); return null; } ``` 源碼中都有非常詳細的注釋,就不解釋了,我們重點說一下,ConcurrentHashMap 在 put 過程中,采用了哪些手段來保證線程安全。 #### 3.1 數組初始化時的線程安全 數組初始化時,首先通過自旋來保證一定可以初始化成功,然后通過 CAS 設置 SIZECTL 變量的值,來保證同一時刻只能有一個線程對數組進行初始化,CAS 成功之后,還會再次判斷當前數組是否已經初始化完成,如果已經初始化完成,就不會再次初始化,通過自旋 + CAS + 雙重 check 等手段保證了數組初始化時的線程安全,源碼如下: ``` //初始化 table,通過對 sizeCtl 的變量賦值來保證數組只能被初始化一次 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //通過自旋保證初始化成功 while ((tab = table) == null || tab.length == 0) { // 小于 0 代表有線程正在初始化,釋放當前 CPU 的調度權,重新發起鎖的競爭 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // CAS 賦值保證當前只有一個線程在初始化,-1 代表當前只有一個線程能初始化 // 保證了數組的初始化的安全性 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 很有可能執行到這里的時候,table 已經不為空了,這里是雙重 check if ((tab = table) == null || tab.length == 0) { // 進行初始化 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; } ``` 3.2 新增槽點值時的線程安全 此時為了保證線程安全,做了四處優化: 1. 通過自旋死循環保證一定可以新增成功。 在新增之前,通過 for (Node[] tab = table;;) 這樣的死循環來保證新增一定可以成功,一旦新增成功,就可以退出當前死循環,新增失敗的話,會重復新增的步驟,直到新增成功為止。 2. 當前槽點為空時,通過 CAS 新增。 Java 這里的寫法非常嚴謹,沒有在判斷槽點為空的情況下直接賦值,因為在判斷槽點為空和賦值的瞬間,很有可能槽點已經被其他線程賦值了,所以我們采用 CAS 算法,能夠保證槽點為空的情況下賦值成功,如果恰好槽點已經被其他線程賦值,當前 CAS 操作失敗,會再次執行 for 自旋,再走槽點有值的 put 流程,這里就是自旋 + CAS 的結合。 3. 當前槽點有值,鎖住當前槽點。 put 時,如果當前槽點有值,就是 key 的 hash 沖突的情況,此時槽點上可能是鏈表或紅黑樹,我們通過鎖住槽點,來保證同一時刻只會有一個線程能對槽點進行修改,截圖如下: ![](https://img.kancloud.cn/c9/43/c943e333ca41013e9f1a1a4a5a802752_413x67.jpg) 4. 紅黑樹旋轉時,鎖住紅黑樹的根節點,保證同一時刻,當前紅黑樹只能被一個線程旋轉,代碼截圖如下: ![](https://img.kancloud.cn/64/64/6464d713fdb5414c2c0d88673b6f74d8_584x440.jpg) 通過以上 4 點,保證了在各種情況下的新增(不考慮擴容的情況下),都是線程安全的,通過自旋 + CAS + 鎖三大姿勢,實現的很巧妙,值得我們借鑒。 #### 3.3 擴容時的線程安全 ConcurrentHashMap 的擴容時機和 HashMap 相同,都是在 put 方法的最后一步檢查是否需要擴容,如果需要則進行擴容,但兩者擴容的過程完全不同,ConcurrentHashMap 擴容的方法叫做 transfer,從 put 方法的 addCount 方法進去,就能找到 transfer 方法,transfer 方法的主要思路是: 1. 首先需要把老數組的值全部拷貝到擴容之后的新數組上,先從數組的隊尾開始拷貝; 2. 拷貝數組的槽點時,先把原數組槽點鎖住,保證原數組槽點不能操作,成功拷貝到新數組時,把原數組槽點賦值為轉移節點; 3. 這時如果有新數據正好需要 put 到此槽點時,發現槽點為轉移節點,就會一直等待,所以在擴容完成之前,該槽點對應的數據是不會發生變化的; 4. 從數組的尾部拷貝到頭部,每拷貝成功一次,就把原數組中的節點設置成轉移節點; 5. 直到所有數組數據都拷貝到新數組時,直接把新數組整個賦值給數組容器,拷貝完成。 關鍵源碼如下: ``` // 擴容主要分 2 步,第一新建新的空數組,第二移動拷貝每個元素到新數組中去 // tab:原數組,nextTab:新數組 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { // 老數組的長度 int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 如果新數組為空,初始化,大小為原數組的兩倍,n << 1 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } // 新數組的長度 int nextn = nextTab.length; // 代表轉移節點,如果原數組上是轉移節點,說明該節點正在被擴容 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab // 無限自旋,i 的值會從原數組的最大值開始,慢慢遞減到 0 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; // 結束循環的標志 if (--i >= bound || finishing) advance = false; // 已經拷貝完成 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 每次減少 i 的值 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // if 任意條件滿足說明拷貝結束了 if (i < 0 || i >= n || i + n >= nextn) { int sc; // 拷貝結束,直接賦值,因為每次拷貝完一個節點,都在原數組上放轉移節點,所以拷貝完成的節點的數據一定不會再發生變化。 // 原數組發現是轉移節點,是不會操作的,會一直等待轉移節點消失之后在進行操作。 // 也就是說數組節點一旦被標記為轉移節點,是不會再發生任何變動的,所以不會有任何線程安全的問題 // 所以此處直接賦值,沒有任何問題。 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { // 進行節點的拷貝 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // 如果節點只有單個數據,直接拷貝,如果是鏈表,循環多次組成鏈表拷貝 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 在新數組位置上放置拷貝的值 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 在老數組位置上放上 ForwardingNode 節點 // put 時,發現是 ForwardingNode 節點,就不會再動這個節點的數據了 setTabAt(tab, i, fwd); advance = true; } // 紅黑樹的拷貝 else if (f instanceof TreeBin) { // 紅黑樹的拷貝工作,同 HashMap 的內容,代碼忽略 ………… // 在老數組位置上放上 ForwardingNode 節點 setTabAt(tab, i, fwd); advance = true; } } } } } } ``` 擴容中的關鍵點,就是如何保證是線程安全的,小結有如下幾點: 1. 拷貝槽點時,會把原數組的槽點鎖住; 2. 拷貝成功之后,會把原數組的槽點設置成轉移節點,這樣如果有數據需要 put 到該節點時,發現該槽點是轉移節點,會一直等待,直到擴容成功之后,才能繼續 put,可以參考 put 方法中的 helpTransfer 方法; 3. 從尾到頭進行拷貝,拷貝成功就把原數組的槽點設置成轉移節點。 4. 等擴容拷貝都完成之后,直接把新數組的值賦值給數組容器,之前等待 put 的數據才能繼續 put。 擴容方法還是很有意思的,通過在原數組上設置轉移節點,put 時碰到轉移節點時會等待擴容成功之后才能 put 的策略,來保證了整個擴容過程中肯定是線程安全的,因為數組的槽點一旦被設置成轉移節點,在沒有擴容完成之前,是無法進行操作的。 ### 4 get ConcurrentHashMap 讀的話,就比較簡單,先獲取數組的下標,然后通過判斷數組下標的 key 是否和我們的 key 相等,相等的話直接返回,如果下標的槽點是鏈表或紅黑樹的話,分別調用相應的查找數據的方法,整體思路和 HashMap 很像,源碼如下: ``` public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //計算hashcode int h = spread(key.hashCode()); //不是空的數組 && 并且當前索引的槽點數據不是空的 //否則該key對應的值不存在,返回null if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //槽點第一個值和key相等,直接返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果是紅黑樹或者轉移節點,使用對應的find方法 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //如果是鏈表,遍歷查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } ``` ### 5 總結 本文摘取 ConcurrentHashMap 兩個核心的方法講解了一下,特別是 put 方法,采取了很多手段來保證了線程安全,是平時面試時的重中之重,大家可以嘗試 debug 來調試一下源碼,其他方法感興趣的話,可以嘗試去 GitHub 上去查看源碼。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看