協議和編解碼是一個網絡應用程序的核心問題之一,客戶端和服務器通過約定的協議來傳輸消息(數據),通過特定的格式來編解碼字節流,并轉化成業務消息,提供給上層框架調用。
Thrift的協議比較簡單,它把協議和編解碼整合在了一起。抽象類TProtocol定義了協議和編解碼的頂層接口。個人感覺采用抽象類而不是接口的方式來定義頂層接口并不好,TProtocol關聯了一個TTransport傳輸對象,而不是提供一個類似getTransport()的接口,導致抽象類的擴展性比接口差。
TProtocol主要做了兩個事情:
1. 關聯TTransport對象
2.定義一系列讀寫消息的編解碼接口,包括兩類,一類是復雜數據結構比如readMessageBegin, readMessageEnd,? writeMessageBegin, writMessageEnd.還有一類是基本數據結構,比如readI32, writeI32, readString, writeString
~~~
public abstract class TProtocol {
/**
* Transport
*/
protected TTransport trans_;
public abstract void writeMessageBegin(TMessage message) throws TException;
public abstract void writeMessageEnd() throws TException;
public abstract void writeStructBegin(TStruct struct) throws TException;
public abstract void writeStructEnd() throws TException;
public abstract void writeFieldBegin(TField field) throws TException;
public abstract void writeFieldEnd() throws TException;
public abstract void writeFieldStop() throws TException;
public abstract void writeMapBegin(TMap map) throws TException;
public abstract void writeMapEnd() throws TException;
public abstract void writeListBegin(TList list) throws TException;
public abstract void writeListEnd() throws TException;
public abstract void writeSetBegin(TSet set) throws TException;
public abstract void writeSetEnd() throws TException;
public abstract void writeBool(boolean b) throws TException;
public abstract void writeByte(byte b) throws TException;
public abstract void writeI16(short i16) throws TException;
public abstract void writeI32(int i32) throws TException;
public abstract void writeI64(long i64) throws TException;
public abstract void writeDouble(double dub) throws TException;
public abstract void writeString(String str) throws TException;
public abstract void writeBinary(ByteBuffer buf) throws TException;
/**
* Reading methods.
*/
public abstract TMessage readMessageBegin() throws TException;
public abstract void readMessageEnd() throws TException;
public abstract TStruct readStructBegin() throws TException;
public abstract void readStructEnd() throws TException;
public abstract TField readFieldBegin() throws TException;
public abstract void readFieldEnd() throws TException;
public abstract TMap readMapBegin() throws TException;
public abstract void readMapEnd() throws TException;
public abstract TList readListBegin() throws TException;
public abstract void readListEnd() throws TException;
public abstract TSet readSetBegin() throws TException;
public abstract void readSetEnd() throws TException;
public abstract boolean readBool() throws TException;
public abstract byte readByte() throws TException;
public abstract short readI16() throws TException;
public abstract int readI32() throws TException;
public abstract long readI64() throws TException;
public abstract double readDouble() throws TException;
public abstract String readString() throws TException;
public abstract ByteBuffer readBinary() throws TException;
/**
* Reset any internal state back to a blank slate. This method only needs to
* be implemented for stateful protocols.
*/
public void reset() {}
/**
* Scheme accessor
*/
public Class<? extends IScheme> getScheme() {
?? return StandardScheme.class;
}
}
~~~
所謂協議就是客戶端和服務器端約定傳輸什么數據,如何解析傳輸的數據。對于一個RPC調用的協議來說,要傳輸的數據主要有:
調用方
1. 方法的名稱,包括類的名稱和方法的名稱
2. 方法的參數,包括類型和參數值
3.一些附加的數據,比如附件,超時事件,自定義的控制信息等等
返回方
1. 調用的返回碼
2. 返回值
3.異常信息
從TProtocol的定義我們可以看出Thrift的協議約定如下事情:
1. 先writeMessageBegin表示開始傳輸消息了,寫消息頭。Message里面定義了方法名,調用的類型,版本號,消息seqId
2.接下來是寫方法的參數,實際就是寫消息體。如果參數是一個類,就writeStructBegin
3. 接下來寫字段,writeFieldBegin, 這個方法會寫接下來的字段的數據類型和順序號。這個順序號是Thrfit對要傳輸的字段的一個編碼,從1開始
4. 如果是一個集合就writeListBegin/writeMapBegin,如果是一個基本數據類型,比如int, 就直接writeI32
5. 每個復雜數據類型寫完都調用writeXXXEnd,直到writeMessageEnd結束
6. 讀消息時根據數據類型讀取相應的長度
每個writeXXX都是采用消息頭+消息體的方式。我們來看TBinaryProtocol的實現。
1.writeMessgeBegin方法寫了消息頭,包括4字節的版本號和類型信息,字符串類型的方法名,4字節的序列號seqId
2. writeFieldBegin,寫了1個字節的字段數據類型,和2個字節字段的順序號
3. writeI32,寫了4個字節的字節數組
4. writeString,先寫4字節消息頭表示字符串長度,再寫字符串字節
5. writeBinary,先寫4字節消息頭表示字節數組長度,再寫字節數組內容
6.readMessageBegin時,先讀4字節版本和類型信息,再讀字符串,再讀4字節序列號
7.readFieldBegin,先讀1個字節的字段數據類型,再讀2個字節的字段順序號
8. readString時,先讀4字節字符串長度,再讀字符串內容。**字符串統一采用UTF-8編碼**
~~~
public void writeMessageBegin(TMessage message) throws TException {
if (strictWrite_) {
int version = VERSION_1 | message.type;
writeI32(version);
writeString(message.name);
writeI32(message.seqid);
} else {
writeString(message.name);
writeByte(message.type);
writeI32(message.seqid);
}
}
public void writeFieldBegin(TField field) throws TException {
?? writeByte(field.type);
?? writeI16(field.id);
}
private byte[] i32out = new byte[4];
public void writeI32(int i32) throws TException {
?? i32out[0] = (byte)(0xff & (i32 >> 24));
?? i32out[1] = (byte)(0xff & (i32 >> 16));
?? i32out[2] = (byte)(0xff & (i32 >> 8));
?? i32out[3] = (byte)(0xff & (i32));
?? trans_.write(i32out, 0, 4);
}
public void writeString(String str) throws TException {
?? try {
???? byte[] dat = str.getBytes("UTF-8");
???? writeI32(dat.length);
???? trans_.write(dat, 0, dat.length);
?? } catch (UnsupportedEncodingException uex) {
???? throw new TException("JVM DOES NOT SUPPORT UTF-8");
?? }
}
public void writeBinary(ByteBuffer bin) throws TException {
?? int length = bin.limit() - bin.position();
?? writeI32(length);
?? trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length);
}
public TMessage readMessageBegin() throws TException {
?? int size = readI32();
?? if (size < 0) {
???? int version = size & VERSION_MASK;
???? if (version != VERSION_1) {
?????? throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
???? }
???? return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());
?? } else {
???? if (strictRead_) {
?????? throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
???? }
???? return new TMessage(readStringBody(size), readByte(), readI32());
?? }
}
public TField readFieldBegin() throws TException {
?? byte type = readByte();
?? short id = type == TType.STOP ? 0 : readI16();
?? return new TField("", type, id);
}
public String readString() throws TException {
?? int size = readI32();
?? if (trans_.getBytesRemainingInBuffer() >= size) {
???? try {
?????? String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
?????? trans_.consumeBuffer(size);
?????? return s;
???? } catch (UnsupportedEncodingException e) {
?????? throw new TException("JVM DOES NOT SUPPORT UTF-8");
???? }
?? }
?? return readStringBody(size);
}
~~~
TProtocol定義了基本的協議信息,包括傳輸什么數據,如何解析傳輸的數據的基本方法。

