<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                RPC調用本質上就是一種網絡編程,客戶端向服務器發送消息,服務器拿到消息之后做后續動作。只是RPC這種消息比較特殊,它封裝了方法調用,包括方法名,方法參數。服務端拿到這個消息之后,解碼消息,然后要通過方法調用模型來完成實際服務器端業務方法的調用。 這篇講講Thrfit的方法調用模型。Thrift的方法調用模型很簡單,就是通過方法名和實際方法實現類的注冊完成,沒有使用反射機制,類加載機制。 和方法調用相關的幾個核心類: 1. 自動生成的Iface接口,是遠程方法的頂層接口 2. 自動生成的Processor類及相關父類,包括TProcessor接口,TBaseProcess抽象類 3. ProcessFunction抽象類,抽象了一個具體的方法調用,包含了方法名信息,調用方法的抽象過程等 4. TNonblcokingServer,是NIO服務器的默認實現,通過Args參數來配置Processor等信息 5. FrameBuffer類,服務器NIO的緩沖區對象,這個對象在服務器端收到全包并解碼后,會調用Processor去完成實際的方法調用 6. 服務器端的方法的具體實現類,實現Iface接口 ![](https://box.kancloud.cn/2016-02-19_56c6c62b7baa8.jpg) 下面逐個來分析相關的類。 Iface接口是自動生成的,描述了方法的接口。 服務器端服務提供方DemoService要實現Iface接口 ~~~ public class DemoService { public interface Iface { public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException; } } public class DemoServiceImpl implements DemoService.Iface{ ?@Override ?public int demoMethod(String param1, Parameter param2, ??? ??? ?Map<String, String> param3) throws TException { ??? ? ??? ?return 0; ?} } ~~~ 來看TProcess相關類和接口 1. TProcessor就定義了一個頂層的調用方法process,參數是輸入流和輸出流 2. 抽象類TBaseProcessor提供了TProcessor的process的默認實現,先讀消息頭,拿到要調用的方法名,然后從維護的一個Map中取ProcessFunction對象。ProcessFunction對象是實際方法的抽象,調用它的process方法,實際是調用了實際的方法。 3. Processor類是自動生成了,它依賴Iface接口,負責把實際的方法實現和方法的key關聯起來,放到Map中維護 ~~~ public interface TProcessor { public boolean process(TProtocol in, TProtocol out) throws TException; } public abstract class TBaseProcessor<I> implements TProcessor { private final I iface; private final Map<String,ProcessFunction<I, ? extends TBase>> processMap; protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) { ?? this.iface = iface; ?? this.processMap = processFunctionMap; } @Override public boolean process(TProtocol in, TProtocol out) throws TException { ?? TMessage msg = in.readMessageBegin(); ?? ProcessFunction fn = processMap.get(msg.name); ?? if (fn == null) { ???? TProtocolUtil.skip(in, TType.STRUCT); ???? in.readMessageEnd(); ???? TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'"); ???? out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); ???? x.write(out); ???? out.writeMessageEnd(); ???? out.getTransport().flush(); ???? return true; ?? } ?? fn.process(msg.seqid, in, out, iface); ?? return true; } } ~~~ ~~~ public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor { public Processor(I iface) { super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>())); } protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { super(iface, getProcessMap(processMap)); } private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { processMap.put("demoMethod", new demoMethod()); return processMap; } private static class demoMethod<I extends Iface> extends org.apache.thrift.ProcessFunction<I, demoMethod_args> { public demoMethod() { super("demoMethod"); } protected demoMethod_args getEmptyArgsInstance() { return new demoMethod_args(); } protected demoMethod_result getResult(I iface, demoMethod_args args) throws org.apache.thrift.TException { demoMethod_result result = new demoMethod_result(); result.success = iface.demoMethod(args.param1, args.param2, args.param3); result.setSuccessIsSet(true); return result; } } } ~~~ 自動生成的demoMethod類繼承了ProcessFunction類,它負載把方法參數,iface, 方法返回值這些抽象的概念組合在一起,通過抽象模型來完成實際方法的調用。實際方法的實現者實現了Iface接口。 TNonblockingServer是NIO服務器的實現,它通過Selector來檢查IO就緒狀態,進而調用相關的Channel。就方法調用而言,它處理的是讀事件,用handelRead()來進一步處理 ~~~ private void select() { try { // wait for io events. selector.select(); // process the io events we received Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // skip if not valid if (!key.isValid()) { cleanupSelectionKey(key); continue; } // if the key is marked Accept, then it has to be the server // transport. if (key.isAcceptable()) { handleAccept(); } else if (key.isReadable()) { // deal with reads handleRead(key); } else if (key.isWritable()) { // deal with writes handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!", e); } } protected void handleRead(SelectionKey key) { ???? FrameBuffer buffer = (FrameBuffer) key.attachment(); ???? if (!buffer.read()) { ?????? cleanupSelectionKey(key); ?????? return; ???? } ???? // if the buffer's frame read is complete, invoke the method. ???? <strong>if (buffer.isFrameFullyRead()) { ?????? if (!requestInvoke(buffer)) { ???????? cleanupSelectionKey(key); ?????? } ???? }</strong> ?? } protected boolean requestInvoke(FrameBuffer frameBuffer) { ?? frameBuffer.invoke(); ?? return true; } ~~~ 非阻塞同步IO的NIO服務器都會使用緩沖區來存放讀寫的中間狀態。FrameBuffer就是這樣的一個緩沖區,它由于涉及到方法調用,所以提供了invoke()方法來實現對Processor的調用。 ~~~ public void invoke() { TTransport inTrans = getInputTransport(); TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); try { processorFactory_.getProcessor(inTrans).process(inProt, outProt); responseReady(); return; } catch (TException te) { LOGGER.warn("Exception while invoking!", te); } catch (Throwable t) { LOGGER.error("Unexpected throwable while invoking!", t); } // This will only be reached when there is a throwable. state_ = FrameBufferState.AWAITING_CLOSE; requestSelectInterestChange(); } ~~~ FrameBuffer使用了processorFactory來獲得Processor。ProcessorFactory是在創建服務器的時候傳遞過來的,只是對Processor的簡單封裝。 ~~~ protected TServer(AbstractServerArgs args) { processorFactory_ = args.processorFactory; serverTransport_ = args.serverTransport; inputTransportFactory_ = args.inputTransportFactory; outputTransportFactory_ = args.outputTransportFactory; inputProtocolFactory_ = args.inputProtocolFactory; outputProtocolFactory_ = args.outputProtocolFactory; } public class TProcessorFactory { private final TProcessor processor_; public TProcessorFactory(TProcessor processor) { ?? processor_ = processor; } public TProcessor getProcessor(TTransport trans) { ?? return processor_; } } public T processor(TProcessor processor) { ???? this.processorFactory = new TProcessorFactory(processor); ???? return (T) this; ?? } ~~~ 下面是一個實際的TNonblockingServer的配置實例 除了配置服務器運行的基本參數,最重要的就是把實際的服務提供者通過服務器參數的方式作為Processor傳遞給TNonblockingServer,供FrameBuffer調用。 ~~~ public class DemoServiceImpl implements DemoService.Iface{ @Override public int demoMethod(String param1, Parameter param2, Map<String, String> param3) throws TException { return 0; } public static void main(String[] args){ TNonblockingServerSocket socket; try { socket = new TNonblockingServerSocket(9090); TNonblockingServer.Args options = new TNonblockingServer.Args(socket); TProcessor processor = new DemoService.Processor(new DemoServiceImpl()); options.processor(processor); options.protocolFactory(new TCompactProtocol.Factory()); TServer server = new TNonblockingServer(options); server.serve(); } catch (Exception e) { throw new RuntimeException(e); } } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看