## 20 其實不用造輪子—Executor框架詳解
> 人的一生可能燃燒也可能腐朽,我不能腐朽,我愿意燃燒起來!
> ——奧斯特洛夫斯基
上一節我們動手實現了一個非常簡單的線程池。其實 JDK 已經為我們準備了功能豐富的線程池工具。本章我們就來學習一下 JDK 中的線程池—Executor 框架。
## 1、Executor 框架的使用
我們首先來看看 Executor 框架是如何實用的。看如下代碼:
~~~java
public class Client {
public static Executor executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
Stream.iterate(1, item -> item + 1).limit(20).forEach(item -> {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " hello!");
});
}
);
}
}
~~~
可以看到在使用上和我們自己實現的線程池幾乎一模一樣。只不過在聲明 Executor 的時候,沒有直接 new 對象。而是通過Executors的靜態方法 newFixedThreadPool 來創建 Executor。
而執行任務的方式則是和我們自己實現的一模一樣。都是調用 executor 方法,傳入 Runnable 接口的實現,也就是運行的邏輯。那么它的內部實現是否也和我們實現的一樣呢?先別急,我們一點點展開來分析。
## 2、Executor 框架設計簡介
我們先來看下Executor框架的繼承關系:

**1、Executor**
可以看到最頂層是 Executor 的接口。這個接口很簡單,只有一個 execute 方法。此接口的目的是為了把任務提交和任務執行解耦。
**2、ExecutorService**
這還是一個接口,繼承自 Executor,它擴展了 Executor 接口,定義了更多線程池相關的操作。
**3、AbstractExecutorService**
提供了 ExecutorService 的部分默認實現。
**4、ThreadPoolExecutor**
實際上我們使用的線程池的實現是 ThreadPoolExecutor。它實現了線程池工作的完整機制。也是我們接下來分析的重點對象。
**5、ForkJoinPool**
實現 Fork/Join 模式的線程池,后面會有小節專門講解。本節不做深入分析。
**6、ScheduledExecutorService**
這個接口擴展了ExecutorService,定義個延遲執行和周期性執行任務的方法。
**7、ScheduledThreadPoolExecutor**
此接口則是在繼承 ThreadPoolExecutor 的基礎上實現 ScheduledExecutorService 接口,提供定時和周期執行任務的特性。
**Executors**
Executor 框架還提供 Executors 對象。注意看這個對象比 Executor 接口后面對了個 s,要區分開,不要搞混。Executors 是一個工廠及工具類。提供了例如 newFixedThreadPool(10) 的方法,來創建各種不同的 Executor。
## 3、Executor 框架源碼分析
Executor 設計的類和實現比較多。本節對 Executor 框架的源碼分析以 ThreadPoolExecutor 作為主線,其它的內容也會有所提及,不過請同學們抓住重點,別偏離了主線。
### 3.1 Executor
代碼如下:
~~~java
public interface Executor {
void execute(Runnable command);
}
~~~
很簡單,只是為了把提交任務解耦出來。
### 3.2 ExecutorService
ExecutorService 定義了線程池管理和更多執行任務的方法,如下:

