為了讓你想象 Transport 如何工作,我會從一個簡單的應用程序開始,這個應用程序什么都不做,只是接受客戶端連接并發送“Hi!”字符串消息到客戶端,發送完了就斷開連接。
### [](https://github.com/waylau/essential-netty-in-action/blob/master/CORE%20FUNCTIONS/Case%20study%20transport%20migration.md#沒有用-netty-實現-io-和-nio)沒有用 Netty 實現 I/O 和 NIO
我們將不用 Netty 實現只用 JDK API 來實現 I/O 和 NIO。下面這個例子,是使用阻塞 IO 實現的例子:
Listing 4.1 Blocking networking without Netty
~~~
public class PlainOioServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port); //1
try {
for (;;) {
final Socket clientSocket = socket.accept(); //2
System.out.println("Accepted connection from " + clientSocket);
new Thread(new Runnable() { //3
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //4
out.flush();
clientSocket.close(); //5
} catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
} catch (IOException ex) {
// ignore on close
}
}
}
}).start(); //6
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
~~~
1.綁定服務器到指定的端口。
2.接受一個連接。
3.創建一個新的線程來處理連接。
4.將消息發送到連接的客戶端。
5.一旦消息被寫入和刷新時就 關閉連接。
6.啟動線程。
上面的方式可以工作正常,但是這種阻塞模式在大連接數的情況就會有很嚴重的問題,如客戶端連接超時,服務器響應嚴重延遲,性能無法擴展。為了解決這種情況,我們可以使用異步網絡處理所有的并發連接,但問題在于 NIO 和 OIO 的 API 是完全不同的,所以一個用OIO開發的網絡應用程序想要使用NIO重構代碼幾乎是重新開發。
下面代碼是使用 NIO 實現的例子:
Listing 4.2 Asynchronous networking without Netty
~~~
public class PlainNioServer {
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address); //1
Selector selector = Selector.open(); //2
serverChannel.register(selector, SelectionKey.OP_ACCEPT); //3
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (;;) {
try {
selector.select(); //4
} catch (IOException ex) {
ex.printStackTrace();
// handle exception
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys(); //5
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) { //6
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_WRITE |
SelectionKey.OP_READ, msg.duplicate()); //7
System.out.println(
"Accepted connection from " + client);
}
if (key.isWritable()) { //8
SocketChannel client =
(SocketChannel)key.channel();
ByteBuffer buffer =
(ByteBuffer)key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) { //9
break;
}
}
client.close(); //10
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
// 在關閉時忽略
}
}
}
}
}
}
~~~
1.綁定服務器到制定端口
2.打開 selector 處理 channel
3.注冊 ServerSocket 到 ServerSocket ,并指定這是專門意接受 連接。
4.等待新的事件來處理。這將阻塞,直到一個事件是傳入。
5.從收到的所有事件中 獲取 SelectionKey 實例。
6.檢查該事件是一個新的連接準備好接受。
7.接受客戶端,并用 selector 進行注冊。
8.檢查 socket 是否準備好寫數據。
9.將數據寫入到所連接的客戶端。如果網絡飽和,連接是可寫的,那么這個循環將寫入數據,直到該緩沖區是空的。
10.關閉連接。
如你所見,即使它們實現的功能是一樣,但是代碼完全不同。下面我們將用Netty 來實現相同的功能。
### [](https://github.com/waylau/essential-netty-in-action/blob/master/CORE%20FUNCTIONS/Case%20study%20transport%20migration.md#采用-netty-實現-io-和-nio)采用 Netty 實現 I/O 和 NIO
下面代碼是使用Netty作為網絡框架編寫的一個阻塞 IO 例子:
Listing 4.3 Blocking networking with Netty
~~~
public class NettyOioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); //1
b.group(group) //2
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {//3
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //4
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
}
});
}
});
ChannelFuture f = b.bind().sync(); //6
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync(); //7
}
}
}
~~~
1.創建一個 ServerBootstrap
2.使用 OioEventLoopGroup 允許阻塞模式(Old-IO)
3.指定 ChannelInitializer 將給每個接受的連接調用
4.添加的 ChannelHandler 攔截事件,并允許他們作出反應
5.寫信息到客戶端,并添加 ChannelFutureListener 當一旦消息寫入就關閉連接
6.綁定服務器來接受連接
7.釋放所有資源
下面代碼是使用 Netty NIO 實現。
### [](https://github.com/waylau/essential-netty-in-action/blob/master/CORE%20FUNCTIONS/Case%20study%20transport%20migration.md#netty-nio-版本)Netty NIO 版本
下面是 Netty NIO 的代碼,只是改變了一行代碼,就從 OIO 傳輸 切換到了 NIO。
Listing 4.4 Asynchronous networking with Netty
~~~
public class NettyNioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); //1
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() { //3
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //4
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate()) //5
.addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync(); //6
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync(); //7
}
}
}
~~~
1.創建一個 ServerBootstrap
2.使用 OioEventLoopGroup 允許阻塞模式(Old-IO)
3.指定 ChannelInitializer 將給每個接受的連接調用
4.添加的 ChannelInboundHandlerAdapter() 接收事件并進行處理
5.寫信息到客戶端,并添加 ChannelFutureListener 當一旦消息寫入就關閉連接
6.綁定服務器來接受連接
7.釋放所有資源
因為 Netty 使用相同的 API 來實現每個傳輸,它并不關心你使用什么來實現。Netty 通過操作接口Channel 、ChannelPipeline 和 ChannelHandler來實現。
現在你了解到了用 基于 Netty 傳輸的好處。下面就來看下傳輸的 API.
- Introduction
- 開始
- Netty-異步和數據驅動
- Netty 介紹
- 構成部分
- 關于本書
- 第一個 Netty 應用
- 設置開發環境
- Netty 客戶端/服務端 總覽
- 寫一個 echo 服務器
- 寫一個 echo 客戶端
- 編譯和運行 Echo 服務器和客戶端
- 總結
- Netty 總覽
- Netty 快速入門
- Channel, Event 和 I/O
- 什么是 Bootstrapping 為什么要用
- ChannelHandler 和 ChannelPipeline
- 近距離觀察 ChannelHandler
- 總結
- 核心功能
- Transport(傳輸)
- 案例研究:Transport 的遷移
- Transport API
- 包含的 Transport
- Transport 使用情況
- 總結
- Buffer(緩沖)
- Buffer API
- ByteBuf - 字節數據的容器
- 字節級別的操作
- ByteBufHolder
- ByteBuf 分配
- 總結
- ChannelHandler 和 ChannelPipeline
- ChannelHandler 家族
- ChannelPipeline
- ChannelHandlerContext
- 總結
- Codec 框架
- 什么是 Codec
- Decoder(解碼器)
- Encoder(編碼器)
- 抽象 Codec(編解碼器)類
- 總結
- 提供了的 ChannelHandler 和 Codec
- 使用 SSL/TLS 加密 Netty 程序
- 構建 Netty HTTP/HTTPS 應用
- 空閑連接以及超時
- 解碼分隔符和基于長度的協議
- 編寫大型數據
- 序列化數據
- 總結
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- 引導
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- NETTY BY EXAMPLE
- 單元測試
- 總覽
- 測試 ChannelHandler
- 測試異常處理
- 總結
- WebSocket
- WebSocket 程序示例
- 添加 WebSocket 支持
- 測試程序
- 總結
- SPDY
- SPDY 背景
- 示例程序
- 實現
- 啟動 SpdyServer 并測試
- 總結
- 通過 UDP 廣播事件
- UDP 基礎
- UDP 廣播
- UDP 示例
- EventLog 的 POJO
- 寫廣播器
- 寫監視器
- 運行 LogEventBroadcaster 和 LogEventMonitor
- 總結
- 高級主題
- 實現自定義的編解碼器
- 編解碼器的范圍
- 實現 Memcached 編解碼器
- 了解 Memcached 二進制協議
- Netty 編碼器和解碼器
- 測試編解碼器
- EventLoop 和線程模型
- 線程模型的總覽
- EventLoop
- EventLoop
- I/O EventLoop/Thread 分配細節
- 總結
- 用例1:Droplr Firebase 和 Urban Airship
- 用例2:Facebook 和 Twitter