> 本文用于系統總結 Java 并發包相關概念,只揀重點闡述,但實現細節不可避免地要展示許多代碼
> openjdk-8u 源碼:`hg clone http://hg.openjdk.java.net/jdk8/jdk8 openjdk8`
[TOC]
## 1 Java 并發包總覽
整個 `java.util.concurrent` 包,按照功能模塊可以劃分為:
- 原子類(Atomic)
- 鎖(Lock)與條件(Condition)
- 同步工具類
- 并發容器
- 線程池、Future、ForkJoinPool
- CompletableFuture (Java 8 出現,android 不關注)
### 1.1 原子類(Atomic)
Atomic類位于 `java.util.concurrent.atomic` 包:
- AtomicInteger:保證一個`int` 類型加減操作的原子性,可以代替 `synchronized` 的加鎖
- AtomicLong:同 AtomicInteger,類型為 `long`
- AtomicBoolean:相比 volatile 修飾的 boolean 類型,可以保證例如 `if (flag == false) {flag = true;}` 的原子性
- AtomicReference:類似 AtomicBoolean,用來保證讀寫引用的原子性
- AtomicStampedReference:類似 AtomicReference,但是其通過版本號,可以避免 **ABA 問題**
- AtomicMarkableReference:類似 AtomicStampedReference,區別在于版本號為 boolean 類型,不能完全避免ABA問題,只是降低了發生的概率
- AtomicIntegerFieldUpdater:對已存在的類,實現其成員變量(int)的原子操作
- AtomicLongFieldUpdater:對已存在的類,實現其成員變量(long)的原子操作
- AtomicReferenceFieldUpdater:對已存在的類,實現其成員變量(Object)的原子操作
- AtomicIntegerArray:支持對 int 數組中的元素執行原子操作
- AtomicLongArray:支持對 long 數組中的元素執行原子操作
- AtomicReferenceArray:支持對 Object 數組中的元素執行原子操作
- LongAddr:作用同 AtomicLong,通過分片技術(Striped64)提供了高并發場景下的性能,保證最終一致性而非強一致性
- DoubleAdder:類似 LongAddr,支持 double 類型
- LongAccumulator:類似 LongAddr,LongAddr只支持的累加操作,其支持自定義二元操作
- DoubleAccumulator:同 LongAccumulator,支持 double 類型
**涉及概念**: `樂觀鎖` `Compare And Set` `自旋` `原子操作` `最終一致性` `分片`
### 1.2 鎖(Lock)與條件(Condition)
鎖與條件位于 `java.util.concurrent.locks` 包:
- LockSupport:一個用于支持線程喚醒和掛起操作的工具類
- AbstractQueuedSynchronizer:基于 Unsafe + 無鎖隊列實現的隊列同步器(AQS),具備阻塞與喚醒線程和維護這些等待線程的無鎖隊列
- Condition:該接口定義了條件的基本操作
- Lock:該接口定義了鎖的基本操作
- ReadWriteLock:該接口定義了讀鎖和寫鎖的獲取
- ReentrantLock:基于 AQS 實現的一個可重入的獨占鎖(讀讀互斥,讀寫互斥,寫寫互斥)
- ReentrantReadWriteLock:基于 AQS 實現的一個可重入的讀寫鎖(讀讀不互斥、讀寫互斥、寫寫互斥)
- StampedLock:基于 Unsafe + LockSupport 實現的一個可重入的讀寫鎖(讀讀不互斥、讀寫不互斥、寫寫互斥)
**涉及概念**: `Compare And Set` `CLH 隊列` `重入鎖` `公平鎖與非公平鎖` `線程中斷` `互斥鎖` `讀寫鎖` `共享鎖與獨占鎖` `MVCC機制` `悲觀鎖與樂觀鎖` `重排序` `內存屏障`
### 1.3 同步工具類
同步工具類位于 `java.util.concurrent` 包:
- Semaphore:基于 AQS 實現的信號量,提供資源數量的并發訪問控制
- CountDownLatch:基于 AQS 實現的計數器,可以使當前線程等待其他線程全部執行完畢后再執行
- CyclicBarrier:基于 ReentrantLock + Condition 實現的同步屏障,區別于 CountDownLatch,計數器可以重置,適用于更復雜的場景
- Exchanger:基于 Unsafe + LockSupport實現,用于線程間交換數據
- Phaser:基于 Unsafe + 無鎖棧實現的,可用于代替 CountDownLatch 和 CyclicBarrier 的同步工具(用樹結構維護)
**涉及概念**: `Compare And Set` `無鎖棧` `公平與非公平`
### 1.4 并發容器
并發容器位于 `java.util.concurrent` 包:
- BlockingQueue:該接口定義了阻塞隊列的基本操作
- ArrayBlockingQueue:基于數組 + ReentrantLock + Condition 實現的環形阻塞隊列
- LinkedBlockingQueue:基于單鏈表 + ReentrantLock + Condition 實現的阻塞隊列
- PriorityBlockingQueue:基于數組 + ReentrantLock + Condition 實現的阻塞隊列(按元素優先級從小到大出隊)
- DelayQueue:基于 PriorityQueue + ReentrantLock + Condition 實現的阻塞隊列(按延遲時間從小到大出隊)
- SynchronousQueue:基于 Unsafe 實現的特殊的阻塞隊列(本身沒有容量,直接通知等待線程,高效)
- BlockingDeque:該接口定義了雙端阻塞隊列的基本操作
- LinkedBlockingDeque:基于雙向鏈表 + ReentrantLock + Condition 實現的雙端阻塞隊列
- CopyOnWriteArrayList:基于數組 + ReentrantLock 實現的 List
- CopyOnWriteArraySet:基于 CopyOnWriteArrayList 實現的 Set
- ConcurrentLinkedQueue:基于單鏈表 + Unsafe 實現的 Queue
- ConcurrentLinkedDueue:基于雙向鏈表 + Unsafe 實現的 Deque
- ConcurrentHashMap:基于數組 + 鏈表/紅黑樹 + Unsafe 實現的 HashMap
- ConcurrentSkipListMap:基于跳查表 + Unsafe 實現的 TreeMap
- ConcurrentSkipListSet:基于 ConcurrentSkipListMap 實現的 TreeSet
### 1.5 線程池與Future
線程池與Future位于 `java.util.concurrent` 包
## 2 Java 并發包底層實現依賴
Java 并發包底層實現依賴于Java 內存模式、volatile 變量、CAS 算法等理論以及 Unsafe 類提供的一系列 native 方法
### 2.1 Java 內存模型、volatile 變量與 CAS 算法
Java 內存模型與 volatitle 變量的語義,以及與CAS 的關系,參考 [\[Java內存模型\]](https://ku.baidu-int.com/knowledge/HFVrC7hq1Q/pKzJfZczuc/8BkidD8KVd/21H5m7F3V7tsFb) 一文
### 2.2 Unsafe 類的應用
通過**反射**拿到 `sun.misc.Unsafe` 實例后,可以進行
- 讀寫類字段(int、long、short、boolean、byte、char、float、double、Object)
- 定義普通類、匿名類、創建類實例
- 讀寫變量(普通讀寫、volatile讀寫、有序讀寫)
- 比較交換(CAS)操作
- 操作堆外內存
- 操作監視器
- 設置內存屏障
- 喚醒與掛起線程
> Unsafe 實現的源碼位置:hotspot/src/share/vm/prims/unsafe.cpp
## 3 AQS 框架的實現原理與應用
AQS 是 `AbstractQueuedSynchronizer` 類的簡稱,定義了一套多線程訪問共享資源的**同步機制**
**核心思想:**如果被請求的共享資源空閑,則將請求的線程設置為有效的工作線程,將共享資源鎖定;如果共享資源被占用,則需要一定的掛起與喚醒機制(CLH隊列變體隊列)確保資源的分配
### 3.1 AQS 實現原理
#### 3.1.1 AQS 數據結構
**AQS 維護了一個由無鎖雙向鏈表實現的阻塞隊列,隊列中的各個元素(**`**AQS.Node**`**)通過volatile變量(** `**AQS#state**`**)來進行狀態維護**
- `AQS#state` AQS 內部不操作 state 變量,由實現層定義 state 的含義與控制 state 的值
- `AQS#head` 指向雙向鏈表頭部,表示等待隊列的頭部,延遲初始化
- 除初始化外,它僅通過 `setHead` 進行修改(如果 `head` 存在,則會確保 `waitStatus` 不會變為 `CANCELLED`)
- `AQS#tail` 指向雙向鏈表尾部,表示等待隊列的尾部,延遲初始化
- 僅通過 `enq` 方法修改以添加新的等待節點
- 隊列操作:
- **入隊:將新的** `Node` 加到 `tail` 后面,然后對 `tail` 進行CAS操作
- **出隊:**對 `head` 進行CAS操作,把 `head` 向后移一個位置
> AQS 入隊的方法
```java
private Node enq(final Node node) { // AQS.enq
for (;;) {
Node t = tail;
if (t == null) { // 必須初始化
if (compareAndSetHead(new Node()))
tail = head;
} else { // 將雙向鏈表結點插入到 tail 后
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
```
----
`AbstractQueuedSynchronizer.Node` 表示 **AQS 隊列元素**,各字段與方法含義如下:
- 構造方法
- `Node()` 提供給自定義初始化邏輯或 `SHARED` 狀態使用
- `Node(Thread thread, Node mode)` 提供給 `addWaiter` 方法使用
- `Node(Thread thread, int waitStatus)` 提供給 `Condition` 使用
- 成員字段
- `volatile Node prev;` 前繼結點
- `volatile Node next;` 后繼結點
- `volatile Thread thread;` 結點中的線程
- `volatile int waitStatus;`
- 數字 `0` 表示 Node 被初始化時的默認值
- `static final int CANCELLED = 1;` 表示線程獲取鎖的請求已經被取消了
- `static final int SIGNAL = -1;` 表示線程正在等待釋放資源
- `static final int CONDITION = -2;` 表示結點在等待隊列中,線程等待喚醒
- `static final int PROPAGATE = -3;` 當前線程處于SHARED情況,才會使用
- `Node nextWaiter;` AQS 中的條件隊列是是通過 nextWaiter,以單向鏈表的形式保存的, `SHARED` 模式不存在 Condition, `EXCLUSIVE` 模式才存在 `Condition`
- `static final Node SHARED = new Node();` 共享模式,多個線程可同時執行;例如 `ReadWriteReentrantLock.readLock` `Semaphore`(state != 1時) `CountDownLatch`
- `static final Node EXCLUSIVE = null;` 獨占模式,只有一個線程能執行;例如 `ReadWriteReentrantLock.writeLock` `ReentrantLock` `Semaphore`(state = 1 時)
- 成員方法
- `final boolean isShared()` 是否為共享結點
- `final Node predecessor()`
#### 3.1.2 AQS 方法架構
- **(1) API 層(*****Main exported methods*****):只需要重寫API 層方法,即可使用AQS框架,定制自定義同步器**
- 自定義同步器**可重寫的方法**
- `protected boolean tryAcquire(int arg)` 嘗試通過獨占方式,獲取資源;返回 true 表示成功,false 表示失敗
- `protected boolean tryRelease(int arg)` 嘗試通過獨占方式,釋放資源;返回 true 表示成功,false 表示失敗
- `protected int tryAcquireShared(int arg)` 嘗試通過共享方式,獲取資源;返回負數表示失敗,0表示成功,但沒有剩余可用資源,正數表示成功,還有剩余資源
- `protected boolean tryReleaseShared(int arg)` 嘗試通過共享方式,釋放資源;返回 true 表示釋放后允許喚醒后繼結點,false 表示不允許
- `protected boolean isHeldExclusively()` 返回當前線程是否獨占資源,使用 Condition 時才需實現
- 自定義同步器不可重寫的方法
- `public final void acquire(int arg)` 通過獨占方式,獲取資源(忽略中斷)
- `public final void acquireInterruptibly(int arg)` 通過獨占方式,獲取資源(響應中斷)
- `public final boolean tryAcquireNanos(int arg, long nanosTimeout)` 通過獨占方式,獲取資源(超時中止)
- `public final void acquireShared(int arg)` 通過共享方式,獲取資源(忽略中斷)
- `public final void acquireSharedInterruptibly(int arg)` 通過共享方式,獲取資源(響應中斷)
- `public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)` 通過共享方式,獲取資源(超時中止)
- `public final boolean release(int arg)` 通過獨占方式,釋放資源
- `public final boolean releaseShared(int arg)` 通過共享方式,釋放資源
- **(2) 資源獲取層(*****Utilities for various versions of acquire*****):通過自定義同步器獲取與釋放資源時,會進入到鎖獲取層**
- `private void cancelAcquire(Node node)`
- `private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)`
- `static void selfInterrupt()`
- `private final boolean parkAndCheckInterrupt()`
- `inal boolean acquireQueued(final Node node, int arg)`
- `private void doAcquireInterruptibly(int arg)`
- `private boolean doAcquireNanos(int arg, long nanosTimeout)`
- `private void doAcquireShared(int arg)`
- `private void doAcquireSharedInterruptibly(int arg)`
- `private boolean doAcquireSharedNanos(int arg, long nanosTimeout)`
- **(3) 隊列檢查層(*****Queue inspection methods*****):獲取資源失敗時,會進入到隊列檢查層,排隊等待**
- `public final boolean hasQueuedThreads()` 判斷是否有線程正在等待獲取,例如 ReentrantLock 用該方法來實現公平與非公平獲取鎖
- `public final boolean hasContended()` 詢問是否有線程曾爭奪過該同步器;也就是說,acquire方法是否曾經被阻止過
- `public final Thread getFirstQueuedThread()` 獲取隊列中的第一個線程
- `public final boolean isQueued(Thread thread)` 判斷指定線程是否在隊列中排隊
- `final boolean apparentlyFirstQueuedIsExclusive()` 如果第一個排隊線程(如果存在)以獨占模式等待,則返回 true
- `public final boolean hasQueuedPredecessors()` 查詢是否有線程等待獲取的時間長于當前線程
- **(4) 入隊出隊層:提供雙向鏈表首尾結點的CAS操作**
- `private Node addWaiter(Node mode)`
- `private Node enq(final Node node)`
- `final boolean transferForSignal(Node node)`
- `final boolean transferAfterCancelledWait(Node node)`
- `private void unparkSuccessor(Node node)`
- `private static final boolean compareAndSetWaitStatus(Node node, int expect, int update)`
- **(5) 數據提供層:**
- `state` 變量的讀寫 ( `private volatile int state`),**可重寫**
- `protected final int getState()` 獲取 state 變量的值
- `protected final void setState(int newState)` 設置 state 變量的值
- `protected final boolean compareAndSetState(int expect, int update)` CAS 設置 state 變量的值
- 測量和監控隊列(*Instrumentation and monitoring methods*)
- `public final int getQueueLength()`
- `public final Collection<Thread> getQueuedThreads()`
- `public final Collection<Thread> getExclusiveQueuedThreads()`
- `public final Collection<Thread> getSharedQueuedThreads()`
- *條件相關的內部方法(Internal support methods for Conditions)*
- `final boolean isOnSyncQueue(Node node)`
- `private boolean findNodeFromTail(Node node)`
- `final boolean transferForSignal(Node node)`
- `final boolean transferAfterCancelledWait(Node node)`
- `final int fullyRelease(Node node)`
- *條件相關的測量方法(Instrumentation methods for conditions)*
- `public final boolean owns(ConditionObject condition)`
- `public final boolean hasWaiters(ConditionObject condition)`
- `public final int getWaitQueueLength(ConditionObject condition)`
- `public final Collection<Thread> getWaitingThreads(ConditionObject condition)`
- CAS 操作包裝
- `private final boolean compareAndSetHead(Node update)`
- `private final boolean compareAndSetTail(Node expect, Node update)`
- `private static final boolean compareAndSetWaitStatus(Node node, int expect, int update)`
- `private static final boolean compareAndSetNext(Node node, Node expect, Node update)`
### 3.2 ReentrantLock (獨占鎖)實現原理
ReentrantLock 的常用方法 :
- lock 方法
- unlock 方法
- tryLock 方法
----
ReentrantLock 具有以下特性:
- 獨占鎖
- 可重入
- 支持公平與非公平(非公平可提高吞吐量)
- 具備線程掛起與喚醒功能
----
**鎖實現的基本原理**與AQS的關系:
- 可標記鎖的狀態( `AbstractQueuedSynchronizer#state`)
- 可記錄當前持有鎖的線程( `AbstractOwnableSynchronizer#exclusiveOwnerThread`)
- 支持對線程進行掛起和喚醒 ( `LockSupport#park` 和 `LockSupport#unpark` )
- 有一個維護所有阻塞線程的無鎖隊列( `AbstractQueuedSynchronizer.Node`)
#### 3.2.1 ReentrantLock#state 變量的含義
`ReentrantLock#state` 變量的含義:
- state=0,表示還沒有線程獲取鎖
- state=1,表示有線程獨占了鎖
- state>1,表示鎖被重入的次數
#### 3.2.2 ReentrantLock#lock 方法分析
> (1) `ReentrantLock#lock` 的公平與非公平實現
```java
static final class FairSync extends Sync { // 公平實現
final void lock() {
acquire(1); // 在方法內部排隊
}
}
static final class NonfairSync extends Sync { // 非公平實現
final void lock() {
if (compareAndSetState(0, 1)) { // 直接搶鎖
setExclusiveOwnerThread(Thread.currentThread()); // 搶鎖成功,則獨占
} else {
acquire(1); // 搶鎖失敗,再在方法內部排隊
}
}
}
```
> (2) `AQS#acquire` 方法分析:包括 `tryAcquire` 的公平與非公平實現、 `addWaiter`和 `acquireQueued`的實現過程
```java
public final void acquire(int arg) {
if (!tryAcquire(arg) // 再次嘗試拿鎖,由子類實現
&& acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 把線程放入阻塞隊列,阻塞該線程 selfInterrupt(); // 返回true表示被中斷過,通知進行中斷
}
}
```
> (3) `tryAcquire`的公平與非公平實現
```java
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) { // tryAcquire的公平實現
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 沒有線程持有鎖,開始搶鎖
if (!hasQueuedPredecessors() // 如果排在隊列第一個
&& compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
final boolean nonfairTryAcquire(int acquires) { // Sync#nonfairTryAcquire,tryAcquire的非公平實現
// ...
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 與公平實現的唯一區別就是,此處沒有 !hasQueuedPredecessors()
setExclusiveOwnerThread(current);
return true;
}
}
// ...
}
```
> (4) `addWaiter` 方法分析
```java
private Node addWaiter(Node mode) { // 為當前線程生成一個Node,然后把Node放入雙向鏈表的尾部,線程還未阻塞
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 先嘗試快速插入到隊列尾部,成功則直接返回
pred.next = node;
return node;
}
}
enq(node); // 進行隊列的初始化,新建一個空的Node,不斷嘗試自旋,直至成功把該Node加入隊列尾部
return node;
}
```
> (5) `acquireQueued` 方法分析
```java
final boolean acquireQueued(final Node node, int arg) { // AQS#acquireQueued
boolean failed = true;
try {
boolean interrupted = false; // 會記錄阻塞過程中有沒有其他線程向自己發送中斷信號
for (;;) {
final Node p = node.predecessor(); // 前一個結點
if (p == head && tryAcquire(arg)) { // 如果自己的前一個結點是head指向的空結點,即隊列頭部,則嘗試拿鎖
setHead(node); // 拿鎖成功,出隊列(head前移一個結點)
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)
&& parkAndCheckInterrupt()) { // 調用park掛起自己
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
```
#### 3.2.3 ReentrantLock#unlock 方法分析
> `unlock` 不區分公平或非公平
```java
public void unlock() { // java.util.concurrent.locks.ReentrantLock#unlock
sync.release(1);
}
public final boolean release(int arg) { // AQS#release
if (tryRelease(arg)) { // 1. 釋放鎖
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 2. 喚醒隊列后繼者
return true;
}
return false;
}
protected final boolean tryRelease(int releases) { // ReentrantLock.Sync#tryRelease
int c = getState() - releases; // 減少重入次數
if (Thread.currentThread() != getExclusiveOwnerThread()) // 只有鎖的擁有者才能unlock
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 重入次數減到0時釋放鎖
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) { // AQS#unparkSuccessor
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
```
#### 3.2.4 ReentrantLock#lockInterruptibly 與 tryLock 的分析 (略)
`ReentrantLock#lockInterruptibly` 與 `ReentrantLock#tryLock` 方法的實現只是多了層封裝,不再贅述
### 3.3 ReentrantReadWriteLock (讀寫鎖)實現原理
ReentrantReadWriteLock 的常用方法:
- readLock().lock
- readLock().unlock
- writeLock().lock
- writeLock().unlock
----
ReentrantReadWriteLock 有以下特性
- **讀寫互斥、寫寫互斥、讀讀不互斥**(利用該特性,可以在**讀多寫少**的場景,替換獨占鎖,優化性能)
- 可重入
- 具備公平與非公平實現
- 具備線程掛起與喚醒功能
----
分析過程包括:
- state 變量的含義
- readLock 和 writeLock 的實現
- 公平與非公平實現
- `AQS#acquireShared` 和 `AQS#releaseShared` 的實現
#### 3.3.1 ReentrantReadWriteLock#state 變量的含義
- 用 `state` 低16位用記錄寫鎖的重入次數,高16位記錄讀鎖的重入次數
- `state=0` 表示沒有線程持有讀鎖或寫鎖
- `state!=0` 時,要么有線程持有讀鎖,要么持有寫鎖;可以進一步通過 `sharedCount(state)` 判斷是否持有讀鎖, `execlusiveCount(state)` 判斷是否持有寫鎖
> `java.util.concurrent.locks.ReentrantReadWriteLock.Sync` 中對 state 變量含義的定義
```java
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
```
#### 3.3.2 readLock() 和 writeLock() 的實現分析
- `readLock()` 和 `writeLock()` 分別是讀鎖和寫鎖的視圖,返回是 `Lock` 接口的實現
- writeLock()基于 `AQS#acquire` 和 `AQS#release` 方法實現;readLock()基于 `AQS#acquireShared` 和 `AQS#acquireRelease` 實現
- 通過內部抽象類Sync實現上述AQS的模板方法,又抽象出 `readerShouldBlock()` 和 `writerShouldBlock()` 方法來擴展公平和非公平實現
----
`AQS#acquire` 和 `AQS#release` 方法已在 `ReentrantLock` 中分析,不再贅述;下一節只分析 `AQS#acquireShared` 和 `AQS#releaseShared` 方法;
----
先簡要給出讀寫鎖公平與非公平實現的過程:
> `ReentrantReadWriteLock.NonfairSync` 和 `ReentrantReadWriteLock.FairSync` 實現分析
```java
static final class NonfairSync extends Sync { // 非公平實現,ReentrantReadWriteLock.NonfairSync
final boolean writerShouldBlock() {
return false; // 寫線程在搶鎖前永遠不被阻塞,非公平
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive(); // 讀線程搶鎖時,如果隊首元素是寫線程,才阻塞
}
}
static final class FairSync extends Sync { // 公平實現,ReentrantReadWriteLock.FairSync
final boolean writerShouldBlock() {
return hasQueuedPredecessors(); // 寫線程搶鎖前,排隊
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors(); // 讀線程搶鎖前,排隊
}
}
```
#### 3.3.3 ReadLock#lock 方法分析
```java
public void lock() { // ReentrantReadWriteLock.ReadLock#lock
sync.acquireShared(1); // AQS#acquireShared
}
public final void acquireShared(int arg) { // AQS#acquireShared
if (tryAcquireShared(arg) < 0) // 子類實現,實際在 ReentrantReadWriteLock.Sync#tryAcquireShared 中實現
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) { // ReentrantReadWriteLock.Sync.tryAcquireShared
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 // 被獨占
&& getExclusiveOwnerThread() != current) {// 且不是當前線程獨占
return -1; // 拿不到讀鎖,返回值小于 0 表示失敗
} // 能走到這里,就是沒有被獨占
int r = sharedCount(c); // 獲取讀線程數量
if (!readerShouldBlock() // 公平鎖:排隊;非公平鎖:隊首元素非寫線程
&& r < MAX_COUNT
&& compareAndSetState(c, c + SHARED_UNIT)) { // CAS 更新讀線程數,高16位+1(1左移16位+1)
if (r == 0) { // 表示當前線程是第1個拿到讀鎖的線程
firstReader = current; // 只用于統計,不影響流程
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 讀鎖重入
firstReaderHoldCount++; // 第一個讀線程的鎖重入次數+1
} else { // 其他進來的讀線程
HoldCounter rh = cachedHoldCounter; // 只用于統計,不影響流程
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
```
#### 3.3.4 ReadLock#unlock 方法分析
```java
public void unlock() { // ReentrantReadWriteLock.ReadLock.unlock
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) { // AQS.releaseShared
if (tryReleaseShared(arg)) { // 子類實現
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) { // ReentrantReadWriteLock.Sync.tryReleaseShared
Thread current = Thread.currentThread();
if (firstReader == current) { // 當前線程在隊首
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) // 讀線程重入次數減到0
firstReader = null; // 則該線程不再持有該鎖
else
firstReaderHoldCount--; // 重入次數減1
} else { // 其他讀線程,在 readHolds 中維護
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { // CAS 自旋更新 state 變量
int c = getState();
int nextc = c - SHARED_UNIT; // 高16位減1
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0; // 等于0表示被釋放
}
}
```
#### 3.3.5 WriteLock#lock 方法分析
```java
public void lock() { // ReentrantReadWriteLock.WriteLock.lock
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) // 再次嘗試拿鎖,由子類實現
&& acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 把線程放入阻塞隊列,阻塞該線程
selfInterrupt(); //
}
}
protected final boolean tryAcquire(int acquires) { // ReentrantReadWriteLock.Sync.tryAcquire
Thread current = Thread.currentThread();
int c = getState(); // 用于判斷是否有讀寫線程
int w = exclusiveCount(c); // 寫線程的重入數(寫線程只能有一個)
if (c != 0) { // 被讀線程或被寫線程占用,此時必然互斥
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 鎖被讀線程持有或者不是被當前線程獨占,則返回
return false; // 獲取寫鎖失敗
if (w + exclusiveCount(acquires) > MAX_COUNT) // 重入數,低16位用滿,拋錯誤
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires); // 更新寫鎖重入數
return true;
}
// 下面進入搶鎖環節
if (writerShouldBlock() // 公平實現:隊列里有其他線程,則排隊;非公平實現:不被阻塞
|| !compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current); // 獨占寫鎖
return true;
}
```
#### 3.3.6 WriteLock#unlock 方法分析
```java
public void unlock() { // ReentrantReadWriteLock.WriteLock.unlock
sync.release(1);
}
public final boolean release(int arg) { // AQS#release
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) { // ReentrantReadWriteLock.Sync.tryRelease
if (!isHeldExclusively()) // 確保unlock方法是被持有鎖的線程調用
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // 寫鎖重入數減1
boolean free = exclusiveCount(nextc) == 0; // 是否被釋放
if (free)
setExclusiveOwnerThread(null); // 釋放獨占線程
setState(nextc); // 更新狀態
return free;
}
```
### 3.4 Condition (條件)實現原理
Condition 的常用方法:
- await
- signal
----
- 如同 `Object#wait()` 和 `Object#notify()` 方法必須和 `synchronized` 一起使用, `Condition#awiat()` 和 `Condition#signal()` 必須和 `Lock` 一起使用
- Condition 相比 `wait/notify`,避免了生產者通知生產者,消費者通知消費者的問題
- 互斥鎖 `ReentrantLock.Sync#newCondition` 和 讀寫鎖的寫鎖`ReentrantReadWriteLock.WriteLock#newCondition` 都使用了 `AQS#newCondition` ,其中讀鎖不支持 `newCondition`
> Condition 的創建
```java
public Condition newCondition() { // ReentrantLock.newCondition
return sync.newCondition();
}
final ConditionObject newCondition() { // ReentrantLock.Sync.newCondition 或 ReentrantReadWriteLock.Sync#newCondition
return new ConditionObject();
}
```
#### 3.4.1 ConditionObject.await 方法分析
```java
public final void await() throws InterruptedException { // AQS.ConditionObject.await()
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 1. 將當前線程加入等待隊列(由于已拿到鎖,因此方法內部線程安全)
int savedState = fullyRelease(node); // 2. 掛起前必須先釋放鎖
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // Node 是否在 AQS 隊列中
LockSupport.park(this); // 掛起
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 3. 重新拿鎖
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0) // 4. 如果被中斷喚醒,向外拋出中斷異常
reportInterruptAfterWait(interruptMode);
}
```
#### 3.4.2 ConditionObject.signal 方法分析
```java
public final void signal() { // AQS.ConditionObject.signal
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 隊首
if (first != null)
doSignal(first); // 真正執行喚醒
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // 先放到同步隊列,再unpark喚醒
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 喚醒
return true;
}
```
### 3.5 CountDownLaunch (計數屏障)實現原理
CountDownLaunch 的常用方法:
- await:調用 await 的線程,將等待 count 被減到0
- countDown:每次調用 countDown,都會將 count 值減1
#### 3.5.1 CountDownLaunch#state 變量的含義
用 `CountDownLaunch#state` 變量表示**未 countDown 的數量**,當 `state` 為0時,調用 awiat 的線程會從掛起中喚醒
#### 3.5.2 CountDownLatch#await 方法分析
```java
public void await() throws InterruptedException { // java.util.concurrent.CountDownLatch.await()
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // AQS.acquireSharedInterruptibly
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 在 CountDownLatch.Sync#tryAcquireShared 重寫
doAcquireSharedInterruptibly(arg);
}
private static final class Sync extends AbstractQueuedSynchronizer { // CountDownLatch.Sync
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) { // CountDownLatch.Sync#tryAcquireShared
return (getState() == 0) ? 1 : -1;
}
// tryReleaseShared
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // CountDownLatch#await() 實現原理
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
```
#### 3.5.3 CountDownLatch#countDown 方法分析
```java
public void countDown() { // CountDownLatch.countDown
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) { // AQS.releaseShared
if (tryReleaseShared(arg)) { // 子類實現
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) { // CountDownLatch.Sync#tryReleaseShared
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
```
### 3.6 Semaphore (信號量)實現原理
Semaphore 的常用方法:
- acquire:令資源數減 n
- release:令資源數加 n
#### 3.6.1 Semaphore#state 變量的含義
`Semaphore#state` 變量表示**資源總數**,調用 acquire 方法對 state 進行 CAS 減操作,**減到0后,線程阻塞**;調用 release 方法對 state 進行 CAS 加操作
#### 3.6.2 Semaphore#acquire 與 release 方法分析
> Semaphore 各方法源碼,與鎖的實現類似,不再贅述
```java
public void acquire() throws InterruptedException { // Semaphore.acquire()
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException { // Semaphore.acquire(int)
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
```
## 4 無鎖編程模式
### 4.1 內存屏障(一寫一讀)
> linux 內核的 kfifo 隊列:[root/kernel/kfifo.c](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/kernel/kfifo.c?h=linux-2.6.38.y) ,通過 `smp_wmb()` 插入 Store 屏障,確保更新指針的操作不會重排序到修改數據之前,以及更新指針的時候,Store Cahce 被刷新,其他 CPU 可見
```c
static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask;
if (esize != 1) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off);
memcpy(fifo->data + off, src, l);
memcpy(fifo->data, src + l, len - l);
/*
* make sure that the data in the fifo is up to date before
* incrementing the fifo->in index counter
*/
smp_wmb();
}
```
> `java.util.concurrent.locks.StampedLock#validate` 方法中,通過 `sun.misc.Unsafe#loadFence` 插入內存屏障,對非 volatile 的局部變量 `stamp` ,避免調用方進行樂觀讀時,代碼被重排序
```c
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y; // validate(stamp) 中插入了內存屏障,這行讀取x值和y值的代碼不會被重排序到上一行前
if (!sl.validate(stamp)) { // 插入內存屏障,避免前面的代碼被重排序
}
```
### 4.2 volatile(一寫多讀)
`volatile` 修飾的變量,在被修改后會將值刷新到主內存中,確保各個工作內存中讀到的值是一致的
### 4.3 無鎖隊列(多寫多讀,可在雙端增刪元素)
參考 AQS 使用的無鎖隊列
### 4.4 無鎖棧(多寫多讀,可在棧頂增刪元素)
參考 `java.util.concurrent.Phaser` 和 `ForkJoinPool` 使用的無鎖棧
### 4.5 無鎖鏈表(多寫多讀,可在中間增刪元素)
參考 `java.util.concurrent.ConcurrentSkipListMap` 使用的無鎖跳查表
## 5 并發編程模式\*
### 5.1 信號量、Latch 與同步屏障
- synchronized + wait() + notify() 實現信號量
- synchronized + wait() + notify() 實現 Launch 模式
- 線程池 + 原子類實現 Launch 模式
- Semaphore、CountDownLatch、CyclicBarrier 與 Phaser 的適用場景
### 5.2 發布訂閱模式
**相關概念:**
- 觀察者模式(Observer)是為了實現**松耦合**,和發布訂閱模式(Publish–Subscribe)通過注冊中心,實現了**完全解耦**
- 擔保-掛起(Guarded Suspension)模式,是很多設計模式(例如生產者-消費者模式)的基礎
- Balking 模式與擔保-掛起模式類似,但選擇的是**放棄**而不是掛起
**相關示例:**
- synchronized + wait() + notify() 實現阻塞隊列
- Lock + Condition 實現阻塞隊列
### 5.3 線程池模式
**線程池(** `java.util.concurrent.ThreadPoolExecutor`**)原理:**
- 線程池入參的含義
- 在不斷往線程池(ThreadPoolExecutor)中提交任務(Runnable)時,先由核心線程處理,核心線程數(corePoolSize)不夠時,存儲到任務隊列(BlockingQueue),任務隊列滿后,將創建線程至最大線程數(maximumPoolSize)來處理任務,如果還是不夠處理任務,則使用拒絕策略(RejectedExecutionHandler)
- 當任務處理完后,除了核心線程,其他線程會在經過保持時間(keepAliveTime)后被銷毀
- 線程池中的 `ctl` 字段含義:高3位表示線程池的運行狀態(runState),低29位表示工作線程數量(workerCount)
- 線程池狀態(runState)遷移(有向無環圖):
- RUNNING(-1) 經過 shutdown() 變為 SHUTDOWN(0)
- RUNNING(-1) 經過 shutdownNow() 變為 STOP(1)
- SHUTDOWN(0) 經過 shutdownNow() 變為 STOP(1)
- SHUTDOWN(0) 在隊列和線程池為空時,變為TIDYING(2)
- STOP(1) 在隊列和線程池為空時,變為TIDYING(2)
- TIDYING(2) 經過 terminated() 變為 TERMINATED(3)
- shutdown() 與 shutdownNow() 的區別
- shutdown() 不會清空任務隊列,會等所有任務執行完成;shutdownNow() 會清空任務隊列
- shutdown() 只中斷空閑線程,shutdownNow() 會中斷所有線程
> 線程池提交任務的 `execute` 方法分析
```java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 1. 當前線程數小于核心線程數,創建核心線程,并start任務
if (addWorker(command, true)) // start 任務成功,則返回
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 2. 當前線程數大于等于核心線程數,則放入隊列
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false)) // 3. 放入隊列失敗,則新建空閑線程 start 任務
reject(command); // 4. 使用空閑線程 start 任務失敗,使用拒絕策略
}
private boolean addWorker(Runnable firstTask, boolean core) { // 新開線程
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 判斷線程池是否處于 shutdown 后續狀態(如果正好是shutdown,還有任務則不退出)
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (; ; ) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 線程數超過上界
return false;
if (compareAndIncrementWorkerCount(c)) // workCouut 加1成功則跳出循環
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // runState 發生了變化,重新開始for循環
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// workCount 成加1,開始添加線程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 創建一個工作線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 添加線程
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 添加成功,則啟動線程
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
```
> 工作線程的執行過程分析
```java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // ThreadPoolExecutor.Worker,繼承自AQS
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...
}
final void runWorker(Worker w) { // ThreadPoolExecutor#runWorker
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 不斷從隊列中取任務執行
w.lock(); // 執行任務前先加鎖,shutdown時會tryLock()
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 檢查運行狀態是否已停止
wt.interrupt();
try {
beforeExecute(wt, task); // 鉤子方法,默認空實現
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown); // 鉤子方法,默認空實現
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally { // 確保 Worker 退出時,能執行下面的退出處理邏輯
processWorkerExit(w, completedAbruptly);
}
}
```
### 5.4 Future 模式
**背景:**
- 同步的API調用比較耗時的時候,可以先選擇獲取一個立即返回的憑據(Future),調用者所在線程不用陷入未知時長的阻塞中,在未來的某個時間再根據Future獲取結果
- 還可以增加回調(Callback)的機制,無需通過阻塞方法(get)獲取結果,而是注入一個回調方法,以此提高系統的響應時間,充分利用CPU資源
實現模式可以參考JDK源碼或 [pattern/future](https://console.cloud.baidu-int.com/devops/icode/repos/baidu/personal-code/z8g/tree/master/pattern/src/main/java/com/baidu/z8g/pattern/concurrent/future)
### 5.5 CopyOnWrite 模式
參考 `CopyOnWriteArrayList`、 `StampedLock` 等工具的實現
### 5.6 ForkJoin 模式
Forkjoin 是 JDK 7 中提供分治算法的多線程并行計算框架:
- 分治算法的實現
- 工作竊取的實現
- 并行計算的實現
### 5.7 Thread-Per-Message 模式\*
### 5.8 Worker-Thread 模式\*
### 5.9 Active Objects 模式\*
### 5.10 消息總線(Event Bus)模式\*
- 空白目錄
- 精簡版Spring的實現
- 0 前言
- 1 注冊和獲取bean
- 2 抽象工廠實例化bean
- 3 注入bean屬性
- 4 通過XML配置beanFactory
- 5 將bean注入到bean
- 6 加入應用程序上下文
- 7 JDK動態代理實現的方法攔截器
- 8 加入切入點和aspectj
- 9 自動創建AOP代理
- Redis原理
- 1 Redis簡介與構建
- 1.1 什么是Redis
- 1.2 構建Redis
- 1.3 源碼結構
- 2 Redis數據結構與對象
- 2.1 簡單動態字符串
- 2.1.1 sds的結構
- 2.1.2 sds與C字符串的區別
- 2.1.3 sds主要操作的API
- 2.2 雙向鏈表
- 2.2.1 adlist的結構
- 2.2.2 adlist和listNode的API
- 2.3 字典
- 2.3.1 字典的結構
- 2.3.2 哈希算法
- 2.3.3 解決鍵沖突
- 2.3.4 rehash
- 2.3.5 字典的API
- 2.4 跳躍表
- 2.4.1 跳躍表的結構
- 2.4.2 跳躍表的API
- 2.5 整數集合
- 2.5.1 整數集合的結構
- 2.5.2 整數集合的API
- 2.6 壓縮列表
- 2.6.1 壓縮列表的結構
- 2.6.2 壓縮列表結點的結構
- 2.6.3 連鎖更新
- 2.6.4 壓縮列表API
- 2.7 對象
- 2.7.1 類型
- 2.7.2 編碼和底層實現
- 2.7.3 字符串對象
- 2.7.4 列表對象
- 2.7.5 哈希對象
- 2.7.6 集合對象
- 2.7.7 有序集合對象
- 2.7.8 類型檢查與命令多態
- 2.7.9 內存回收
- 2.7.10 對象共享
- 2.7.11 對象空轉時長
- 3 單機數據庫的實現
- 3.1 數據庫
- 3.1.1 服務端中的數據庫
- 3.1.2 切換數據庫
- 3.1.3 數據庫鍵空間
- 3.1.4 過期鍵的處理
- 3.1.5 數據庫通知
- 3.2 RDB持久化
- 操作系統
- 2021-01-08 Linux I/O 操作
- 2021-03-01 Linux 進程控制
- 2021-03-01 Linux 進程通信
- 2021-06-11 Linux 性能優化
- 2021-06-18 性能指標
- 2022-05-05 Android 系統源碼閱讀筆記
- Java基礎
- 2020-07-18 Java 前端編譯與優化
- 2020-07-28 Java 虛擬機類加載機制
- 2020-09-11 Java 語法規則
- 2020-09-28 Java 虛擬機字節碼執行引擎
- 2020-11-09 class 文件結構
- 2020-12-08 Java 內存模型
- 2021-09-06 Java 并發包
- 代碼性能
- 2020-12-03 Java 字符串代碼性能
- 2021-01-02 ASM 運行時增強技術
- 理解Unsafe
- Java 8
- 1 行為參數化
- 1.1 行為參數化的實現原理
- 1.2 Java 8中的行為參數化
- 1.3 行為參數化 - 排序
- 1.4 行為參數化 - 線程
- 1.5 泛型實現的行為參數化
- 1.6 小結
- 2 Lambda表達式
- 2.1 Lambda表達式的組成
- 2.2 函數式接口
- 2.2.1 Predicate
- 2.2.2 Consumer
- 2.2.3 Function
- 2.2.4 函數式接口列表
- 2.3 方法引用
- 2.3.1 方法引用的類別
- 2.3.2 構造函數引用
- 2.4 復合方法
- 2.4.1 Comparator復合
- 2.4.2 Predicate復合
- 2.4.3 Function復合
- 3 流處理
- 3.1 流簡介
- 3.1.1 流的定義
- 3.1.2 流的特點
- 3.2 流操作
- 3.2.1 中間操作
- 3.2.2 終端操作
- 3.3.3 構建流
- 3.3 流API
- 3.3.1 flatMap的用法
- 3.3.2 reduce的用法
- 3.4 collect操作
- 3.4.1 collect示例
- 3.4.2 Collector接口