<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                我在[專欄第 17 講](http://time.geekbang.org/column/article/9103)中介紹過線程是不能夠重復啟動的,創建或銷毀線程存在一定的開銷,所以利用線程池技術來提高系統資源利用效率,并簡化線程管理,已經是非常成熟的選擇。 今天我要問你的問題是,Java 并發類庫提供的線程池有哪幾種? 分別有什么特點? ## 典型回答 通常開發者都是利用 Executors 提供的通用線程池創建方法,去創建不同配置的線程池,主要區別在于不同的 ExecutorService 類型或者不同的初始參數。 Executors 目前提供了 5 種不同的線程池創建配置: * newCachedThreadPool(),它是一種用來處理大量短時間工作任務的線程池,具有幾個鮮明特點:它會試圖緩存線程并重用,當無緩存線程可用時,就會創建新的工作線程;如果線程閑置的時間超過 60 秒,則被終止并移出緩存;長時間閑置時,這種線程池,不會消耗什么資源。其內部使用 SynchronousQueue 作為工作隊列。 * newFixedThreadPool(int nThreads),重用指定數目(nThreads)的線程,其背后使用的是無界的工作隊列,任何時候最多有 nThreads 個工作線程是活動的。這意味著,如果任務數量超過了活動隊列數目,將在工作隊列中等待空閑線程出現;如果有工作線程退出,將會有新的工作線程被創建,以補足指定的數目 nThreads。 * newSingleThreadExecutor(),它的特點在于工作線程數目被限制為 1,操作一個無界的工作隊列,所以它保證了所有任務的都是被順序執行,最多會有一個任務處于活動狀態,并且不允許使用者改動線程池實例,因此可以避免其改變線程數目。 * newSingleThreadScheduledExecutor() 和 newScheduledThreadPool(int corePoolSize),創建的是個 ScheduledExecutorService,可以進行定時或周期性的工作調度,區別在于單一工作線程還是多個工作線程。 * newWorkStealingPool(int parallelism),這是一個經常被人忽略的線程池,Java 8 才加入這個創建方法,其內部會構建[ForkJoinPool](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ForkJoinPool.html),利用[Work-Stealing](https://en.wikipedia.org/wiki/Work_stealing)算法,并行地處理任務,不保證處理順序。 ## 考點分析 Java 并發包中的 Executor 框架無疑是并發編程中的重點,今天的題目考察的是對幾種標準線程池的了解,我提供的是一個針對最常見的應用方式的回答。 在大多數應用場景下,使用 Executors 提供的 5 個靜態工廠方法就足夠了,但是仍然可能需要直接利用 ThreadPoolExecutor 等構造函數創建,這就要求你對線程構造方式有進一步的了解,你需要明白線程池的設計和結構。 另外,線程池這個定義就是個容易讓人誤解的術語,因為 ExecutorService 除了通常意義上“池”的功能,還提供了更全面的線程管理、任務提交等方法。 Executor 框架可不僅僅是線程池,我覺得至少下面幾點值得深入學習: * 掌握 Executor 框架的主要內容,至少要了解組成與職責,掌握基本開發用例中的使用。 * 對線程池和相關并發工具類型的理解,甚至是源碼層面的掌握。 * 實踐中有哪些常見問題,基本的診斷思路是怎樣的。 * 如何根據自身應用特點合理使用線程池。 ## 知識擴展 首先,我們來看看 Executor 框架的基本組成,請參考下面的類圖。 ![](https://img.kancloud.cn/fc/70/fc70c37867c7fbfb672fa3e37fe14b5b_757x456.png) 我們從整體上把握一下各個類型的主要設計目的: * Executor 是一個基礎的接口,其初衷是將任務提交和任務執行細節解耦,這一點可以體會其定義的唯一方法。 ~~~ void execute(Runnable command); ~~~ Executor 的設計是源于 Java 早期線程 API 使用的教訓,開發者在實現應用邏輯時,被太多線程創建、調度等不相關細節所打擾。就像我們進行 HTTP 通信,如果還需要自己操作 TCP 握手,開發效率低下,質量也難以保證。 * ExecutorService 則更加完善,不僅提供 service 的管理功能,比如 shutdown 等方法,也提供了更加全面的提交任務機制,如返回[Future](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Future.html)而不是 void 的 submit 方法。 ~~~ <T> Future<T> submit(Callable<T> task); ~~~ 注意,這個例子輸入的可是[Callable](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Callable.html),它解決了 Runnable 無法返回結果的困擾。 * Java 標準類庫提供了幾種基礎實現,比如[ThreadPoolExecutor](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ThreadPoolExecutor.html)、[ScheduledThreadPoolExecutor](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html)、[ForkJoinPool](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ForkJoinPool.html)。這些線程池的設計特點在于其高度的可調節性和靈活性,以盡量滿足復雜多變的實際應用場景,我會進一步分析其構建部分的源碼,剖析這種靈活性的源頭。 * Executors 則從簡化使用的角度,為我們提供了各種方便的靜態工廠方法。 下面我就從源碼角度,分析線程池的設計與實現,我將主要圍繞最基礎的 ThreadPoolExecutor 源碼。ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的擴展,主要是增加了調度邏輯,如想深入了解,你可以參考相關[教程](https://www.journaldev.com/2340/java-scheduler-scheduledexecutorservice-scheduledthreadpoolexecutor-example)。而 ForkJoinPool 則是為 ForkJoinTask 定制的線程池,與通常意義的線程池有所不同。 這部分內容比較晦澀,羅列概念也不利于你去理解,所以我會配合一些示意圖來說明。在現實應用中,理解應用與線程池的交互和線程池的內部工作過程,你可以參考下圖。 ![](https://img.kancloud.cn/18/b6/18b64aee22c67f488171a73133e4d465_1072x482.png) 簡單理解一下: * 工作隊列負責存儲用戶提交的各個任務,這個工作隊列,可以是容量為 0 的 SynchronousQueue(使用 newCachedThreadPool),也可以是像固定大小線程池(newFixedThreadPool)那樣使用 LinkedBlockingQueue。 ~~~ private final BlockingQueue<Runnable> workQueue; ~~~ * 內部的“線程池”,這是指保持工作線程的集合,線程池需要在運行過程中管理線程創建、銷毀。例如,對于帶緩存的線程池,當任務壓力較大時,線程池會創建新的工作線程;當業務壓力退去,線程池會在閑置一段時間(默認 60 秒)后結束線程。 ~~~ private final HashSet<Worker> workers = new HashSet<>(); ~~~ 線程池的工作線程被抽象為靜態內部類 Worker,基于[AQS](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/locks/AbstractQueuedSynchronizer.html)實現。 * ThreadFactory 提供上面所需要的創建線程邏輯。 * 如果任務提交時被拒絕,比如線程池已經處于 SHUTDOWN 狀態,需要為其提供處理邏輯,Java 標準庫提供了類似[ThreadPoolExecutor.AbortPolicy](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ThreadPoolExecutor.AbortPolicy.html)等默認實現,也可以按照實際需求自定義。 從上面的分析,就可以看出線程池的幾個基本組成部分,一起都體現在線程池的構造函數中,從字面我們就可以大概猜測到其用意: * corePoolSize,所謂的核心線程數,可以大致理解為長期駐留的線程數目(除非設置了 allowCoreThreadTimeOut)。對于不同的線程池,這個值可能會有很大區別,比如 newFixedThreadPool 會將其設置為 nThreads,而對于 newCachedThreadPool 則是為 0。 * maximumPoolSize,顧名思義,就是線程不夠時能夠創建的最大線程數。同樣進行對比,對于 newFixedThreadPool,當然就是 nThreads,因為其要求是固定大小,而 newCachedThreadPool 則是 Integer.MAX\_VALUE。 * keepAliveTime 和 TimeUnit,這兩個參數指定了額外的線程能夠閑置多久,顯然有些線程池不需要它。 * workQueue,工作隊列,必須是 BlockingQueue。 通過配置不同的參數,我們就可以創建出行為大相徑庭的線程池,這就是線程池高度靈活性的基礎。 ~~~ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ~~~ 進一步分析,線程池既然有生命周期,它的狀態是如何表征的呢? 這里有一個非常有意思的設計,ctl 變量被賦予了雙重角色,通過高低位的不同,既表示線程池狀態,又表示工作線程數目,這是一個典型的高效優化。試想,實際系統中,雖然我們可以指定線程極限為 Integer.MAX\_VALUE,但是因為資源限制,這只是個理論值,所以完全可以將空閑位賦予其他意義。 ~~~ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 真正決定了工作線程數的理論上限 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 線程池狀態,存儲在數字的高位 private static final int RUNNING = -1 << COUNT_BITS; … // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~COUNT_MASK; } private static int workerCountOf(int c) { return c & COUNT_MASK; } private static int ctlOf(int rs, int wc) { return rs | wc; } ~~~ 為了讓你能對線程生命周期有個更加清晰的印象,我這里畫了一個簡單的狀態流轉圖,對線程池的可能狀態和其內部方法之間進行了對應,如果有不理解的方法,請參考 Javadoc。**注意**,實際 Java 代碼中并不存在所謂 Idle 狀態,我添加它僅僅是便于理解。 ![](https://img.kancloud.cn/c5/0c/c50ce5f2ff4ae723c6267185699ccda1_840x626.png) 前面都是對線程池屬性和構建等方面的分析,下面我選擇典型的 execute 方法,來看看其是如何工作的,具體邏輯請參考我添加的注釋,配合代碼更加容易理解。 ~~~ public void execute(Runnable command) { … int c = ctl.get(); // 檢查工作線程數目,低于 corePoolSize 則添加 Worker if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // isRunning 就是檢查線程池是否被 shutdown // 工作隊列可能是有界的,offer 是比較友好的入隊方式 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次進行防御性檢查 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 嘗試添加一個 worker,如果失敗意味著已經飽和或者被 shutdown 了 else if (!addWorker(command, false)) reject(command); } ~~~ **線程池實踐** 線程池雖然為提供了非常強大、方便的功能,但是也不是銀彈,使用不當同樣會導致問題。我這里介紹些典型情況,經過前面的分析,很多方面可以自然的推導出來。 * 避免任務堆積。前面我說過 newFixedThreadPool 是創建指定數目的線程,但是其工作隊列是無界的,如果工作線程數目太少,導致處理跟不上入隊的速度,這就很有可能占用大量系統內存,甚至是出現 OOM。診斷時,你可以使用 jmap 之類的工具,查看是否有大量的任務對象入隊。 * 避免過度擴展線程。我們通常在處理大量短時任務時,使用緩存的線程池,比如在最新的 HTTP/2 client API 中,目前的默認實現就是如此。我們在創建線程池的時候,并不能準確預計任務壓力有多大、數據特征是什么樣子(大部分請求是 1K 、100K 還是 1M 以上?),所以很難明確設定一個線程數目。 * 另外,如果線程數目不斷增長(可以使用 jstack 等工具檢查),也需要警惕另外一種可能性,就是線程泄漏,這種情況往往是因為任務邏輯有問題,導致工作線程遲遲不能被釋放。建議你排查下線程棧,很有可能多個線程都是卡在近似的代碼處。 * 避免死鎖等同步問題,對于死鎖的場景和排查,你可以復習[專欄第 18 講](http://time.geekbang.org/column/article/9266)。 * 盡量避免在使用線程池時操作 ThreadLocal,同樣是[專欄第 17 講](http://time.geekbang.org/column/article/9103)已經分析過的,通過今天的線程池學習,應該更能理解其原因,工作線程的生命周期通常都會超過任務的生命周期。 **線程池大小的選擇策略** 上面我已經介紹過,線程池大小不合適,太多或太少,都會導致麻煩,所以我們需要去考慮一個合適的線程池大小。雖然不能完全確定,但是有一些相對普適的規則和思路。 * 如果我們的任務主要是進行計算,那么就意味著 CPU 的處理能力是稀缺的資源,我們能夠通過大量增加線程數提高計算能力嗎?往往是不能的,如果線程太多,反倒可能導致大量的上下文切換開銷。所以,這種情況下,通常建議按照 CPU 核的數目 N 或者 N+1。 * 如果是需要較多等待的任務,例如 I/O 操作比較多,可以參考 Brain Goetz 推薦的計算方法: ~~~ 線程數 = CPU 核數 × 目標 CPU 利用率 ×(1 + 平均等待時間 / 平均工作時間) ~~~ 這些時間并不能精準預計,需要根據采樣或者概要分析等方式進行計算,然后在實際中驗證和調整。 * 上面是僅僅考慮了 CPU 等限制,實際還可能受各種系統資源限制影響,例如我最近就在 Mac OS X 上遇到了大負載時[ephemeral 端口受限](http://danielmendel.github.io/blog/2013/04/07/benchmarkers-beware-the-ephemeral-port-limit/)的情況。當然,我是通過擴大可用端口范圍解決的,如果我們不能調整資源的容量,那么就只能限制工作線程的數目了。這里的資源可以是文件句柄、內存等。 另外,在實際工作中,不要把解決問題的思路全部指望到調整線程池上,很多時候架構上的改變更能解決問題,比如利用背壓機制的[Reactive Stream](http://www.reactive-streams.org/)、合理的拆分等。 今天,我從 Java 創建的幾種線程池開始,對 Executor 框架的主要組成、線程池結構與生命周期等方面進行了講解和分析,希望對你有所幫助。 ## 一課一練 關于今天我們討論的題目你做到心中有數了嗎?今天的思考題是從邏輯上理解,線程池創建和生命周期。請談一談,如果利用 newSingleThreadExecutor() 創建一個線程池,corePoolSize、maxPoolSize 等都是什么數值?ThreadFactory 可能在線程池生命周期中被使用多少次?怎么驗證自己的判斷?
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看