<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                ## 37 ThreadPoolExecutor 源碼解析 ## 引導語 線程池我們在工作中經常會用到。在請求量大時,使用線程池,可以充分利用機器資源,增加請求的處理速度,本章節我們就和大家一起來學習線程池。 本章的基礎是第四章隊列和第五章線程,沒有看過這兩章的同學可以先看一看。 本章的順序,先說源碼,弄懂原理,接著看一看面試題,最后看看實際工作中是如何運用線程池的。 ### 1 整體架構圖 我們畫了線程池的整體圖,如下: ![](https://img.kancloud.cn/95/2b/952b197b6cd379611ec2fc76452a0a47_2013x1040.jpg) 本小節主要就按照這個圖來進行 ThreadPoolExecutor 源碼的講解,大家在看各個方法時,可以結合這個圖一起看。 #### 1.1 類結構 首先我們來看一下 ThreadPoolExecutor 的類結構,如下圖: ![](https://img.kancloud.cn/ff/84/ff843f244d91e34f8db41a287a0a9474_602x618.jpg) 從上圖中,我們從命名上來看,都有 Executor 的共同命名,Executor 的中文意思為執行的意思,表示對提供的任務進行執行,我們在第五章線程中學習到了幾種任務:Runnable、Callable、FutureTask,之前我們都是使用 Thread 來執行這些任務的,除了 Thread,這些 Executor 命名的類和接口也是可以執行這幾種任務的,接下來我們大概的看下這幾個類的大概含義: 1. Executor:定義 execute 方法來執行任務,入參是 Runnable,無出參: ![](https://img.kancloud.cn/db/37/db37402b1d23899c003a452931cbaa4c_1523x680.jpg) 2. ExecutorService:Executor 的功能太弱,ExecutorService 豐富了對任務的執行和管理的功能,主要代碼如下: ``` // 關閉,不會接受新的任務,也不會等待未完成的任務 // 如果需要等待未完成的任務,可以使用 awaitTermination 方法 void shutdown(); // executor 是否已經關閉了,返回值 true 表示已關閉 boolean isShutdown(); // 所有的任務是否都已經終止,是的話,返回 true boolean isTerminated(); // 在超時時間內,等待剩余的任務終止 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交有返回值的任務,使用 get 方法可以阻塞等待任務的執行結果返回 <T> Future<T> submit(Callable<T> task); // 提交沒有返回值的任務,如果使用 get 方法的話,任務執行完之后得到的是 null 值 Future<?> submit(Runnable task); // 給定任務集合,返回已經執行完成的 Future 集合,每個返回的 Future 都是 isDone = true 的狀態 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 給定任務中有一個執行成功就返回,如果拋異常,其余未完成的任務將被取消 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; ``` 1. AbstractExecutorService 是一個抽象類,封裝了 Executor 的很多通用功能,比如: ``` // 把 Runnable 轉化成 RunnableFuture // RunnableFuture 是一個接口,實現了 Runnable 和 Future // FutureTask 是 RunnableFuture 的實現類,主要是對任務進行各種管理 // Runnable + Future => RunnableFuture => FutureTask protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } // 提交無返回值的任務 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // ftask 其實是 FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } // 提交有返回值的任務 public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // ftask 其實是 FutureTask RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } ``` 有幾個點需要注意下: 1. FutureTask 我們在第五章有說,其本身就是一個任務,而且具備對任務管理的功能,比如可以通過 get 方法拿到任務的執行結果; 2. submit 方法是我們平時使用線程池時提交任務的方法,支持 Runable 和 Callable 兩種任務的提交,方法中 execute 方法是其子類 ThreadPoolExecutor 實現的,不管是那種任務入參,execute 方法最終執行的任務都是 FutureTask; 3. ThreadPoolExecutor 繼承了 AbstractExecutorService 抽象類,具備以上三個類的所有功能。 #### 1.2 類注釋 ThreadPoolExecutor 的類注釋有很多,我們選取關鍵的注釋如下: 1. ExecutorService 使用線程池中的線程執行提交的任務,線程池我們可以使用 Executors 進行配置; 2. 線程池解決兩個問題:1:通過減少任務間的調度開銷 (主要是通過線程池中的線程被重復使用的方式),來提高大量任務時的執行性能;2:提供了一種方式來管理線程和消費,維護基本數據統計等工作,比如統計已完成的任務數; 3. Executors 為常用的場景設定了可直接初始化線程池的方法,比如 Executors#newCachedThreadPool 無界的線程池,并且可以自動回收;Executors#newFixedThreadPool 固定大小線程池;Executors#newSingleThreadExecutor 單個線程的線程池; 4. 為了在各種上下文中使用線程池,線程池提供可供擴展的參數設置:1:coreSize:當新任務提交時,發現運行的線程數小于 coreSize,一個新的線程將被創建,即使這時候其它工作線程是空閑的,可以通過 getCorePoolSize 方法獲得 coreSize;2:maxSize: 當任務提交時,coreSize < 運行線程數 <= maxSize,但隊列沒有滿時,任務提交到隊列中,如果隊列滿了,在 maxSize 允許的范圍內新建線程; 5. 一般來說,coreSize 和 maxSize 在線程池初始化時就已經設定了,但我們也可以通過 setCorePoolSize、setMaximumPoolSize 方法動態的修改這兩個值; 6. 默認的,core threads 需要到任務提交后才創建的,但我們可以分別使用 prestartCoreThread、prestartAllCoreThreads 兩個方法來提前創建一個、所有的 core threads; 7. 新的線程被默認 ThreadFactory 創建時,優先級會被限制成 NORM_PRIORITY,默認會被設置成非守護線程,這個和新建線程的繼承是不同的; 8. Keep-alive times 參數的作用:1:如果當前線程池中有超過 coreSize 的線程;2:并且線程空閑的時間超過 keepAliveTime,當前線程就會被回收,這樣可以避免線程沒有被使用時的資源浪費; 9. 通過 setKeepAliveTime 方法可以動態的設置 keepAliveTime 的值; 10. 如果設置 allowCoreThreadTimeOut 為 ture 的話,core thread 空閑時間超過 keepAliveTime 的話,也會被回收; 11. 線程池新建時,有多種隊列可供選擇,比如:1:SynchronousQueue,為了避免任務被拒絕,要求線程池的 maxSize 無界,缺點是當任務提交的速度超過消費的速度時,可能出現無限制的線程增長;2:LinkedBlockingQueue,無界隊列,未消費的任務可以在隊列中等待;3:ArrayBlockingQueue,有界隊列,可以防止資源被耗盡; 12. 隊列的維護:提供了 getQueue () 方法方便我們進行監控和調試,嚴禁用于其他目的,remove 和 purge 兩個方法可以對隊列中的元素進行操作; 13. 在 Executor 已經關閉或對最大線程和最大隊列都使用飽和時,可以使用 RejectedExecutionHandler 類進行異常捕捉,有如下四種處理策略:ThreadPoolExecutor.AbortPolicy、ThreadPoolExecutor.DiscardPolicy、ThreadPoolExecutor.CallerRunsPolicy、ThreadPoolExecutor.DiscardOldestPolicy; 14. 線程池提供了很多可供擴展的鉤子函數,比如有:1:提供在每個任務執行之前 beforeExecute 和執行之后 afterExecute 的鉤子方法,主要用于操作執行環境,比如初始化 ThreadLocals、收集統計數據、添加日志條目等;2: 如果在執行器執行完成之后想干一些事情,可以實現 terminated 方法,如果鉤子方法執行時發生異常,工作線程可能會失敗并立即終止。 可以看到 ThreadPoolExecutor 的注釋是非常多的,也是非常重要的,我們很多面試的題目,在注釋上都能找到答案。 #### 1.3 ThreadPoolExecutor 重要屬性 接下來我們來看一看 ThreadPoolExecutor 都有哪些重要屬性,如下: ``` //ctl 線程池狀態控制字段,由兩部分組成: //1:workerCount wc 工作線程數,我們限制 workerCount 最大到(2^29)-1,大概 5 億個線程 //2:runState rs 線程池的狀態,提供了生命周期的控制,源碼中有很多關于狀態的校驗,狀態枚舉如下: //RUNNING(-536870912):接受新任務或者處理隊列里的任務。 //SHUTDOWN(0):不接受新任務,但仍在處理已經在隊列里面的任務。 //STOP(-536870912):不接受新任務,也不處理隊列中的任務,對正在執行的任務進行中斷。 //TIDYING(1073741824): 所以任務都被中斷,workerCount 是 0,整理狀態 //TERMINATED(1610612736): terminated() 已經完成的時候 //runState 之間的轉變過程: //RUNNING -> SHUTDOWN:調用 shudown(),finalize() //(RUNNING or SHUTDOWN) -> STOP:調用shutdownNow() //SHUTDOWN -> TIDYING -> workerCount ==0 //STOP -> TIDYING -> workerCount ==0 //TIDYING -> TERMINATED -> terminated() 執行完成之后 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;// 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1;// =(2^29)-1=536870911 // Packing and unpacking ctl private static int ctlOf(int rs, int wc) { return rs | wc; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int runStateOf(int c) { return c & ~CAPACITY; } // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS;//-536870912 private static final int SHUTDOWN = 0 << COUNT_BITS;//0 private static final int STOP = 1 << COUNT_BITS;//-536870912 private static final int TIDYING = 2 << COUNT_BITS;//1073741824 private static final int TERMINATED = 3 << COUNT_BITS;//1610612736 // 已完成任務的計數 volatile long completedTasks; // 線程池最大容量 private int largestPoolSize; // 已經完成的任務數 private long completedTaskCount; // 用戶可控制的參數都是 volatile 修飾的 // 可以使用 threadFactory 創建 thread // 創建失敗一般不拋出異常,只有在 OutOfMemoryError 時候才會 private volatile ThreadFactory threadFactory; // 飽和或者運行中拒絕任務的 handler 處理類 private volatile RejectedExecutionHandler handler; // 線程存活時間設置 private volatile long keepAliveTime; // 設置 true 的話,核心線程空閑 keepAliveTime 時間后,也會被回收 private volatile boolean allowCoreThreadTimeOut; // coreSize private volatile int corePoolSize; // maxSize 最大限制 (2^29)-1 private volatile int maximumPoolSize; // 默認的拒絕策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 隊列會 hold 住任務,并且利用隊列的阻塞的特性,來保持線程的存活周期 private final BlockingQueue<Runnable> workQueue; // 大多數情況下是控制對 workers 的訪問權限 private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = mainLock.newCondition(); // 包含線程池中所有的工作線程 private final HashSet<Worker> workers = new HashSet<Worker>(); ``` 屬性也是非常多,為了方便理解線程池的狀態扭轉,畫了一個圖: ![](https://img.kancloud.cn/68/a0/68a08062d39b93efb24fe944729b9c69_1675x424.jpg) Worker 我們可以理解成線程池中任務運行的最小單元,Worker 的大致結構如下: ``` // 線程池中任務執行的最小單元 // Worker 繼承 AQS,具有鎖功能 // Worker 實現 Runnable,本身是一個可執行的任務 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 任務運行的線程 final Thread thread; // 需要執行的任務 Runnable firstTask; // 非常巧妙的設計,Worker本身是個 Runnable,把自己作為任務傳遞給 thread // 內部有個屬性又設置了 Runnable Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 把 Worker 自己作為 thread 運行的任務 this.thread = getThreadFactory().newThread(this); } /** Worker 本身是 Runnable,run 方法是 Worker 執行的入口, runWorker 是外部的方法 */ public void run() { runWorker(this); } private static final long serialVersionUID = 6138294804551838833L; // Lock methods // 0 代表沒有鎖住,1 代表鎖住 protected boolean isHeldExclusively() { return getState() != 0; } // 嘗試加鎖,CAS 賦值為 1,表示鎖住 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 嘗試釋放鎖,釋放鎖沒有 CAS 校驗,可以任意的釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } ``` 理解 Worker 非常關鍵,主要有以下幾點: 1. Worker 很像是任務的代理,在線程池中,最小的執行單位就是 Worker,所以 Worker 實現了 Runnable 接口,實現了 run 方法; 2. 在 Worker 初始化時 this.thread = getThreadFactory ().newThread (this) 這行代碼比較關鍵,它把當前 Worker 作為線程的構造器入參,我們在后續的實現中會發現這樣的代碼:Thread t = w.thread;t.start (),此時的 w 是 Worker 的引用申明,此處 t.start 實際上執行的就是 Worker 的 run 方法; 3. Worker 本身也實現了 AQS,所以其本身也是一個鎖,其在執行任務的時候,會鎖住自己,任務執行完成之后,會釋放自己。 ### 2 線程池的任務提交 線程池的任務提交從 submit 方法說起,submit 方法是 AbstractExecutorService 抽象類定義的,主要做了兩件事情: 1. 把 Runnable 和 Callable 都轉化成 FutureTask,這個我們之前看過源碼了; 2. 使用 execute 方法執行 FutureTask。 execute 方法是 ThreadPoolExecutor 中的方法,源碼如下: ``` public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 工作的線程小于核心線程數,創建新的線程,成功返回,失敗不拋異常 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 線程池狀態可能發生變化 c = ctl.get(); } // 工作的線程大于等于核心線程數,或者新建線程失敗 // 線程池狀態正常,并且可以入隊的話,嘗試入隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果線程池狀態異常 嘗試從隊列中移除任務,可以移除的話就拒絕掉任務 if (!isRunning(recheck) && remove(command)) reject(command); // 發現可運行的線程數是 0,就初始化一個線程,這里是個極限情況,入隊的時候,突然發現 // 可用線程都被回收了 else if (workerCountOf(recheck) == 0) // Runnable是空的,不會影響新增線程,但是線程在 start 的時候不會運行 // Thread.run() 里面有判斷 addWorker(null, false); } // 隊列滿了,開啟線程到 maxSize,如果失敗直接拒絕, else if (!addWorker(command, false)) reject(command); } ``` execute 方法執行的就是整體架構圖的左半邊的邏輯,其中多次調用 addWorker 方法,addWorker 方法的作用是新建一個 Worker,我們一起來看下源碼: ``` // 結合線程池的情況看是否可以添加新的 worker // firstTask 不為空可以直接執行,為空執行不了,Thread.run()方法有判斷,Runnable為空不執行 // core 為 true 表示線程最大新增個數是 coresize,false 表示最大新增個數是 maxsize // 返回 true 代表成功,false 失敗 // break retry 跳到retry處,且不再進入循環 // continue retry 跳到retry處,且再次進入循環 private boolean addWorker(Runnable firstTask, boolean core) { retry: // 先是各種狀態的校驗 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // rs >= SHUTDOWN 說明線程池狀態不正常 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 工作中的線程數大于等于容量,或者大于等于 coreSize or maxSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // break 結束 retry 的 for 循環 break retry; c = ctl.get(); // Re-read ctl // 線程池狀態被更改 if (runStateOf(c) != rs) // 跳轉到retry位置 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 巧妙的設計,Worker 本身是個 Runnable. // 在初始化的過程中,會把 worker 丟給 thread 去初始化 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 啟動線程,實際上去執行 Worker.run 方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } ``` addWorker 方法首先是執行了一堆校驗,然后使用 new Worker (firstTask) 新建了 Worker,最后使用 t.start () 執行 Worker,上文我們說了 Worker 在初始化時的關鍵代碼:this.thread = getThreadFactory ().newThread (this),Worker(this) 是作為新建線程的構造器入參的,所以 t.start () 會執行到 Worker 的 run 方法上,源碼如下: ``` public void run() { runWorker(this); } ``` runWorker 方法是非常重要的方法,我們一起看下源碼實現: ``` final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //幫助gc回收 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // task 為空的情況: // 1:任務入隊列了,極限情況下,發現沒有運行的線程,于是新增一個線程; // 2:線程執行完任務執行,再次回到 while 循環。 // 如果 task 為空,會使用 getTask 方法阻塞從隊列中拿數據,如果拿不到數據,會阻塞住 while (task != null || (task = getTask()) != null) { //鎖住 worker w.lock(); // 線程池 stop 中,但是線程沒有到達中斷狀態,幫助線程中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //執行 before 鉤子函數 beforeExecute(wt, task); Throwable thrown = null; try { //同步執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //執行 after 鉤子函數,如果這里拋出異常,會覆蓋 catch 的異常 //所以這里異常最好不要拋出來 afterExecute(task, thrown); } } finally { //任務執行完成,計算解鎖 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //做一些拋出異常的善后工作 processWorkerExit(w, completedAbruptly); } } ``` 這個方法執行的邏輯是架構圖中的標紅部分: ![](https://img.kancloud.cn/d3/e8/d3e893ffe6989b1816f7ba41f418c72d_934x436.jpg) 我們聚焦一下這行代碼:task.run () 此時的 task 是什么呢?此時的 task 是 FutureTask 類,所以我們繼續追索到 FutureTask 類的 run 方法的源碼,如下: ``` /** * run 方法可以直接被調用 * 也可以由線程池進行調用 */ public void run() { // 狀態不是任務創建,或者當前任務已經有線程在執行了 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // Callable 不為空,并且已經初始化完成 if (c != null && state == NEW) { V result; boolean ran; try { // 調用執行 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } // 給 outcome 賦值 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } ``` run 方法中有兩行關鍵代碼: 1. result = c.call () 這行代碼是真正執行業務代碼的地方; 2. set (result) 這里是給 outCome 賦值,這樣 Future.get 方法執行時,就可以從 outCome 中拿值,這個我們在《Future、ExecutorService 源碼解析》章節中都有說到。 至此,submit 方法就執行完成了,整體流程比較復雜,我們畫一個圖釋義一下任務提交執行的主流程: ![](https://img.kancloud.cn/bc/10/bc1037bad8be6a87403fe3a717b72e8f_1604x222.jpg) ### 3 線程執行完任務之后都在干啥 線程執行完任務之后,是消亡還是干什么呢?這是一個值得思考的問題,我們可以從源碼中找到答案,從 ThreadPoolExecutor 的 runWorker 方法中,不知道有沒有同學注意到一個 while 循環,我們截圖釋義一下: ![](https://img.kancloud.cn/73/b1/73b196ccd02b41b5ca207c0b154110c9_1966x1304.jpg) 這個 while 循環有個 getTask 方法,getTask 的主要作用是阻塞從隊列中拿任務出來,如果隊列中有任務,那么就可以拿出來執行,如果隊列中沒有任務,這個線程會一直阻塞到有任務為止(或者超時阻塞),下面我們一起來看下 getTask 方法,源碼如下: ``` // 從阻塞隊列中拿任務 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //線程池關閉 && 隊列為空,不需要在運行了,直接放回 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // true 運行的線程數大于 coreSize || 核心線程也可以被滅亡 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 隊列以 LinkedBlockingQueue 為例,timedOut 為 true 的話說明下面 poll 方法執行返回的是 null // 說明在等待 keepAliveTime 時間后,隊列中仍然沒有數據 // 說明此線程已經空閑了 keepAliveTime 了 // 再加上 wc > 1 || workQueue.isEmpty() 的判斷 // 所以使用 compareAndDecrementWorkerCount 方法使線程池數量減少 1 // 并且直接 return,return 之后,此空閑的線程會自動被回收 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 從隊列中阻塞拿 worker Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 設置已超時,說明此時隊列沒有數據 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` 代碼有兩處關鍵: 1. 使用隊列的 poll 或 take 方法從隊列中拿數據,根據隊列的特性,隊列中有任務可以返回,隊列中無任務會阻塞; 2. 方法中的第二個 if 判斷,說的是在滿足一定條件下(條件看注釋),會減少空閑的線程,減少的手段是使可用線程數減一,并且直接 return,直接 return 后,該線程就執行結束了,JVM 會自動回收該線程。 ### 4 總結 本章節主要以 submit 方法為主線闡述了 ThreadPoolExecutor 的整體架構和底層源碼,只要有隊列和線程的基礎知識的話,理解 ThreadPoolExecutor 并不復雜。ThreadPoolExecutor 還有一些其他的源碼,比如說拒絕請求的策略、得到各種屬性、設置各種屬性等等方法,這些方法都比較簡單,感興趣的同學可以自己去看一看。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看