<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ## 流程 injvm協議的過程是dubbo中最簡單的協議,但除了沒有注冊中心外,還是可以對其基本流程進行全面的了解。 ![dubbo流程](http://www.uxiaowo.com/dubbo/dubbo.png) ## 1.發布到dubbo協議 在發布流程中,我們已經知道了服務要發布為dubbo協議時,不同點在發布Invoker的不同。非injvm協議都使用了RegistryProtocol的export()來發布服務,RegistryProtocol的內部變量bounds中保存了`<服務,協議>`對應的Exporter,每次發布后會保存到這個map中。 發布的過程如下: ``` // originInvoker是發布服務的公共流程中生成的Invoker對象 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 使用dubbo協議發布 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // 獲取注冊中心 final Registry registry = getRegistry(originInvoker); // 通過invoker的url 獲取 providerUrl的地址 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); // 通知注冊中心發布服務 registry.register(registedProviderUrl); // 訂閱override數據 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保證每次export都返回一個新的exporter實例 return new Exporter<T>() { public Invoker<T> getInvoker() { } public void unexport() { } }; } ``` ### 1.1 使用dubbo協議發布 dubbo協議發布服務時,會根據發布時生成的Invoker,構建InvokerFilterChain,并添加監聽事件,最后,打開協議指定的服務器,等待客戶端連接后處理調用。 doLocalExport(originInvoker);中首先根據服務名在bounds之后查找對應的Exporter,如果找到,說明已經發不過了;如果沒有找到則使用DubboProtocol協議發布Invoker。在發布之前,會將發布之前生成的Invoker包裝為InvokerDelegete對象,這是因為originInvoker的url是注冊中心協議的url`registry://xxxx/xxx?xx`;而dubboProtocol發布時需要改為`dubbo://xxx/xx?xxx` ``` private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; } ``` 接下來,繼續分析dubboProtocol的export,protocol變量與之前相同,會根據協議名稱獲取協議鏈: ``` |- ProtocolFilterWrapper |- ProtocolListenerWrapper |- DubboProtocol ``` ProtocolFilterWrapper:主要用來生成調用鏈,內部的buildInvokerChain方法會查找Filter的實現類,查找group為provider的,并根據order排序,將這些Filter連接成一個調用鏈 InvokerFilterChain,最終調用上一步生成的InvokerDelegete ``` EchoFilter -> ClassloaderFilter -> GenericFilter -> ContextFilter -> TraceFilter -> TimeoutFilter -> MonitorFilter -> ExceptionFilter -> InvokerDelegete ``` ProtocolListenerWrapper:主要用來添加監聽事件。 DubboProtocol:首先調用DubboProtocol的export,內部將InvokerFilterChain的頭節點保存到DubboExporter中,最后打開服務器,最終返回DubboExporter。打開服務器的過程見3.1節。 ### 1.2 獲取注冊中心 在1.1中雖然創建了Dubbo協議的Invoker,但還需要發布到注冊中心,發布之前需要獲取注冊中心,以Zookeeper注冊中心為例,獲取注冊中心時會根據url中的`registry=zookeeper`參數獲取RegistryFactory,再由工廠獲取注冊中心 ``` private Registry getRegistry(final Invoker<?> originInvoker) { URL registryUrl = originInvoker.getUrl(); // 目的是根據修改registry參數值修改url if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); } return registryFactory.getRegistry(registryUrl);// 此次協議為zookeeper://xxx/xxx?xxx } ``` RegistryFactory也是通過ExtensionLoader機制獲取的,由于時zookeeper協議,會返回ZookeeperRegistryFactory對象 ``` public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } } ``` zookeeperTransporter是對zookeeper節點操作的抽象,可以使用`CuratorZookeeperTransporter`或`ZkclientZookeeperTransporter`,創建注冊中心對象最終會獲得ZookeeperRegistry對象的實例。 ### 1.3 注冊服務 注冊服務是向注冊中心注冊,首先使用getRegistedProviderUrl獲取服務提供者的url:`dubbo://IP:PORT/接口名?param`,并使用注冊中心的register方法注冊服務,Zookeeper注冊中心會創建`/dubbo/接口名/providers/dubbo://xxxx/xxx?xxx`節點,providers下面都是提供了該服務的協議。 ### 1.4 訂閱服務 使用getSubscribedOverrideUrl方法獲取訂閱服務的url`provider://xxx/xxx?xxx`,這個過程中,會向url中添加參數category;category用來指定該url關心的變化,如configurations、routers、providers和consumers等等; Zookeeper注冊中心根據url中category的值configurators創建Zookeeper的節點`/dubbo/接口名/configurators`,并監聽該節點,其子節點發生變化后調用ZookeeperRegistry的notify;最后,觸發一遍notify,執行OverrideListener事件。 OverrideListener是一個監聽事件,在zookeeper的節點發生變化后會調用notify(),目的是服務url發生更新后能夠協議的Invoker。 ## 2.引用服務 ### 2.1 創建Invoker 根據第四章引用服務流程中創建Invoker的描述并未看到DubboProtocol創建Invoker的過程,這是因為`RegistryProtocol`的refer中創建了RegistryDirectory對象,并使用`cluster.join(directory)`方法返回了一個失敗重試的Invoker就返回了。 其實DubboProtocol refer創建Invoker的過程正是在RegistryDirectory的監聽函數中,RegistryDirectory的subscribe結束后會收到觸發一遍,由于providers節點下有提供者,refreshInvoker中會使用DubboProtocol引用服務 ``` invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); ``` protocol與前面章節的類似,會有協議鏈,ProtocolFilterWrapper用來構建InvokerFilterChain,與發布服務不同的是此時使用的是`@Activate(group = Constants.CONSUMER)`的Filter實現類;ProtocolListenerWrapper用來添加監聽函數。 ``` |- ProtocolFilterWrapper |- ProtocolListenerWrapper |- DubboProtocol ``` DubboProtocol的refer時,首先要獲取客戶端,如果不存在的話就需要打開客戶端了,`<dubbo:reference/>`標簽中可以設置客戶端連接數量,打開客戶端時也會根據`connections`中設置的值初始化多個客戶端,返回ExchangeClient數組。 之后創建DubboInvoker對象并返回。 客戶端的應用中執行服務的方法時,最終會調用DubboInvoker的doInvoke,內部會使用currentClient.request(inv, timeout).get();想服務器發生請求。 ## 3.底層通信 ### 3.1 打開服務器流程 在DubboProtocol協議發布服務時,會打開服務器,DubboProtocol中有一個變量serverMap保存了`<ip:dubboPort>`對應的ExchangeServer,如果map中不存在,需要創建服務器createServer()。 底層通信分為兩個層:信息交換層和傳輸層,傳輸層封裝了netty、mina等服務器的實現。 ``` |- Exchanger 信息交換層 header |- Transporters 傳輸層 netty |- NettyServer |- HeaderExchangeServer ``` 1. 創建服務器時,`dubbo:protocol`標簽中可以設置協議的服務器端實現類型,比如:dubbo協議的mina,netty等,http協議的jetty,servlet等,dubbo協議默認的服務器是netty。 2. Exchangers.bind會根據url啟動服務器,首先根據url獲取Exchanger,默認的exchanger是header,對應的是HeaderExchanger,exchanger是信息交換層 ``` public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } } ``` 4.Exchanger在執行bind方法時會使用Transporters的bind ``` getTransporter().bind(url, handler); public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); } ``` getTransporter會獲取url獲取Transporter的實現類,默認的是NettyTransporter,使用的是netty3版本,也可以配置成netty4 ``` <dubbo:protocol name="dubbo" port="20880" transporter="netty4"/> ``` 下面以netty4為例說明, ``` public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } ``` 此時的listener是 ``` |- DecodeHandler |- HeaderExchangeHandler |- DubboProtocol中的requestHandler ``` 在NettyServer中會對這個ChannelHandler鏈進行再包裝 ``` public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); } ``` Dispatcher是線程池配置,默認的是AllDispatcher,內部會返回AllChannelHandler,此時,ChannelHandler-Chain通道處理鏈如下: ``` |-MultiMessageHandler |-HeartbeatHandler |-AllChannelHandler |- DecodeHandler |- HeaderExchangeHandler |- DubboProtocol中的requestHandler ``` netty的open中會設置netty的子Channel的ChannnelHandler并啟動服務器,NettyChannel鏈為 ``` decoder - NettyCodecAdapter.getDecoder() encoder - NettyCodecAdapter.getEncoder() handler - NettyServerHandler ``` 最后,將NettyServer包裝為HeaderExchangeServer并返回。 ### 3.2 打開客戶端流程 創建DubboInvoker時會打開客戶端,會使用Exchangers的connect方法 ``` client = Exchangers.connect(url, requestHandler); public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).connect(url, handler); } ``` 默認的Exchanger是HeaderExchanger,內部的connect使用Transporters的connect方法 ``` public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } ``` 使用時,構建了dubboChannel鏈 ``` |- DecodeHandler |- HeaderExchangeHandler |- DubboProtocol中的requestHandler ``` 在創建NettyClient中,跟打開服務器一樣會構建DubboChannel鏈 ``` |-MultiMessageHandler |-HeartbeatHandler |-AllChannelHandler |- DecodeHandler |- HeaderExchangeHandler |- DubboProtocol中的requestHandler ``` Netty Channel的鏈為: ``` decoder - NettyCodecAdapter.getDecoder() encoder - NettyCodecAdapter.getEncoder() handler - NettyClientHandler ``` ### 3.2 codec 在打開netty服務器和客戶端的過程中,都使用了NettyCodecAdapter獲取編解碼的ChannelHandler,在NettyCodecAdapter中有一個變量Codec2,會根據url中codec的值獲取,默認為dubbo,返回的是DubboCountCodec, ``` protected static Codec2 getChannelCodec(URL url) { String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class) .getExtension(codecName)); } } ``` DubboCountCodec內部有一個編解碼的ChannelHandler,內部調用DubboCodec的編解碼方法,這個類實現了dubbo協議傳輸時的byte數組的數據結構,主要會根據使用序列化參數`serialization`的值,默認是`hessian2`獲取到序列化對象,根據序列化對象來序列化Request。 與發送類似,服務器端收到請求后,會首先進入decode方法,根據序列化方式講Request對象序列化出來,然 ### 3.3 數據傳輸 客戶端調用SpringContext返回的代碼類的方法時,最終會進入DubboInvoker的doInvoker方法,在內部會使用ExchangeClient發送請求,內部將Invocation包裝為Request對象,發送出去,最終會通過codec編碼然后傳遞給 ``` public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; } ``` 服務器端收到Request對象后,會根據選擇的序列化方法decode解析出Request對象,然后,在AllChannelHandler中提交給線程池處理,最終調用到DubboProtocol中的requestHandler的reply方法來處理請求,DubboProtocol中的exporterMap保存了服務名與Invoker的映射關系,最后使用Invoker的doInvoker調用Wrapper的invokeMethod方法從而完成調用。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看