<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] # 線程池的使用 ThreadPoolExecutor的構造方法如下: ```java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ``` 其中: * corePoolSize為核心線程池大小 * maximumPoolSize為線程池允許的最大線程數 * keepAliveTime為線程池的工作線程空閑后,保持存活的時間 * unit為線程保持存活的時間單位 * workQueue為工作隊列,線程池中的工作線程都是從這個工作隊列源源不斷的取出任務進行執行 * threadFactory為創建新的線程時使用的工廠類 * handler為拒絕任務時的飽和策略 使用: ```java ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); executor.execute(new Runnable() { @Override public void run() { //do something } }); ``` # 源碼分析 ## 線程池執行機制 ![](https://img.kancloud.cn/13/c5/13c5c8e5646465e3abd210b9fc09ccfe_826x394.png) 1、工作線程數小于核心線程數時,直接新建核心線程執行任務; 2、大于核心線程數時,將任務添加進等待隊列; 3、隊列滿時,創建非核心線程執行任務; 4、工作線程數大于最大線程數時,拒絕任務。 ThreadPoolExecutor類的execute方法源碼如下: ```java // 屬性ctl是AtomicInteger類型,高3位存儲線程池狀態,低29位存儲當前線程數量 // wokerCountOf(ctl)返回當前線程數量,runStateOf(ctl)返回當前線程池狀態 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); public void execute(Runnable command) { int c = ctl.get(); // 如果工作線程數小于核心線程數 if (workerCountOf(c) < corePoolSize) { // 創建核心線程并執行任務 if (addWorker(command, true)) { return; } // 執行失敗時,重新獲取值 c = ctl.get(); } // 工作線程數大于等于核心線程數時 // 檢查運行狀態,然后將任務插入隊列 if (isRunning(c) && workQueue.offer(command)) { // 將任務插入隊列成功 int recheck = ctl.get(); // 再次檢查狀態,防止狀態有變化,如果有變化,將任務移出隊列并拒絕任務 if (! isRunning(recheck) && remove(command)) { // 拒絕任務 reject(command); } // 線程數為0 else if (workerCountOf(recheck) == 0) { // 創建非核心線程 addWorker(null, false); } } // 工作線程數大于等于核心線程數時 // 將任務插入隊列失敗,說明隊列已滿,創建非核心線程執行任務 else if (!addWorker(command, false)) { // 創建非核心線程失敗,說明工作線程數量大于最大線程數,拒絕任務 reject(command); } } ``` 上面提到的屬性ctl用來控制線程的狀態,并用來表示線程池線程數量,在線程池中有以下幾種狀態: 1、RUNNABLE:運行狀態,接受新任務,持續處理任務隊列里的任務 2、SHUTDOWN:不再接受新任務,但會處理任務隊列里的任務 3、STOP:不再接受新任務,不再處理任務隊列里的任務,中斷正在執行的任務 4、TIDYING:表示線程池正在停止運作,終止所有任務,消耗所有工作線程 5、TERMINATED:表示線程池已停止運作,所有工作線程已銷毀,所有任務已被清空或執行完畢 ## 啟動新線程 線程池中的工作線程以Worker作為提現,真正工作的線程為Worker的成員變量,Worker從工作隊列中取出任務來執行,并能通過Worker控制任務狀態。 Worker的源碼如下: ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } //... } ``` 接下來看看新建線程的代碼,也就是addWorker方法: ```java /** * @param firstTask 新線程首先會執行的任務,會在執行完這個任務后再從隊列中取任務執行 * @param core 是否是核心線程 * @return boolen 創建結果 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判斷幾種不創建線程的情況 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) { return false; } for (;;) { // 增加線程數,并跳出循環 if (compareAndIncrementWorkerCount(c)) break retry; //... } } // 開始創建并啟動線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //...這里實際有加鎖,暫時不關心 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 將新啟動的線程添加到線程池中 workers.add(w); workerAdded = true; } if (workerAdded) { // 啟動線程 t.start(); workerStarted = true; } } } return workerStarted; } ``` ## 新線程執行任務 創建線程后,會執行Worker的run方法,run方法會調用ThreadPoolExecutor的runWorker方法: ```java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //... try { // 循環調用getTask方法從隊列取出任務執行 while (task != null || (task = getTask()) != null) { w.lock(); //... try { beforeExecute(wt, task); Throwable thrown = null; try { // 執行任務 task.run(); } //... } finally { task = null; w.completedTasks++; w.unlock(); } } } finally { // 將Worker從workers中移出 processWorkerExit(w, completedAbruptly); } } private void processWorkerExit(Worker w, boolean completedAbruptly) { //... try { completedTaskCount += w.completedTasks; workers.remove(w); } } ``` 可以看到,線程執行任務的流程為: 1、如果firstTask不為空,先執行firstTask,執行后置空 2、firstTask為空后,循環調用getTask從隊列中取出Task并執行 3、一直到沒有Task,退出循環 4、調用processWorkerExit將Woker從workers中移出,線程執行完畢,不再被引用,會自動銷毀 下面來看看getTask方法。 getTask方法用于從阻塞隊列里拿出任務: ```java private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 檢查線程池和阻塞隊列的狀態 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 獲取當前線程數 int wc = workerCountOf(c); // allowCoreThreadTimeOut代表是否允許核心線程退出 // wc > corePoolSize用于判斷是否存在非核心線程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 線程池線程已滿 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) { // 超時 return null; } continue; } try { // timed為true,使用poll等待keepAliveTime長的時間來獲取任務 // timed為false,使用take獲取任務,阻塞線程,直到可以從阻塞隊列拿到任務 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` 線程池里的線程從阻塞隊列里那任務 1、如果存在非核心線程,假設阻塞隊列里沒有任務,那么非核心線程需要等到keepAliveTime后才會釋放。 2、如果只有核心線程,且允許核心線程釋放,那么等到keepAliveTime后才會釋放。 3、只有核心線程,且不允許核心線程釋放,那么會通過take阻塞隊列,直到可以從隊列拿到任務。 參考文章: [徹底弄懂 Java 線程池原理](https://juejin.im/post/5c33400c6fb9a049fe35503b) [Java線程池(ThreadPoolExecutor)原理分析與使用](https://blog.csdn.net/fuyuwei2015/article/details/72758179) [線程池ThreadPoolExecutor實現原理](https://juejin.im/post/5aeec0106fb9a07ab379574f) [關于線程池的這 8 個問題你都能答上來嗎?](https://url.cn/5J45up8)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看