本節,我們將寫一個廣播器。下圖展示了廣播一個 DatagramPacket 在每個日志實體里面的方法
[](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2013.2%20Log%20entries%20sent%20with%20DatagramPackets.jpg)
1. 日志文件
2. 日志文件中的日志實體
3. 一個 DatagramPacket 保持一個單獨的日志實體
Figure 13.2 Log entries sent with DatagramPackets
圖13.3表示一個 LogEventBroadcaster 的 ChannelPipeline 的高級視圖,說明了 LogEvent 是如何流轉的。
[](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2013.3%20LogEventBroadcaster-ChannelPipeline%20and%20LogEvent%20flow.jpg)
Figure 13.3 LogEventBroadcaster: ChannelPipeline and LogEvent flow
正如我們所看到的,所有的數據傳輸都封裝在 LogEvent 消息里。LogEventBroadcaster 寫這些通過在本地端的管道,發送它們通過ChannelPipeline 轉換(編碼)為一個定制的 ChannelHandler 的DatagramPacket 信息。最后,他們通過 UDP 廣播并被遠程接收。
*編碼器和解碼器*
*編碼器和解碼器將消息從一種格式轉換為另一種,深度探討在第7章中進行。我們探索 Netty 提供的基礎類來簡化和實現自定義 ChannelHandler 如 LogEventEncoder 在這個應用程序中。*
下面展示了 編碼器的實現
Listing 13.2 LogEventEncoder
~~~
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress remoteAddress;
public LogEventEncoder(InetSocketAddress remoteAddress) { //1
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8); //2
byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1);
buf.writeBytes(file);
buf.writeByte(LogEvent.SEPARATOR); //3
buf.writeBytes(msg); //4
out.add(new DatagramPacket(buf, remoteAddress)); //5
}
}
~~~
1. LogEventEncoder 創建了 DatagramPacket 消息類發送到指定的 InetSocketAddress
2. 寫文件名到 ByteBuf
3. 添加一個 SEPARATOR
4. 寫一個日志消息到 ByteBuf
5. 添加新的 DatagramPacket 到出站消息
*為什么使用 MessageToMessageEncoder?*
*當然我們可以編寫自己的自定義 ChannelOutboundHandler 來轉換 LogEvent 對象到 DatagramPackets。但是繼承自MessageToMessageEncoder 為我們簡化和做了大部分的工作。*
為了實現 LogEventEncoder,我們只需要定義服務器的運行時配置,我們稱之為“bootstrapping(引導)”。這包括設置各種 ChannelOption 并安裝需要的 ChannelHandler 到 ChannelPipeline 中。完成的 LogEventBroadcaster 類,如清單13.3所示。
Listing 13.3 LogEventBroadcaster
~~~
public class LogEventBroadcaster {
private final Bootstrap bootstrap;
private final File file;
private final EventLoopGroup group;
public LogEventBroadcaster(InetSocketAddress address, File file) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new LogEventEncoder(address)); //1
this.file = file;
}
public void run() throws IOException {
Channel ch = bootstrap.bind(0).syncUninterruptibly().channel(); //2
System.out.println("LogEventBroadcaster running");
long pointer = 0;
for (;;) {
long len = file.length();
if (len < pointer) {
// file was reset
pointer = len; //3
} else if (len > pointer) {
// Content was added
RandomAccessFile raf = new RandomAccessFile(file, "r");
raf.seek(pointer); //4
String line;
while ((line = raf.readLine()) != null) {
ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line)); //5
}
pointer = raf.getFilePointer(); //6
raf.close();
}
try {
Thread.sleep(1000); //7
} catch (InterruptedException e) {
Thread.interrupted();
break;
}
}
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException();
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255",
Integer.parseInt(args[0])), new File(args[1])); //8
try {
broadcaster.run();
} finally {
broadcaster.stop();
}
}
}
~~~
1. 引導 NioDatagramChannel 。為了使用廣播,我們設置 SO_BROADCAST 的 socket 選項
2. 綁定管道。注意當使用 Datagram Channel 時,是沒有連接的
3. 如果需要,可以設置文件的指針指向文件的最后字節
4. 設置當前文件的指針,這樣不會把舊的發出去
5. 寫一個 LogEvent 到管道用于保存文件名和文件實體。(我們期望每個日志實體是一行長度)
6. 存儲當前文件的位置,這樣,我們可以稍后繼續
7. 睡 1 秒。如果其他中斷退出循環就重新啟動它。
8. 構造一個新的實例 LogEventBroadcaster 并啟動它
這就是程序的完整的第一部分。可以使用 "netcat" 程序查看程序的結果。在 UNIX/Linux 系統,可以使用 "nc", 在 Windows 環境下,可以在?[http://nmap.org/ncat](http://nmap.org/ncat)找到
Netcat 是完美的第一個測試我們的應用程序;它只是監聽指定的端口上接收并打印所有數據到標準輸出。將其設置為在端口 9999 上監聽 UDP 數據如下:
~~~
$ nc -l -u 9999
~~~
現在我們需要啟動 LogEventBroadcaster。清單13.4顯示了如何使用 mvn 編譯和運行廣播器。pom的配置。pom.xml 配置指向一個文件`/var/log/syslog`(假設是UNIX / Linux環境)和端口設置為 9999。文件中的條目將通過 UDP 廣播到端口,在你開始 netcat 后打印到控制臺。
Listing 13.4 Compile and start the LogEventBroadcaster
~~~
$ mvn clean package exec:exec -Pchapter13-LogEventBroadcaster
[INFO] Scanning for projects...
[INFO]
[INFO] --------------------------------------------------------------------
[INFO] Building netty-in-action 0.1-SNAPSHOT
[INFO] --------------------------------------------------------------------
...
...
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---
[INFO] Building jar: /Users/norman/Documents/workspace-intellij/netty-in-actionprivate/
target/netty-in-action-0.1-SNAPSHOT.jar
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action -
LogEventBroadcaster running
~~~
當調用 mvn 時,在系統屬性中改變文件和端口值,指定你想要的。清單13.5 設置日志文件 到?`/var/log/mail.log`?和端口 8888。
Listing 13.5 Compile and start the LogEventBroadcaster
~~~
$ mvn clean package exec:exec -Pchapter13-LogEventBroadcaster /
-Dlogfile=/var/log/mail.log -Dport=8888 -....
....
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action -
LogEventBroadcaster running
~~~
當看到 “LogEventBroadcaster running” 說明程序運行成功了。
netcat 只用于測試,但不適合生產環境中使用。
- Introduction
- 開始
- Netty-異步和數據驅動
- Netty 介紹
- 構成部分
- 關于本書
- 第一個 Netty 應用
- 設置開發環境
- Netty 客戶端/服務端 總覽
- 寫一個 echo 服務器
- 寫一個 echo 客戶端
- 編譯和運行 Echo 服務器和客戶端
- 總結
- Netty 總覽
- Netty 快速入門
- Channel, Event 和 I/O
- 什么是 Bootstrapping 為什么要用
- ChannelHandler 和 ChannelPipeline
- 近距離觀察 ChannelHandler
- 總結
- 核心功能
- Transport(傳輸)
- 案例研究:Transport 的遷移
- Transport API
- 包含的 Transport
- Transport 使用情況
- 總結
- Buffer(緩沖)
- Buffer API
- ByteBuf - 字節數據的容器
- 字節級別的操作
- ByteBufHolder
- ByteBuf 分配
- 總結
- ChannelHandler 和 ChannelPipeline
- ChannelHandler 家族
- ChannelPipeline
- ChannelHandlerContext
- 總結
- Codec 框架
- 什么是 Codec
- Decoder(解碼器)
- Encoder(編碼器)
- 抽象 Codec(編解碼器)類
- 總結
- 提供了的 ChannelHandler 和 Codec
- 使用 SSL/TLS 加密 Netty 程序
- 構建 Netty HTTP/HTTPS 應用
- 空閑連接以及超時
- 解碼分隔符和基于長度的協議
- 編寫大型數據
- 序列化數據
- 總結
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- 引導
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- NETTY BY EXAMPLE
- 單元測試
- 總覽
- 測試 ChannelHandler
- 測試異常處理
- 總結
- WebSocket
- WebSocket 程序示例
- 添加 WebSocket 支持
- 測試程序
- 總結
- SPDY
- SPDY 背景
- 示例程序
- 實現
- 啟動 SpdyServer 并測試
- 總結
- 通過 UDP 廣播事件
- UDP 基礎
- UDP 廣播
- UDP 示例
- EventLog 的 POJO
- 寫廣播器
- 寫監視器
- 運行 LogEventBroadcaster 和 LogEventMonitor
- 總結
- 高級主題
- 實現自定義的編解碼器
- 編解碼器的范圍
- 實現 Memcached 編解碼器
- 了解 Memcached 二進制協議
- Netty 編碼器和解碼器
- 測試編解碼器
- EventLoop 和線程模型
- 線程模型的總覽
- EventLoop
- EventLoop
- I/O EventLoop/Thread 分配細節
- 總結
- 用例1:Droplr Firebase 和 Urban Airship
- 用例2:Facebook 和 Twitter