RPC即遠程過程調用,它的提出旨在消除通信細節、屏蔽繁雜且易錯的底層網絡通信操作,像調用本地服務一般地調用遠程服務,讓業務開發者更多關注業務開發而不必考慮網絡、硬件、系統的異構復雜環境。
先看看集群中RPC的整個通信過程,假設從節點node1開始一個RPC調用,①先將待傳遞的數據放到NIO集群通信框架(這里使用的是tribes框架)中;②由于使用的是NIO模式,線程無需阻塞直接返回;③由于與集群其他節點通信需要花銷若干時間,為了提高CPU使用率當前線程應該放棄CPU的使用權進行等待操作;④NIO集群通信框架tribes接收到node2節點的響應消息,并將消息封裝成Response對象保存至響應數組;⑤tribes接收到node4節點的響應消息,由于是使用了并行通信,所以node4可能比node3先返回消息,并將消息封裝成Response對象保存至響應數組;⑥tribes最后接收到node3節點的響應消息,并將消息封裝成Response對象保存至響應數組;⑦現在所有節點的響應都已經收集完畢,是時候通知剛剛被阻塞的那條線程了,原來的線程被notify醒后拿到所有節點的響應Response[]進行處理,至此完成了整個集群RPC過程。
?
上面整個過程是在只有一條線程的情況下,一切看起來沒什么問題,但如果有多條線程并發調用則會導致一個問題:線程與響應的對應關系將被打亂,無法確定哪個線程對應哪幾個響應。因為NIO通信框架不會每個線程都獨自使用一個socket通道,為提高性能一般都是使用長連接,所有線程公用一個socket通道,這時就算線程一比線程二先放入tribes也不能保證響應一比響應二先接收到,所以接收到響應一后不知道該通知線程一還是線程二。只有解決了這個問題才能保證RPC調用的正確性。
要解決線程與響應對應的問題就需要維護一個線程響應關系列表,響應從關系列表中就能查找對應的線程,如下圖,在發送之前生成一個UUID標識,此標識要保證同socket中唯一,再把UUID與線程對象關系對應起來,可使用Map數據結構實現,UUID的值作為key,線程對應的鎖對象為value。接著制定一個協議報文,UUID作為報文的其中一部分,報文發往另一個節點node2后將響應信息message放入報文中并返回,node1對接收到的報文進行解包根據UUID去查找并喚起對應的線程,告訴它“你要的消息已經收到,往下處理吧”。但在集群環境下,我們更希望是集群中所有節點的消息都接收到了才往下處理,如下圖下半部分,一個UUID1的請求報文會發往node2、node3和node4三個節點,這時假如只接收到一個響應則不喚起線程,直到node2、node3對應UUID1的響應報文都接收到后才喚起對應線程往下執行。同樣地,UUID2、UUID3的報文消息都是如此處理,最后集群中對應的響應都能正確回到各自線程上。
?
用簡單代碼實現一個RPC例子,選擇一個集群通信框架負責底層通信,這里使用tribes,接著往下:
①定義一個RPC接口,這些方法是預留提供給上層具體邏輯處理的入口,replyRequest方法用于處理響應邏輯,leftOver方法用于殘留請求的邏輯處理。
~~~
public?interface?RpcCallback?{
????public?Serializable?replyRequest(Serializable?msg,?Member?sender);
????public?void?leftOver(Serializable?msg,?Member?sender);
}
~~~
②定義通信消息協議,實現Externalizable接口自定義序列化和反序列化,message用于存放響應消息,uuid標識用于關聯線程,rpcId用于標識RPC實例,reply表示是否回復。
~~~
public?class?RpcMessage?implements?Externalizable?{
protected?Serializable?message;
protected?byte[]?uuid;
protected?byte[]?rpcId;
protected?boolean?reply?=?false;
public?RpcMessage()?{
}
public?RpcMessage(byte[]?rpcId,?byte[]?uuid,?Serializable?message)?{
this.rpcId?=?rpcId;
this.uuid?=?uuid;
this.message?=?message;
}
~~~
@Override
~~~
public?void?readExternal(ObjectInput?in)?throws?IOException,ClassNotFoundException?{
reply?=?in.readBoolean();
int?length?=?in.readInt();
uuid?=?new?byte[length];
in.readFully(uuid);
length?=?in.readInt();
rpcId?=?new?byte[length];
in.readFully(rpcId);
message?=?(Serializable)?in.readObject();
}
~~~
@Override
~~~
public?void?writeExternal(ObjectOutput?out)?throws?IOException?{
out.writeBoolean(reply);
out.writeInt(uuid.length);
out.write(uuid,?0,?uuid.length);
out.writeInt(rpcId.length);
out.write(rpcId,?0,?rpcId.length);
out.writeObject(message);
}
}
~~~
③響應類型,提供多種喚起線程的條件,一共四種類型,分別表示接收到第一個響應就喚起線程、接收到集群中大多數節點的響應就喚起線程、接收到集群中所有節點的響應才喚起線程、無需等待響應的無響應模式。
~~~
public?class?RpcResponseType?{
public?static?final?int?FIRST_REPLY?=?1;
public?static?final?int?MAJORITY_REPLY?=?2;
public?static?final?int?ALL_REPLY?=?3;
public?static?final?int?NO_REPLY?=?4;
}
~~~
④響應對象,用于封裝接收到的消息,Member在通信框架tribes是節點的抽象,這里用來表示來源節點。
~~~
public?class?RpcResponse?{
private?Member?source;
private?Serializable?message;
public?RpcResponse()?{
}
public?RpcResponse(Member?source,?Serializable?message)?{
this.source?=?source;
this.message?=?message;
}
public?void?setSource(Member?source)?{
this.source?=?source;
}
public?void?setMessage(Serializable?message)?{
this.message?=?message;
}
public?Member?getSource()?{
return?source;
}
public?Serializable?getMessage()?{
return?message;
}
}
~~~
⑤RPC響應集,用于存放同個UUID的所有響應。
~~~
public?class?RpcCollector?{
????public?ArrayList?responses?=?new?ArrayList();?
????public?byte[]?key;
????public?int?options;
????public?int?destcnt;
????public?RpcCollector(byte[]?key,?int?options,?int?destcnt)?{
????????this.key?=?key;
????????this.options?=?options;
????????this.destcnt?=?destcnt;
????}
????public?void?addResponse(Serializable?message,?Member?sender){
???? RpcResponse?resp?=?new?RpcResponse(sender,message);
????????responses.add(resp);
????}
????public?boolean?isComplete()?{
????????if?(?destcnt?<=?0?)?return?true;
????????switch?(options)?{
????????????case?RpcResponseType.ALL_REPLY:
????????????????return?destcnt?==?responses.size();
????????????case?RpcResponseType.MAJORITY_REPLY:
????????????{
????????????????float?perc?=?((float)responses.size())?/?((float)destcnt);
????????????????return?perc?>=?0.50f;
????????????}
????????????case?RpcResponseType.FIRST_REPLY:
????????????????return?responses.size()>0;
????????????default:
????????????????return?false;
????????}
????}
????public?RpcResponse[]?getResponses()?{
????????return?responses.toArray(new?RpcResponse[responses.size()]);
????}
}
~~~
⑥RPC核心類,是整個RPC的抽象,它要實現tribes框架的ChannelListener接口,在messageReceived方法中處理接收到的消息。因為所有的消息都會通過此方法,所以它必須要根據key去處理對應的線程,同時它也要負責調用RpcCallback接口定義的相關的方法,例如響應請求的replyRequest方法和處理殘留的響應leftOver方法,殘留響應是指有時我們在接收到第一個響應后就喚起線程。
~~~
public?class?RpcChannel?implements?ChannelListener?{
private?Channel?channel;
private?RpcCallback?callback;
private?byte[]?rpcId;
private?int?replyMessageOptions?=?0;
private?HashMap?responseMap?=?new?HashMap();
public?RpcChannel(byte[]?rpcId,?Channel?channel,?RpcCallback?callback)?{
this.rpcId?=?rpcId;
this.channel?=?channel;
this.callback?=?callback;
channel.addChannelListener(this);
}
public?RpcResponse[]?send(Member[]?destination,?Serializable?message,
int?rpcOptions,?int?channelOptions,?long?timeout)
throws?ChannelException?{
int?sendOptions?=?channelOptions&?~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
byte[]?key?=?UUIDGenerator.randomUUID(false);
RpcCollector?collector?=?new?RpcCollector(key,?rpcOptions,
destination.length);
try?{
synchronized?(collector)?{
if?(rpcOptions?!=?RpcResponseType.NO_REPLY)
responseMap.put(key,?collector);
RpcMessage?rmsg?=?new?RpcMessage(rpcId,?key,?message);
channel.send(destination,?rmsg,?sendOptions);
if?(rpcOptions?!=?RpcResponseType.NO_REPLY)
collector.wait(timeout);
}
}?catch?(InterruptedException?ix)?{
Thread.currentThread().interrupt();
}?finally?{
responseMap.remove(key);
}
return?collector.getResponses();
}
~~~
@Override
~~~
public?void?messageReceived(Serializable?msg,?Member?sender)?{
RpcMessage?rmsg?=?(RpcMessage)?msg;
byte[]?key?=?rmsg.uuid;
if?(rmsg.reply)?{
RpcCollector?collector?=?responseMap.get(key);
if?(collector?==?null)?{
callback.leftOver(rmsg.message,?sender);
}?else?{
synchronized?(collector)?{
if?(responseMap.containsKey(key))?{
collector.addResponse(rmsg.message,?sender);
if?(collector.isComplete())
collector.notifyAll();
}?else?{
callback.leftOver(rmsg.message,?sender);
}
}
}
}?else?{
Serializable?reply?=?callback.replyRequest(rmsg.message,?sender);
rmsg.reply?=?true;
rmsg.message?=?reply;
try?{
channel.send(new?Member[]?{?sender?},?rmsg,?replyMessageOptions
&?~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
}?catch?(Exception?x)?{
}
}
}
~~~
@Override
~~~
public?boolean?accept(Serializable?msg,?Member?sender)?{
if?(msg?instanceof?RpcMessage)?{
RpcMessage?rmsg?=?(RpcMessage)?msg;
return?Arrays.equals(rmsg.rpcId,?rpcId);
}?else
return?false;
}
}
~~~
⑦自定義一個RPC,它要實現RpcCallback接口,分別對請求處理和殘留響應處理,這里請求處理僅僅是簡單返回“hello,response?for?you!”作為響應消息,殘留響應處理則是簡單輸出“receive?a?leftover?message!”。假如整個集群有五個節點,由于接收模式設置成了FIRST_REPLY,所以每個只會接受一個響應消息,其他的響應都被當做殘留響應處理。
~~~
public?class?MyRPC?implements?RpcCallback?{
@Override
public?Serializable?replyRequest(Serializable?msg,?Member?sender)?{
RpcMessage?mapmsg?=?(RpcMessage)?msg;
mapmsg.message?=?"hello,response?for?you!";
return?mapmsg;
}
@Override
public?void?leftOver(Serializable?msg,?Member?sender)?{
System.out.println("receive?a?leftover?message!");
}
public?static?void?main(String[]?args)?{
MyRPC?myRPC?=?new?MyRPC();
byte[]?rpcId?=?new?byte[]?{?1,?1,?1,?1?};
byte[]?key?=?new?byte[]?{?0,?0,?0,?0?};
String?message?=?"hello";
int?sendOptions?=?Channel.SEND_OPTIONS_SYNCHRONIZED_ACK
|?Channel.SEND_OPTIONS_USE_ACK;
RpcMessage?msg?=?new?RpcMessage(rpcId,?key,?(Serializable)?message);
RpcChannel?rpcChannel?=?new?RpcChannel(rpcId,?channel,?myRPC);
RpcResponse[]?resp?=?rpcChannel.send(channel.getMembers(),?msg,
RpcResponseType.FIRST_REPLY,?sendOptions,?3000);
???????while(true)
Thread.currentThread().sleep(1000);
}
}
~~~
可以看到通過上面的RPC封裝后,上層可以把更多的精力關注到消息邏輯處理上面了,而不必關注具體的網絡IO如何實現,屏蔽了繁雜重復的網絡傳輸操作,為上層提供了很大的方便。