## 流程
injvm協議的過程是dubbo中最簡單的協議,但除了沒有注冊中心外,還是可以對其基本流程進行全面的了解。

## 1.發布到dubbo協議
在發布流程中,我們已經知道了服務要發布為dubbo協議時,不同點在發布Invoker的不同。非injvm協議都使用了RegistryProtocol的export()來發布服務,RegistryProtocol的內部變量bounds中保存了`<服務,協議>`對應的Exporter,每次發布后會保存到這個map中。
發布的過程如下:
```
// originInvoker是發布服務的公共流程中生成的Invoker對象
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 使用dubbo協議發布
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 獲取注冊中心
final Registry registry = getRegistry(originInvoker);
// 通過invoker的url 獲取 providerUrl的地址
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// 通知注冊中心發布服務
registry.register(registedProviderUrl);
// 訂閱override數據
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保證每次export都返回一個新的exporter實例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
}
public void unexport() {
}
};
}
```
### 1.1 使用dubbo協議發布
dubbo協議發布服務時,會根據發布時生成的Invoker,構建InvokerFilterChain,并添加監聽事件,最后,打開協議指定的服務器,等待客戶端連接后處理調用。
doLocalExport(originInvoker);中首先根據服務名在bounds之后查找對應的Exporter,如果找到,說明已經發不過了;如果沒有找到則使用DubboProtocol協議發布Invoker。在發布之前,會將發布之前生成的Invoker包裝為InvokerDelegete對象,這是因為originInvoker的url是注冊中心協議的url`registry://xxxx/xxx?xx`;而dubboProtocol發布時需要改為`dubbo://xxx/xx?xxx`
```
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
```
接下來,繼續分析dubboProtocol的export,protocol變量與之前相同,會根據協議名稱獲取協議鏈:
```
|- ProtocolFilterWrapper
|- ProtocolListenerWrapper
|- DubboProtocol
```
ProtocolFilterWrapper:主要用來生成調用鏈,內部的buildInvokerChain方法會查找Filter的實現類,查找group為provider的,并根據order排序,將這些Filter連接成一個調用鏈 InvokerFilterChain,最終調用上一步生成的InvokerDelegete
```
EchoFilter -> ClassloaderFilter -> GenericFilter ->
ContextFilter -> TraceFilter -> TimeoutFilter ->
MonitorFilter -> ExceptionFilter -> InvokerDelegete
```
ProtocolListenerWrapper:主要用來添加監聽事件。
DubboProtocol:首先調用DubboProtocol的export,內部將InvokerFilterChain的頭節點保存到DubboExporter中,最后打開服務器,最終返回DubboExporter。打開服務器的過程見3.1節。
### 1.2 獲取注冊中心
在1.1中雖然創建了Dubbo協議的Invoker,但還需要發布到注冊中心,發布之前需要獲取注冊中心,以Zookeeper注冊中心為例,獲取注冊中心時會根據url中的`registry=zookeeper`參數獲取RegistryFactory,再由工廠獲取注冊中心
```
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
// 目的是根據修改registry參數值修改url
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryFactory.getRegistry(registryUrl);// 此次協議為zookeeper://xxx/xxx?xxx
}
```
RegistryFactory也是通過ExtensionLoader機制獲取的,由于時zookeeper協議,會返回ZookeeperRegistryFactory對象
```
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
```
zookeeperTransporter是對zookeeper節點操作的抽象,可以使用`CuratorZookeeperTransporter`或`ZkclientZookeeperTransporter`,創建注冊中心對象最終會獲得ZookeeperRegistry對象的實例。
### 1.3 注冊服務
注冊服務是向注冊中心注冊,首先使用getRegistedProviderUrl獲取服務提供者的url:`dubbo://IP:PORT/接口名?param`,并使用注冊中心的register方法注冊服務,Zookeeper注冊中心會創建`/dubbo/接口名/providers/dubbo://xxxx/xxx?xxx`節點,providers下面都是提供了該服務的協議。
### 1.4 訂閱服務
使用getSubscribedOverrideUrl方法獲取訂閱服務的url`provider://xxx/xxx?xxx`,這個過程中,會向url中添加參數category;category用來指定該url關心的變化,如configurations、routers、providers和consumers等等;
Zookeeper注冊中心根據url中category的值configurators創建Zookeeper的節點`/dubbo/接口名/configurators`,并監聽該節點,其子節點發生變化后調用ZookeeperRegistry的notify;最后,觸發一遍notify,執行OverrideListener事件。
OverrideListener是一個監聽事件,在zookeeper的節點發生變化后會調用notify(),目的是服務url發生更新后能夠協議的Invoker。
## 2.引用服務
### 2.1 創建Invoker
根據第四章引用服務流程中創建Invoker的描述并未看到DubboProtocol創建Invoker的過程,這是因為`RegistryProtocol`的refer中創建了RegistryDirectory對象,并使用`cluster.join(directory)`方法返回了一個失敗重試的Invoker就返回了。
其實DubboProtocol refer創建Invoker的過程正是在RegistryDirectory的監聽函數中,RegistryDirectory的subscribe結束后會收到觸發一遍,由于providers節點下有提供者,refreshInvoker中會使用DubboProtocol引用服務
```
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
```
protocol與前面章節的類似,會有協議鏈,ProtocolFilterWrapper用來構建InvokerFilterChain,與發布服務不同的是此時使用的是`@Activate(group = Constants.CONSUMER)`的Filter實現類;ProtocolListenerWrapper用來添加監聽函數。
```
|- ProtocolFilterWrapper
|- ProtocolListenerWrapper
|- DubboProtocol
```
DubboProtocol的refer時,首先要獲取客戶端,如果不存在的話就需要打開客戶端了,`<dubbo:reference/>`標簽中可以設置客戶端連接數量,打開客戶端時也會根據`connections`中設置的值初始化多個客戶端,返回ExchangeClient數組。
之后創建DubboInvoker對象并返回。
客戶端的應用中執行服務的方法時,最終會調用DubboInvoker的doInvoke,內部會使用currentClient.request(inv, timeout).get();想服務器發生請求。
## 3.底層通信
### 3.1 打開服務器流程
在DubboProtocol協議發布服務時,會打開服務器,DubboProtocol中有一個變量serverMap保存了`<ip:dubboPort>`對應的ExchangeServer,如果map中不存在,需要創建服務器createServer()。
底層通信分為兩個層:信息交換層和傳輸層,傳輸層封裝了netty、mina等服務器的實現。
```
|- Exchanger 信息交換層 header
|- Transporters 傳輸層 netty
|- NettyServer
|- HeaderExchangeServer
```
1. 創建服務器時,`dubbo:protocol`標簽中可以設置協議的服務器端實現類型,比如:dubbo協議的mina,netty等,http協議的jetty,servlet等,dubbo協議默認的服務器是netty。
2. Exchangers.bind會根據url啟動服務器,首先根據url獲取Exchanger,默認的exchanger是header,對應的是HeaderExchanger,exchanger是信息交換層
```
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
```
4.Exchanger在執行bind方法時會使用Transporters的bind
```
getTransporter().bind(url, handler);
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
```
getTransporter會獲取url獲取Transporter的實現類,默認的是NettyTransporter,使用的是netty3版本,也可以配置成netty4
```
<dubbo:protocol name="dubbo" port="20880" transporter="netty4"/>
```
下面以netty4為例說明,
```
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
```
此時的listener是
```
|- DecodeHandler
|- HeaderExchangeHandler
|- DubboProtocol中的requestHandler
```
在NettyServer中會對這個ChannelHandler鏈進行再包裝
```
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
```
Dispatcher是線程池配置,默認的是AllDispatcher,內部會返回AllChannelHandler,此時,ChannelHandler-Chain通道處理鏈如下:
```
|-MultiMessageHandler
|-HeartbeatHandler
|-AllChannelHandler
|- DecodeHandler
|- HeaderExchangeHandler
|- DubboProtocol中的requestHandler
```
netty的open中會設置netty的子Channel的ChannnelHandler并啟動服務器,NettyChannel鏈為
```
decoder - NettyCodecAdapter.getDecoder()
encoder - NettyCodecAdapter.getEncoder()
handler - NettyServerHandler
```
最后,將NettyServer包裝為HeaderExchangeServer并返回。
### 3.2 打開客戶端流程
創建DubboInvoker時會打開客戶端,會使用Exchangers的connect方法
```
client = Exchangers.connect(url, requestHandler);
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
```
默認的Exchanger是HeaderExchanger,內部的connect使用Transporters的connect方法
```
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
```
使用時,構建了dubboChannel鏈
```
|- DecodeHandler
|- HeaderExchangeHandler
|- DubboProtocol中的requestHandler
```
在創建NettyClient中,跟打開服務器一樣會構建DubboChannel鏈
```
|-MultiMessageHandler
|-HeartbeatHandler
|-AllChannelHandler
|- DecodeHandler
|- HeaderExchangeHandler
|- DubboProtocol中的requestHandler
```
Netty Channel的鏈為:
```
decoder - NettyCodecAdapter.getDecoder()
encoder - NettyCodecAdapter.getEncoder()
handler - NettyClientHandler
```
### 3.2 codec
在打開netty服務器和客戶端的過程中,都使用了NettyCodecAdapter獲取編解碼的ChannelHandler,在NettyCodecAdapter中有一個變量Codec2,會根據url中codec的值獲取,默認為dubbo,返回的是DubboCountCodec,
```
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
.getExtension(codecName));
}
}
```
DubboCountCodec內部有一個編解碼的ChannelHandler,內部調用DubboCodec的編解碼方法,這個類實現了dubbo協議傳輸時的byte數組的數據結構,主要會根據使用序列化參數`serialization`的值,默認是`hessian2`獲取到序列化對象,根據序列化對象來序列化Request。
與發送類似,服務器端收到請求后,會首先進入decode方法,根據序列化方式講Request對象序列化出來,然
### 3.3 數據傳輸
客戶端調用SpringContext返回的代碼類的方法時,最終會進入DubboInvoker的doInvoker方法,在內部會使用ExchangeClient發送請求,內部將Invocation包裝為Request對象,發送出去,最終會通過codec編碼然后傳遞給
```
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
```
服務器端收到Request對象后,會根據選擇的序列化方法decode解析出Request對象,然后,在AllChannelHandler中提交給線程池處理,最終調用到DubboProtocol中的requestHandler的reply方法來處理請求,DubboProtocol中的exporterMap保存了服務名與Invoker的映射關系,最后使用Invoker的doInvoker調用Wrapper的invokeMethod方法從而完成調用。