## 3.3 Netty的EventLoop與線程池
Netty的事件循環和事件循環組的實現中,類的層級關系比較復雜,其底層是Java線程池的實現,不過在實際使用中還是比較簡單的,我們只需要使用如下的代碼即可,
```
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workGroup)//設置事件循環組
```
Netty的事件循環機制有兩個基本接口:EventLoop和EventLoopGroup。前者是事件循環,后者是由多個事件循環組成的組。每個EventLoop被包裝為一個Task放在在線程池中運行,但其本身也可以看做一個線程池,如Nio的事件循環會不斷select后獲取任務并執行。Nio的事件循環在實現時,使用死循環的方式不斷select(),然后處理提交給EventLoop的系統任務。因此,我們可以將NioEventLoop當做線程池,EventLoopGroup作為線程池組,線程池組的意義是采用給的的策略選取一個EventLoop并提交任務。
EventLoop的定義如下,其繼承了一個順序執行的線程池接口和EventLoopGroup,也就是說EventLoop之間有父子關系,通過parent();返回任務循環組,通過next()選取一個事件循環。線程池組的register用于將Netty的Channel注冊到線程池中。
```
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
public interface EventLoopGroup extends EventExecutorGroup {
EventLoop next();
ChannelFuture register(Channel channel);
}
```
### NioEventLoopGroup
NioEventLoopGroup除了處理網絡的異步I/O任務,還用于完成異步提交的系統任務。NioEventLoopGroup初始化時,有如下幾個參數可以配置,主要用于設置線程池的相關配置。
* nThreads 子線程池數量
* Executor executor 用來執行任務的線程池
* chooserFactory :next()時選擇線程池的策略
* selectorProvider 用于打開selector
* selectStrategyFactory 用來控制select循環行為的策略
* RejectedExecutionHandlers 線程池執行的異常處理策略
```
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
```
NioEventLoopGroup初始化過程為:
1. 如果傳入的executor 為空,會默認使用`ThreadPerTaskExecutor`,該線程池針對每個任務會創建一個線程,創建線程方式使用`DefaultThreadFactory`提供的newThread方法。
2. 初始化開始,首先會根據創建nThread個子線程池,保存在childrens變量中,創建邏輯比較簡單,將初始化NioEventLoopGroup時設置的參數傳遞給NioEventLoop對象。在創建子線程池NioEventLoop的過程中,如果一旦有失敗的,就需要關閉已經創建的所有子線程池并等待這些線程池結束。
3. 之后,使用chooserFactory創建`chooser`,用來在next()選擇事件循環時從childrens變量選擇一個返回。默認使用2的倍數的策略,也可以設置為順序依次選擇。
4. 向組中所有的事件循環的`terminationFuture`注冊事件,目的是等待所有事件循環結束后將事件循環組的`terminatedChildren`設置為成功完成。
5. 最后,將children復制保存為一個只讀的集合,保存在變量`readonlyChildren`中。
至此,NioEventLoopGroup的初始化過程就結束了。我們可以看到,NioEventLoopGroup主要的用來聚合多個線程池,對其進行調度。
### NioEventLoop
在NioEventLoopGroup的初始化過程中,會創建多個NioEventLoop,NioEventLoop用來執行實際的事件循環,初始化時有如下幾個屬性:
* NioEventLoopGroup parent 線程池所在的Group
* Executor executor 執行任務的線程池,默認是ThreadPerTaskExecutor
* SelectorProvider selectorProvider 用來打開selector
* SelectStrategy strategy 用來控制select循環行為的策略
* RejectedExecutionHandlers 線程池執行的異常處理策略
* addTaskWakesUp addTask(Runnable)添加任務時是否喚醒線程池,默認是false
* maxPendingTasks 線程池中等待任務的最大數量
* scheduledTaskQueue 保存定時任務的QUeue
* tailTasks :保存任務的Queue,netty選擇使用jctools的MpscChunkedArrayQueue,原因是為了提高效率,因為Nio線程池的線程消費者只有一個,就是一直進行的select循環,而生產者可能有多個。具體實現參見 http://blog.csdn.net/youaremoon/article/details/50351929
### 提交任務
NioEventLoop初始化時,會創建/設置其包含的屬性,最重要的是打開selector和創建tailTasks兩個步驟;這時,由于沒有任何任務,NioEventLoop不會啟動線程。在netty中,向線程池提交任務可以使用下面的方法:
```
EventLoopGroup loop = new NioEventLoopGroup();
loop.next().submit(Callable<T> task)
loop.next().submit(Runnable task)
loop.next().execute(Runnable command);
```
也可以直接通過EventLoopGroup提交任務,只是EventLoopGroup內部會調用next()后再執行相關的方法。
```
EventLoopGroup loop = new NioEventLoopGroup();
loop.submit(Callable<T> task)
loop.submit(Runnable task)
loop.execute(Runnable command);
```
submit方法的內部會將Callable或Runnable包裝后交給execute方法執行。
```
// AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // 包裝task為 ftask
execute(ftask);
return ftask;
}
```
execute方法被NioEventLoop的父類SingleThreadEventExecutor覆蓋,程序如下:
```
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task); // 添加到任務隊列
} else {
startThread(); // 啟動線程,向EventLoop內部的線程池提交任務,會執行NioEventLoop run
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
```
1. 判斷當前線程(提交任務的線程)與當前線程池是同一個線程,也就是說是如果是當前線程池提交的任務,則直接將任務加入線程池隊列即可;
2. 如果不是,則需要啟動線程后添加任務。啟動線程的過程是,向NioEventLoop內部包含的executor提交一個任務,任務內部執行NioEventLoop的run方法(executor是實際使用的線程池,初始化是傳入,默認是ThreadPerTaskExecutor)。
3. 最后根據addTaskWakesUp標志和任務是否實現了NonWakeupRunnable判斷是否需要喚醒,喚醒的方法是提交一個默認的空任務WAKEUP_TASK。
NioEventLoop的run方法內部是一個死循環,會一直執行select()查詢準備就緒的I/O描述符并做相應的I/O處理,還會對提交到NioEventLoop的任務進行處理。

當我們向線程池組提交任務時,group先選擇一個EventLoop(通過next()),如果EventLoop未啟動則向線程池提交一個任務執行EventLoop的run,然后將任務加入到該線程池的隊列中,等待事件EventLoop下次處理。