在服務器啟動過程初,我們向ServerBootstrap類傳入了兩個線程池,一個負責處理I/O連接請求,另一個用來處理連接后的讀寫操作。主事件循環主要負責接收客戶端連接,之后創建與客戶端連接的NioSocketChannel,然后將其注冊到子事件循環上面,由子事件循環負責處理子Channel的讀寫操作。
## 7.2.1 Accept事件的注冊
向java的channel注冊Accept事件發生在bind階段(AbstractBootstrap的doBind0方法)結束的最后,bind結束后觸發了Pipeline的fireChannelActive事件,經由NioServerSocketChannel的Pipeline的TailContext傳播到HeadContext,最后由unsafe向線程池提交fireChannelActive任務完成Accept的注冊。
```
// AbstractChannel.java中的AbstractUnsafe的bind()
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
```
Pipeline執行fireChannelActive從HeadContext開始觸發,Head執行readIfIsAutoRead方法
```
// DefaultChannelPipeline.java的HeadContext中
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
```
在readIfIsAutoRead中,調用channel的read(),由于read是outbound方法,最終會調用NioServerSocketChannel的unsafe的beginRead,在里面注冊Accept事件。
```
// AbstractNioChannel.java中的AbstractNioUnsafe類
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
```
## 7.2.2 Accept事件處理器
在服務器啟動完成后,NioServerSocketChannel的Pipeline結構如下:
```
Head[I/O] <--> ServerBootstrapAcceptor[IN] <--> Tail[IN]
```
Pipeline中的ServerBootstrapAcceptor是用來處理連接任務,其邏輯比較簡單:在服務器啟動時調用childHandler方法設置了ServerBootstrap的子Channel的處理器,此時會將childChannelHandler添加到子Channel中(NioSocketChannel會在連接過程中創建);設置子Channel的配置和屬性;最后將子Channel注冊到子線程池組中。
```
// 子Channel是服務器接收到請求后創建與客戶端連接的通道
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler); // 向子Channel添加子處理器
// 設置子Channel的配置和屬性
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {// 將子Channel注冊到子線程池組這
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
```
## 7.2.3 Accept事件的處理
在Nio的事件循環中,如果select到I/O連接或讀時,最終會使用服務器的NioServerSocketChannel內部的NioMessageUnsafe的read()進行處理。
```
private final List<Object> readBuf = new ArrayList<Object>(); // Unsafe內部變量
read(){
try {
do {
int localRead = doReadMessages(readBuf); // accept建立建立,創建客戶端Channel
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading()); // 是否還有讀取的數據
} catch (Throwable t) {
exception = t;
}
// 通知Pipeline
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
allocHandle.readComplete(); 重新計算緩沖池大小
pipeline.fireChannelReadComplete();
```
1. Netty中緩沖區使用RecvByteBufAllocator和RecvByteBufAllocator.Handle來進行分配,在內存管理一節有詳細說明。
2. accept獲得連接成功與客戶端的javaChannel,然后創建NettyChannel,NioSocketChannel(初始化過程與NioServerSocketChannel類似,有channelID,unsafe和Pipeline),也會創建Config和AdaptiveRecvByteBufAllocator。
3. 通知Pipeline,觸發fireChannelRead,參數msg為創建的客戶端通道NioSocketChannel,Head的channelRead沒有實際內容,傳給ServerBootstrapAcceptor,ServerBootstrapAcceptor負責講用戶定義的childHandler加入到子ChannelHandler的Pipeline中。
4. 最后,重新計算緩沖池大小(可能擴容或減小)
5. 觸發Pipeline的fireChannelReadComplete
此時,與客戶端的javaChannel已經建立,并且創建了Netty的客戶端NioSocketChannel,并將其注冊子線程池組中,在子線程池的事件循環中,會處理read事件。