還存在一個問題,就是服務器端如何知道客戶端發送過來的數據是怎么組合的,比如第一個字段是字符串類型,第二個字段是int。這個信息是在IDL生成客戶端時生成的代碼時提供了。Thrift生成的客戶端代碼提供了讀寫參數的方法,這兩個方式是一一對應的,包括字段的序號,類型等等。客戶端使用寫參數的方法,服務器端使用讀參數的方法。
關于IDL生成的客戶端代碼會在后面的文章具體描述。下面簡單看一下自動生成的代碼
1. 方法的調用從writeMessageBegin開始,發送了消息頭信息
2. 寫方法的參數,也就是寫消息體。方法參數由一個統一的接口TBase描述,提供了read和write的統一接口。自動生成的代碼提供了read, write方法參數的具體實現
3. 寫完結束?
~~~
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("handle", org.apache.thrift.protocol.TMessageType.CALL, 0));
handle_args args = new handle_args();
args.setIdentity(identity);
args.setUid(uid);
args.setSid(sid);
args.setType(type);
args.setMessage(message);
args.setParams(params);
args.write(prot);
prot.writeMessageEnd();
}
public interface TBase<T extends TBase<?,?>, F extends TFieldIdEnum> extends Comparable<T>,? Serializable {
public void read(TProtocol iprot) throws TException;
public void write(TProtocol oprot) throws TException;
}
public static class handle_args <strong>implements org.apache.thrift.TBase</strong><handle_args, handle_args._Fields>, java.io.Serializable, Cloneable?? {
?? private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("handle_args");
?? private static final org.apache.thrift.protocol.TField IDENTITY_FIELD_DESC = new org.apache.thrift.protocol.TField("identity", org.apache.thrift.protocol.TType.STRING, (short)1);
?? private static final org.apache.thrift.protocol.TField UID_FIELD_DESC = new org.apache.thrift.protocol.TField("uid", org.apache.thrift.protocol.TType.I64, (short)2);
?? private static final org.apache.thrift.protocol.TField SID_FIELD_DESC = new org.apache.thrift.protocol.TField("sid", org.apache.thrift.protocol.TType.STRING, (short)3);
?? private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)4);
?? private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)5);
?? private static final org.apache.thrift.protocol.TField PARAMS_FIELD_DESC = new org.apache.thrift.protocol.TField("params", org.apache.thrift.protocol.TType.MAP, (short)6);
?? private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
?? static {
???? schemes.put(StandardScheme.class, new handle_argsStandardSchemeFactory());
???? schemes.put(TupleScheme.class, new handle_argsTupleSchemeFactory());
?? }
?? public String identity; // required
?? public long uid; // required
?? public String sid; // required
?? public int type; // required
?? public String message; // required
?? public Map<String,String> params; // required
?? /**The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
?? public enum _Fields implements org.apache.thrift.TFieldIdEnum {
???? IDENTITY((short)1, "identity"),
???? UID((short)2, "uid"),
???? SID((short)3, "sid"),
???? TYPE((short)4, "type"),
???? MESSAGE((short)5, "message"),
???? PARAMS((short)6, "params");
}
// 自動生成的寫方法參數的方法,按照字段順序寫,給客戶端代碼使用
???? public void write(org.apache.thrift.protocol.TProtocol oprot, handle_args struct) throws org.apache.thrift.TException {
?????? struct.validate();
?????? oprot.writeStructBegin(STRUCT_DESC);
?????? if (struct.identity != null) {
???????? oprot.writeFieldBegin(IDENTITY_FIELD_DESC);
???????? oprot.writeString(struct.identity);
???????? oprot.writeFieldEnd();
?????? }
?????? oprot.writeFieldBegin(UID_FIELD_DESC);
?????? oprot.writeI64(struct.uid);
?????? oprot.writeFieldEnd();
?????? if (struct.sid != null) {
???????? oprot.writeFieldBegin(SID_FIELD_DESC);
???????? oprot.writeString(struct.sid);
???????? oprot.writeFieldEnd();
?????? }
?????? oprot.writeFieldBegin(TYPE_FIELD_DESC);
?????? oprot.writeI32(struct.type);
?????? oprot.writeFieldEnd();
?????? if (struct.message != null) {
???????? oprot.writeFieldBegin(MESSAGE_FIELD_DESC);
???????? oprot.writeString(struct.message);
???????? oprot.writeFieldEnd();
?????? }
}
~~~
<pre name="code" class="java">// 自動生成的讀方法參數的方法,按照字段順序讀,給服務器端代碼使用
~~~
public void read(org.apache.thrift.protocol.TProtocol iprot, handle_args struct) throws org.apache.thrift.TException {
?????? org.apache.thrift.protocol.TField schemeField;
?????? iprot.readStructBegin();
?????? while (true)
?????? {
???????? schemeField = iprot.readFieldBegin();
???????? if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
?????????? break;
???????? }
???????? switch (schemeField.id) {
?????????? case 1: // IDENTITY
???????????? if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
?????????????? struct.identity = iprot.readString();
?????????????? struct.setIdentityIsSet(true);
???????????? } else {
?????????????? org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
???????????? }
???????????? break;
?????????? case 2: // UID
???????????? if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
?????????????? struct.uid = iprot.readI64();
?????????????? struct.setUidIsSet(true);
???????????? } else {
?????????????? org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
???????????? }
???????????? break;
?????????? case 3: // SID
???????????? if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
?????????????? struct.sid = iprot.readString();
?????????????? struct.setSidIsSet(true);
???????????? } else {
?????????????? org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
???????????? }
???????????? break;
?????????? case 4: // TYPE
???????????? if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
?????????????? struct.type = iprot.readI32();
?????????????? struct.setTypeIsSet(true);
???????????? } else {
?????????????? org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
???????????? }
???????????? break;
}
~~~