[TOC]
# 為什么使用線程池
諸如 Web 服務器、數據庫服務器、文件服務器或郵件服務器之類的許多服務器應用程序都面向處理來自某些遠程來源的大量短小的任務。請求以某種方式到達服務器,這種方式可能是通過網絡協議(例如 HTTP、FTP 或 POP)、通過 JMS 隊列或者可能通過輪詢數據庫。不管請求如何到達,服務器應用程序中經常出現的情況是:單個任務處理的時間很短而請求的數目卻是巨大的。
構建服務器應用程序的一個簡單模型是:每當一個請求到達就創建一個新線程,然后在新線程中為請求服務。實際上對于原型開發這種方法工作得很好,但如果試圖部署以這種方式運行的服務器應用程序,那么這種方法的嚴重不足就很明顯。每個請求對應一個線程(thread-per-request)方法的不足之一是:為每個請求創建一個新線程的開銷很大;為每個請求創建新線程的服務器在創建和銷毀線程上花費的時間和消耗的系統資源要比花在處理實際的用戶請求的時間和資源更多。
除了創建和銷毀線程的開銷之外,活動的線程也消耗系統資源。在一個 JVM 里創建太多的線程可能會導致系統由于過度消耗內存而用完內存或“切換過度”。為了防止資源不足,服務器應用程序需要一些辦法來限制任何給定時刻處理的請求數目。
線程池為線程生命周期開銷問題和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。其好處是,因為在請求到達時線程已經存在,所以無意中也消除了線程創建所帶來的延遲。這樣,就可以立即為請求服務,使應用程序響應更快。而且,通過適當地調整線程池中的線程數目,也就是當請求的數目超過某個閾值時,就強制其它任何新到的請求一直等待,直到獲得一個線程來處理為止,從而可以防止資源不足。
# 使用線程池的風險
雖然線程池是構建多線程應用程序的強大機制,但使用它并不是沒有風險的。用線程池構建的應用程序容易遭受任何其它多線程應用程序容易遭受的所有并發風險,諸如同步錯誤和死鎖,它還容易遭受特定于線程池的少數其它風險,諸如與池有關的死鎖、資源不足和線程泄漏。
## 死鎖
任何多線程應用程序都有死鎖風險。當一組進程或線程中的每一個都在等待一個只有該組中另一個進程才能引起的事件時,我們就說這組進程或線程 死鎖了。死鎖的最簡單情形是:線程 A 持有對象 X 的獨占鎖,并且在等待對象 Y 的鎖,而線程 B 持有對象 Y 的獨占鎖,卻在等待對象 X 的鎖。除非有某種方法來打破對鎖的等待(Java 鎖定不支持這種方法),否則死鎖的線程將永遠等下去。
雖然任何多線程程序中都有死鎖的風險,但線程池卻引入了另一種死鎖可能,在那種情況下,所有池線程都在執行已阻塞的等待隊列中另一任務的執行結果的任務,但這一任務卻因為沒有未被占用的線程而不能運行。當線程池被用來實現涉及許多交互對象的模擬,被模擬的對象可以相互發送查詢,這些查詢接下來作為排隊的任務執行,查詢對象又同步等待著響應時,會發生這種情況。
## 資源不足
線程池的一個優點在于:相對于其它替代調度機制(有些我們已經討論過)而言,它們通常執行得很好。但只有恰當地調整了線程池大小時才是這樣的。線程消耗包括內存和其它系統資源在內的大量資源。除了 Thread 對象所需的內存之外,每個線程都需要兩個可能很大的執行調用堆棧。除此以外,JVM 可能會為每個 Java 線程創建一個本機線程,這些本機線程將消耗額外的系統資源。最后,雖然線程之間切換的調度開銷很小,但如果有很多線程,環境切換也可能嚴重地影響程序的性能。
如果線程池太大,那么被那些線程消耗的資源可能嚴重地影響系統性能。在線程之間進行切換將會浪費時間,而且使用超出比您實際需要的線程可能會引起資源匱乏問題,因為池線程正在消耗一些資源,而這些資源可能會被其它任務更有效地利用。除了線程自身所使用的資源以外,服務請求時所做的工作可能需要其它資源,例如 JDBC 連接、套接字或文件。這些也都是有限資源,有太多的并發請求也可能引起失效,例如不能分配 JDBC 連接。
## 并發錯誤
線程池和其它排隊機制依靠使用 wait() 和 notify() 方法,這兩個方法都難于使用。如果編碼不正確,那么可能丟失通知,導致線程保持空閑狀態,盡管隊列中有工作要處理。使用這些方法時,必須格外小心。而最好使用現有的、已經知道能工作的實現,例如 util.concurrent 包。
## 線程泄漏
各種類型的線程池中一個嚴重的風險是線程泄漏,當從池中除去一個線程以執行一項任務,而在任務完成后該線程卻沒有返回池時,會發生這種情況。發生線程泄漏的一種情形出現在任務拋出一個 RuntimeException 或一個 Error 時。如果池類沒有捕捉到它們,那么線程只會退出而線程池的大小將會永久減少一個。當這種情況發生的次數足夠多時,線程池最終就為空,而且系統將停止,因為沒有可用的線程來處理任務。
有些任務可能會永遠等待某些資源或來自用戶的輸入,而這些資源又不能保證變得可用,用戶可能也已經回家了,諸如此類的任務會永久停止,而這些停止的任務也會引起和線程泄漏同樣的問題。如果某個線程被這樣一個任務永久地消耗著,那么它實際上就被從池除去了。對于這樣的任務,應該要么只給予它們自己的線程,要么只讓它們等待有限的時間。
## 請求過載
僅僅是請求就壓垮了服務器,這種情況是可能的。在這種情形下,我們可能不想將每個到來的請求都排隊到我們的工作隊列,因為排在隊列中等待執行的任務可能會消耗太多的系統資源并引起資源缺乏。在這種情形下決定如何做取決于您自己;在某些情況下,您可以簡單地拋棄請求,依靠更高級別的協議稍后重試請求,您也可以用一個指出服務器暫時很忙的響應來拒絕請求。
# 線程池的大小設置
調整線程池的大小基本上就是避免兩類錯誤:線程太少或線程太多。幸運的是,對于大多數應用程序來說,太多和太少之間的余地相當寬。
請回憶:在應用程序中使用線程有兩個主要優點,盡管在等待諸如 I/O 的慢操作,但允許繼續進行處理,并且可以利用多處理器。在運行于具有 N 個處理器機器上的計算限制的應用程序中,在線程數目接近 N 時添加額外的線程可能會改善總處理能力,而在線程數目超過 N 時添加額外的線程將不起作用。事實上,太多的線程甚至會降低性能,因為它會導致額外的環境切換開銷。
線程池的最佳大小取決于可用處理器的數目以及工作隊列中的任務的性質。若在一個具有 N 個處理器的系統上只有一個工作隊列,其中全部是計算性質的任務,在線程池具有 N 或 N+1 個線程時一般會獲得最大的 CPU 利用率。
對于那些可能需要等待 I/O 完成的任務(例如,從套接字讀取 HTTP 請求的任務),需要讓池的大小超過可用處理器的數目,因為并不是所有線程都一直在工作。通過使用概要分析,您可以估計某個典型請求的等待時間(WT)與服務時間(ST)之間的比例。如果我們將這一比例稱之為 WT/ST,那么對于一個具有 N 個處理器的系統,需要設置大約 N*(1+WT/ST) 個線程來保持處理器得到充分利用。
處理器利用率不是調整線程池大小過程中的唯一考慮事項。隨著線程池的增長,您可能會碰到調度程序、可用內存方面的限制,或者其它系統資源方面的限制,例如套接字、打開的文件句柄或數據庫連接等的數目
# ThreadPoolExecutor
無論創建那種線程池 必須要調用ThreadPoolExecutor
線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為:
~~~
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler)
corePoolSize: 線程池維護線程的最少數量
maximumPoolSize:線程池維護線程的最大數量
keepAliveTime: 線程池維護線程所允許的空閑時間
unit: 線程池維護線程所允許的空閑時間的單位
workQueue: 線程池所使用的緩沖隊列
handler: 線程池對拒絕任務的處理策略
~~~
一個任務通過 execute(Runnable)方法被添加到線程池,任務就是一個 Runnable類型的對象,任務的執行方法就是 Runnable類型對象的run()方法。
當一個任務通過execute(Runnable)方法欲添加到線程池時:
如果此時線程池中的數量小于corePoolSize,即使線程池中的線程都處于空閑狀態,也要創建新的線程來處理被添加的任務。
如果此時線程池中的數量等于 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量小于maximumPoolSize,建新的線程來處理被添加的任務。
如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量等于maximumPoolSize,那么通過 handler所指定的策略來處理此任務。
也就是:處理任務的優先級為:
核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
當線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
unit可選的參數為java.util.concurrent.TimeUnit中的幾個靜態屬性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四個選擇:
ThreadPoolExecutor.AbortPolicy()
拋出java.util.concurrent.RejectedExecutionException異常
ThreadPoolExecutor.CallerRunsPolicy()
重試添加當前的任務,他會自動重復調用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
拋棄舊的任務
ThreadPoolExecutor.DiscardPolicy()
拋棄當前的任務
當然也可以根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務。
# 常用的幾種線程池
Java通過Executors提供四種線程池,分別為:
newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool 創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
## newCachedThreadPool
### 分析
CachedThreadPool是一個”無限“容量的線程池,它會根據需要創建新線程。特點是可以根據需要來創建新的線程執行任務,沒有特定的corePool。下面是它的構造方法
~~~
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
~~~
CachedThreadPool的corePoolSize被設置為0,即corePool為空;maximumPoolSize被設置為Integer.MAX_VALUE,即maximum是無界的。這里keepAliveTime設置為60秒,意味著空閑的線程最多可以等待任務60秒,否則將被回收。
CachedThreadPool使用沒有容量的SynchronousQueue作為主線程池的工作隊列,它是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個線程的對應移除操作。這意味著,如果主線程提交任務的速度高于線程池中處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU資源。其運行圖如下:

