Nio事件循環在NioEventLoop中,主要功能:
* 處理網絡I/O讀寫事件
* 執行系統任務和定時任務
在主循環中我們可以看到netty對I/O任務和提交到事件循環中的系統任務的調度。

## 6.1 I/O事件
1. 由于NIO的I/O讀寫需要使用選擇符,因此,netty在NioEventLoop初始化時,會使用SelectorProvider打開selector。在類加載時,netty會從系統設置中讀取相關配置參數:
* sun.nio.ch.bugLevel 用來修復JDK的NIO在Selector.open()的一個BUG
* io.netty.selectorAutoRebuildThreshold select()多少次數后重建selector
```
static {
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
}
```
2. NioEventLoop的構造方法中,會調用provider.openSelector()打開Selector;如果設置`io.netty.noKeySetOptimization`為true,則會啟動優化,優化內容是將Selector的selectedKeys和publicSelectedKeys屬性設置為可寫并替換為Netty實現的集合以提供效率。
```
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
// 下面是優化程序,此處省略
...
return selector;
}
```
3. NioEventLoop最核心的地方在于事件循環,具體代碼在NioEventLoop.java在run方法中
* 首先根據默認的選擇策略DefaultSelectStrategy判斷本次循環是否select,具體邏輯為:如果當前有任務則使用selectNow立刻查詢是否有準備就緒的I/O;如果當前沒有任務則返回SelectStrategy.SELECT,并將wakenUp設置為false,并調用select()進行查詢。
```
protected void run() {
for (;;) { // 事件循環
try {
// select策略
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); // select()
if (wakenUp.get()) {
selector.wakeup(); // 喚醒select()的線程
}
default:
// fallthrough
}
.... 后續處理
```
* select()時需要判斷當前是否有scheduledTask(定時任務),如果有則需要計算任務delay的時間,如果定時任務需要立刻執行了,那么必須馬上selectNow()并返回,之后執行任務。如果沒有scheduledTask,會判斷當前是否有任務在等待列表,如果有任務時將wakenUp設置為true并selectNow();如果沒有任務,那么會 selector.select(1000); 阻塞等待1s,直到有I/O就緒,或者有任務等待,或需要喚醒時退出,否則,會繼續循環,直到前面的幾種情況發生后退出。
* 之后,事件循環開始處理IO和任務。如果查詢到有IO事件,會調用processSelectedKeysOptimized(優化的情況下),對SelectionKey進行處理。
```
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime; // io花費的時間
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按照iorate計算task的時間
}
}
```
* processSelectedKeysOptimized處理I/O,主要是NIO的select操作,處理相關的事件。
```
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
......
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
```
## 6.2 任務處理
* runAllTasks執行提交到EventLoop的任務,首先從scheduledTaskQueue獲取需要執行的任務,加入到taskQueue,然后依次執行taskQueue的任務。
```
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue(); // 獲取定時任務
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
```
* ioRatio不為100時,會調用runAllTasks(ioTime * (100 - ioRatio) / ioRatio),首先計算出I/O處理的事件,然后按照比例為執行task分配事件,內部主要邏輯與runAllTasks()主要邏輯相同。