由于網絡的原因,如何有效的寫大數據在異步框架是一個特殊的問題。因為寫操作是非阻塞的,即便是在數據不能寫出時,只是通知 ChannelFuture 完成了。當這種情況發生時,你必須停止寫操作或面臨內存耗盡的風險。所以寫時,會產生大量的數據,我們需要做好準備來處理的這種情況下的緩慢的連接遠端導致延遲釋放內存的問題你。作為一個例子讓我們考慮寫一個文件的內容到網絡。
在我們的討論傳輸(見4.2節)時,我們提到了 NIO 的“zero-copy(零拷貝)”功能,消除移動一個文件的內容從文件系統到網絡堆棧的復制步驟。所有這一切發生在 Netty 的核心,因此所有所需的應用程序代碼是使用 interface FileRegion 的實現,在網狀的API文檔中定義如下為一個通過 Channel 支持 zero-copy 文件傳輸的文件區域。
下面演示了通過 zero-copy 將文件內容從 FileInputStream 創建 DefaultFileRegion 并寫入 使用 Channel
Listing 8.11 Transferring file contents with FileRegion
~~~
FileInputStream in = new FileInputStream(file); //1
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length()); //2
channel.writeAndFlush(region).addListener(new ChannelFutureListener() { //3
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause(); //4
// Do something
}
}
});
~~~
1. 獲取 FileInputStream
2. 創建一個心的 DefaultFileRegion 用于文件的完整長度
3. 發送 DefaultFileRegion 并且注冊一個 ChannelFutureListener
4. 處理發送失敗
只是看到的例子只適用于直接傳輸一個文件的內容,沒有執行的數據應用程序的處理。在相反的情況下,將數據從文件系統復制到用戶內存是必需的,您可以使用 ChunkedWriteHandler。這個類提供了支持異步寫大數據流不引起高內存消耗。
這個關鍵是 interface ChunkedInput,實現如下:
| 名稱 | 描述 |
| --- | --- |
| ChunkedFile | 當你使用平臺不支持 zero-copy 或者你需要轉換數據,從文件中一塊一塊的獲取數據 |
| ChunkedNioFile | 與 ChunkedFile 類似,處理使用了NIOFileChannel |
| ChunkedStream | 從 InputStream 中一塊一塊的轉移內容 |
| ChunkedNioStream | 從 ReadableByteChannel 中一塊一塊的轉移內容 |
清單 8.12 演示了使用 ChunkedStream,實現在實踐中最常用。 所示的類被實例化一個 File 和一個 SslContext。當 initChannel() 被調用來初始化顯示的處理程序鏈的通道。
當通道激活時,WriteStreamHandler 從文件一塊一塊的寫入數據作為ChunkedStream。最后將數據通過 SslHandler 加密后傳播。
Listing 8.12 Transfer file content with FileRegion
~~~
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslCtx;
public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
this.file = file;
this.sslCtx = sslCtx;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SslHandler(sslCtx.createEngine()); //1
pipeline.addLast(new ChunkedWriteHandler());//2
pipeline.addLast(new WriteStreamHandler());//3
}
public final class WriteStreamHandler extends ChannelInboundHandlerAdapter { //4
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
~~~
1. 添加 SslHandler 到 ChannelPipeline.
2. 添加 ChunkedWriteHandler 用來處理作為 ChunkedInput 傳進的數據
3. 當連接建立時,WriteStreamHandler 開始寫文件的內容
4. 當連接建立時,channelActive() 觸發使用 ChunkedInput 來寫文件的內容 (插圖顯示了 FileInputStream;也可以使用任何 InputStream )
*ChunkedInput*?*所有被要求使用自己的 ChunkedInput 實現,是安裝ChunkedWriteHandler 在管道中*
在本節中,我們討論
* 如何采用zero-copy(零拷貝)功能高效地傳輸文件
* 如何使用 ChunkedWriteHandler 編寫大型數據而避免 OutOfMemoryErrors 錯誤。
在下一節中我們將研究幾種不同方法來序列化 POJO。
- Introduction
- 開始
- Netty-異步和數據驅動
- Netty 介紹
- 構成部分
- 關于本書
- 第一個 Netty 應用
- 設置開發環境
- Netty 客戶端/服務端 總覽
- 寫一個 echo 服務器
- 寫一個 echo 客戶端
- 編譯和運行 Echo 服務器和客戶端
- 總結
- Netty 總覽
- Netty 快速入門
- Channel, Event 和 I/O
- 什么是 Bootstrapping 為什么要用
- ChannelHandler 和 ChannelPipeline
- 近距離觀察 ChannelHandler
- 總結
- 核心功能
- Transport(傳輸)
- 案例研究:Transport 的遷移
- Transport API
- 包含的 Transport
- Transport 使用情況
- 總結
- Buffer(緩沖)
- Buffer API
- ByteBuf - 字節數據的容器
- 字節級別的操作
- ByteBufHolder
- ByteBuf 分配
- 總結
- ChannelHandler 和 ChannelPipeline
- ChannelHandler 家族
- ChannelPipeline
- ChannelHandlerContext
- 總結
- Codec 框架
- 什么是 Codec
- Decoder(解碼器)
- Encoder(編碼器)
- 抽象 Codec(編解碼器)類
- 總結
- 提供了的 ChannelHandler 和 Codec
- 使用 SSL/TLS 加密 Netty 程序
- 構建 Netty HTTP/HTTPS 應用
- 空閑連接以及超時
- 解碼分隔符和基于長度的協議
- 編寫大型數據
- 序列化數據
- 總結
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- 引導
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- NETTY BY EXAMPLE
- 單元測試
- 總覽
- 測試 ChannelHandler
- 測試異常處理
- 總結
- WebSocket
- WebSocket 程序示例
- 添加 WebSocket 支持
- 測試程序
- 總結
- SPDY
- SPDY 背景
- 示例程序
- 實現
- 啟動 SpdyServer 并測試
- 總結
- 通過 UDP 廣播事件
- UDP 基礎
- UDP 廣播
- UDP 示例
- EventLog 的 POJO
- 寫廣播器
- 寫監視器
- 運行 LogEventBroadcaster 和 LogEventMonitor
- 總結
- 高級主題
- 實現自定義的編解碼器
- 編解碼器的范圍
- 實現 Memcached 編解碼器
- 了解 Memcached 二進制協議
- Netty 編碼器和解碼器
- 測試編解碼器
- EventLoop 和線程模型
- 線程模型的總覽
- EventLoop
- EventLoop
- I/O EventLoop/Thread 分配細節
- 總結
- 用例1:Droplr Firebase 和 Urban Airship
- 用例2:Facebook 和 Twitter