執行過程如下:
1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的線程池中有空閑的線程正在執行SynchronousQueue.poll(),那么主線程執行的offer操作與空閑線程執行的poll操作配對成功,主線程把任務交給空閑線程執行。,execute()方法執行成功,否則執行步驟2
2.當線程池為空(初始maximumPool為空)或沒有空閑線程時,配對失敗,將沒有線程執行SynchronousQueue.poll操作。這種情況下,線程池會創建一個新的線程執行任務。
3.在創建完新的線程以后,將會執行poll操作。當步驟2的線程執行完成后,將等待60秒,如果此時主線程提交了一個新任務,那么這個空閑線程將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會占用系統資源。
SynchronousQueue是一個不存儲元素阻塞隊列,每次要進行offer操作時必須等待poll操作,否則不能繼續添加元素。
### 代碼實現
創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。示例代碼如下:
~~~
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i=0; i<10; i++) {
final int index = 1;
try{
Thread.sleep(index*1000);
}catch (Exception e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(index);
}
});
}
~~~
這種類型的線程池特點是:
* 工作線程的創建數量幾乎沒有限制(其實也有限制的,數目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。
* 如果長時間沒有往線程池中提交任務,即如果工作線程空閑了指定的時間(默認為1分鐘),則該工作線程將自動終止。終止后,如果你又提交了新的任務,則線程池重新創建一個工作線程。
* 在使用CachedThreadPool時,一定要注意控制任務的數量,否則,由于大量線程同時運行,很有會造成系統癱瘓。
## newFixedThreadPool
### 分析
這個線程池可以創建固定線程數的線程池。特點就是可以**重用固定數量線程**的線程池。它的構造源碼如下:
~~~
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
~~~
* FixedThreadPool的corePoolSize和maxiumPoolSize都被設置為創建FixedThreadPool時指定的參數nThreads。
* 0L則表示當線程池中的線程數量操作核心線程的數量時,多余的線程將被立即停止
* 最后一個參數表示FixedThreadPool使用了無界隊列LinkedBlockingQueue作為線程池的做工隊列,由于是無界的,當線程池的線程數達到corePoolSize后,新任務將在無界隊列中等待,因此線程池的線程數量不會超過corePoolSize,同時maxiumPoolSize也就變成了一個無效的參數,并且運行中的線程池并不會拒絕任務
FixedThreadPool運行圖如下

執行過程如下:
1. 如果當前工作中的線程數量少于corePool的數量,就創建新的線程來執行任務。
2. 當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。
3. 線程執行完1中的任務后會從隊列中去任務。
注意LinkedBlockingQueue是無界隊列,所以可以一直添加新任務到線程池
### 代碼實現
創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。示例代碼如下:
~~~
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i=0; i<10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(20000);
}catch (Exception e) {
e.printStackTrace();
}
}
});
}
~~~
因為線程池大小為3,每個任務輸出index后sleep 2秒,所以每兩秒打印3個數字。
定長線程池的大小最好根據系統資源進行設置
如cpu個數
~~~
int cpuNums =Runtime.getRuntime().availableProcessors();
~~~
## newSingleThreadExecutor
### 分析
SingleThreadExecutor是使用單個worker線程的Executor。特點是使用單個工作線程執行任務。它的構造源碼如下:
~~~
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
~~~
SingleThreadExecutor的corePoolSize和maxiumPoolSize都被設置1。
其他參數均與FixedThreadPool相同,其運行圖如下

