<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ## 20 其實不用造輪子—Executor框架詳解 > 人的一生可能燃燒也可能腐朽,我不能腐朽,我愿意燃燒起來! > ——奧斯特洛夫斯基 上一節我們動手實現了一個非常簡單的線程池。其實 JDK 已經為我們準備了功能豐富的線程池工具。本章我們就來學習一下 JDK 中的線程池—Executor 框架。 ## 1、Executor 框架的使用 我們首先來看看 Executor 框架是如何實用的。看如下代碼: ~~~java public class Client { public static Executor executor = Executors.newFixedThreadPool(10); public static void main(String[] args) { Stream.iterate(1, item -> item + 1).limit(20).forEach(item -> { executor.execute(() -> { System.out.println(Thread.currentThread().getName() + " hello!"); }); } ); } } ~~~ 可以看到在使用上和我們自己實現的線程池幾乎一模一樣。只不過在聲明 Executor 的時候,沒有直接 new 對象。而是通過Executors的靜態方法 newFixedThreadPool 來創建 Executor。 而執行任務的方式則是和我們自己實現的一模一樣。都是調用 executor 方法,傳入 Runnable 接口的實現,也就是運行的邏輯。那么它的內部實現是否也和我們實現的一樣呢?先別急,我們一點點展開來分析。 ## 2、Executor 框架設計簡介 我們先來看下Executor框架的繼承關系: ![圖片描述](https://img.mukewang.com/5dba76b6000189b606480516.jpg) **1、Executor** 可以看到最頂層是 Executor 的接口。這個接口很簡單,只有一個 execute 方法。此接口的目的是為了把任務提交和任務執行解耦。 **2、ExecutorService** 這還是一個接口,繼承自 Executor,它擴展了 Executor 接口,定義了更多線程池相關的操作。 **3、AbstractExecutorService** 提供了 ExecutorService 的部分默認實現。 **4、ThreadPoolExecutor** 實際上我們使用的線程池的實現是 ThreadPoolExecutor。它實現了線程池工作的完整機制。也是我們接下來分析的重點對象。 **5、ForkJoinPool** 實現 Fork/Join 模式的線程池,后面會有小節專門講解。本節不做深入分析。 **6、ScheduledExecutorService** 這個接口擴展了ExecutorService,定義個延遲執行和周期性執行任務的方法。 **7、ScheduledThreadPoolExecutor** 此接口則是在繼承 ThreadPoolExecutor 的基礎上實現 ScheduledExecutorService 接口,提供定時和周期執行任務的特性。 **Executors** Executor 框架還提供 Executors 對象。注意看這個對象比 Executor 接口后面對了個 s,要區分開,不要搞混。Executors 是一個工廠及工具類。提供了例如 newFixedThreadPool(10) 的方法,來創建各種不同的 Executor。 ## 3、Executor 框架源碼分析 Executor 設計的類和實現比較多。本節對 Executor 框架的源碼分析以 ThreadPoolExecutor 作為主線,其它的內容也會有所提及,不過請同學們抓住重點,別偏離了主線。 ### 3.1 Executor 代碼如下: ~~~java public interface Executor { void execute(Runnable command); } ~~~ 很簡單,只是為了把提交任務解耦出來。 ### 3.2 ExecutorService ExecutorService 定義了線程池管理和更多執行任務的方法,如下: ![圖片描述](https://img.mukewang.com/5dba76d70001cff905560264.jpg) 挑選幾個重點的說一下: **shutdown 方法** 終止 executorService,不再執行任務新的任務,已經執行的任務會被執行完。 **shutdownNow 方法** 不等待正在執行的任務完成,強行關閉。不過此方法并不保證正在執行的任務能被強行終止。返回從來沒有被執行的任務列表。 **submit 方法** 對 execute 方法的擴展,會返回一個 Future 對象,持有任務執行結果。 **invokeAll 方法** 執行一組任務,所有任務都返回或者 timeout 的時候,invokeAll 方法返回執行結果列表。該方法一旦返回結果,沒有完成的任務則被取消。 **invokeAny 方法** 執行一組任務,任意一個任務有返回時,invokeAny 返回該任務的執行結果。其余沒有完成的任務則被取消。 ### 3.3 AbstractExecutorService 提供了 newTaskFor 方法對 Runnable 進行包裝: ~~~java protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } ~~~ 它對 submit 的實現,就是過 newTaskFor 方法,代碼如下: ~~~java public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } ~~~ 這里用到的 RunnableFuture,就是為了這個功能而生,它實現了 Runbale 接口及 Future 接口。所以它可以被傳入execute方法,從而添加進任務列表。此外它還保存了執行的結果,并被返回。 ### 3.4 構造ThreadPoolExecutor 下面才是本節的重頭戲,對 ThreadPoolExecutor 的源代碼分析。我們從 ThreadPoolExecutor 的創建開始。 ~~~java Executors.newFixedThreadPool(10) ~~~ 可以看到是通過 Executors 的工廠方法來創建的,Executor 提供了多種工廠方法創建 ThreadPool。其實根本是調用 ThreadPoolExecutor 構造方法時傳入參數不同。我們以 newFixedThreadPool 方法為例,看一下代碼: ~~~java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } ~~~ ThreadPoolExecutor 構造方法如下: ~~~java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } ~~~ 現在我們可以翻一下 newFixedThreadPool 定義了一個什么樣的線程池: 核心線程數量為 n,最大線程數量也為 n 的線程池。線程池中線程永遠存活。線程池創建線程使用 defaultTHreadFactory。當無法創建線程時,使用 defaultHandler。 **corePoolSize**即線程池的核心線程數量,其實也是最小線程數量。不設置 allowCoreThreadTimeOut 的情況下,核心線程數量范圍內的線程一直存活。 **maximumPoolSize**即線程池的最大線程數量。受限于線程池的 CAPACITY。線程池的 CAPACITY 為 2 的 29 次方 -1。這是由于線程池把線程數量和狀態保存在一個整形原子變量中。狀態保存在高位,占據了兩位,所以線程池中線程數量最多到 2 的 29 次方 -1。 **workQueue**是一個阻塞的 queue,用來保存線程池要執行的所有任務。 **Executors.defaultThreadFactory()**,我們看下源代碼,發現其最終返回了一個 DefaultThreadFactory。代碼如下: ~~~java static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } ~~~ 其實就是規范了生成的 Thread。避免調用 new Thread 創建,導致創建出來的Thread可能存在差異。在Executor中,對線程的創建都是通過 ThreadFactory,禁止使用 new Thread 來創建。 ThreadPoolExecutor 中還有個重要的屬性: ~~~java /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>(); ~~~ 通過注釋可以看出,這個 HashSet 中存的是 Thread。而 Worker 其實就是對 Thread 的進一步封裝。 我們再回過頭來,看一下 ThreadPoolExecutor 的構造函數中做了什么事情: ~~~java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ~~~ 可以看到只是對屬性的賦值,并沒有啟動任何線程。這樣做是很好的設計,因為沒有任何任務添加時就啟動線程,是對系統資源的浪費。 通過以上分析,我們對 ThreadPoolExecutor 的結構應該比較清晰了,其實核心和我們自己實現的線程池是一樣的。ThreadPoolExecutor 也有一個任務的列表 workQueue,還有一個線程的列表 worker 那么按照我們自己實現的邏輯,線程池應該是通過啟動線程輪詢從 workQueue 中獲取任務執行來實現線程池的運轉。結下來我們看看猜想是否正確。 ### 3.5 啟動 ThreadPoolExecutor 既然在創建 ThreadPoolExecutor 時并沒有啟動線程池,那么線程池是何時被啟動的呢?我猜應該是添加第一個任務的時候,也就是調用 execute 方法時。我們來看看 execute 方法的代碼: ~~~java public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } ~~~ 源代碼中有一段關鍵的注釋我沒有貼進來,下面我先把這段關鍵的注釋翻譯講解下: 分三步做處理: * 1、如果運行的線程數量小于 corePoolSize,那么嘗試創建新的線程,并把傳入的 command 作為它的第一個 task 來執行。調用 addWorker 會自動檢查 runState 和 workCount,以此來防止在不應該添加線程時添加線程的錯誤警告; * 2、即使 task 可以被成功加入隊列,我們仍舊需要再次確認我們是否應該添加 thread(因為最后一次檢查之后可能有線程已經死掉了)還是線程池在進入此方法后已經停掉了。所以我們會再次檢查狀態,如果有必要的話,可以回滾隊列。或者當沒有線程時,開啟新的 thread; * 3、如果無法將 task 加入 queue,那么可以嘗試添加新的 thread。如果添加失敗,這是因為線程池被關閉或者已經飽和了,所以拒絕這個 task。 以上是原文的翻譯。結合代碼,其實就是如下三步: * 1、線程數量不足 corePoolSize時,添加新線程作為 core thread 執行 command; * 2、將 command 加入 workQueue,然后再次檢查線程池狀態。如果不是 isRunning,則移除 command 并且reject command。如果線程數量已經為 0,那么則再次 addWorker; * 3、如果無法將 task 加入 workQueue,則嘗試 addWorker。但不作為 core thread。如果添加失敗,則 reject command(由于沒有加入 workQueue,所以不需要從 queue 中移除 command)。 可以看到 execute 流程的核心方法為**addWorker**。我們繼續分析 addWorker方法。 **addWork**中主要執行如下邏輯: 1、更新 worker 的數量,代碼如下: ~~~java retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } ~~~ retry 是一個標記,和循環配合使用,continue retry 的時候,會跳到 retry 的地方再次執行。如果 break retry,則跳出整個循環體。前文提到過,ThreadPoolExecutor 把狀態和線程池數量兩個屬性存在了一個 Atomic 變量中,就是這里用到的 ctl。源碼中先檢查了狀態,然后格局創建線程類型的不同,進行數量的校驗。在通過 CAS方 式更新狀 ctl,成功的話則跳出循環。否則再次取得線程池狀態,如果和最初已經不一致,那么從頭開始執行。如果狀態并未改變則繼續更新 worker 的數量。流程參考下圖: ![圖片描述](https://img.mukewang.com/5dba77580001323211010534.jpg) 2、添加 worker 到 workers 的 set 中。并且啟動 worker 中持有的線程。代碼如下: ~~~java boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; ~~~ 可以看到添加 work 時需要先獲得鎖,這樣確保多線程并發安全。如果添加 worker 成功,那么調用 worker 中線程的 start 方法啟動線程。如果啟動失敗則調用 addWorkerFailed 方法進行回滾。過程比較簡單,這里就不再提流程圖了。 分析到這里,我們先進行下總結。 * 1、ThreadPoolExecutor 在初始化后并沒有啟動和創建任何線程; * 2、在調用 execute 方法時才會調用 addWorker 創建線程,并且把 command 加入到 workQueue(如果已經擁有超過 core 數量的線程,則不會再調用 addWorker 創建線程); * 3、addWorker 方法中會創建新的 worker,并啟動其持有的線程來執行任務。 第二步中,如果線程數量已經達到 corePoolSize,則只會把 command 加入到 workQueue 中,那么加入到 workQueue 中的 command 是如何被執行的呢?我們下面來分析 Worker 的源代碼。 ### 3.6 Worker Worker 封裝了線程,是 executor 中的工作單元。worker 繼承自 AbstractQueuedSynchronizer,并實現 Runnable。 worker 中的屬性如下: ~~~java /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; ~~~ 如果存在 firstTask,那么 worker 中線程啟動時,會先執行 firstTask。 構造方法如下: ~~~java Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } ~~~ 可以看到通過 ThreadFactory 創建線程,并沒有直接 new。原因上文已經將結果。此處還需要特別注意的是,創建 thread 時把 worker 自己作為 Runnable 的實現傳入了 thread 中。那么 addWork 時調用的 t.start(),實際上運行的是 t 所屬 worker的run 方法。worker 的 run 方法如下: ~~~java public void run() { runWorker(this); } ~~~ 實際運行的是 ThreadPoolExecutor 的 runWorker 方法,代碼如下: ~~~java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } ~~~ 主流程如下: * 1、先取出 worker 中的 firstTask,并清空; * 2、如果沒有 firstTask,則調用 getTask 方法,從 workQueue 中獲取task; * 3、獲取鎖; * 4、執行 beforeExecute。這里是空方法,如有需要在子類實現; * 5、執行 task.run; * 6、執行 afterExecute。這里是空方法,如有需要在子類實現; * 7、清空 task,completedTasks++,釋放鎖; * 8、當有異常或者沒有 task 可執行時,會進入外層 finnaly 代碼塊。調用 processWorkerExit 退出當前 worker。從 works 中移除本 worker 后,如果 worker 數量小于 corePoolSize,則創建新的 worker,以維持 corePoolSize 大小的線程數。 這行代碼 while (task != null || (task = getTask()) != null) ,確保了 worker 不停地從 workQueue 中取得 task 執行。getTask 方法會從 BlockingQueue workQueue 中 poll 或者 take 其中的 task 出來。 到這里關于 executor 如何創建并啟動線程執行 task 的過程已經分析清楚了。其實和我們自己實現的線程池的核心思想一致,都是通過維護一定數量的線程,并且不斷從任務隊列取得任務執行來實現線程池的運轉。但是 Executor 框架考慮得更為全面,健壯性也要好很多。我們在實際開發中不要自己再去設計線程池,請直接使用 executor。 ## 4、總結 本節的內容相對比較多,源代碼閱讀也比較枯燥。我們在閱讀源代碼時一定抓住核心流程·,從高層級邏輯開始自頂向下分析和閱讀。不要過多糾纏于細節,等到大體能夠讀懂時,再去看感興趣的細節實現。否則很容易在層層嵌套的源代碼中迷失了方向,陷入某個細節不能自拔。其實關于 ThreadPoolExecutor 還有些方法,本節沒有給出分析,比如 shutdown 和 shutdownNow,大家可以嘗試自己分析下。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看