<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                RPC即遠程過程調用,它的提出旨在消除通信細節、屏蔽繁雜且易錯的底層網絡通信操作,像調用本地服務一般地調用遠程服務,讓業務開發者更多關注業務開發而不必考慮網絡、硬件、系統的異構復雜環境。 先看看集群中RPC的整個通信過程,假設從節點node1開始一個RPC調用,①先將待傳遞的數據放到NIO集群通信框架(這里使用的是tribes框架)中;②由于使用的是NIO模式,線程無需阻塞直接返回;③由于與集群其他節點通信需要花銷若干時間,為了提高CPU使用率當前線程應該放棄CPU的使用權進行等待操作;④NIO集群通信框架tribes接收到node2節點的響應消息,并將消息封裝成Response對象保存至響應數組;⑤tribes接收到node4節點的響應消息,由于是使用了并行通信,所以node4可能比node3先返回消息,并將消息封裝成Response對象保存至響應數組;⑥tribes最后接收到node3節點的響應消息,并將消息封裝成Response對象保存至響應數組;⑦現在所有節點的響應都已經收集完畢,是時候通知剛剛被阻塞的那條線程了,原來的線程被notify醒后拿到所有節點的響應Response[]進行處理,至此完成了整個集群RPC過程。 ?![](https://box.kancloud.cn/2016-01-15_5698bd91574dc.jpg) 上面整個過程是在只有一條線程的情況下,一切看起來沒什么問題,但如果有多條線程并發調用則會導致一個問題:線程與響應的對應關系將被打亂,無法確定哪個線程對應哪幾個響應。因為NIO通信框架不會每個線程都獨自使用一個socket通道,為提高性能一般都是使用長連接,所有線程公用一個socket通道,這時就算線程一比線程二先放入tribes也不能保證響應一比響應二先接收到,所以接收到響應一后不知道該通知線程一還是線程二。只有解決了這個問題才能保證RPC調用的正確性。 要解決線程與響應對應的問題就需要維護一個線程響應關系列表,響應從關系列表中就能查找對應的線程,如下圖,在發送之前生成一個UUID標識,此標識要保證同socket中唯一,再把UUID與線程對象關系對應起來,可使用Map數據結構實現,UUID的值作為key,線程對應的鎖對象為value。接著制定一個協議報文,UUID作為報文的其中一部分,報文發往另一個節點node2后將響應信息message放入報文中并返回,node1對接收到的報文進行解包根據UUID去查找并喚起對應的線程,告訴它“你要的消息已經收到,往下處理吧”。但在集群環境下,我們更希望是集群中所有節點的消息都接收到了才往下處理,如下圖下半部分,一個UUID1的請求報文會發往node2、node3和node4三個節點,這時假如只接收到一個響應則不喚起線程,直到node2、node3對應UUID1的響應報文都接收到后才喚起對應線程往下執行。同樣地,UUID2、UUID3的報文消息都是如此處理,最后集群中對應的響應都能正確回到各自線程上。 ?![](https://box.kancloud.cn/2016-01-15_5698bd916e0fc.jpg) 用簡單代碼實現一個RPC例子,選擇一個集群通信框架負責底層通信,這里使用tribes,接著往下: ①定義一個RPC接口,這些方法是預留提供給上層具體邏輯處理的入口,replyRequest方法用于處理響應邏輯,leftOver方法用于殘留請求的邏輯處理。 ~~~ public?interface?RpcCallback?{ ????public?Serializable?replyRequest(Serializable?msg,?Member?sender); ????public?void?leftOver(Serializable?msg,?Member?sender); } ~~~ ②定義通信消息協議,實現Externalizable接口自定義序列化和反序列化,message用于存放響應消息,uuid標識用于關聯線程,rpcId用于標識RPC實例,reply表示是否回復。 ~~~ public?class?RpcMessage?implements?Externalizable?{ protected?Serializable?message; protected?byte[]?uuid; protected?byte[]?rpcId; protected?boolean?reply?=?false; public?RpcMessage()?{ } public?RpcMessage(byte[]?rpcId,?byte[]?uuid,?Serializable?message)?{ this.rpcId?=?rpcId; this.uuid?=?uuid; this.message?=?message; } ~~~ @Override ~~~ public?void?readExternal(ObjectInput?in)?throws?IOException,ClassNotFoundException?{ reply?=?in.readBoolean(); int?length?=?in.readInt(); uuid?=?new?byte[length]; in.readFully(uuid); length?=?in.readInt(); rpcId?=?new?byte[length]; in.readFully(rpcId); message?=?(Serializable)?in.readObject(); } ~~~ @Override ~~~ public?void?writeExternal(ObjectOutput?out)?throws?IOException?{ out.writeBoolean(reply); out.writeInt(uuid.length); out.write(uuid,?0,?uuid.length); out.writeInt(rpcId.length); out.write(rpcId,?0,?rpcId.length); out.writeObject(message); } } ~~~ ③響應類型,提供多種喚起線程的條件,一共四種類型,分別表示接收到第一個響應就喚起線程、接收到集群中大多數節點的響應就喚起線程、接收到集群中所有節點的響應才喚起線程、無需等待響應的無響應模式。 ~~~ public?class?RpcResponseType?{ public?static?final?int?FIRST_REPLY?=?1; public?static?final?int?MAJORITY_REPLY?=?2; public?static?final?int?ALL_REPLY?=?3; public?static?final?int?NO_REPLY?=?4; } ~~~ ④響應對象,用于封裝接收到的消息,Member在通信框架tribes是節點的抽象,這里用來表示來源節點。 ~~~ public?class?RpcResponse?{ private?Member?source; private?Serializable?message; public?RpcResponse()?{ } public?RpcResponse(Member?source,?Serializable?message)?{ this.source?=?source; this.message?=?message; } public?void?setSource(Member?source)?{ this.source?=?source; } public?void?setMessage(Serializable?message)?{ this.message?=?message; } public?Member?getSource()?{ return?source; } public?Serializable?getMessage()?{ return?message; } } ~~~ ⑤RPC響應集,用于存放同個UUID的所有響應。 ~~~ public?class?RpcCollector?{ ????public?ArrayList?responses?=?new?ArrayList();? ????public?byte[]?key; ????public?int?options; ????public?int?destcnt; ????public?RpcCollector(byte[]?key,?int?options,?int?destcnt)?{ ????????this.key?=?key; ????????this.options?=?options; ????????this.destcnt?=?destcnt; ????} ????public?void?addResponse(Serializable?message,?Member?sender){ ???? RpcResponse?resp?=?new?RpcResponse(sender,message); ????????responses.add(resp); ????} ????public?boolean?isComplete()?{ ????????if?(?destcnt?<=?0?)?return?true; ????????switch?(options)?{ ????????????case?RpcResponseType.ALL_REPLY: ????????????????return?destcnt?==?responses.size(); ????????????case?RpcResponseType.MAJORITY_REPLY: ????????????{ ????????????????float?perc?=?((float)responses.size())?/?((float)destcnt); ????????????????return?perc?>=?0.50f; ????????????} ????????????case?RpcResponseType.FIRST_REPLY: ????????????????return?responses.size()>0; ????????????default: ????????????????return?false; ????????} ????} ????public?RpcResponse[]?getResponses()?{ ????????return?responses.toArray(new?RpcResponse[responses.size()]); ????} } ~~~ ⑥RPC核心類,是整個RPC的抽象,它要實現tribes框架的ChannelListener接口,在messageReceived方法中處理接收到的消息。因為所有的消息都會通過此方法,所以它必須要根據key去處理對應的線程,同時它也要負責調用RpcCallback接口定義的相關的方法,例如響應請求的replyRequest方法和處理殘留的響應leftOver方法,殘留響應是指有時我們在接收到第一個響應后就喚起線程。 ~~~ public?class?RpcChannel?implements?ChannelListener?{ private?Channel?channel; private?RpcCallback?callback; private?byte[]?rpcId; private?int?replyMessageOptions?=?0; private?HashMap?responseMap?=?new?HashMap(); public?RpcChannel(byte[]?rpcId,?Channel?channel,?RpcCallback?callback)?{ this.rpcId?=?rpcId; this.channel?=?channel; this.callback?=?callback; channel.addChannelListener(this); } public?RpcResponse[]?send(Member[]?destination,?Serializable?message, int?rpcOptions,?int?channelOptions,?long?timeout) throws?ChannelException?{ int?sendOptions?=?channelOptions&?~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; byte[]?key?=?UUIDGenerator.randomUUID(false); RpcCollector?collector?=?new?RpcCollector(key,?rpcOptions, destination.length); try?{ synchronized?(collector)?{ if?(rpcOptions?!=?RpcResponseType.NO_REPLY) responseMap.put(key,?collector); RpcMessage?rmsg?=?new?RpcMessage(rpcId,?key,?message); channel.send(destination,?rmsg,?sendOptions); if?(rpcOptions?!=?RpcResponseType.NO_REPLY) collector.wait(timeout); } }?catch?(InterruptedException?ix)?{ Thread.currentThread().interrupt(); }?finally?{ responseMap.remove(key); } return?collector.getResponses(); } ~~~ @Override ~~~ public?void?messageReceived(Serializable?msg,?Member?sender)?{ RpcMessage?rmsg?=?(RpcMessage)?msg; byte[]?key?=?rmsg.uuid; if?(rmsg.reply)?{ RpcCollector?collector?=?responseMap.get(key); if?(collector?==?null)?{ callback.leftOver(rmsg.message,?sender); }?else?{ synchronized?(collector)?{ if?(responseMap.containsKey(key))?{ collector.addResponse(rmsg.message,?sender); if?(collector.isComplete()) collector.notifyAll(); }?else?{ callback.leftOver(rmsg.message,?sender); } } } }?else?{ Serializable?reply?=?callback.replyRequest(rmsg.message,?sender); rmsg.reply?=?true; rmsg.message?=?reply; try?{ channel.send(new?Member[]?{?sender?},?rmsg,?replyMessageOptions &?~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); }?catch?(Exception?x)?{ } } } ~~~ @Override ~~~ public?boolean?accept(Serializable?msg,?Member?sender)?{ if?(msg?instanceof?RpcMessage)?{ RpcMessage?rmsg?=?(RpcMessage)?msg; return?Arrays.equals(rmsg.rpcId,?rpcId); }?else return?false; } } ~~~ ⑦自定義一個RPC,它要實現RpcCallback接口,分別對請求處理和殘留響應處理,這里請求處理僅僅是簡單返回“hello,response?for?you!”作為響應消息,殘留響應處理則是簡單輸出“receive?a?leftover?message!”。假如整個集群有五個節點,由于接收模式設置成了FIRST_REPLY,所以每個只會接受一個響應消息,其他的響應都被當做殘留響應處理。 ~~~ public?class?MyRPC?implements?RpcCallback?{ @Override public?Serializable?replyRequest(Serializable?msg,?Member?sender)?{ RpcMessage?mapmsg?=?(RpcMessage)?msg; mapmsg.message?=?"hello,response?for?you!"; return?mapmsg; } @Override public?void?leftOver(Serializable?msg,?Member?sender)?{ System.out.println("receive?a?leftover?message!"); } public?static?void?main(String[]?args)?{ MyRPC?myRPC?=?new?MyRPC(); byte[]?rpcId?=?new?byte[]?{?1,?1,?1,?1?}; byte[]?key?=?new?byte[]?{?0,?0,?0,?0?}; String?message?=?"hello"; int?sendOptions?=?Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |?Channel.SEND_OPTIONS_USE_ACK; RpcMessage?msg?=?new?RpcMessage(rpcId,?key,?(Serializable)?message); RpcChannel?rpcChannel?=?new?RpcChannel(rpcId,?channel,?myRPC); RpcResponse[]?resp?=?rpcChannel.send(channel.getMembers(),?msg, RpcResponseType.FIRST_REPLY,?sendOptions,?3000); ???????while(true) Thread.currentThread().sleep(1000); } } ~~~ 可以看到通過上面的RPC封裝后,上層可以把更多的精力關注到消息邏輯處理上面了,而不必關注具體的網絡IO如何實現,屏蔽了繁雜重復的網絡傳輸操作,為上層提供了很大的方便。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看