挑選幾個重點的說一下:
**shutdown 方法**
終止 executorService,不再執行任務新的任務,已經執行的任務會被執行完。
**shutdownNow 方法**
不等待正在執行的任務完成,強行關閉。不過此方法并不保證正在執行的任務能被強行終止。返回從來沒有被執行的任務列表。
**submit 方法**
對 execute 方法的擴展,會返回一個 Future 對象,持有任務執行結果。
**invokeAll 方法**
執行一組任務,所有任務都返回或者 timeout 的時候,invokeAll 方法返回執行結果列表。該方法一旦返回結果,沒有完成的任務則被取消。
**invokeAny 方法**
執行一組任務,任意一個任務有返回時,invokeAny 返回該任務的執行結果。其余沒有完成的任務則被取消。
### 3.3 AbstractExecutorService
提供了 newTaskFor 方法對 Runnable 進行包裝:
~~~java
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
~~~
它對 submit 的實現,就是過 newTaskFor 方法,代碼如下:
~~~java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
~~~
這里用到的 RunnableFuture,就是為了這個功能而生,它實現了 Runbale 接口及 Future 接口。所以它可以被傳入execute方法,從而添加進任務列表。此外它還保存了執行的結果,并被返回。
### 3.4 構造ThreadPoolExecutor
下面才是本節的重頭戲,對 ThreadPoolExecutor 的源代碼分析。我們從 ThreadPoolExecutor 的創建開始。
~~~java
Executors.newFixedThreadPool(10)
~~~
可以看到是通過 Executors 的工廠方法來創建的,Executor 提供了多種工廠方法創建 ThreadPool。其實根本是調用 ThreadPoolExecutor 構造方法時傳入參數不同。我們以 newFixedThreadPool 方法為例,看一下代碼:
~~~java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
~~~
ThreadPoolExecutor 構造方法如下:
~~~java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
~~~
現在我們可以翻一下 newFixedThreadPool 定義了一個什么樣的線程池:
核心線程數量為 n,最大線程數量也為 n 的線程池。線程池中線程永遠存活。線程池創建線程使用 defaultTHreadFactory。當無法創建線程時,使用 defaultHandler。
**corePoolSize**即線程池的核心線程數量,其實也是最小線程數量。不設置 allowCoreThreadTimeOut 的情況下,核心線程數量范圍內的線程一直存活。
**maximumPoolSize**即線程池的最大線程數量。受限于線程池的 CAPACITY。線程池的 CAPACITY 為 2 的 29 次方 -1。這是由于線程池把線程數量和狀態保存在一個整形原子變量中。狀態保存在高位,占據了兩位,所以線程池中線程數量最多到 2 的 29 次方 -1。
**workQueue**是一個阻塞的 queue,用來保存線程池要執行的所有任務。
**Executors.defaultThreadFactory()**,我們看下源代碼,發現其最終返回了一個 DefaultThreadFactory。代碼如下:
~~~java
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
~~~
其實就是規范了生成的 Thread。避免調用 new Thread 創建,導致創建出來的Thread可能存在差異。在Executor中,對線程的創建都是通過 ThreadFactory,禁止使用 new Thread 來創建。
ThreadPoolExecutor 中還有個重要的屬性:
~~~java
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
~~~
通過注釋可以看出,這個 HashSet 中存的是 Thread。而 Worker 其實就是對 Thread 的進一步封裝。
我們再回過頭來,看一下 ThreadPoolExecutor 的構造函數中做了什么事情:
~~~java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
~~~
可以看到只是對屬性的賦值,并沒有啟動任何線程。這樣做是很好的設計,因為沒有任何任務添加時就啟動線程,是對系統資源的浪費。
通過以上分析,我們對 ThreadPoolExecutor 的結構應該比較清晰了,其實核心和我們自己實現的線程池是一樣的。ThreadPoolExecutor 也有一個任務的列表 workQueue,還有一個線程的列表 worker
那么按照我們自己實現的邏輯,線程池應該是通過啟動線程輪詢從 workQueue 中獲取任務執行來實現線程池的運轉。結下來我們看看猜想是否正確。
### 3.5 啟動 ThreadPoolExecutor
既然在創建 ThreadPoolExecutor 時并沒有啟動線程池,那么線程池是何時被啟動的呢?我猜應該是添加第一個任務的時候,也就是調用 execute 方法時。我們來看看 execute 方法的代碼:
~~~java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
~~~
源代碼中有一段關鍵的注釋我沒有貼進來,下面我先把這段關鍵的注釋翻譯講解下:
分三步做處理:
* 1、如果運行的線程數量小于 corePoolSize,那么嘗試創建新的線程,并把傳入的 command 作為它的第一個 task 來執行。調用 addWorker 會自動檢查 runState 和 workCount,以此來防止在不應該添加線程時添加線程的錯誤警告;
* 2、即使 task 可以被成功加入隊列,我們仍舊需要再次確認我們是否應該添加 thread(因為最后一次檢查之后可能有線程已經死掉了)還是線程池在進入此方法后已經停掉了。所以我們會再次檢查狀態,如果有必要的話,可以回滾隊列。或者當沒有線程時,開啟新的 thread;
* 3、如果無法將 task 加入 queue,那么可以嘗試添加新的 thread。如果添加失敗,這是因為線程池被關閉或者已經飽和了,所以拒絕這個 task。
以上是原文的翻譯。結合代碼,其實就是如下三步:
* 1、線程數量不足 corePoolSize時,添加新線程作為 core thread 執行 command;
* 2、將 command 加入 workQueue,然后再次檢查線程池狀態。如果不是 isRunning,則移除 command 并且reject command。如果線程數量已經為 0,那么則再次 addWorker;
* 3、如果無法將 task 加入 workQueue,則嘗試 addWorker。但不作為 core thread。如果添加失敗,則 reject command(由于沒有加入 workQueue,所以不需要從 queue 中移除 command)。
可以看到 execute 流程的核心方法為**addWorker**。我們繼續分析 addWorker方法。
**addWork**中主要執行如下邏輯:
1、更新 worker 的數量,代碼如下:
~~~java
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
~~~
retry 是一個標記,和循環配合使用,continue retry 的時候,會跳到 retry 的地方再次執行。如果 break retry,則跳出整個循環體。前文提到過,ThreadPoolExecutor 把狀態和線程池數量兩個屬性存在了一個 Atomic 變量中,就是這里用到的 ctl。源碼中先檢查了狀態,然后格局創建線程類型的不同,進行數量的校驗。在通過 CAS方 式更新狀 ctl,成功的話則跳出循環。否則再次取得線程池狀態,如果和最初已經不一致,那么從頭開始執行。如果狀態并未改變則繼續更新 worker 的數量。流程參考下圖:

