阿里的面試官問了個問題,如果corePolllSize=10,MaxPollSize=20,如果來了25個線程 怎么辦,
?先 達到 corePoolSize,然后 優先放入隊列,然后在到MaxPollSize;然后拒絕;
答案:
~~~
當一個任務通過execute(Runnable)方法欲添加到線程池時:
1、 如果此時線程池中的數量小于corePoolSize,即使線程池中的線程都處于空閑狀態,也要創建新的線程來處理被添加的任務。
2、 如果此時線程池中的數量等于 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
3、如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,再有新的線程,開始增加線程池的線程數量處理新的線程,直到maximumPoolSize;
4、 如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量等于maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5、 當線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
~~~
**當線程數小于corePoolSize時,提交一個任務創建一個線程(即使這時有空閑線程)來執行該任務。**
**當線程數大于等于corePoolSize,首選將任務添加等待隊列workQueue中(這里的workQueue是上面的BlockingQueue),等有空閑線程時,讓空閑線程從隊列中取任務。**
**當等待隊列滿時,如果線程數量小于maximumPoolSize則創建新的線程,否則使用拒絕線程處理器來處理提交的任務。**
慢慢的啟動到10,然后把剩下的15個放到阻塞隊列里面,并開始在線程池里面創建線程,直到最大MaximumPoolSize;
當然是先放在阻塞隊列(如果數量為0,就一直等待,LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列,兩邊都可以進出的,那種,
參考:[聊聊并發(七)——Java中的阻塞隊列](http://www.cnblogs.com/aspirant/p/8628779.html))里面了,BlockingQueue,面試官想知道具體的處理流程,我掌握的不深,于是下定決心好好查查:
尤其是那個車間里工人的例子,好好看看,理解線程很有用:
在上一章中我們概述了一下線程池,這一章我們看一下創建newFixedThreadPool的源碼。例子還是我們在上一章中寫的那個例子。
## 創建newFixedThreadPool的方法:
~~~
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
~~~
~~~
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
~~~
上面這兩個方法是創建固定數量的線程池的兩種方法,兩者的區別是:第二種創建方法多了一個線程工廠的方法。我們繼續看ThreadPoolExecutor這個類中的構造函數:
## ThreadPoolExecutor的構造函數:
~~~
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
~~~
~~~
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
~~~
ThreadPollExecutor中的所有的構造函數最終都會調用上面這個構造函數,接下來我們來分析一下這些參數的含義:?
### corePoolSize:
線程池啟動后,在池中保持的線程的最小數量。需要說明的是線程數量是逐步到達corePoolSize值的。例如corePoolSize被設置為10,而任務數量只有5,則線程池中最多會啟動5個線程,而不是一次性地啟動10個線程。
### maxinumPoolSize:
線程池中能容納的最大線程數量,如果超出,則使用RejectedExecutionHandler拒絕策略處理。?
### keepAliveTime:
線程的最大生命周期。這里的生命周期有兩個約束條件:一:該參數針對的是超過corePoolSize數量的線程;二:處于非運行狀態的線程。舉個例子:如果corePoolSize(最小線程數)為10,maxinumPoolSize(最大線程數)為20,而此時線程池中有15個線程在運行,過了一段時間后,其中有3個線程處于等待狀態的時間超過keepAliveTime指定的時間,則結束這3個線程,此時線程池中則還有12個線程正在運行。
### unit:
這是keepAliveTime的時間單位,可以是納秒,毫秒,秒,分鐘等。
### workQueue:?
任務隊列。當線程池中的線程都處于運行狀態,而此時任務數量繼續增加,則需要一個容器來容納這些任務,這就是任務隊列。這個任務隊列是一個阻塞式的單端隊列。?
**newFixedThreadPool**和**newSingleThreadExector使用的是LinkedBlockingQueue的無界模式(美團面試題目)。**
### threadFactory:
定義如何啟動一個線程,可以設置線程的名稱,并且可以確定是否是后臺線程等。
### handler:
拒絕任務處理器。由于超出線程數量和隊列容量而對繼續增加的任務進行處理的程序。
OK,ThreadPoolExecutor中的主要參數介紹完了。我們再說一下線程的管理過程:**首先創建一個線程池,然后根據任務的數量逐步將線程增大到corePoolSize,如果此時仍有任務增加,則放置到workQueue中,**直到workQueue爆滿為止,然后繼續增加池中的線程數量(增強處理能力),最終達到maxinumPoolSize。那如果此時還有任務要增加進來呢?這就需要handler來處理了,**或者丟棄新任務,或者拒絕新任務,或者擠占已有的任務(拒絕策略,美團面試)**。在任務隊列和線程池都飽和的情況下,一旦有線程處于等待(任務處理完畢,沒有新任務)狀態的時間超過keepAliveTime,則該線程終止,也就是說池中的線程數量會逐漸降低,直至為corePoolSize數量為止。
總結:
~~~
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue
RejectedExecutionHandler handler)
corePoolSize: 線程池維護線程的最少線程數,也是核心線程數,包括空閑線程
maximumPoolSize: 線程池維護線程的最大線程數
keepAliveTime: 線程池維護線程所允許的空閑時間
unit: 程池維護線程所允許的空閑時間的單位
workQueue: 線程池所使用的緩沖隊列
handler: 線程池對拒絕任務的處理策略
~~~
在《編寫高質量代碼 改善Java程序的151個建議》這本書里舉的這個例子很形象:

OK,接下來我們來看一下怎么往任務隊里中放入線程任務:在java.util.concurrent.AbstractExecutorService這個類的submit方法
## submit方法
~~~
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);//執行任務
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);//執行任務
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);//執行任務
return ftask;
}
~~~
這是三個重載方法,分別對應Runnable、帶結果的Runnable接口和Callable回調函數。其中的newTaskFor也是一個重載的方法,它通過層層的包裝,把Runnable接口包裝成了適配RunnableFuture的實現類,底層實現如下:
~~~
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
~~~
~~~
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
~~~
~~~
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
~~~
在submit中最重要的是execute這個方法,這個方法也是我們分析的重點
## execute方法:
~~~
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
~~~
在這個方法中分為三部分
1、如果少于corePoolSize數量的線程在運行,則啟動一個新的線程并把傳進來的Runnable做為第一個任務。然后會檢查線程的運行狀態和worker的數量,阻止不符合要求的任務添加到線程中
2、如果一個任務成功的放入到了隊列中,我們仍然需要二次檢查我們是否應該添加線程或者停止。因此我們重新檢查線程狀態,是否需要回滾隊列,或者是停止或者是啟動一個新的線程
3、如果我們不能添加隊列任務了,但是仍然在往隊列中添加任務,如果添加失敗的話,用拒絕策略來處理。
這里最主要的是addWorker這個方法:
~~~
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
~~~
我們在這個方法里創建一個線程,注意這個線程不是我們的任務線程,而是經過包裝的Worker線程。所以這里的run方法是Worker這個類中的run方法。execute方法是通過Worker類啟動的一個工作線程,執行的是我們的第一個任務,然后該線程通過getTask方法從任務隊列總獲取任務,之后再繼續執行。這個任務隊列是一個BlockingQueue,是一個阻塞式的,也就是說如果該隊列元素為0,則保持等待狀態。直到有任務進入為止。
?
# Java中的線程池
> 我們一般將任務(Task)提交到線程池中運行,對于一個線程池而言,需要關注的內容有以下幾點:
> 在什么樣的線程中執行任務
> 任務按照什么順序來執行(FIFO,LIFO,優先級)
> 最多有多少個任務能并發執行
> 最多有多個任務等待執行
> 如果系統過載則需要拒絕一個任務,如何通知任務被拒絕?
> 在執行一個任務之前或之后需要進行哪些操作
> **圍繞上面的問題,我們來研究一下java中的線程池**
## 線程池的創建
**Exectors.newFixedThreadPool(int size):**創建一個固定大小的線程池。 每來一個任務創建一個線程,當線程數量為size將會停止創建。當線程池中的線程已滿,繼續提交任務,如果有空閑線程那么空閑線程去執行任務,否則將任務添加到一個無界的等待隊列中。
**Exectors.newCachedThreadPool():**創建一個可緩存的線程池。對線程池的規模沒有限制,當線程池的當前規模超過處理需求時(比如線程池中有10個線程,而需要處理的任務只有5個),那么將回收空閑線程。當需求增加時則會添加新的線程。
**Exectors.newSingleThreadExcutor():**創建一個單線程的Executor,它創建單個工作者線程來執行任務,如果這個線程異常結束,它會創建另一個線程來代替。
**Exectors.newScheduledThreadPool():**創建一個固定長度的線程池,而且以延遲或定時的方式來執行任務。
上面都是通過工廠方法來創建線程池,其實它們內部都是通過創建ThreadPoolExector對象來創建線程池的。下面是ThreadPoolExctor的構造函數。
~~~
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
~~~
我們看到構造函數是public類型的,所以我們也可以自定義自己的線程池。
## 在什么樣的線程中執行任務?
java中對于任務的描述有兩種,一種是Runnable型的任務,一種是Callable型的任務。前者運行結束后不會返回任何東西,而后者可以返回我們需要的計算結果,甚至異常。
##### 在沒有返回值的線程中運行
創建一個線程池,然后調用其execute方法,并將一個Runnable對象傳遞進去即可。
~~~
ExectorService exector = Exectors.newCachedThreadPool();
exector.execute(new Runnable(){
public void run(){
System.out.println("running...");
}
});
~~~
##### 在有返回值的線程中運行
~~~
ExectorService exector = Exectors.newCachedThreadPool();
Callable<Result> task = new Callable<Result>() {
public Result call() {
return new Computor().compute();
}
};
Future<Result> future = exector.submit(task);
result = future.get(); //改方法會一直阻塞,直到提交的任務被運行完畢
~~~
## 任務按照什么順序來執行(FIFO,優先級)
如果任務按照某種順序來執行的話,則任務一定是串行執行的。我們可以看到在ThreadPoolExecutor中第四個參數是BlockingQueue,提交的任務都先放到該隊列中。如果傳入不同的BlockQueue就可以實現不同的執行順序。傳入LinkedBlockingQueue則表示先來先服務,傳入PriorityBlockingQueue則使用優先級來處理任務
Exectors.newSingleThreadExcutor()使用的是先來先服務策略
## 最多有多少個任務能并發執行
線程池中的線程會不斷從workQueue中取任務來執行,如果沒任務可執行,則線程處于空閑狀態。
在ThreadPoolExecutor中有兩個參數corePoolSize和maximumPoolSize,前者被稱為基本大小,表示一個線程池初始化時,里面應該有的一定數量的線程。但是默認情況下,ThreadPoolExecutor在初始化是并不會馬上創建corePoolSize個線程對象,它使用的是懶加載模式。
* 當線程數小于corePoolSize時,提交一個任務創建一個線程(即使這時有空閑線程)來執行該任務。
* 當線程數大于等于corePoolSize,首選將任務添加等待隊列workQueue中(這里的workQueue是上面的BlockingQueue),等有空閑線程時,讓空閑線程從隊列中取任務。
* 當等待隊列滿時,如果線程數量小于maximumPoolSize則創建新的線程,否則使用拒絕線程處理器來處理提交的任務。
## 最多有多少的任務等待執行
這個問題和BlockingQueue相關。 BlockingQueue有三個子類,一個是ArrayBlockingQueue(有界隊列),一個是LinkedBlockingQueue(默認無界,但可以配置為有界),PriorityBlockingQueue(默認無界,可配置為有界)。所以,對于有多少個任務等待執行與傳入的阻塞隊列有關。
**newFixedThreadPool**和**newSingleThreadExector**使用的是LinkedBlockingQueue的無界模式。而**newCachedThreadPool**使用的是SynchronousQueue,這種情況下線程是不需要排隊等待的,SynchronousQueue適用于線程池規模無界。
## 如果系統過載則需要拒絕一個任務,如何通知任務被拒絕?
當有界隊列被填滿或者某個任務被提交到一個已關閉的Executor時將會啟動飽和策略,即使用RejectedExecutionHandler來處理。JDK中提供了幾種不同的RejectedExecutionHandler的實現:AbortPolicy,CallerRunsPolicy, DiscardPolicy和DiscardOldestPolicy。
**AbortPolicy:**默認的飽和策略。該策略將拋出未檢查的**RejectedExcutionException**,調用者可以捕獲這個異常,然后根據自己的需求來處理。
**DiscardPolicy:**該策略將會拋棄提交的任務
**DiscardOldestPolicy:**該策略將會拋棄下一個將被執行的任務(處于隊頭的任務),然后嘗試重新提交該任務到等待隊列
**CallerRunsPolicy:**該策略既不會拋棄任務也不會拋出異常,而是在調用execute()的線程中運行任務。比如我們在主線程中調用了execute(task)方法,但是這時workQueue已經滿了,并且也不會創建的新的線程了。這時候將會在主線程中直接運行execute中的task。
## 在執行一個任務之前或之后需要進行哪些操作
**ThreadPoolExecutor**是可擴展的,它提供了幾個可以重載的方法:**beforeExecute**,**afterExecute**和**terminated**,這里用到了面向的切面編程的思想。無論任務是從run中正常返回,還是拋出異常而返回,**afterExectue**都會被調用。如果?**beforeExecute**中拋出了一個?**RunntimeException**,那么任務將不會被執行,并且?**afterExecute**也不會被調用。
~~~
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class Test {
public static void main(String[] args) {
TimingThreadPool executor = new TimingThreadPool(5, 10, 1,
TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 5; i++)
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("running1....");
}
});
executor.shutdown();
}
}
class TimingThreadPool extends ThreadPoolExecutor {
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
public TimingThreadPool(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTime.set(System.nanoTime());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
} finally {
super.afterExecute(r, t);
}
}
@Override
protected void terminated() {
try {
System.out.println(String.format("Terminated: arg time = %d",
totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}
~~~
上面的代碼統計任務平均執行時間,在每個線程中beforeExecute和afertExecute都會執行一次,而terminated等線程池關閉的時候執行
- 線程參數含義
- Inoddb索引實現
- 為什么是B+tree
- Redis使用,分布式鎖的實現
- 操作系統虛擬內存換頁的過程
- TCP三次握手
- Volatile關鍵字的作用
- 樂觀鎖,悲觀鎖
- HashMap結構,是否線程安全
- ConcurrentHashMap如何保證線程安全
- 說一下B樹和B+樹的區別
- HashMap的實現,擴容機制,擴容時如何保證可操作?
- Spring AOP的原理
- Spring IoC的原理,如何實現,如何解決循環依賴?
- 兩線程對變量i進行加1操作,結果如何?為什么?怎么解決?
- CAS概念、原子類實現原理
- synchronize底層實現,如何實現Lock?
- AQS有什么特點?
- 介紹各種網絡協議。
- DNS在網絡層用哪個協議,為什么。
- 介紹HTTPS協議,詳述SSL建立連接過程。
- 反轉單鏈表
- 復雜鏈表復制
- 數組a,先單調地址再單調遞減,輸出數組中不同元素個數
- 說一下Java垃圾回收機制
- 64匹馬,8個賽道,找最快的4匹馬
- 64匹馬,8個賽道,找最快的8匹馬
- 給出兩個升序數組A、B和長度m、n,求第k個大的
- 講一下多線程與多進程區別
- JVM中什么時候會進行垃圾回收?什么樣的對象是可以回收的?
- Spring主要思想是什么?