[TOC]
# 1.ThreadPoolExecutor
> 1. 在《阿里巴巴java開發手冊》中指出了線程資源必須通過線程池提供,不允許在應用中自行顯示的創建線程,這樣一方面是線程的創建更加規范,可以合理控制開辟線程的數量;
> 2. 線程池不允許使用Executors去創建,而要通過ThreadPoolExecutor方式,這一方面是由于jdk中Executor框架雖然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等創建線程池的方法,但都有其局限性,不夠靈活;另外由于前面幾種方法內部也是通過ThreadPoolExecutor方式實現,使用ThreadPoolExecutor有助于大家明確線程池的運行規則,創建符合自己的業務場景需要的線程池,避免資源耗盡的風險。
## 1.1 構造函數詳解
```
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
```
**1. corePoolSize:**
指定了線程池中的線程數量,它的數量決定了添加的任務是開辟新的線程去執行,還是放到`workQueue`任務隊列中去;
**2. maximumPoolSize:**
**指定了線程池中的最大線程數量**,這個參數會根據你使用的`workQueue`任務隊列的類型,決定線程池會開辟的最大線程數量;
**3. keepAliveTime:**
當線程池中空閑線程數量超過corePoolSize時,多余的線程會在多長時間內被銷毀;
**4. unit:** keepAliveTime的單位
**5. workQueue:**
任務隊列,被添加到線程池中,但尚未被執行的任務;它一般分為直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列幾種;
**6. threadFactory:** 線程工廠,用于創建線程,一般用默認即可;
**7. handler:** 拒絕策略;當任務太多來不及處理時,如何拒絕任務;
## 1.2 線程創建規則
1. 待執行任務數小于corePoolSize則創建線程,直到線程數等于corePoolSize
2. 待執行任務書大于corePoolSize,往隊列里塞(如果隊列無界,則線程數量永遠等于corePoolSize)
3. 隊列滿了,創建新線程,直到線程數等于maximumPoolSize
4. 線程數量大于maximumPoolSize,執行拒絕策略
# 2. **使用**
上面我們已經介紹過了,**它一般分為直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列;**
## 2.1 **直接提交隊列**:
設置為SynchronousQueue隊列,**SynchronousQueue是一個特殊的BlockingQueue,它沒有容量**,沒執行一個插入操作就會阻塞,需要再執行一個刪除操作才會被喚醒,反之每一個刪除操作也都要等待對應的插入操作。
如下代碼,最多創建三個線程
~~~
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//maximumPoolSize設置為2 ,拒絕策略為AbortPolic策略,直接拋出異常
pool = new ThreadPoolExecutor(0, 3, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<5;i++) {
pool.execute(new ThreadTask());
}
}
}
class ThreadTask implements Runnable{
public ThreadTask() {
}
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
~~~
輸出:
```
[INFO] --- exec-maven-plugin:3.0.0:exec (default-cli) @ javatest ---
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.tuna.test.thread.ThreadTask@33909752 rejected from java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 3]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at com.tuna.test.thread.ThreadPool.main(ThreadPool.java:12)
[ERROR] Command execution failed.
org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1)
at org.apache.commons.exec.DefaultExecutor.executeInternal (DefaultExecutor.java:404)
der.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
at org.codehaus.classworlds.Launcher.main (Launcher.java:47)
```
可以看到,當任務隊列為SynchronousQueue,創建的線程數大于maximumPoolSize時,直接執行了拒絕策略拋出異常。
**使用SynchronousQueue隊列,提交的任務不會被保存,總是會馬上提交執行。** 如果用于執行任務的線程數量小于maximumPoolSize,則嘗試創建新的進程,如果達到maximumPoolSize設置的最大值,則根據你設置的handler執行拒絕策略。因此這種方式你提交的任務不會被緩存起來,而是會被馬上執行,在這種情況下,你需要對你程序的并發量有個準確的評估,才能設置合適的maximumPoolSize數量,否則很容易就會執行拒絕策略;
## 2.2 **有界的任務隊列**:
有界的任務隊列可以使用ArrayBlockingQueue實現,如下所示
~~~
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
~~~
1. 使用ArrayBlockingQueue有界任務隊列,若有新的任務需要執行時,線程池會創建新的線程,直到創建的線程數量達到corePoolSize時,則會將新的任務加入到等待隊列中。
2. 若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續創建線程,直到線程數量達到maximumPoolSize設置的最大線程數量,若大于maximumPoolSize,則執行拒絕策略。
3. 在這種情況下,線程數量的上限與有界任務隊列的狀態有直接關系,如果有界隊列初始容量較大或者沒有達到超負荷的狀態,線程數將一直維持在corePoolSize以下,反之當任務隊列已滿時,則會以maximumPoolSize為最大線程數上限。
## 2.3 **無界的任務隊列**
有界任務隊列可以使用LinkedBlockingQueue實現,如下所示
~~~
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
~~~
使用無界任務隊列,線程池的任務隊列可以無限制的添加新的任務,而線程池創建的最大線程數量就是你corePoolSize設置的數量,也就是說在這種情況下maximumPoolSize這個參數是無效的,哪怕你的任務隊列中緩存了很多未執行的任務,當線程池的線程數達到corePoolSize后,就不會再增加了;若后續有新的任務加入,則直接進入隊列等待,當使用這種任務隊列模式時,一定要注意你任務提交與處理之間的協調與控制,不然會出現隊列中的任務由于無法及時處理導致一直增長,直到最后資源耗盡的問題。
## 2.4 **優先任務隊列:**
優先任務隊列通過PriorityBlockingQueue實現,下面我們通過一個例子演示下
~~~
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//優先任務隊列
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<20;i++) {
pool.execute(new ThreadTask(i));
}
}
}
public class ThreadTask implements Runnable,Comparable<ThreadTask>{
private int priority;
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public ThreadTask() {
}
public ThreadTask(int priority) {
this.priority = priority;
}
//當前對象和其他對象做比較,當前優先級大就返回-1,優先級小就返回1,值越小優先級越高
public int compareTo(ThreadTask o) {
return this.priority>o.priority?-1:1;
}
public void run() {
try {
//讓線程阻塞,使后續任務進入緩存隊列
Thread.sleep(1000);
System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
~~~
我們來看下執行的結果情況
~~~
priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1
~~~
到除了第一個任務直接創建線程執行外,其他的任務都被放入了優先任務隊列,按優先級進行了重新排列執行,且線程池的線程數一直為corePoolSize,也就是只有一個。
通過運行的代碼我們可以看出PriorityBlockingQueue它其實是一個特殊的無界隊列,它其中無論添加了多少個任務,線程池創建的線程數也不會超過corePoolSize的數量,只不過其他隊列一般是按照先進先出的規則處理任務,而PriorityBlockingQueue隊列可以自定義規則根據任務的優先級順序先后執行。
## **2.5 拒絕策略**
一般我們創建線程池時,為防止資源被耗盡,任務隊列都會選擇創建有界任務隊列,但種模式下如果出現任務隊列已滿且線程池創建的線程數達到你設置的最大線程數時,這時就需要你指定ThreadPoolExecutor的RejectedExecutionHandler參數即合理的拒絕策略,來處理線程池"超載"的情況。ThreadPoolExecutor自帶的拒絕策略如下:
**1、AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作;**
**2、CallerRunsPolicy策略:如果線程池的線程數量達到上限,該策略會把任務隊列中的任務放在調用者線程當中運行;**
**3、DiscardOledestPolicy策略:該策略會丟棄任務隊列中最老的一個任務,也就是當前任務隊列中最先被添加進去的,馬上要被執行的那個任務,并嘗試再次提交;**
**4、DiscardPolicy策略:該策略會默默丟棄無法處理的任務,不予任何處理。當然使用此策略,業務場景中需允許任務的丟失;**
**以上內置的策略均實現了**RejectedExecutionHandler接口,**當然你也可以自己擴展RejectedExecutionHandler接口,定義自己的拒絕策略,我們看下示例代碼:**
如果任務數量超過了 1、corePoolSize 2. 隊列容量 3.最大線程數 ---> 執行拒絕策略:拋異常、丟棄、再次嘗試和自定義策略
線程池最大任務數 maxPoolSize(包含corePoolSize) + queneSize 如下能容納的最大任務數量 7,同時執行的任務數 2
~~~
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//自定義拒絕策略
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString()+"執行了拒絕策略");
}
});
for(int i=0;i<10;i++) {
pool.execute(new ThreadTask());
}
}
}
public class ThreadTask implements Runnable{
public void run() {
try {
//讓線程阻塞,使后續任務進入緩存隊列
Thread.sleep(1000);
System.out.println("ThreadName:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
~~~
輸出結果:
~~~
com.hhxx.test.ThreadTask@33909752執行了拒絕策略
com.hhxx.test.ThreadTask@55f96302執行了拒絕策略
com.hhxx.test.ThreadTask@3d4eac69執行了拒絕策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
~~~
可以看到由于任務加了休眠阻塞,執行需要花費一定時間,導致會有一定的任務被丟棄,從而執行自定義的拒絕策略;
## **2.6 ThreadFactory自定義線程創建**
?線程池中線程就是通過ThreadPoolExecutor中的ThreadFactory,線程工廠創建的。那么通過自定義ThreadFactory,可以按需要對線程池中創建的線程進行一些特殊的設置,如命名、優先級等,下面代碼我們通過ThreadFactory對線程池中創建的線程進行記錄與命名
~~~
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args )
{
//自定義線程工廠
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("線程"+r.hashCode()+"創建");
//線程命名
Thread th = new Thread(r,"threadPool"+r.hashCode());
return th;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
for(int i=0;i<10;i++) {
pool.execute(new ThreadTask());
}
}
}
public class ThreadTask implements Runnable{
public void run() {
//輸出執行線程的名稱
System.out.println("ThreadName:"+Thread.currentThread().getName());
}
}
~~~
我們看下輸出結果
~~~
線程118352462創建
線程1550089733創建
線程865113938創建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
線程1442407170創建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170
~~~
可以看到線程池中,每個線程的創建我們都進行了記錄輸出與命名。
# **3. ThreadPoolExecutor擴展**
ThreadPoolExecutor擴展主要是圍繞beforeExecute()、afterExecute()和terminated()三個接口實現的,
**1、beforeExecute:線程池中任務運行前執行**
**2、afterExecute:線程池中任務運行完畢后執行**
**3、terminated:線程池退出后執行**
通過這三個接口我們可以監控每個任務的開始和結束時間,或者其他一些功能。下面我們可以通過代碼實現一下
~~~
public class ThreadPool {
private static ExecutorService pool;
public static void main( String[] args ) throws InterruptedException
{
//實現自定義接口
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("線程"+r.hashCode()+"創建");
//線程命名
Thread th = new Thread(r,"threadPool"+r.hashCode());
return th;
}
}, new ThreadPoolExecutor.CallerRunsPolicy()) {
protected void beforeExecute(Thread t,Runnable r) {
System.out.println("準備執行:"+ ((ThreadTask)r).getTaskName());
}
protected void afterExecute(Runnable r,Throwable t) {
System.out.println("執行完畢:"+((ThreadTask)r).getTaskName());
}
protected void terminated() {
System.out.println("線程池退出");
}
};
for(int i=0;i<10;i++) {
pool.execute(new ThreadTask("Task"+i));
}
pool.shutdown();
}
}
public class ThreadTask implements Runnable{
private String taskName;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public ThreadTask(String name) {
this.setTaskName(name);
}
public void run() {
//輸出執行線程的名稱
System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
}
}
~~~
輸出結果
~~~
線程118352462創建
線程1550089733創建
準備執行:Task0
準備執行:Task1
TaskNameTask0---ThreadName:threadPool118352462
線程865113938創建
執行完畢:Task0
TaskNameTask1---ThreadName:threadPool1550089733
執行完畢:Task1
準備執行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
執行完畢:Task3
準備執行:Task2
準備執行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
執行完畢:Task4
準備執行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
執行完畢:Task5
準備執行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
執行完畢:Task6
準備執行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
執行完畢:Task8
準備執行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
準備執行:Task7
執行完畢:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
執行完畢:Task7
執行完畢:Task2
線程池退出
~~~
可以看到通過對beforeExecute()、afterExecute()和terminated()的實現,我們對線程池中線程的運行狀態進行了監控,在其執行前后輸出了相關打印信息。另外使用shutdown方法可以比較安全的關閉線程池,?當線程池調用該方法后,線程池中不再接受后續添加的任務。但是,此時線程池不會立刻退出,直到添加到線程池中的任務都已經處理完成,才會退出。
# **4. 線程池線程數量**
線程吃線程數量的設置沒有一個明確的指標,根據實際情況,只要不是設置的偏大和偏小都問題不大,結合下面這個公式即可
~~~
/**
* Nthreads=CPU數量
* Ucpu=目標CPU的使用率,0<=Ucpu<=1
* W/C=任務等待時間與任務計算時間的比率
*/
Nthreads = Ncpu*Ucpu*(1+W/C)
~~~
- 計算機網絡
- 基礎_01
- tcp/ip
- http轉https
- Let's Encrypt免費ssl證書(基于haproxy負載)
- what's the http?
- 網關
- 網絡IO
- http
- 工具
- Git
- 初始本地倉庫并上傳
- git保存密碼
- Gitflow
- maven
- 1.生命周期命令
- 聚合與繼承
- 插件管理
- assembly
- 資源管理插件
- 依賴范圍
- 分環境打包
- dependencyManagement
- 版本分類
- 找不到主類
- 無法加載主類
- 私服
- svn
- gradle
- 手動引入第三方jar包
- 打包exe文件
- Windows
- java
- 設計模式
- 七大原則
- 1.開閉原則
- 2. 里式替換原則
- 3. 依賴倒置原則
- 4. 單一職責原則
- 單例模式
- 工廠模式
- 簡單工廠
- 工廠方法模式
- 抽象工廠模式
- 觀察者模式
- 適配器模式
- 建造者模式
- 代理模式
- 適配器模式
- 命令模式
- json
- jackson
- poi
- excel
- easy-poi
- 規則
- 模板
- 合并單元格
- word
- 讀取
- java基礎
- 類路徑與jar
- 訪問控制權限
- 類加載
- 注解
- 異常處理
- String不可變
- 跨域
- transient關鍵字
- 二進制編碼
- 泛型1
- 與或非
- final詳解
- Java -jar
- 正則
- 讀取jar
- map
- map計算
- hashcode計算原理
- 枚舉
- 序列化
- URLClassLoader
- 環境變量和系統變量
- java高級
- java8
- 1.Lambda表達式和函數式接口
- 2.接口的默認方法和靜態方法
- 3.方法引用
- 4.重復注解
- 5.類型推斷
- 6.拓寬注解的應用場景
- java7-自動關閉資源機制
- 泛型
- stream
- 時區的正確理解
- StringJoiner字符串拼接
- 注解
- @RequestParam和@RequestBody的區別
- 多線程
- 概念
- 線程實現方法
- 守護線程
- 線程阻塞
- 筆試題
- 類加載
- FutureTask和Future
- 線程池
- 同步與異步
- 高效簡潔的代碼
- IO
- ThreadLocal
- IO
- NIO
- 圖片操作
- KeyTool生成證書
- 壓縮圖片
- restful
- 分布式session
- app保持session
- ClassLoader.getResources 能搜索到的資源路徑
- java開發規范
- jvm
- 高并發
- netty
- 多線程與多路復用
- 異步與事件驅動
- 五種IO模型
- copy on write
- code style
- 布隆過濾器
- 筆試
- 數據庫
- mybatis
- mybatis與springboot整合配置
- pagehelper
- 分頁數據重復問題
- Java與數據庫之間映射
- 攔截器
- 攔截器應用
- jvm
- 堆內存測試
- 線程棧
- 直接內存
- 內存結構
- 內存模型
- 垃圾回收
- 調優
- 符號引用
- 運行參數
- 方法區
- 分帶回收理論
- 快捷開發
- idea插件
- 注釋模板
- git
- pull沖突
- push沖突
- Excel處理
- 圖片處理
- 合并單元格
- easypoi
- 模板處理
- 響應式編程
- reactor
- reactor基礎
- jingyan
- 規范
- 數據庫