2、添加 worker 到 workers 的 set 中。并且啟動 worker 中持有的線程。代碼如下:
~~~java
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
~~~
可以看到添加 work 時需要先獲得鎖,這樣確保多線程并發安全。如果添加 worker 成功,那么調用 worker 中線程的 start 方法啟動線程。如果啟動失敗則調用 addWorkerFailed 方法進行回滾。過程比較簡單,這里就不再提流程圖了。
分析到這里,我們先進行下總結。
* 1、ThreadPoolExecutor 在初始化后并沒有啟動和創建任何線程;
* 2、在調用 execute 方法時才會調用 addWorker 創建線程,并且把 command 加入到 workQueue(如果已經擁有超過 core 數量的線程,則不會再調用 addWorker 創建線程);
* 3、addWorker 方法中會創建新的 worker,并啟動其持有的線程來執行任務。
第二步中,如果線程數量已經達到 corePoolSize,則只會把 command 加入到 workQueue 中,那么加入到 workQueue 中的 command 是如何被執行的呢?我們下面來分析 Worker 的源代碼。
### 3.6 Worker
Worker 封裝了線程,是 executor 中的工作單元。worker 繼承自 AbstractQueuedSynchronizer,并實現 Runnable。
worker 中的屬性如下:
~~~java
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
~~~
如果存在 firstTask,那么 worker 中線程啟動時,會先執行 firstTask。
構造方法如下:
~~~java
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
~~~
可以看到通過 ThreadFactory 創建線程,并沒有直接 new。原因上文已經將結果。此處還需要特別注意的是,創建 thread 時把 worker 自己作為 Runnable 的實現傳入了 thread 中。那么 addWork 時調用的 t.start(),實際上運行的是 t 所屬 worker的run 方法。worker 的 run 方法如下:
~~~java
public void run() {
runWorker(this);
}
~~~
實際運行的是 ThreadPoolExecutor 的 runWorker 方法,代碼如下:
~~~java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
~~~
主流程如下:
* 1、先取出 worker 中的 firstTask,并清空;
* 2、如果沒有 firstTask,則調用 getTask 方法,從 workQueue 中獲取task;
* 3、獲取鎖;
* 4、執行 beforeExecute。這里是空方法,如有需要在子類實現;
* 5、執行 task.run;
* 6、執行 afterExecute。這里是空方法,如有需要在子類實現;
* 7、清空 task,completedTasks++,釋放鎖;
* 8、當有異常或者沒有 task 可執行時,會進入外層 finnaly 代碼塊。調用 processWorkerExit 退出當前 worker。從 works 中移除本 worker 后,如果 worker 數量小于 corePoolSize,則創建新的 worker,以維持 corePoolSize 大小的線程數。
這行代碼 while (task != null || (task = getTask()) != null) ,確保了 worker 不停地從 workQueue 中取得 task 執行。getTask 方法會從 BlockingQueue workQueue 中 poll 或者 take 其中的 task 出來。
到這里關于 executor 如何創建并啟動線程執行 task 的過程已經分析清楚了。其實和我們自己實現的線程池的核心思想一致,都是通過維護一定數量的線程,并且不斷從任務隊列取得任務執行來實現線程池的運轉。但是 Executor 框架考慮得更為全面,健壯性也要好很多。我們在實際開發中不要自己再去設計線程池,請直接使用 executor。
## 4、總結
本節的內容相對比較多,源代碼閱讀也比較枯燥。我們在閱讀源代碼時一定抓住核心流程·,從高層級邏輯開始自頂向下分析和閱讀。不要過多糾纏于細節,等到大體能夠讀懂時,再去看感興趣的細節實現。否則很容易在層層嵌套的源代碼中迷失了方向,陷入某個細節不能自拔。其實關于 ThreadPoolExecutor 還有些方法,本節沒有給出分析,比如 shutdown 和 shutdownNow,大家可以嘗試自己分析下。
- 前言
- 第1章 Java并發簡介
- 01 開篇詞:多線程為什么是你必需要掌握的知識
- 02 絕對不僅僅是為了面試—我們為什么需要學習多線程
- 03 多線程開發如此簡單—Java中如何編寫多線程程序
- 04 人多力量未必大—并發可能會遇到的問題
- 第2章 Java中如何編寫多線程
- 05 看若兄弟,實如父子—Thread和Runnable詳解
- 06 線程什么時候開始真正執行?—線程的狀態詳解
- 07 深入Thread類—線程API精講
- 08 集體協作,什么最重要?溝通!—線程的等待和通知
- 09 使用多線程實現分工、解耦、緩沖—生產者、消費者實戰
- 第3章 并發的問題和原因詳解
- 10 有福同享,有難同當—原子性
- 11 眼見不實—可見性
- 12 什么?還有這種操作!—有序性
- 13 問題的根源—Java內存模型簡介
- 14 僵持不下—死鎖詳解
- 第4章 如何解決并發問題
- 15 原子性輕量級實現—深入理解Atomic與CAS
- 16 讓你眼見為實—volatile詳解
- 17 資源有限,請排隊等候—Synchronized使用、原理及缺陷
- 18 線程作用域內共享變量—深入解析ThreadLocal
- 第5章 線程池
- 19 自己動手豐衣足食—簡單線程池實現
- 20 其實不用造輪子—Executor框架詳解
- 第6章 主要并發工具類
- 21 更高級的鎖—深入解析Lock
- 22 到底哪把鎖更適合你?—synchronized與ReentrantLock對比
- 23 按需上鎖—ReadWriteLock詳解
- 24 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap上
- 25 經典并發容器,多線程面試必備—深入解析ConcurrentHashMap下
- 26不讓我進門,我就在門口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒數計時開始,三、二、一—CountDownLatch詳解
- 28 人齊了,一起行動—CyclicBarrier詳解
- 29 一手交錢,一手交貨—Exchanger詳解
- 30 限量供應,不好意思您來晚了—Semaphore詳解
- 第7章 高級并發工具類及并發設計模式
- 31 憑票取餐—Future模式詳解
- 32 請按到場順序發言—Completion Service詳解
- 33 分階段執行你的任務-學習使用Phaser運行多階段任務
- 34 誰都不能偷懶-通過 CompletableFuture 組裝你的異步計算單元
- 35 拆分你的任務—學習使用Fork/Join框架
- 36 為多線程們安排一位經理—Master/Slave模式詳解
- 第8章 總結
- 37 結束語