WebSocket 使用一種被稱作“[Upgrade handshake](https://developer.mozilla.org/en-US/docs/HTTP/Protocol_upgrade_mechanism)(升級握手)”的機制將標準的 HTTP 或HTTPS 協議轉為 WebSocket。因此,使用 WebSocket 的應用程序將始終以 HTTP/S 開始,然后進行升級。這種升級發生在什么時候取決于具體的應用;可以在應用啟動的時候,或者當一個特定的 URL 被請求的時候。
在我們的應用中,僅當 URL 請求以“/ws”結束時,我們才升級協議為WebSocket。否則,服務器將使用基本的 HTTP/S。一旦連接升級,之后的數據傳輸都將使用 WebSocket 。
下面看下服務器的邏輯圖
Figure 11.2 Server logic
[](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2011.2%20Server%20logic.jpg)
#1客戶端/用戶連接到服務器并加入聊天
#2 HTTP 請求頁面或 WebSocket 升級握手
#3服務器處理所有客戶端/用戶
#4響應 URI “/”的請求,轉到 index.html
#5如果訪問的是 URI“/ws” ,處理 WebSocket 升級握手
#6升級握手完成后 ,通過 WebSocket 發送聊天消息
### [](https://github.com/waylau/essential-netty-in-action/blob/master/NETTY%20BY%20EXAMPLE/Adding%20WebSockets%20support.md#處理-http-請求)處理 HTTP 請求
本節我們將實現此應用中用于處理 HTTP 請求的組件,這個組件托管著可供客戶端訪問的聊天室頁面,并且顯示客戶端發送的消息。
下面就是這個 HttpRequestHandler 的代碼,它是一個用來處理 FullHttpRequest 消息的 ChannelInboundHandler 的實現類。注意看它是怎么實現忽略符合 "/ws" 格式的 URI 請求的。
Listing 11.1 HTTPRequestHandler
~~~
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //1
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
try {
String path = location.toURI() + "index.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate index.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.getUri())) {
ctx.fireChannelRead(request.retain()); //2
} else {
if (HttpHeaders.is100ContinueExpected(request)) {
send100Continue(ctx); //3
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) { //5
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response); //6
if (ctx.pipeline().get(SslHandler.class) == null) { //7
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //8
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE); //9
}
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
~~~
1.擴展 SimpleChannelInboundHandler 用于處理 FullHttpRequest信息
2.如果請求是一次升級了的 WebSocket 請求,則遞增引用計數器(retain)并且將它傳遞給在 ChannelPipeline 中的下個 ChannelInboundHandler
3.處理符合 HTTP 1.1的 "100 Continue" 請求
4.讀取 index.html
5.判斷 keepalive 是否在請求頭里面
6.寫 HttpResponse 到客戶端
7.寫 index.html 到客戶端,根據 ChannelPipeline 中是否有 SslHandler 來決定使用 DefaultFileRegion 還是 ChunkedNioFile
8.寫并刷新 LastHttpContent 到客戶端,標記響應完成
9.如果 請求頭中不包含 keepalive,當寫完成時,關閉 Channel
HttpRequestHandler 做了下面幾件事,
* 如果該 HTTP 請求被發送到URI “/ws”,則調用 FullHttpRequest 上的 retain(),并通過調用 fireChannelRead(msg) 轉發到下一個 ChannelInboundHandler。retain() 的調用是必要的,因為 channelRead() 完成后,它會調用 FullHttpRequest 上的 release() 來釋放其資源。 (請參考我們先前在第6章中關于 SimpleChannelInboundHandler 的討論)
* 如果客戶端發送的 HTTP 1.1 頭是“Expect: 100-continue” ,則發送“100 Continue”的響應。
* 在 頭被設置后,寫一個 HttpResponse 返回給客戶端。注意,這不是 FullHttpResponse,這只是響應的第一部分。另外,這里我們也不使用 writeAndFlush(), 這個是在留在最后完成。
* 如果傳輸過程既沒有要求加密也沒有要求壓縮,那么把 index.html 的內容存儲在一個 DefaultFileRegion 里就可以達到最好的效率。這將利用零拷貝來執行傳輸。出于這個原因,我們要檢查 ChannelPipeline 中是否有一個 SslHandler。如果是的話,我們就使用 ChunkedNioFile。
* 寫 LastHttpContent 來標記響應的結束,并終止它
* 如果不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 對象的最后寫入,并關閉連接。注意,這里我們調用 writeAndFlush() 來刷新所有以前寫的信息。
這里展示了應用程序的第一部分,用來處理純的 HTTP 請求和響應。接下來我們將處理 WebSocket 的 frame(幀),用來發送聊天消息。
*WebSocket frame*
*WebSockets 在“幀”里面來發送數據,其中每一個都代表了一個消息的一部分。一個完整的消息可以利用了多個幀。*
### [](https://github.com/waylau/essential-netty-in-action/blob/master/NETTY%20BY%20EXAMPLE/Adding%20WebSockets%20support.md#處理-websocket-frame)處理 WebSocket frame
WebSocket "Request for Comments" (RFC) 定義了六種不同的 frame; Netty 給他們每個都提供了一個 POJO 實現 ,見下表:
Table 11.1 WebSocketFrame types
| 名稱 | 描述 |
| --- | --- |
| BinaryWebSocketFrame | contains binary data |
| TextWebSocketFrame | contains text data |
| ContinuationWebSocketFrame | contains text or binary data that belongs to a previous BinaryWebSocketFrame or TextWebSocketFrame |
| CloseWebSocketFrame | represents a CLOSE request and contains close status code and a phrase |
| PingWebSocketFrame | requests the transmission of a PongWebSocketFrame |
| PongWebSocketFrame | sent as a response to a PingWebSocketFrame |
我們的程序只需要使用下面4個幀類型:
* CloseWebSocketFrame
* PingWebSocketFrame
* PongWebSocketFrame
* TextWebSocketFrame
在這里我們只需要處理 TextWebSocketFrame,其他的會由 WebSocketServerProtocolHandler 自動處理。
下面代碼展示了 ChannelInboundHandler 處理 TextWebSocketFrame,同時也將跟蹤在 ChannelGroup 中所有活動的 WebSocket 連接
Listing 11.2 Handles Text frames
~~~
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //1
private final ChannelGroup group;
public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //2
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().remove(HttpRequestHandler.class); //3
group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));//4
group.add(ctx.channel()); //5
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
group.writeAndFlush(msg.retain()); //6
}
}
~~~
1.擴展 SimpleChannelInboundHandler 用于處理 TextWebSocketFrame 信息
2.覆寫userEventTriggered() 方法來處理自定義事件
3.如果接收的事件表明握手成功,就從 ChannelPipeline 中刪除HttpRequestHandler ,因為接下來不會接受 HTTP 消息了
4.寫一條消息給所有的已連接 WebSocket 客戶端,通知它們建立了一個新的 Channel 連接
5.添加新連接的 WebSocket Channel 到 ChannelGroup 中,這樣它就能收到所有的信息
6.保留收到的消息,并通過 writeAndFlush() 傳遞給所有連接的客戶端。
上面顯示了 TextWebSocketFrameHandler 僅作了幾件事:
* 當WebSocket 與新客戶端已成功握手完成,通過寫入信息到 ChannelGroup 中的 Channel 來通知所有連接的客戶端,然后添加新 Channel 到 ChannelGroup
* 如果接收到 TextWebSocketFrame,調用 retain() ,并將其寫、刷新到 ChannelGroup,使所有連接的 WebSocket Channel 都能接收到它。和以前一樣,retain() 是必需的,因為當 channelRead0()返回時,TextWebSocketFrame 的引用計數將遞減。由于所有操作都是異步的,writeAndFlush() 可能會在以后完成,我們不希望它訪問無效的引用。
由于 Netty 在其內部處理了其余大部分功能,唯一剩下的需要我們去做的就是為每一個新創建的 Channel 初始化 ChannelPipeline 。要完成這個,我們需要一個ChannelInitializer
### [](https://github.com/waylau/essential-netty-in-action/blob/master/NETTY%20BY%20EXAMPLE/Adding%20WebSockets%20support.md#初始化-channelpipeline)初始化 ChannelPipeline
接下來,我們需要安裝我們上面實現的兩個 ChannelHandler 到 ChannelPipeline。為此,我們需要繼承 ChannelInitializer 并且實現 initChannel()。看下面 ChatServerInitializer 的代碼實現
Listing 11.3 Init the ChannelPipeline
~~~
public class ChatServerInitializer extends ChannelInitializer<Channel> { //1
private final ChannelGroup group;
public ChatServerInitializer(ChannelGroup group) {
this.group = group;
}
@Override
protected void initChannel(Channel ch) throws Exception { //2
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler(group));
}
}
~~~
1.擴展 ChannelInitializer
2.添加 ChannelHandler 到 ChannelPipeline
initChannel() 方法用于設置所有新注冊的 Channel 的ChannelPipeline,安裝所有需要的 ChannelHandler。總結如下:
Table 11.2 ChannelHandlers for the WebSockets Chat server
| ChannelHandler | 職責 |
| --- | --- |
| HttpServerCodec | Decode bytes to HttpRequest, HttpContent, LastHttpContent.Encode HttpRequest, HttpContent, LastHttpContent to bytes. |
| ChunkedWriteHandler | Write the contents of a file. |
| HttpObjectAggregator | This ChannelHandler aggregates an HttpMessage and its following HttpContents into a single FullHttpRequest or FullHttpResponse (depending on whether it is being used to handle requests or responses).With this installed the next ChannelHandler in the pipeline will receive only full HTTP requests. |
| HttpRequestHandler | Handle FullHttpRequests (those not sent to "/ws" URI). |
| WebSocketServerProtocolHandler | As required by the WebSockets specification, handle the WebSocket Upgrade handshake, PingWebSocketFrames,PongWebSocketFrames and CloseWebSocketFrames. |
| TextWebSocketFrameHandler | Handles TextWebSocketFrames and handshake completion events |
該 WebSocketServerProtocolHandler 處理所有規定的 WebSocket 幀類型和升級握手本身。如果握手成功所需的 ChannelHandler 被添加到管道,而那些不再需要的則被去除。管道升級之前的狀態如下圖。這代表了 ChannelPipeline 剛剛經過 ChatServerInitializer 初始化。
Figure 11.3 ChannelPipeline before WebSockets Upgrade
[](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2011.3%20ChannelPipeline%20before%20WebSockets%20Upgrade.jpg)
握手升級成功后 WebSocketServerProtocolHandler 替換HttpRequestDecoder 為 WebSocketFrameDecoder,HttpResponseEncoder 為WebSocketFrameEncoder。 為了最大化性能,WebSocket 連接不需要的 ChannelHandler 將會被移除。其中就包括了 HttpObjectAggregator 和 HttpRequestHandler
下圖,展示了 ChannelPipeline 經過這個操作完成后的情況。注意 Netty 目前支持四個版本 WebSocket 協議,每個通過其自身的方式實現類。選擇正確的版本WebSocketFrameDecoder 和 WebSocketFrameEncoder 是自動進行的,這取決于在客戶端(在這里指瀏覽器)的支持(在這個例子中,我們假設使用版本是 13 的 WebSocket 協議,從而圖中顯示的是 WebSocketFrameDecoder13 和 WebSocketFrameEncoder13)。
Figure 11.4 ChannelPipeline after WebSockets Upgrade
[](https://github.com/waylau/essential-netty-in-action/blob/master/images/Figure%2011.4%20ChannelPipeline%20after%20WebSockets%20Upgrade.jpg)
### [](https://github.com/waylau/essential-netty-in-action/blob/master/NETTY%20BY%20EXAMPLE/Adding%20WebSockets%20support.md#引導)引導
最后一步是 引導服務器,設置 ChannelInitializer
~~~
public class ChatServer {
private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);//1
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture start(InetSocketAddress address) {
ServerBootstrap bootstrap = new ServerBootstrap(); //2
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(createInitializer(channelGroup));
ChannelFuture future = bootstrap.bind(address);
future.syncUninterruptibly();
channel = future.channel();
return future;
}
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) { //3
return new ChatServerInitializer(group);
}
public void destroy() { //4
if (channel != null) {
channel.close();
}
channelGroup.close();
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception{
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
final ChatServer endpoint = new ChatServer();
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
~~~
1.創建 DefaultChannelGroup 用來 保存所有連接的的 WebSocket channel
2.引導 服務器
3.創建 ChannelInitializer
4.處理服務器關閉,包括釋放所有資源
- Introduction
- 開始
- Netty-異步和數據驅動
- Netty 介紹
- 構成部分
- 關于本書
- 第一個 Netty 應用
- 設置開發環境
- Netty 客戶端/服務端 總覽
- 寫一個 echo 服務器
- 寫一個 echo 客戶端
- 編譯和運行 Echo 服務器和客戶端
- 總結
- Netty 總覽
- Netty 快速入門
- Channel, Event 和 I/O
- 什么是 Bootstrapping 為什么要用
- ChannelHandler 和 ChannelPipeline
- 近距離觀察 ChannelHandler
- 總結
- 核心功能
- Transport(傳輸)
- 案例研究:Transport 的遷移
- Transport API
- 包含的 Transport
- Transport 使用情況
- 總結
- Buffer(緩沖)
- Buffer API
- ByteBuf - 字節數據的容器
- 字節級別的操作
- ByteBufHolder
- ByteBuf 分配
- 總結
- ChannelHandler 和 ChannelPipeline
- ChannelHandler 家族
- ChannelPipeline
- ChannelHandlerContext
- 總結
- Codec 框架
- 什么是 Codec
- Decoder(解碼器)
- Encoder(編碼器)
- 抽象 Codec(編解碼器)類
- 總結
- 提供了的 ChannelHandler 和 Codec
- 使用 SSL/TLS 加密 Netty 程序
- 構建 Netty HTTP/HTTPS 應用
- 空閑連接以及超時
- 解碼分隔符和基于長度的協議
- 編寫大型數據
- 序列化數據
- 總結
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- 引導
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- NETTY BY EXAMPLE
- 單元測試
- 總覽
- 測試 ChannelHandler
- 測試異常處理
- 總結
- WebSocket
- WebSocket 程序示例
- 添加 WebSocket 支持
- 測試程序
- 總結
- SPDY
- SPDY 背景
- 示例程序
- 實現
- 啟動 SpdyServer 并測試
- 總結
- 通過 UDP 廣播事件
- UDP 基礎
- UDP 廣播
- UDP 示例
- EventLog 的 POJO
- 寫廣播器
- 寫監視器
- 運行 LogEventBroadcaster 和 LogEventMonitor
- 總結
- 高級主題
- 實現自定義的編解碼器
- 編解碼器的范圍
- 實現 Memcached 編解碼器
- 了解 Memcached 二進制協議
- Netty 編碼器和解碼器
- 測試編解碼器
- EventLoop 和線程模型
- 線程模型的總覽
- EventLoop
- EventLoop
- I/O EventLoop/Thread 分配細節
- 總結
- 用例1:Droplr Firebase 和 Urban Airship
- 用例2:Facebook 和 Twitter