執行過程如下:
1. 如果當前工作中的線程數量少于corePool的數量,就創建一個新的線程來執行任務。
2. 當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。
3. 線程執行完1中的任務后會從隊列中去任務。
注意:由于在線程池中只有一個工作線程,所以任務可以按照添加順序執行。
### 代碼實現
創建一個單線程化的Executor,即只創建唯一的工作者線程來執行任務,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。如果這個線程異常結束,會有另一個取代它,保證順序執行。單工作線程最大的特點是可保證順序地執行各個任務,并且在任意給定的時間不會有多個線程是活動的。
~~~
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i=0; i<10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
try{
System.out.println(index);
Thread.sleep(2000);
}catch (Exception e) {
e.printStackTrace();
}
}
});
}
~~~
## newScheduleThreadPool
### 代碼實現
創建一個定長的線程池,而且支持定時的以及周期性的任務執行,支持定時及周期性任務執行。
延遲3秒執行,延遲執行示例代碼如下:
~~~
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
~~~
表示延遲1秒后每3秒執行一次,定期執行示例代碼如下:
~~~
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("delay 1 seconds,and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
~~~
## newSingleThreadScheduledExecutor
Single Thread Scheduled Pool : 只有一個線程,用來調度執行將來的任務,代碼:`Executors.newSingleThreadScheduledExecutor()`
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介