最近有一個分布式調用跟蹤系統的項目,需要給基于Thrift的RPC調用添加調用鏈上下文信息,從而可以生成一次RPC調用的調用鏈信息。這篇講講如何擴展Thrift框架來實現RPC過程中無侵入地添加額外attachment信息的場景。
Thrift框架本身提供了很多機制來支持擴展,比如
1. 擴展TProtocol來實現自定義的序列化類
2. 擴展TTransport來實現自定義的流
3. 采用裝飾器模式來裝飾Processor,從而在服務實現方法被調用的前后插入自定義邏輯
4. 構建Client和Server時,可以把自定義的Protocol, Transport, Processor作為參數Args傳入,從而使用自定義的這些類來處理請求
下圖是Thrfit RPC調用涉及到的主要組件,RPC框架都大同小異,基本的結構差不多。綠色部分是可以擴展的點。比如在Client端包一層,可以增加服務尋址,負載均衡等分布式集群的功能,在Server端包一層,可以實現服務端的配置管理,監控等等。

在這個簡化的例子中,只需要擴展TProtocol和Processor,就可以實現在RPC調用時添加額外的attachment。
TProtocol表示了RPC調用的序列化過程,更多可以看這篇[Thrift源碼分析(二)-- 協議和編解碼](http://blog.csdn.net/iter_zc/article/details/39497863) 。TProtocol將序列化過程分為幾步
1. write/read Message,讀寫消息頭,消息頭包含了方法名,序列號等信息
2. write/read Struct,將RPC方法的參數/返回值封裝成結構體,讀寫結構體即表示要讀寫RPC方法參數了
3. write/read Field,每一個參數都被抽象成Field,Field主要包含了字段的索引信息,類型信息等
4. write/read Type,即讀寫各種具體的數據
TBinaryProtocol是使用地比較多的一種基于二進制流的協議,它實現了上述所有的write/read方法。
~~~
public void writeMessageBegin(TMessage message) throws TException {
if (strictWrite_) {
int version = VERSION_1 | message.type;
writeI32(version);
writeString(message.name);
writeI32(message.seqid);
} else {
writeString(message.name);
writeByte(message.type);
writeI32(message.seqid);
}
}
public void writeMessageEnd() {}
public void writeStructBegin(TStruct struct) {}
public void writeStructEnd() {}
public void writeFieldBegin(TField field) throws TException {
writeByte(field.type);
writeI16(field.id);
}
public void writeFieldEnd() {}
~~~
看一下上面TBinaryProtocol幾個方法實現可以發現,它的write/read Struct是空實現,也即寫完Message消息頭之后直接開始寫Field。具體一個Thrift服務生成的客戶端中包含了一個服務方法所有的結構信息,比如所有的參數都被創建了相應的TFiled對象,TField都是從1開始往后編號,并且生成了如何序列化一個具體參數的方法,可以看這篇[Thrift源碼分析(三)-- IDL和生成代碼分析](http://blog.csdn.net/iter_zc/article/details/39522531)
所以基于TBinaryProtocol協議生成的RPC調用字節流大致如下:

Thrift生成的讀這個字節流的代碼流程大致如下
~~~
readMessageBegin();
readStructBegin();
while(true){
field = readField();
if(field.type == Type.stop){
break;
}
switch(field.id){
case 1:
readXXXX();
break;
.....
case n:
readXXXX();
break;
default:
TProtocolUtil.skip(iprotocol, field.type);
}
readFieldEnd();
}
readStructEnd();
readMessageEnd();
~~~
從這個流程中,可以看到幾點:
1. Thrift生成的代碼在讀寫字節流時,都是按照生成的TField的索引號去判斷,然后讀取的
2. Thrift提供了skip和stop解析Filed的機制
我們可以從TFiled的索引號入手,通過下列方法來添加Attachment
1. 擴展TBinaryProtocol, 將我們想往字節流里插入的數據通過特定編號寫入字節流
2. 然后在正常解析字節流之前,先將插入的特定編號的TFiled讀取出來
3. 將字節流復位,交給后續的Processor處理
第2,3步的處理都是在裝飾后的Processor中處理的。
最后生成的字節流如下

先看一下AttachableBinaryProtocol的實現
1. 定義了一個私有的Map類型的attachment字段,支持Key-Vaule結構的attachment
2. 擴展了writeMessageBegin方法,在寫完message頭之后,判斷是否有attachment,如果有,就調用writeFieldZero方法講attachment寫入到字節流
3. writeFieldZero方法將attachment作為0號字段寫入到字節流。Thrift本身支持Map類型,按照Thrift的規范,將attachment寫入字節流
4. readFieldZero方法會從字節流中讀取0號索引的Map類型的數據,寫入到attachment
5. resetTFramedTransport,將字節流復位。在使用NIO類型的Thrift server的時候,默認使用TFramedTransport作為流實現,TFramedTransport是基于緩沖區的流實現,它內部使用了TMemoryInputTrasport流來存儲讀入的字節流。而TMemoryInputTrasport提供了reset方法來復位流的position。
~~~
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TField;
import org.apache.thrift.protocol.TMap;
import org.apache.thrift.protocol.TMessage;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.protocol.TType;
import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TTransport;
public class AttachableBinaryProtocol extends TBinaryProtocol {
private Map<String, String> attachment;
public AttachableBinaryProtocol(TTransport trans) {
super(trans);
attachment = new HashMap<String, String>();
}
public AttachableBinaryProtocol(TTransport trans, boolean strictRead,
boolean strictWrite) {
super(trans);
strictRead_ = strictRead;
strictWrite_ = strictWrite;
attachment = new HashMap<String, String>();
}
/**
* Factory
*/
public static class Factory implements TProtocolFactory {
protected boolean strictRead_ = false;
protected boolean strictWrite_ = true;
protected int readLength_;
public Factory() {
this(false, true);
}
public Factory(boolean strictRead, boolean strictWrite) {
this(strictRead, strictWrite, 0);
}
public Factory(boolean strictRead, boolean strictWrite, int readLength) {
strictRead_ = strictRead;
strictWrite_ = strictWrite;
readLength_ = readLength;
}
public TProtocol getProtocol(TTransport trans) {
AttachableBinaryProtocol proto = new AttachableBinaryProtocol(
trans, strictRead_, strictWrite_);
if (readLength_ != 0) {
proto.setReadLength(readLength_);
}
return proto;
}
}
public void writeMessageBegin(TMessage message) throws TException {
super.writeMessageBegin(message);
if(attachment.size() > 0){
writeFieldZero();
}
}
public void writeFieldZero() throws TException{
TField RTRACE_ATTACHMENT = new TField("rtraceAttachment", TType.MAP,
(short) 0);
this.writeFieldBegin(RTRACE_ATTACHMENT);
{
this.writeMapBegin(new TMap(TType.STRING, TType.STRING, attachment
.size()));
for (Map.Entry<String, String> entry : attachment.entrySet()) {
this.writeString(entry.getKey());
this.writeString(entry.getValue());
}
this.writeMapEnd();
}
this.writeFieldEnd();
}
public boolean readFieldZero() throws Exception {
TField schemeField = this.readFieldBegin();
if (schemeField.id == 0
&& schemeField.type == org.apache.thrift.protocol.TType.MAP) {
TMap _map = this.readMapBegin();
attachment = new HashMap<String, String>(2 * _map.size);
for (int i = 0; i < _map.size; ++i) {
String key = this.readString();
String value = this.readString();
attachment.put(key, value);
}
this.readMapEnd();
}
this.readFieldEnd();
return attachment.size() > 0 ? true: false;
}
public Map<String, String> getAttachment() {
return attachment;
}
/*
* 重置TFramedTransport流,不影響Thrift原有流程
*/
public void resetTFramedTransport(TProtocol in) {
try {
Field readBuffer_ = TFramedTransportFieldsCache.getInstance()
.getTFramedTransportReadBuffer();
Field buf_ = TFramedTransportFieldsCache.getInstance()
.getTMemoryInputTransportBuf();
if (readBuffer_ == null || buf_ == null) {
return;
}
TMemoryInputTransport stream = (TMemoryInputTransport) readBuffer_
.get(in.getTransport());
byte[] buf = (byte[]) (buf_.get(stream));
stream.reset(buf, 0, buf.length);
} catch (Exception e) {
e.printStackTrace();
}
}
private static class TFramedTransportFieldsCache {
private static TFramedTransportFieldsCache instance;
private final Field readBuffer_;
private final Field buf_;
private final String TFramedTransport_readBuffer_ = "readBuffer_";
private final String TMemoryInputTransport_buf_ = "buf_";
private TFramedTransportFieldsCache() throws Exception {
readBuffer_ = org.apache.thrift.transport.TFramedTransport.class
.getDeclaredField(TFramedTransport_readBuffer_);
readBuffer_.setAccessible(true);
buf_ = org.apache.thrift.transport.TMemoryInputTransport.class
.getDeclaredField(TMemoryInputTransport_buf_);
buf_.setAccessible(true);
}
public static TFramedTransportFieldsCache getInstance()
throws Exception {
if (instance == null) {
synchronized (TFramedTransportFieldsCache.class) {
if (instance == null) {
instance = new TFramedTransportFieldsCache();
}
}
}
return instance;
}
public Field getTFramedTransportReadBuffer() {
return readBuffer_;
}
public Field getTMemoryInputTransportBuf() {
return buf_;
}
}
}
~~~
來具體說下resetTFramedTransport這個方法,它采用了反射機制來從傳入的TProtocol中復位字節流。由于TMemoryInputTransport是TFramedTransport的私有屬性,只有通過反射機制才能訪問到這個readBuffer屬性。而真正的字節流存儲在TMemoryInputTransport的私有屬性buf中,還需要再次通過反射機制來訪問TMemoryInputTransport的私有屬性buf,TMemoryInputTransport提供了公有的reset方法,可以直接被調用。
resetTFramedTransport方法演示了如何通過反射機制來訪問一個對象的私有屬性。Filed.get是線程安全的,它最后落腳在Unsafe類上,通過Unsafe類的getObject方法,根據傳入的對象和字段的偏移量來直接從內存中讀取對應偏移量上屬性值。
~~~
public void resetTFramedTransport(TProtocol in) {
try {
Field readBuffer_ = TFramedTransportFieldsCache.getInstance()
.getTFramedTransportReadBuffer();
Field buf_ = TFramedTransportFieldsCache.getInstance()
.getTMemoryInputTransportBuf();
if (readBuffer_ == null || buf_ == null) {
return;
}
TMemoryInputTransport stream = (TMemoryInputTransport) readBuffer_
.get(in.getTransport());
byte[] buf = (byte[]) (buf_.get(stream));
stream.reset(buf, 0, buf.length);
} catch (Exception e) {
e.printStackTrace();
}
}
public class TFramedTransport extends TTransport {
private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);
}
public final class TMemoryInputTransport extends TTransport {
private byte[] buf_;
private int pos_;
private int endPos_;
public TMemoryInputTransport() {
}
public TMemoryInputTransport(byte[] buf) {
?? reset(buf);
}
public TMemoryInputTransport(byte[] buf, int offset, int length) {
?? reset(buf, offset, length);
}
public void reset(byte[] buf) {
?? reset(buf, 0, buf.length);
}
public void reset(byte[] buf, int offset, int length) {
?? buf_ = buf;
?? pos_ = offset;
?? endPos_ = offset + length;
}
}
~~~
再來看看裝飾的Processor類, TraceProcessor類,這是一個典型的裝飾器模式,實現TProcessor接口,并且維護了一個TProcessor對象。
1. 在process方法中,先將輸入流轉化成AttachableProtocol,然后讀取消息頭 readMessageBegin,然后readFieldZero讀0號索引的Map字段。
2. 調用resetTFramedProtocol將輸入流復位,然后交給實際的realProcessor處理,在readProcessor中最終會調用到Thrift服務的實現類。
~~~
import java.util.Map;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TMessage;
import org.apache.thrift.protocol.TProtocol;
public class TraceProcessor implements TProcessor {
private TProcessor realProcessor;
private String serviceId;
private String serviceName;
private int port;
public TraceProcessor(TProcessor realProcessor, int port) {
this(realProcessor, "", "", port);
}
public TraceProcessor(TProcessor realProcessor, String serviceName, int port) {
this(realProcessor, serviceName, serviceName, port);
}
public TraceProcessor(TProcessor realProcessor, String serviceId, String serviceName, int port) {
this.realProcessor = realProcessor;
this.serviceId = serviceId;
this.serviceName = serviceName;
this.port = port;
}
@Override
public boolean process(TProtocol in, TProtocol out) throws TException {
Map<String, String> attachment = null;
if(in instanceof AttachableBinaryProtocol){
AttachableBinaryProtocol inProtocal = (AttachableBinaryProtocol)in;
TMessage message = inProtocal.readMessageBegin();
// 先讀MessageBegin來獲得Attachment
boolean isAttachableRequest = false;
try {
isAttachableRequest = inProtocal.readFieldZero();
} catch (Exception e) {
}
// 重置TramedTransport內部的流,不影響Thrift框架的正常執行流程
inProtocal.resetTFramedTransport(in);
if(isAttachableRequest){
attachment = ((AttachableBinaryProtocol)in).getAttachment();
XXXX = attachment.get(XXXX);
XXXX = attachment.get(XXXX);
}
}
boolean result = realProcessor.process(in, out);
return result;
}
}
~~~
采用插入特定索引號的字段到Thrift生成的字節流有個好處是兼容性比較好,因為Thrift反序列化對象時,會按照生成的特定索引號去讀取,一旦讀到不是指定的索引號,就會skip到,繼續讀取下一個字段。這樣就不會影響Thrift框架原有的序列化機制。