這一節我們編寫一個監視器:EventLogMonitor ,也就是用來接收事件的程序,用來代替 netcat 。EventLogMonitor 做下面事情:
* 接收 LogEventBroadcaster 廣播的 UDP DatagramPacket
* 解碼 LogEvent 消息
* 輸出 LogEvent 消息
和之前一樣,將實現自定義 ChannelHandler 的邏輯。圖13.4描述了LogEventMonitor 的 ChannelPipeline 并表明了 LogEvent 的流經情況。
[](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2013.4%20LogEventMonitor.jpg)
Figure 13.4 LogEventMonitor
圖中顯示我們的兩個自定義 ChannelHandlers,LogEventDecoder 和 LogEventHandler。首先是負責將網絡上接收到的 DatagramPacket 解碼到 LogEvent 消息。清單13.6顯示了實現。
Listing 13.6 LogEventDecoder
~~~
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
ByteBuf data = datagramPacket.content(); //1
int i = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR); //2
String filename = data.slice(0, i).toString(CharsetUtil.UTF_8); //3
String logMsg = data.slice(i + 1, data.readableBytes()).toString(CharsetUtil.UTF_8); //4
LogEvent event = new LogEvent(datagramPacket.recipient(), System.currentTimeMillis(),
filename,logMsg); //5
out.add(event);
}
}
~~~
1. 獲取 DatagramPacket 中數據的引用
2. 獲取 SEPARATOR 的索引
3. 從數據中讀取文件名
4. 讀取數據中的日志消息
5. 構造新的 LogEvent 對象并將其添加到列表中
第二個 ChannelHandler 將執行一些首先創建的 LogEvent 消息。在這種情況下,我們只會寫入 system.out。在真實的應用程序可能用到一個單獨的日志文件或放到數據庫。
下面的清單顯示了 LogEventHandler。
Listing 13.7 LogEventHandler
~~~
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { //1
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); //2
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception {
StringBuilder builder = new StringBuilder(); //3
builder.append(event.getReceivedTimestamp());
builder.append(" [");
builder.append(event.getSource().toString());
builder.append("] [");
builder.append(event.getLogfile());
builder.append("] : ");
builder.append(event.getMsg());
System.out.println(builder.toString()); //4
}
}
~~~
1. 繼承 SimpleChannelInboundHandler 用于處理 LogEvent 消息
2. 在異常時,輸出消息并關閉 channel
3. 建立一個 StringBuilder 并構建輸出
4. 打印出 LogEvent 的數據
LogEventHandler 打印出 LogEvent 的一個易讀的格式,包括以下:
* 收到時間戳以毫秒為單位
* 發送方的 InetSocketAddress,包括IP地址和端口
* LogEvent 生成絕對文件名
* 實際的日志消息,代表在日志文件中一行
現在我們需要安裝處理程序到 ChannelPipeline ,如圖13.4所示。下一個清單顯示了這是如何實現 LogEventMonitor 類的一部分。
Listing 13.8 LogEventMonitor
~~~
public class LogEventMonitor {
private final Bootstrap bootstrap;
private final EventLoopGroup group;
public LogEventMonitor(InetSocketAddress address) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group) //1
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LogEventDecoder()); //2
pipeline.addLast(new LogEventHandler());
}
}).localAddress(address);
}
public Channel bind() {
return bootstrap.bind().syncUninterruptibly().channel(); //3
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
}
LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0]))); //4
try {
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().await();
} finally {
monitor.stop();
}
}
}
~~~
1. 引導 NioDatagramChannel。設置 SO_BROADCAST socket 選項。
2. 添加 ChannelHandler 到 ChannelPipeline
3. 綁定的通道。注意,在使用 DatagramChannel 是沒有連接,因為這些 無連接
4. 構建一個新的 LogEventMonitor
- Introduction
- 開始
- Netty-異步和數據驅動
- Netty 介紹
- 構成部分
- 關于本書
- 第一個 Netty 應用
- 設置開發環境
- Netty 客戶端/服務端 總覽
- 寫一個 echo 服務器
- 寫一個 echo 客戶端
- 編譯和運行 Echo 服務器和客戶端
- 總結
- Netty 總覽
- Netty 快速入門
- Channel, Event 和 I/O
- 什么是 Bootstrapping 為什么要用
- ChannelHandler 和 ChannelPipeline
- 近距離觀察 ChannelHandler
- 總結
- 核心功能
- Transport(傳輸)
- 案例研究:Transport 的遷移
- Transport API
- 包含的 Transport
- Transport 使用情況
- 總結
- Buffer(緩沖)
- Buffer API
- ByteBuf - 字節數據的容器
- 字節級別的操作
- ByteBufHolder
- ByteBuf 分配
- 總結
- ChannelHandler 和 ChannelPipeline
- ChannelHandler 家族
- ChannelPipeline
- ChannelHandlerContext
- 總結
- Codec 框架
- 什么是 Codec
- Decoder(解碼器)
- Encoder(編碼器)
- 抽象 Codec(編解碼器)類
- 總結
- 提供了的 ChannelHandler 和 Codec
- 使用 SSL/TLS 加密 Netty 程序
- 構建 Netty HTTP/HTTPS 應用
- 空閑連接以及超時
- 解碼分隔符和基于長度的協議
- 編寫大型數據
- 序列化數據
- 總結
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- 引導
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- NETTY BY EXAMPLE
- 單元測試
- 總覽
- 測試 ChannelHandler
- 測試異常處理
- 總結
- WebSocket
- WebSocket 程序示例
- 添加 WebSocket 支持
- 測試程序
- 總結
- SPDY
- SPDY 背景
- 示例程序
- 實現
- 啟動 SpdyServer 并測試
- 總結
- 通過 UDP 廣播事件
- UDP 基礎
- UDP 廣播
- UDP 示例
- EventLog 的 POJO
- 寫廣播器
- 寫監視器
- 運行 LogEventBroadcaster 和 LogEventMonitor
- 總結
- 高級主題
- 實現自定義的編解碼器
- 編解碼器的范圍
- 實現 Memcached 編解碼器
- 了解 Memcached 二進制協議
- Netty 編碼器和解碼器
- 測試編解碼器
- EventLoop 和線程模型
- 線程模型的總覽
- EventLoop
- EventLoop
- I/O EventLoop/Thread 分配細節
- 總結
- 用例1:Droplr Firebase 和 Urban Airship
- 用例2:Facebook 和 Twitter