## 9.2.1 ChannelPipeline機制
Pipleline中可以講ChannelHandler維護成一個雙向鏈表,實現上通過將ChannelHandler包裝為AbstractChannelHandlerContext,然后將各個Context連接起來。
**初始化Pipeline**
初始化時機:在Netty啟動時,創建Channel的構造方法中會初始化一個默認的DefaultChannelPipeline
```
// DefaultChannelPipeline的構造方法
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
```
TailContext繼承了AbstractChannelHandlerContext并實現ChannelInboundHandler,可以看做是一個處理輸入的處理器;而HeadContext最靠近java的Channel,因此需要實現in/outbound,處理輸入和輸出。通過代碼可以看到,HeadContext會調用Channel的unsafe處理所有的I/O操作。
```
class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler
class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler
```
***PendingHandlerCallback鏈表**
DefaultChannelPipeline內部還維護了一個PendingHandlerCallback鏈表,通過名稱我們可以看出來這個鏈表上面的ChannelHandler等待被回調。
在Netty服務器啟動過程中,Channel創建并初始化完成后,才會同時進行注冊和綁定。由于初始化過程中需要在注冊完成后向鏈表加入ServerBootstrapAcceptor用來處理連接操作,因此,在addLast()時只能先加入到了這個等待鏈表中,注冊完成后,會遍歷這個鏈表,執行ChannelHandler的initChannel方法。初始化階段中注冊后的操作寫在了initChannel里面。
```
// ServerBootstrap的init() 初始化階段加入的初始化Handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
//addLast中,若未注冊,則加入PendingHandlerCallback鏈表
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
```
**通知機制**
在Netty的啟動過程以及后續的I/O操作中,很多階段都會通知Pipeline上面的ChannelHandler。
例如,在啟動過程在,注冊完成后,調用pipeline.fireChannelRegistered();綁定完成后調用pipeline.fireChannelActive();
我們以fireChannelRead為例,看看如何實現的按照鏈表通知。
```
//DefaultChannelPipeline.java
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
// AbstractChannelHandlerContext.java
void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
// ReferenceCountUtil 引用計數
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 調用invokeChannelRead
next.invokeChannelRead(m);
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) { // 判斷是否已經添加完成
try {
// 調用ChannelHandler的channelRead
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg); // 未完成繼續下一個
}
}
```
上面的程序中,從head出發查找ChannelInboundHandler,調用其channelRead,如果需要繼續調用鏈表后面的channelRead,需要調用ctx.fireChannelRead(msg);繼續通知,在今后自己定義channelRead時需要注意,需要手動繼續傳遞消息。