Netty 的是一個復雜和先進的框架,但它不虛幻。當我們請求設置一些 key 為給定值,我們現在知道,Request 類的一個實例被創建來代表這個請求。但 Netty 并不知道 Request 對象是如何轉成 Memcached 所期望的。Memcached 所期望的是字節序列;忽略使用的協議,數據在網絡上傳輸永遠是字節序列。
將 Request 對象轉為 Memcached 所需的字節序列,Netty 需要用 MemcachedRequest 來編碼成另外一種格式。
Netty 提供了一個抽象類稱為MessageToByteEncoder。它提供了一個抽象方法,將一條消息(在本例中我們 MemcachedRequest 對象)轉為字節。你顯示什么信息實現通過使用 Java 泛型可以處理;例如 , MessageToByteEncoder 說這個編碼器要編碼的對象類型是 MemcachedRequest
*MessageToByteEncoder 和 Java 泛型*
*使用 MessageToByteEncoder 可以綁定特定的參數類型。如果你有多個不同的消息類型,在相同的編碼器里,也可以使用MessageToByteEncoder,注意檢查消息示例的類型即可*
*
這也適用于解碼器,除了解碼器將一系列字節轉換回一個對象。 這個 Netty 的提供了 ByteToMessageDecoder 類,而不是提供一個編碼方法用來實現解碼。在接下來的兩個部分你看看如何實現一個 Memcached 解碼器和編碼器。在你做之前,然而,它的重要的意識到在使用 Netty 時,你不總是需要提供自己的編碼器和解碼器。只是現在因為沒有 Netty 這樣對 Memcached 的內置支持。
*編碼器和解碼器*
*記住,編碼器處理出站和譯碼器處理入站。這基本上意味著編碼器將編碼數據,寫入遠端。譯碼器將從遠端讀取處理數據。重要的是要記住,出站和入站是兩個不同的方向。*
請注意,我們的編碼器和譯碼器不檢查任何值最大大小保持實現簡單。在實際實現中你最有可能想放入一些驗證檢查,使用 EncoderException 或DecoderException(或一個子類)如果檢測到違反協議。
### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Netty%20encoders%20and%20decoders.md#實現-memcached-編碼器)實現 Memcached 編碼器
本節我們將簡要介紹編碼器的實現。正如我們提到的,編碼器負責編碼消息為一系列字節。這些字節可以通過網絡發送到遠端。為了發送請求,我們首先創建 MemcachedRequest 類,稍后編碼器實現會編碼為一系列字節。下面的清單顯示了我們的 MemcachedRequest 類
Listing 14.1 Implementation of a Memcached request
~~~
public class MemcachedRequest { //1
private static final Random rand = new Random();
private final int magic = 0x80;//fixed so hard coded
private final byte opCode; //the operation e.g. set or get
private final String key; //the key to delete, get or set
private final int flags = 0xdeadbeef; //random
private final int expires; //0 = item never expires
private final String body; //if opCode is set, the value
private final int id = rand.nextInt(); //Opaque
private final long cas = 0; //data version check...not used
private final boolean hasExtras; //not all ops have extras
public MemcachedRequest(byte opcode, String key, String value) {
this.opCode = opcode;
this.key = key;
this.body = value == null ? "" : value;
this.expires = 0;
//only set command has extras in our example
hasExtras = opcode == Opcode.SET;
}
public MemcachedRequest(byte opCode, String key) {
this(opCode, key, null);
}
public int magic() { //2
return magic;
}
public int opCode() { //3
return opCode;
}
public String key() { //4
return key;
}
public int flags() { //5
return flags;
}
public int expires() { //6
return expires;
}
public String body() { //7
return body;
}
public int id() { //8
return id;
}
public long cas() { //9
return cas;
}
public boolean hasExtras() { //10
return hasExtras;
}
}
~~~
1. 這個類將會發送請求到 Memcached server
2. 幻數,它可以用來標記文件或者協議的格式
3. opCode,反應了響應的操作已經創建了
4. 執行操作的 key
5. 使用的額外的 flag
6. 表明到期時間
7. body
8. 請求的 id。這個id將在響應中回顯。
9. compare-and-check 的值
10. 如果有額外的使用,將返回 true
你如果想實現 Memcached 的其余部分協議,你只需要將 client.op*(op * 任何新的操作添加)轉換為其中一個方法請求。我們需要兩個更多的支持類,在下一個清單所示
Listing 14.2 Possible Memcached operation codes and response statuses
~~~
public class Status {
public static final short NO_ERROR = 0x0000;
public static final short KEY_NOT_FOUND = 0x0001;
public static final short KEY_EXISTS = 0x0002;
public static final short VALUE_TOO_LARGE = 0x0003;
public static final short INVALID_ARGUMENTS = 0x0004;
public static final short ITEM_NOT_STORED = 0x0005;
public static final short INC_DEC_NON_NUM_VAL = 0x0006;
}
public class Opcode {
public static final byte GET = 0x00;
public static final byte SET = 0x01;
public static final byte DELETE = 0x04;
}
~~~
一個 Opcode 告訴 Memcached 要執行哪些操作。每個操作都由一個字節表示。同樣的,當 Memcached 響應一個請求,響應頭中包含兩個字節代表響應狀態。狀態和 Opcode 類表示這些 Memcached 的構造。這些操作碼可以使用當你構建一個新的 MemcachedRequest 指定哪個行動應該由它引發的。
但現在可以集中精力在編碼器上:
Listing 14.3 MemcachedRequestEncoder implementation
~~~
public class MemcachedRequestEncoder extends
MessageToByteEncoder<MemcachedRequest> { //1
@Override
protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg,
ByteBuf out) throws Exception { //2
byte[] key = msg.key().getBytes(CharsetUtil.UTF_8);
byte[] body = msg.body().getBytes(CharsetUtil.UTF_8);
//total size of the body = key size + content size + extras size //3
int bodySize = key.length + body.length + (msg.hasExtras() ? 8 : 0);
//write magic byte //4
out.writeByte(msg.magic());
//write opcode byte //5
out.writeByte(msg.opCode());
//write key length (2 byte) //6
out.writeShort(key.length); //key length is max 2 bytes i.e. a Java short //7
//write extras length (1 byte)
int extraSize = msg.hasExtras() ? 0x08 : 0x0;
out.writeByte(extraSize);
//byte is the data type, not currently implemented in Memcached but required //8
out.writeByte(0);
//next two bytes are reserved, not currently implemented but are required //9
out.writeShort(0);
//write total body length ( 4 bytes - 32 bit int) //10
out.writeInt(bodySize);
//write opaque ( 4 bytes) - a 32 bit int that is returned in the response //11
out.writeInt(msg.id());
//write CAS ( 8 bytes)
out.writeLong(msg.cas()); //24 byte header finishes with the CAS //12
if (msg.hasExtras()) {
//write extras (flags and expiry, 4 bytes each) - 8 bytes total //13
out.writeInt(msg.flags());
out.writeInt(msg.expires());
}
//write key //14
out.writeBytes(key);
//write value //15
out.writeBytes(body);
}
}
~~~
1. 該類是負責編碼 MemachedRequest 為一系列字節
2. 轉換的 key 和實際請求的 body 到字節數組
3. 計算 body 大小
4. 寫幻數到 ByteBuf 字節
5. 寫 opCode 作為字節
6. 寫 key 長度z作為 short
7. 編寫額外的長度作為字節
8. 寫數據類型,這總是0,因為目前不是在 Memcached,但可用于使用 后來的版本
9. 為保留字節寫為 short ,后面的 Memcached 版本可能使用
10. 寫 body 的大小作為 long
11. 寫 opaque 作為 int
12. 寫 cas 作為 long。這個是頭文件的最后部分,在 body 的開始
13. 編寫額外的 flag 和到期時間為 int
14. 寫 key
15. 這個請求完成后 寫 body。
總結,編碼器 使用 Netty 的 ByteBuf 處理請求,編碼 MemcachedRequest 成一套正確排序的字節。詳細步驟為:
* 寫幻數字節。
* 寫 opcode 字節。
* 寫 key 長度(2字節)。
* 寫額外的長度(1字節)。
* 寫數據類型(1字節)。
* 為保留字節寫 null 字節(2字節)。
* 寫 body 長度(4字節- 32位整數)。
* 寫 opaque(4個字節,一個32位整數在響應中返回)。
* 寫 CAS(8個字節)。
* 寫 額外的(flag 和 到期,4字節)= 8個字節
* 寫 key
* 寫 值
無論你放入什么到輸出緩沖區( 調用 ByteBuf) Netty 的將向服務器發送被寫入請求。下一節將展示如何進行反向通過解碼器工作。
### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Netty%20encoders%20and%20decoders.md#實現-memcached-編碼器-1)實現 Memcached 編碼器
將 MemcachedRequest 對象轉為 字節序列,Memcached 僅需將字節轉到響應對象返回即可。
先見一個 POJO:
Listing 14.7 Implementation of a MemcachedResponse
~~~
public class MemcachedResponse { //1
private final byte magic;
private final byte opCode;
private byte dataType;
private final short status;
private final int id;
private final long cas;
private final int flags;
private final int expires;
private final String key;
private final String data;
public MemcachedResponse(byte magic, byte opCode,
byte dataType, short status,
int id, long cas,
int flags, int expires, String key, String data) {
this.magic = magic;
this.opCode = opCode;
this.dataType = dataType;
this.status = status;
this.id = id;
this.cas = cas;
this.flags = flags;
this.expires = expires;
this.key = key;
this.data = data;
}
public byte magic() { //2
return magic;
}
public byte opCode() { //3
return opCode;
}
public byte dataType() { //4
return dataType;
}
public short status() { //5
return status;
}
public int id() { //6
return id;
}
public long cas() { //7
return cas;
}
public int flags() { //8
return flags;
}
public int expires() { //9
return expires;
}
public String key() { //10
return key;
}
public String data() { //11
return data;
}
}
~~~
1. 該類,代表從 Memcached 服務器返回的響應
2. 幻數
3. opCode,這反映了創建操作的響應
4. 數據類型,這表明這個是基于二進制還是文本
5. 響應的狀態,這表明如果請求是成功的
6. 惟一的 id
7. compare-and-set 值
8. 使用額外的 flag
9. 表示該值存儲的一個有效期
10. 響應創建的 key
11. 實際數據
下面為 MemcachedResponseDecoder, 使用了 ByteToMessageDecoder 基類,用于將 字節序列轉為 MemcachedResponse
Listing 14.4 MemcachedResponseDecoder class
~~~
public class MemcachedResponseDecoder extends ByteToMessageDecoder { //1
private enum State { //2
Header,
Body
}
private State state = State.Header;
private int totalBodySize;
private byte magic;
private byte opCode;
private short keyLength;
private byte extraLength;
private short status;
private int id;
private long cas;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) {
switch (state) { //3
case Header:
if (in.readableBytes() < 24) {
return;//response header is 24 bytes //4
}
magic = in.readByte(); //5
opCode = in.readByte();
keyLength = in.readShort();
extraLength = in.readByte();
in.skipBytes(1);
status = in.readShort();
totalBodySize = in.readInt();
id = in.readInt(); //referred to in the protocol spec as opaque
cas = in.readLong();
state = State.Body;
case Body:
if (in.readableBytes() < totalBodySize) {
return; //until we have the entire payload return //6
}
int flags = 0, expires = 0;
int actualBodySize = totalBodySize;
if (extraLength > 0) { //7
flags = in.readInt();
actualBodySize -= 4;
}
if (extraLength > 4) { //8
expires = in.readInt();
actualBodySize -= 4;
}
String key = "";
if (keyLength > 0) { //9
ByteBuf keyBytes = in.readBytes(keyLength);
key = keyBytes.toString(CharsetUtil.UTF_8);
actualBodySize -= keyLength;
}
ByteBuf body = in.readBytes(actualBodySize); //10
String data = body.toString(CharsetUtil.UTF_8);
out.add(new MemcachedResponse( //1
magic,
opCode,
status,
id,
cas,
flags,
expires,
key,
data
));
state = State.Header;
}
}
}
~~~
1. 類負責創建的 MemcachedResponse 讀取字節
2. 代表當前解析狀態,這意味著我們需要解析的頭或 body
3. 根據解析狀態切換
4. 如果不是至少24個字節是可讀的,它不可能讀整個頭部,所以返回這里,等待再通知一次數據準備閱讀
5. 閱讀所有頭的字段
6. 檢查是否足夠的數據是可讀用來讀取完整的響應的 body。長度是從頭讀取
7. 檢查如果有任何額外的 flag 用于讀,如果是這樣做
8. 檢查如果響應包含一個 expire 字段,有就讀它
9. 檢查響應是否包含一個 key ,有就讀它
10. 讀實際的 body 的 payload
11. 從前面讀取字段和數據構造一個新的 MemachedResponse
所以在實現發生了什么事?我們知道一個 Memcached 響應有24位頭;我們不知道是否所有數據,響應將被包含在輸入 ByteBuf ,當解碼方法調用時。這是因為底層網絡堆棧可能將數據分解成塊。所以確保我們只解碼當我們有足夠的數據,這段代碼檢查是否可用可讀的字節的數量至少是24。一旦我們有24個字節,我們可以確定整個消息有多大,因為這個信息包含在24位頭。
當我們解碼整個消息,我們創建一個 MemcachedResponse 并將其添加到輸出列表。任何對象添加到該列表將被轉發到下一個ChannelInboundHandler 在 ChannelPipeline,因此允許處理。
*
- Introduction
- 開始
- Netty-異步和數據驅動
- Netty 介紹
- 構成部分
- 關于本書
- 第一個 Netty 應用
- 設置開發環境
- Netty 客戶端/服務端 總覽
- 寫一個 echo 服務器
- 寫一個 echo 客戶端
- 編譯和運行 Echo 服務器和客戶端
- 總結
- Netty 總覽
- Netty 快速入門
- Channel, Event 和 I/O
- 什么是 Bootstrapping 為什么要用
- ChannelHandler 和 ChannelPipeline
- 近距離觀察 ChannelHandler
- 總結
- 核心功能
- Transport(傳輸)
- 案例研究:Transport 的遷移
- Transport API
- 包含的 Transport
- Transport 使用情況
- 總結
- Buffer(緩沖)
- Buffer API
- ByteBuf - 字節數據的容器
- 字節級別的操作
- ByteBufHolder
- ByteBuf 分配
- 總結
- ChannelHandler 和 ChannelPipeline
- ChannelHandler 家族
- ChannelPipeline
- ChannelHandlerContext
- 總結
- Codec 框架
- 什么是 Codec
- Decoder(解碼器)
- Encoder(編碼器)
- 抽象 Codec(編解碼器)類
- 總結
- 提供了的 ChannelHandler 和 Codec
- 使用 SSL/TLS 加密 Netty 程序
- 構建 Netty HTTP/HTTPS 應用
- 空閑連接以及超時
- 解碼分隔符和基于長度的協議
- 編寫大型數據
- 序列化數據
- 總結
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- 引導
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- NETTY BY EXAMPLE
- 單元測試
- 總覽
- 測試 ChannelHandler
- 測試異常處理
- 總結
- WebSocket
- WebSocket 程序示例
- 添加 WebSocket 支持
- 測試程序
- 總結
- SPDY
- SPDY 背景
- 示例程序
- 實現
- 啟動 SpdyServer 并測試
- 總結
- 通過 UDP 廣播事件
- UDP 基礎
- UDP 廣播
- UDP 示例
- EventLog 的 POJO
- 寫廣播器
- 寫監視器
- 運行 LogEventBroadcaster 和 LogEventMonitor
- 總結
- 高級主題
- 實現自定義的編解碼器
- 編解碼器的范圍
- 實現 Memcached 編解碼器
- 了解 Memcached 二進制協議
- Netty 編碼器和解碼器
- 測試編解碼器
- EventLoop 和線程模型
- 線程模型的總覽
- EventLoop
- EventLoop
- I/O EventLoop/Thread 分配細節
- 總結
- 用例1:Droplr Firebase 和 Urban Airship
- 用例2:Facebook 和 Twitter