RPC調用本質上就是一種網絡編程,客戶端向服務器發送消息,服務器拿到消息之后做后續動作。只是RPC這種消息比較特殊,它封裝了方法調用,包括方法名,方法參數。服務端拿到這個消息之后,解碼消息,然后要通過方法調用模型來完成實際服務器端業務方法的調用。
這篇講講Thrfit的方法調用模型。Thrift的方法調用模型很簡單,就是通過方法名和實際方法實現類的注冊完成,沒有使用反射機制,類加載機制。
和方法調用相關的幾個核心類:
1. 自動生成的Iface接口,是遠程方法的頂層接口
2. 自動生成的Processor類及相關父類,包括TProcessor接口,TBaseProcess抽象類
3. ProcessFunction抽象類,抽象了一個具體的方法調用,包含了方法名信息,調用方法的抽象過程等
4. TNonblcokingServer,是NIO服務器的默認實現,通過Args參數來配置Processor等信息
5. FrameBuffer類,服務器NIO的緩沖區對象,這個對象在服務器端收到全包并解碼后,會調用Processor去完成實際的方法調用
6. 服務器端的方法的具體實現類,實現Iface接口

下面逐個來分析相關的類。
Iface接口是自動生成的,描述了方法的接口。 服務器端服務提供方DemoService要實現Iface接口
~~~
public class DemoService {
public interface Iface {
public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException;
}
}
public class DemoServiceImpl implements DemoService.Iface{
?@Override
?public int demoMethod(String param1, Parameter param2,
??? ??? ?Map<String, String> param3) throws TException {
??? ?
??? ?return 0;
?}
}
~~~
來看TProcess相關類和接口
1. TProcessor就定義了一個頂層的調用方法process,參數是輸入流和輸出流
2. 抽象類TBaseProcessor提供了TProcessor的process的默認實現,先讀消息頭,拿到要調用的方法名,然后從維護的一個Map中取ProcessFunction對象。ProcessFunction對象是實際方法的抽象,調用它的process方法,實際是調用了實際的方法。
3. Processor類是自動生成了,它依賴Iface接口,負責把實際的方法實現和方法的key關聯起來,放到Map中維護
~~~
public interface TProcessor {
public boolean process(TProtocol in, TProtocol out)
throws TException;
}
public abstract class TBaseProcessor<I> implements TProcessor {
private final I iface;
private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;
protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {
?? this.iface = iface;
?? this.processMap = processFunctionMap;
}
@Override
public boolean process(TProtocol in, TProtocol out) throws TException {
?? TMessage msg = in.readMessageBegin();
?? ProcessFunction fn = processMap.get(msg.name);
?? if (fn == null) {
???? TProtocolUtil.skip(in, TType.STRUCT);
???? in.readMessageEnd();
???? TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
???? out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
???? x.write(out);
???? out.writeMessageEnd();
???? out.getTransport().flush();
???? return true;
?? }
?? fn.process(msg.seqid, in, out, iface);
?? return true;
}
}
~~~
~~~
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
public Processor(I iface) {
super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
processMap.put("demoMethod", new demoMethod());
return processMap;
}
private static class demoMethod<I extends Iface> extends org.apache.thrift.ProcessFunction<I, demoMethod_args> {
public demoMethod() {
super("demoMethod");
}
protected demoMethod_args getEmptyArgsInstance() {
return new demoMethod_args();
}
protected demoMethod_result getResult(I iface, demoMethod_args args) throws org.apache.thrift.TException {
demoMethod_result result = new demoMethod_result();
result.success = iface.demoMethod(args.param1, args.param2, args.param3);
result.setSuccessIsSet(true);
return result;
}
}
}
~~~
自動生成的demoMethod類繼承了ProcessFunction類,它負載把方法參數,iface, 方法返回值這些抽象的概念組合在一起,通過抽象模型來完成實際方法的調用。實際方法的實現者實現了Iface接口。
TNonblockingServer是NIO服務器的實現,它通過Selector來檢查IO就緒狀態,進而調用相關的Channel。就方法調用而言,它處理的是讀事件,用handelRead()來進一步處理
~~~
private void select() {
try {
// wait for io events.
selector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// if the key is marked Accept, then it has to be the server
// transport.
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
protected void handleRead(SelectionKey key) {
???? FrameBuffer buffer = (FrameBuffer) key.attachment();
???? if (!buffer.read()) {
?????? cleanupSelectionKey(key);
?????? return;
???? }
???? // if the buffer's frame read is complete, invoke the method.
???? <strong>if (buffer.isFrameFullyRead()) {
?????? if (!requestInvoke(buffer)) {
???????? cleanupSelectionKey(key);
?????? }
???? }</strong>
?? }
protected boolean requestInvoke(FrameBuffer frameBuffer) {
?? frameBuffer.invoke();
?? return true;
}
~~~
非阻塞同步IO的NIO服務器都會使用緩沖區來存放讀寫的中間狀態。FrameBuffer就是這樣的一個緩沖區,它由于涉及到方法調用,所以提供了invoke()方法來實現對Processor的調用。
~~~
public void invoke() {
TTransport inTrans = getInputTransport();
TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
try {
processorFactory_.getProcessor(inTrans).process(inProt, outProt);
responseReady();
return;
} catch (TException te) {
LOGGER.warn("Exception while invoking!", te);
} catch (Throwable t) {
LOGGER.error("Unexpected throwable while invoking!", t);
}
// This will only be reached when there is a throwable.
state_ = FrameBufferState.AWAITING_CLOSE;
requestSelectInterestChange();
}
~~~
FrameBuffer使用了processorFactory來獲得Processor。ProcessorFactory是在創建服務器的時候傳遞過來的,只是對Processor的簡單封裝。
~~~
protected TServer(AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public class TProcessorFactory {
private final TProcessor processor_;
public TProcessorFactory(TProcessor processor) {
?? processor_ = processor;
}
public TProcessor getProcessor(TTransport trans) {
?? return processor_;
}
}
public T processor(TProcessor processor) {
???? this.processorFactory = new TProcessorFactory(processor);
???? return (T) this;
?? }
~~~
下面是一個實際的TNonblockingServer的配置實例
除了配置服務器運行的基本參數,最重要的就是把實際的服務提供者通過服務器參數的方式作為Processor傳遞給TNonblockingServer,供FrameBuffer調用。
~~~
public class DemoServiceImpl implements DemoService.Iface{
@Override
public int demoMethod(String param1, Parameter param2,
Map<String, String> param3) throws TException {
return 0;
}
public static void main(String[] args){
TNonblockingServerSocket socket;
try {
socket = new TNonblockingServerSocket(9090);
TNonblockingServer.Args options = new TNonblockingServer.Args(socket);
TProcessor processor = new DemoService.Processor(new DemoServiceImpl());
options.processor(processor);
options.protocolFactory(new TCompactProtocol.Factory());
TServer server = new TNonblockingServer(options);
server.serve();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
~~~