<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                > 本文用于系統總結 Java 并發包相關概念,只揀重點闡述,但實現細節不可避免地要展示許多代碼 > openjdk-8u 源碼:`hg clone http://hg.openjdk.java.net/jdk8/jdk8 openjdk8` [TOC] ## 1 Java 并發包總覽 整個 `java.util.concurrent` 包,按照功能模塊可以劃分為: - 原子類(Atomic) - 鎖(Lock)與條件(Condition) - 同步工具類 - 并發容器 - 線程池、Future、ForkJoinPool - CompletableFuture (Java 8 出現,android 不關注) ### 1.1 原子類(Atomic) Atomic類位于 `java.util.concurrent.atomic` 包: - AtomicInteger:保證一個`int` 類型加減操作的原子性,可以代替 `synchronized` 的加鎖 - AtomicLong:同 AtomicInteger,類型為 `long` - AtomicBoolean:相比 volatile 修飾的 boolean 類型,可以保證例如 `if (flag == false) {flag = true;}` 的原子性 - AtomicReference:類似 AtomicBoolean,用來保證讀寫引用的原子性 - AtomicStampedReference:類似 AtomicReference,但是其通過版本號,可以避免 **ABA 問題** - AtomicMarkableReference:類似 AtomicStampedReference,區別在于版本號為 boolean 類型,不能完全避免ABA問題,只是降低了發生的概率 - AtomicIntegerFieldUpdater:對已存在的類,實現其成員變量(int)的原子操作 - AtomicLongFieldUpdater:對已存在的類,實現其成員變量(long)的原子操作 - AtomicReferenceFieldUpdater:對已存在的類,實現其成員變量(Object)的原子操作 - AtomicIntegerArray:支持對 int 數組中的元素執行原子操作 - AtomicLongArray:支持對 long 數組中的元素執行原子操作 - AtomicReferenceArray:支持對 Object 數組中的元素執行原子操作 - LongAddr:作用同 AtomicLong,通過分片技術(Striped64)提供了高并發場景下的性能,保證最終一致性而非強一致性 - DoubleAdder:類似 LongAddr,支持 double 類型 - LongAccumulator:類似 LongAddr,LongAddr只支持的累加操作,其支持自定義二元操作 - DoubleAccumulator:同 LongAccumulator,支持 double 類型 **涉及概念**: `樂觀鎖` `Compare And Set` `自旋` `原子操作` `最終一致性` `分片` ### 1.2 鎖(Lock)與條件(Condition) 鎖與條件位于 `java.util.concurrent.locks` 包: - LockSupport:一個用于支持線程喚醒和掛起操作的工具類 - AbstractQueuedSynchronizer:基于 Unsafe + 無鎖隊列實現的隊列同步器(AQS),具備阻塞與喚醒線程和維護這些等待線程的無鎖隊列 - Condition:該接口定義了條件的基本操作 - Lock:該接口定義了鎖的基本操作 - ReadWriteLock:該接口定義了讀鎖和寫鎖的獲取 - ReentrantLock:基于 AQS 實現的一個可重入的獨占鎖(讀讀互斥,讀寫互斥,寫寫互斥) - ReentrantReadWriteLock:基于 AQS 實現的一個可重入的讀寫鎖(讀讀不互斥、讀寫互斥、寫寫互斥) - StampedLock:基于 Unsafe + LockSupport 實現的一個可重入的讀寫鎖(讀讀不互斥、讀寫不互斥、寫寫互斥) **涉及概念**: `Compare And Set` `CLH 隊列` `重入鎖` `公平鎖與非公平鎖` `線程中斷` `互斥鎖` `讀寫鎖` `共享鎖與獨占鎖` `MVCC機制` `悲觀鎖與樂觀鎖` `重排序` `內存屏障` ### 1.3 同步工具類 同步工具類位于 `java.util.concurrent` 包: - Semaphore:基于 AQS 實現的信號量,提供資源數量的并發訪問控制 - CountDownLatch:基于 AQS 實現的計數器,可以使當前線程等待其他線程全部執行完畢后再執行 - CyclicBarrier:基于 ReentrantLock + Condition 實現的同步屏障,區別于 CountDownLatch,計數器可以重置,適用于更復雜的場景 - Exchanger:基于 Unsafe + LockSupport實現,用于線程間交換數據 - Phaser:基于 Unsafe + 無鎖棧實現的,可用于代替 CountDownLatch 和 CyclicBarrier 的同步工具(用樹結構維護) **涉及概念**: `Compare And Set` `無鎖棧` `公平與非公平` ### 1.4 并發容器 并發容器位于 `java.util.concurrent` 包: - BlockingQueue:該接口定義了阻塞隊列的基本操作 - ArrayBlockingQueue:基于數組 + ReentrantLock + Condition 實現的環形阻塞隊列 - LinkedBlockingQueue:基于單鏈表 + ReentrantLock + Condition 實現的阻塞隊列 - PriorityBlockingQueue:基于數組 + ReentrantLock + Condition 實現的阻塞隊列(按元素優先級從小到大出隊) - DelayQueue:基于 PriorityQueue + ReentrantLock + Condition 實現的阻塞隊列(按延遲時間從小到大出隊) - SynchronousQueue:基于 Unsafe 實現的特殊的阻塞隊列(本身沒有容量,直接通知等待線程,高效) - BlockingDeque:該接口定義了雙端阻塞隊列的基本操作 - LinkedBlockingDeque:基于雙向鏈表 + ReentrantLock + Condition 實現的雙端阻塞隊列 - CopyOnWriteArrayList:基于數組 + ReentrantLock 實現的 List - CopyOnWriteArraySet:基于 CopyOnWriteArrayList 實現的 Set - ConcurrentLinkedQueue:基于單鏈表 + Unsafe 實現的 Queue - ConcurrentLinkedDueue:基于雙向鏈表 + Unsafe 實現的 Deque - ConcurrentHashMap:基于數組 + 鏈表/紅黑樹 + Unsafe 實現的 HashMap - ConcurrentSkipListMap:基于跳查表 + Unsafe 實現的 TreeMap - ConcurrentSkipListSet:基于 ConcurrentSkipListMap 實現的 TreeSet ### 1.5 線程池與Future 線程池與Future位于 `java.util.concurrent` 包 ## 2 Java 并發包底層實現依賴 Java 并發包底層實現依賴于Java 內存模式、volatile 變量、CAS 算法等理論以及 Unsafe 類提供的一系列 native 方法 ### 2.1 Java 內存模型、volatile 變量與 CAS 算法 Java 內存模型與 volatitle 變量的語義,以及與CAS 的關系,參考 [\[Java內存模型\]](https://ku.baidu-int.com/knowledge/HFVrC7hq1Q/pKzJfZczuc/8BkidD8KVd/21H5m7F3V7tsFb) 一文 ### 2.2 Unsafe 類的應用 通過**反射**拿到 `sun.misc.Unsafe` 實例后,可以進行 - 讀寫類字段(int、long、short、boolean、byte、char、float、double、Object) - 定義普通類、匿名類、創建類實例 - 讀寫變量(普通讀寫、volatile讀寫、有序讀寫) - 比較交換(CAS)操作 - 操作堆外內存 - 操作監視器 - 設置內存屏障 - 喚醒與掛起線程 > Unsafe 實現的源碼位置:hotspot/src/share/vm/prims/unsafe.cpp ## 3 AQS 框架的實現原理與應用 AQS 是 `AbstractQueuedSynchronizer` 類的簡稱,定義了一套多線程訪問共享資源的**同步機制** **核心思想:**如果被請求的共享資源空閑,則將請求的線程設置為有效的工作線程,將共享資源鎖定;如果共享資源被占用,則需要一定的掛起與喚醒機制(CLH隊列變體隊列)確保資源的分配 ### 3.1 AQS 實現原理 #### 3.1.1 AQS 數據結構 **AQS 維護了一個由無鎖雙向鏈表實現的阻塞隊列,隊列中的各個元素(**`**AQS.Node**`**)通過volatile變量(** `**AQS#state**`**)來進行狀態維護** - `AQS#state` AQS 內部不操作 state 變量,由實現層定義 state 的含義與控制 state 的值 - `AQS#head` 指向雙向鏈表頭部,表示等待隊列的頭部,延遲初始化 - 除初始化外,它僅通過 `setHead` 進行修改(如果 `head` 存在,則會確保 `waitStatus` 不會變為 `CANCELLED`) - `AQS#tail` 指向雙向鏈表尾部,表示等待隊列的尾部,延遲初始化 - 僅通過 `enq` 方法修改以添加新的等待節點 - 隊列操作: - **入隊:將新的** `Node` 加到 `tail` 后面,然后對 `tail` 進行CAS操作 - **出隊:**對 `head` 進行CAS操作,把 `head` 向后移一個位置 > AQS 入隊的方法 ```java private Node enq(final Node node) { // AQS.enq for (;;) { Node t = tail; if (t == null) { // 必須初始化 if (compareAndSetHead(new Node())) tail = head; } else { // 將雙向鏈表結點插入到 tail 后 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ``` ---- `AbstractQueuedSynchronizer.Node` 表示 **AQS 隊列元素**,各字段與方法含義如下: - 構造方法 - `Node()` 提供給自定義初始化邏輯或 `SHARED` 狀態使用 - `Node(Thread thread, Node mode)` 提供給 `addWaiter` 方法使用 - `Node(Thread thread, int waitStatus)` 提供給 `Condition` 使用 - 成員字段 - `volatile Node prev;` 前繼結點 - `volatile Node next;` 后繼結點 - `volatile Thread thread;` 結點中的線程 - `volatile int waitStatus;` - 數字 `0` 表示 Node 被初始化時的默認值 - `static final int CANCELLED = 1;` 表示線程獲取鎖的請求已經被取消了 - `static final int SIGNAL = -1;` 表示線程正在等待釋放資源 - `static final int CONDITION = -2;` 表示結點在等待隊列中,線程等待喚醒 - `static final int PROPAGATE = -3;` 當前線程處于SHARED情況,才會使用 - `Node nextWaiter;` AQS 中的條件隊列是是通過 nextWaiter,以單向鏈表的形式保存的, `SHARED` 模式不存在 Condition, `EXCLUSIVE` 模式才存在 `Condition` - `static final Node SHARED = new Node();` 共享模式,多個線程可同時執行;例如 `ReadWriteReentrantLock.readLock` `Semaphore`(state != 1時) `CountDownLatch` - `static final Node EXCLUSIVE = null;` 獨占模式,只有一個線程能執行;例如 `ReadWriteReentrantLock.writeLock` `ReentrantLock` `Semaphore`(state = 1 時) - 成員方法 - `final boolean isShared()` 是否為共享結點 - `final Node predecessor()` #### 3.1.2 AQS 方法架構 - **(1) API 層(*****Main exported methods*****):只需要重寫API 層方法,即可使用AQS框架,定制自定義同步器** - 自定義同步器**可重寫的方法** - `protected boolean tryAcquire(int arg)` 嘗試通過獨占方式,獲取資源;返回 true 表示成功,false 表示失敗 - `protected boolean tryRelease(int arg)` 嘗試通過獨占方式,釋放資源;返回 true 表示成功,false 表示失敗 - `protected int tryAcquireShared(int arg)` 嘗試通過共享方式,獲取資源;返回負數表示失敗,0表示成功,但沒有剩余可用資源,正數表示成功,還有剩余資源 - `protected boolean tryReleaseShared(int arg)` 嘗試通過共享方式,釋放資源;返回 true 表示釋放后允許喚醒后繼結點,false 表示不允許 - `protected boolean isHeldExclusively()` 返回當前線程是否獨占資源,使用 Condition 時才需實現 - 自定義同步器不可重寫的方法 - `public final void acquire(int arg)` 通過獨占方式,獲取資源(忽略中斷) - `public final void acquireInterruptibly(int arg)` 通過獨占方式,獲取資源(響應中斷) - `public final boolean tryAcquireNanos(int arg, long nanosTimeout)` 通過獨占方式,獲取資源(超時中止) - `public final void acquireShared(int arg)` 通過共享方式,獲取資源(忽略中斷) - `public final void acquireSharedInterruptibly(int arg)` 通過共享方式,獲取資源(響應中斷) - `public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)` 通過共享方式,獲取資源(超時中止) - `public final boolean release(int arg)` 通過獨占方式,釋放資源 - `public final boolean releaseShared(int arg)` 通過共享方式,釋放資源 - **(2) 資源獲取層(*****Utilities for various versions of acquire*****):通過自定義同步器獲取與釋放資源時,會進入到鎖獲取層** - `private void cancelAcquire(Node node)` - `private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)` - `static void selfInterrupt()` - `private final boolean parkAndCheckInterrupt()` - `inal boolean acquireQueued(final Node node, int arg)` - `private void doAcquireInterruptibly(int arg)` - `private boolean doAcquireNanos(int arg, long nanosTimeout)` - `private void doAcquireShared(int arg)` - `private void doAcquireSharedInterruptibly(int arg)` - `private boolean doAcquireSharedNanos(int arg, long nanosTimeout)` - **(3) 隊列檢查層(*****Queue inspection methods*****):獲取資源失敗時,會進入到隊列檢查層,排隊等待** - `public final boolean hasQueuedThreads()` 判斷是否有線程正在等待獲取,例如 ReentrantLock 用該方法來實現公平與非公平獲取鎖 - `public final boolean hasContended()` 詢問是否有線程曾爭奪過該同步器;也就是說,acquire方法是否曾經被阻止過 - `public final Thread getFirstQueuedThread()` 獲取隊列中的第一個線程 - `public final boolean isQueued(Thread thread)` 判斷指定線程是否在隊列中排隊 - `final boolean apparentlyFirstQueuedIsExclusive()` 如果第一個排隊線程(如果存在)以獨占模式等待,則返回 true - `public final boolean hasQueuedPredecessors()` 查詢是否有線程等待獲取的時間長于當前線程 - **(4) 入隊出隊層:提供雙向鏈表首尾結點的CAS操作** - `private Node addWaiter(Node mode)` - `private Node enq(final Node node)` - `final boolean transferForSignal(Node node)` - `final boolean transferAfterCancelledWait(Node node)` - `private void unparkSuccessor(Node node)` - `private static final boolean compareAndSetWaitStatus(Node node, int expect, int update)` - **(5) 數據提供層:** - `state` 變量的讀寫 ( `private volatile int state`),**可重寫** - `protected final int getState()` 獲取 state 變量的值 - `protected final void setState(int newState)` 設置 state 變量的值 - `protected final boolean compareAndSetState(int expect, int update)` CAS 設置 state 變量的值 - 測量和監控隊列(*Instrumentation and monitoring methods*) - `public final int getQueueLength()` - `public final Collection<Thread> getQueuedThreads()` - `public final Collection<Thread> getExclusiveQueuedThreads()` - `public final Collection<Thread> getSharedQueuedThreads()` - *條件相關的內部方法(Internal support methods for Conditions)* - `final boolean isOnSyncQueue(Node node)` - `private boolean findNodeFromTail(Node node)` - `final boolean transferForSignal(Node node)` - `final boolean transferAfterCancelledWait(Node node)` - `final int fullyRelease(Node node)` - *條件相關的測量方法(Instrumentation methods for conditions)* - `public final boolean owns(ConditionObject condition)` - `public final boolean hasWaiters(ConditionObject condition)` - `public final int getWaitQueueLength(ConditionObject condition)` - `public final Collection<Thread> getWaitingThreads(ConditionObject condition)` - CAS 操作包裝 - `private final boolean compareAndSetHead(Node update)` - `private final boolean compareAndSetTail(Node expect, Node update)` - `private static final boolean compareAndSetWaitStatus(Node node, int expect, int update)` - `private static final boolean compareAndSetNext(Node node, Node expect, Node update)` ### 3.2 ReentrantLock (獨占鎖)實現原理 ReentrantLock 的常用方法 : - lock 方法 - unlock 方法 - tryLock 方法 ---- ReentrantLock 具有以下特性: - 獨占鎖 - 可重入 - 支持公平與非公平(非公平可提高吞吐量) - 具備線程掛起與喚醒功能 ---- **鎖實現的基本原理**與AQS的關系: - 可標記鎖的狀態( `AbstractQueuedSynchronizer#state`) - 可記錄當前持有鎖的線程( `AbstractOwnableSynchronizer#exclusiveOwnerThread`) - 支持對線程進行掛起和喚醒 ( `LockSupport#park` 和 `LockSupport#unpark` ) - 有一個維護所有阻塞線程的無鎖隊列( `AbstractQueuedSynchronizer.Node`) #### 3.2.1 ReentrantLock#state 變量的含義 `ReentrantLock#state` 變量的含義: - state=0,表示還沒有線程獲取鎖 - state=1,表示有線程獨占了鎖 - state>1,表示鎖被重入的次數 #### 3.2.2 ReentrantLock#lock 方法分析 > (1) `ReentrantLock#lock` 的公平與非公平實現 ```java static final class FairSync extends Sync { // 公平實現 final void lock() { acquire(1); // 在方法內部排隊 } } static final class NonfairSync extends Sync { // 非公平實現 final void lock() { if (compareAndSetState(0, 1)) { // 直接搶鎖 setExclusiveOwnerThread(Thread.currentThread()); // 搶鎖成功,則獨占 } else { acquire(1); // 搶鎖失敗,再在方法內部排隊 } } } ``` > (2) `AQS#acquire` 方法分析:包括 `tryAcquire` 的公平與非公平實現、 `addWaiter`和 `acquireQueued`的實現過程 ```java public final void acquire(int arg) { if (!tryAcquire(arg) // 再次嘗試拿鎖,由子類實現 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 把線程放入阻塞隊列,阻塞該線程 selfInterrupt(); // 返回true表示被中斷過,通知進行中斷 } } ``` > (3) `tryAcquire`的公平與非公平實現 ```java static final class FairSync extends Sync { protected final boolean tryAcquire(int acquires) { // tryAcquire的公平實現 final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 沒有線程持有鎖,開始搶鎖 if (!hasQueuedPredecessors() // 如果排在隊列第一個 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } final boolean nonfairTryAcquire(int acquires) { // Sync#nonfairTryAcquire,tryAcquire的非公平實現 // ... if (c == 0) { if (compareAndSetState(0, acquires)) { // 與公平實現的唯一區別就是,此處沒有 !hasQueuedPredecessors() setExclusiveOwnerThread(current); return true; } } // ... } ``` > (4) `addWaiter` 方法分析 ```java private Node addWaiter(Node mode) { // 為當前線程生成一個Node,然后把Node放入雙向鏈表的尾部,線程還未阻塞 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 先嘗試快速插入到隊列尾部,成功則直接返回 pred.next = node; return node; } } enq(node); // 進行隊列的初始化,新建一個空的Node,不斷嘗試自旋,直至成功把該Node加入隊列尾部 return node; } ``` > (5) `acquireQueued` 方法分析 ```java final boolean acquireQueued(final Node node, int arg) { // AQS#acquireQueued boolean failed = true; try { boolean interrupted = false; // 會記錄阻塞過程中有沒有其他線程向自己發送中斷信號 for (;;) { final Node p = node.predecessor(); // 前一個結點 if (p == head && tryAcquire(arg)) { // 如果自己的前一個結點是head指向的空結點,即隊列頭部,則嘗試拿鎖 setHead(node); // 拿鎖成功,出隊列(head前移一個結點) p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { // 調用park掛起自己 interrupted = true; } } } finally { if (failed) { cancelAcquire(node); } } } ``` #### 3.2.3 ReentrantLock#unlock 方法分析 > `unlock` 不區分公平或非公平 ```java public void unlock() { // java.util.concurrent.locks.ReentrantLock#unlock sync.release(1); } public final boolean release(int arg) { // AQS#release if (tryRelease(arg)) { // 1. 釋放鎖 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 2. 喚醒隊列后繼者 return true; } return false; } protected final boolean tryRelease(int releases) { // ReentrantLock.Sync#tryRelease int c = getState() - releases; // 減少重入次數 if (Thread.currentThread() != getExclusiveOwnerThread()) // 只有鎖的擁有者才能unlock throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 重入次數減到0時釋放鎖 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } private void unparkSuccessor(Node node) { // AQS#unparkSuccessor int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } ``` #### 3.2.4 ReentrantLock#lockInterruptibly 與 tryLock 的分析 (略) `ReentrantLock#lockInterruptibly` 與 `ReentrantLock#tryLock` 方法的實現只是多了層封裝,不再贅述 ### 3.3 ReentrantReadWriteLock (讀寫鎖)實現原理 ReentrantReadWriteLock 的常用方法: - readLock().lock - readLock().unlock - writeLock().lock - writeLock().unlock ---- ReentrantReadWriteLock 有以下特性 - **讀寫互斥、寫寫互斥、讀讀不互斥**(利用該特性,可以在**讀多寫少**的場景,替換獨占鎖,優化性能) - 可重入 - 具備公平與非公平實現 - 具備線程掛起與喚醒功能 ---- 分析過程包括: - state 變量的含義 - readLock 和 writeLock 的實現 - 公平與非公平實現 - `AQS#acquireShared` 和 `AQS#releaseShared` 的實現 #### 3.3.1 ReentrantReadWriteLock#state 變量的含義 - 用 `state` 低16位用記錄寫鎖的重入次數,高16位記錄讀鎖的重入次數 - `state=0` 表示沒有線程持有讀鎖或寫鎖 - `state!=0` 時,要么有線程持有讀鎖,要么持有寫鎖;可以進一步通過 `sharedCount(state)` 判斷是否持有讀鎖, `execlusiveCount(state)` 判斷是否持有寫鎖 > `java.util.concurrent.locks.ReentrantReadWriteLock.Sync` 中對 state 變量含義的定義 ```java static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } ``` #### 3.3.2 readLock() 和 writeLock() 的實現分析 - `readLock()` 和 `writeLock()` 分別是讀鎖和寫鎖的視圖,返回是 `Lock` 接口的實現 - writeLock()基于 `AQS#acquire` 和 `AQS#release` 方法實現;readLock()基于 `AQS#acquireShared` 和 `AQS#acquireRelease` 實現 - 通過內部抽象類Sync實現上述AQS的模板方法,又抽象出 `readerShouldBlock()` 和 `writerShouldBlock()` 方法來擴展公平和非公平實現 ---- `AQS#acquire` 和 `AQS#release` 方法已在 `ReentrantLock` 中分析,不再贅述;下一節只分析 `AQS#acquireShared` 和 `AQS#releaseShared` 方法; ---- 先簡要給出讀寫鎖公平與非公平實現的過程: > `ReentrantReadWriteLock.NonfairSync` 和 `ReentrantReadWriteLock.FairSync` 實現分析 ```java static final class NonfairSync extends Sync { // 非公平實現,ReentrantReadWriteLock.NonfairSync final boolean writerShouldBlock() { return false; // 寫線程在搶鎖前永遠不被阻塞,非公平 } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); // 讀線程搶鎖時,如果隊首元素是寫線程,才阻塞 } } static final class FairSync extends Sync { // 公平實現,ReentrantReadWriteLock.FairSync final boolean writerShouldBlock() { return hasQueuedPredecessors(); // 寫線程搶鎖前,排隊 } final boolean readerShouldBlock() { return hasQueuedPredecessors(); // 讀線程搶鎖前,排隊 } } ``` #### 3.3.3 ReadLock#lock 方法分析 ```java public void lock() { // ReentrantReadWriteLock.ReadLock#lock sync.acquireShared(1); // AQS#acquireShared } public final void acquireShared(int arg) { // AQS#acquireShared if (tryAcquireShared(arg) < 0) // 子類實現,實際在 ReentrantReadWriteLock.Sync#tryAcquireShared 中實現 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { // ReentrantReadWriteLock.Sync.tryAcquireShared Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 // 被獨占 && getExclusiveOwnerThread() != current) {// 且不是當前線程獨占 return -1; // 拿不到讀鎖,返回值小于 0 表示失敗 } // 能走到這里,就是沒有被獨占 int r = sharedCount(c); // 獲取讀線程數量 if (!readerShouldBlock() // 公平鎖:排隊;非公平鎖:隊首元素非寫線程 && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // CAS 更新讀線程數,高16位+1(1左移16位+1) if (r == 0) { // 表示當前線程是第1個拿到讀鎖的線程 firstReader = current; // 只用于統計,不影響流程 firstReaderHoldCount = 1; } else if (firstReader == current) { // 讀鎖重入 firstReaderHoldCount++; // 第一個讀線程的鎖重入次數+1 } else { // 其他進來的讀線程 HoldCounter rh = cachedHoldCounter; // 只用于統計,不影響流程 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } ``` #### 3.3.4 ReadLock#unlock 方法分析 ```java public void unlock() { // ReentrantReadWriteLock.ReadLock.unlock sync.releaseShared(1); } public final boolean releaseShared(int arg) { // AQS.releaseShared if (tryReleaseShared(arg)) { // 子類實現 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { // ReentrantReadWriteLock.Sync.tryReleaseShared Thread current = Thread.currentThread(); if (firstReader == current) { // 當前線程在隊首 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) // 讀線程重入次數減到0 firstReader = null; // 則該線程不再持有該鎖 else firstReaderHoldCount--; // 重入次數減1 } else { // 其他讀線程,在 readHolds 中維護 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // CAS 自旋更新 state 變量 int c = getState(); int nextc = c - SHARED_UNIT; // 高16位減1 if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; // 等于0表示被釋放 } } ``` #### 3.3.5 WriteLock#lock 方法分析 ```java public void lock() { // ReentrantReadWriteLock.WriteLock.lock sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) // 再次嘗試拿鎖,由子類實現 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 把線程放入阻塞隊列,阻塞該線程 selfInterrupt(); // } } protected final boolean tryAcquire(int acquires) { // ReentrantReadWriteLock.Sync.tryAcquire Thread current = Thread.currentThread(); int c = getState(); // 用于判斷是否有讀寫線程 int w = exclusiveCount(c); // 寫線程的重入數(寫線程只能有一個) if (c != 0) { // 被讀線程或被寫線程占用,此時必然互斥 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) // 鎖被讀線程持有或者不是被當前線程獨占,則返回 return false; // 獲取寫鎖失敗 if (w + exclusiveCount(acquires) > MAX_COUNT) // 重入數,低16位用滿,拋錯誤 throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); // 更新寫鎖重入數 return true; } // 下面進入搶鎖環節 if (writerShouldBlock() // 公平實現:隊列里有其他線程,則排隊;非公平實現:不被阻塞 || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); // 獨占寫鎖 return true; } ``` #### 3.3.6 WriteLock#unlock 方法分析 ```java public void unlock() { // ReentrantReadWriteLock.WriteLock.unlock sync.release(1); } public final boolean release(int arg) { // AQS#release if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { // ReentrantReadWriteLock.Sync.tryRelease if (!isHeldExclusively()) // 確保unlock方法是被持有鎖的線程調用 throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 寫鎖重入數減1 boolean free = exclusiveCount(nextc) == 0; // 是否被釋放 if (free) setExclusiveOwnerThread(null); // 釋放獨占線程 setState(nextc); // 更新狀態 return free; } ``` ### 3.4 Condition (條件)實現原理 Condition 的常用方法: - await - signal ---- - 如同 `Object#wait()` 和 `Object#notify()` 方法必須和 `synchronized` 一起使用, `Condition#awiat()` 和 `Condition#signal()` 必須和 `Lock` 一起使用 - Condition 相比 `wait/notify`,避免了生產者通知生產者,消費者通知消費者的問題 - 互斥鎖 `ReentrantLock.Sync#newCondition` 和 讀寫鎖的寫鎖`ReentrantReadWriteLock.WriteLock#newCondition` 都使用了 `AQS#newCondition` ,其中讀鎖不支持 `newCondition` > Condition 的創建 ```java public Condition newCondition() { // ReentrantLock.newCondition return sync.newCondition(); } final ConditionObject newCondition() { // ReentrantLock.Sync.newCondition 或 ReentrantReadWriteLock.Sync#newCondition return new ConditionObject(); } ``` #### 3.4.1 ConditionObject.await 方法分析 ```java public final void await() throws InterruptedException { // AQS.ConditionObject.await() if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 1. 將當前線程加入等待隊列(由于已拿到鎖,因此方法內部線程安全) int savedState = fullyRelease(node); // 2. 掛起前必須先釋放鎖 int interruptMode = 0; while (!isOnSyncQueue(node)) { // Node 是否在 AQS 隊列中 LockSupport.park(this); // 掛起 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 3. 重新拿鎖 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) // 4. 如果被中斷喚醒,向外拋出中斷異常 reportInterruptAfterWait(interruptMode); } ``` #### 3.4.2 ConditionObject.signal 方法分析 ```java public final void signal() { // AQS.ConditionObject.signal if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 隊首 if (first != null) doSignal(first); // 真正執行喚醒 } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); // 先放到同步隊列,再unpark喚醒 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 喚醒 return true; } ``` ### 3.5 CountDownLaunch (計數屏障)實現原理 CountDownLaunch 的常用方法: - await:調用 await 的線程,將等待 count 被減到0 - countDown:每次調用 countDown,都會將 count 值減1 #### 3.5.1 CountDownLaunch#state 變量的含義 用 `CountDownLaunch#state` 變量表示**未 countDown 的數量**,當 `state` 為0時,調用 awiat 的線程會從掛起中喚醒 #### 3.5.2 CountDownLatch#await 方法分析 ```java public void await() throws InterruptedException { // java.util.concurrent.CountDownLatch.await() sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // AQS.acquireSharedInterruptibly if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 在 CountDownLatch.Sync#tryAcquireShared 重寫 doAcquireSharedInterruptibly(arg); } private static final class Sync extends AbstractQueuedSynchronizer { // CountDownLatch.Sync Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { // CountDownLatch.Sync#tryAcquireShared return (getState() == 0) ? 1 : -1; } // tryReleaseShared } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // CountDownLatch#await() 實現原理 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } ``` #### 3.5.3 CountDownLatch#countDown 方法分析 ```java public void countDown() { // CountDownLatch.countDown sync.releaseShared(1); } public final boolean releaseShared(int arg) { // AQS.releaseShared if (tryReleaseShared(arg)) { // 子類實現 doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // CountDownLatch.Sync#tryReleaseShared for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } ``` ### 3.6 Semaphore (信號量)實現原理 Semaphore 的常用方法: - acquire:令資源數減 n - release:令資源數加 n #### 3.6.1 Semaphore#state 變量的含義 `Semaphore#state` 變量表示**資源總數**,調用 acquire 方法對 state 進行 CAS 減操作,**減到0后,線程阻塞**;調用 release 方法對 state 進行 CAS 加操作 #### 3.6.2 Semaphore#acquire 與 release 方法分析 > Semaphore 各方法源碼,與鎖的實現類似,不再贅述 ```java public void acquire() throws InterruptedException { // Semaphore.acquire() sync.acquireSharedInterruptibly(1); } public void acquire(int permits) throws InterruptedException { // Semaphore.acquire(int) if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } public void release() { sync.releaseShared(1); } public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } ``` ## 4 無鎖編程模式 ### 4.1 內存屏障(一寫一讀) > linux 內核的 kfifo 隊列:[root/kernel/kfifo.c](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/kernel/kfifo.c?h=linux-2.6.38.y) ,通過 `smp_wmb()` 插入 Store 屏障,確保更新指針的操作不會重排序到修改數據之前,以及更新指針的時候,Store Cahce 被刷新,其他 CPU 可見 ```c static void kfifo_copy_in(struct __kfifo *fifo, const void *src, unsigned int len, unsigned int off) { unsigned int size = fifo->mask + 1; unsigned int esize = fifo->esize; unsigned int l; off &= fifo->mask; if (esize != 1) { off *= esize; size *= esize; len *= esize; } l = min(len, size - off); memcpy(fifo->data + off, src, l); memcpy(fifo->data, src + l, len - l); /* * make sure that the data in the fifo is up to date before * incrementing the fifo->in index counter */ smp_wmb(); } ``` > `java.util.concurrent.locks.StampedLock#validate` 方法中,通過 `sun.misc.Unsafe#loadFence` 插入內存屏障,對非 volatile 的局部變量 `stamp` ,避免調用方進行樂觀讀時,代碼被重排序 ```c long stamp = sl.tryOptimisticRead(); double currentX = x, currentY = y; // validate(stamp) 中插入了內存屏障,這行讀取x值和y值的代碼不會被重排序到上一行前 if (!sl.validate(stamp)) { // 插入內存屏障,避免前面的代碼被重排序 } ``` ### 4.2 volatile(一寫多讀) `volatile` 修飾的變量,在被修改后會將值刷新到主內存中,確保各個工作內存中讀到的值是一致的 ### 4.3 無鎖隊列(多寫多讀,可在雙端增刪元素) 參考 AQS 使用的無鎖隊列 ### 4.4 無鎖棧(多寫多讀,可在棧頂增刪元素) 參考 `java.util.concurrent.Phaser` 和 `ForkJoinPool` 使用的無鎖棧 ### 4.5 無鎖鏈表(多寫多讀,可在中間增刪元素) 參考 `java.util.concurrent.ConcurrentSkipListMap` 使用的無鎖跳查表 ## 5 并發編程模式\* ### 5.1 信號量、Latch 與同步屏障 - synchronized + wait() + notify() 實現信號量 - synchronized + wait() + notify() 實現 Launch 模式 - 線程池 + 原子類實現 Launch 模式 - Semaphore、CountDownLatch、CyclicBarrier 與 Phaser 的適用場景 ### 5.2 發布訂閱模式 **相關概念:** - 觀察者模式(Observer)是為了實現**松耦合**,和發布訂閱模式(Publish–Subscribe)通過注冊中心,實現了**完全解耦** - 擔保-掛起(Guarded Suspension)模式,是很多設計模式(例如生產者-消費者模式)的基礎 - Balking 模式與擔保-掛起模式類似,但選擇的是**放棄**而不是掛起 **相關示例:** - synchronized + wait() + notify() 實現阻塞隊列 - Lock + Condition 實現阻塞隊列 ### 5.3 線程池模式 **線程池(** `java.util.concurrent.ThreadPoolExecutor`**)原理:** - 線程池入參的含義 - 在不斷往線程池(ThreadPoolExecutor)中提交任務(Runnable)時,先由核心線程處理,核心線程數(corePoolSize)不夠時,存儲到任務隊列(BlockingQueue),任務隊列滿后,將創建線程至最大線程數(maximumPoolSize)來處理任務,如果還是不夠處理任務,則使用拒絕策略(RejectedExecutionHandler) - 當任務處理完后,除了核心線程,其他線程會在經過保持時間(keepAliveTime)后被銷毀 - 線程池中的 `ctl` 字段含義:高3位表示線程池的運行狀態(runState),低29位表示工作線程數量(workerCount) - 線程池狀態(runState)遷移(有向無環圖): - RUNNING(-1) 經過 shutdown() 變為 SHUTDOWN(0) - RUNNING(-1) 經過 shutdownNow() 變為 STOP(1) - SHUTDOWN(0) 經過 shutdownNow() 變為 STOP(1) - SHUTDOWN(0) 在隊列和線程池為空時,變為TIDYING(2) - STOP(1) 在隊列和線程池為空時,變為TIDYING(2) - TIDYING(2) 經過 terminated() 變為 TERMINATED(3) - shutdown() 與 shutdownNow() 的區別 - shutdown() 不會清空任務隊列,會等所有任務執行完成;shutdownNow() 會清空任務隊列 - shutdown() 只中斷空閑線程,shutdownNow() 會中斷所有線程 > 線程池提交任務的 `execute` 方法分析 ```java public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 1. 當前線程數小于核心線程數,創建核心線程,并start任務 if (addWorker(command, true)) // start 任務成功,則返回 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 2. 當前線程數大于等于核心線程數,則放入隊列 int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // 3. 放入隊列失敗,則新建空閑線程 start 任務 reject(command); // 4. 使用空閑線程 start 任務失敗,使用拒絕策略 } private boolean addWorker(Runnable firstTask, boolean core) { // 新開線程 retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 判斷線程池是否處于 shutdown 后續狀態(如果正好是shutdown,還有任務則不退出) if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (; ; ) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 線程數超過上界 return false; if (compareAndIncrementWorkerCount(c)) // workCouut 加1成功則跳出循環 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // runState 發生了變化,重新開始for循環 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // workCount 成加1,開始添加線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // 創建一個工作線程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 添加線程 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 添加成功,則啟動線程 workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; } ``` > 工作線程的執行過程分析 ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // ThreadPoolExecutor.Worker,繼承自AQS /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // ... } final void runWorker(Worker w) { // ThreadPoolExecutor#runWorker Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 不斷從隊列中取任務執行 w.lock(); // 執行任務前先加鎖,shutdown時會tryLock() if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 檢查運行狀態是否已停止 wt.interrupt(); try { beforeExecute(wt, task); // 鉤子方法,默認空實現 Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 鉤子方法,默認空實現 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 確保 Worker 退出時,能執行下面的退出處理邏輯 processWorkerExit(w, completedAbruptly); } } ``` ### 5.4 Future 模式 **背景:** - 同步的API調用比較耗時的時候,可以先選擇獲取一個立即返回的憑據(Future),調用者所在線程不用陷入未知時長的阻塞中,在未來的某個時間再根據Future獲取結果 - 還可以增加回調(Callback)的機制,無需通過阻塞方法(get)獲取結果,而是注入一個回調方法,以此提高系統的響應時間,充分利用CPU資源 實現模式可以參考JDK源碼或 [pattern/future](https://console.cloud.baidu-int.com/devops/icode/repos/baidu/personal-code/z8g/tree/master/pattern/src/main/java/com/baidu/z8g/pattern/concurrent/future) ### 5.5 CopyOnWrite 模式 參考 `CopyOnWriteArrayList`、 `StampedLock` 等工具的實現 ### 5.6 ForkJoin 模式 Forkjoin 是 JDK 7 中提供分治算法的多線程并行計算框架: - 分治算法的實現 - 工作竊取的實現 - 并行計算的實現 ### 5.7 Thread-Per-Message 模式\* ### 5.8 Worker-Thread 模式\* ### 5.9 Active Objects 模式\* ### 5.10 消息總線(Event Bus)模式\*
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看