> anoyi中的GrpcClient
1. 查看屬性
~~~
private static final Map<String, ServerContext> serverMap = new HashMap<>();
private final GrpcProperties grpcProperties;
private final SerializeService serializeService;
private ClientInterceptor clientInterceptor;
~~~
這里四個屬性,serverMap 的key值是遠程服務端的名字列表,ServerContext是http2.0的請求context,grpcProperties是客戶端的配置屬性,有端口號,啟動時候綁定的,serializeService是序列化的類,clientInterceptor是攔截器,因為grpc本質是http,所以有必要在發送的時候加token或者啥的令牌
2. 構造方法(自己去看)
~~~
public GrpcClient(GrpcProperties grpcProperties, SerializeService serializeService) {
this.grpcProperties = grpcProperties;
this.serializeService = serializeService;
}
public GrpcClient(GrpcProperties grpcProperties, SerializeService serializeService, ClientInterceptor clientInterceptor) {
this.grpcProperties = grpcProperties;
this.serializeService = serializeService;
this.clientInterceptor = clientInterceptor;
}
~~~
3. 初始化方法
~~~
/**
* 初始化
*/
public void init(){
List<RemoteServer> remoteServers = grpcProperties.getRemoteServers();
if (!CollectionUtils.isEmpty(remoteServers)) {
for (RemoteServer server : remoteServers) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(server.getHost(), server.getPort())
.defaultLoadBalancingPolicy("round_robin")
.nameResolverFactory(new DnsNameResolverProvider())
.idleTimeout(30, TimeUnit.SECONDS)
.usePlaintext().build();
if (clientInterceptor != null){
Channel newChannel = ClientInterceptors.intercept(channel, clientInterceptor);
serverMap.put(server.getServer(), new ServerContext(newChannel, serializeService));
} else {
Class clazz = grpcProperties.getClientInterceptor();
if (clazz == null) {
serverMap.put(server.getServer(), new ServerContext(channel, serializeService));
}else {
try {
ClientInterceptor interceptor = (ClientInterceptor) clazz.newInstance();
Channel newChannel = ClientInterceptors.intercept(channel, interceptor);
serverMap.put(server.getServer(), new ServerContext(newChannel, serializeService));
} catch (InstantiationException | IllegalAccessException e) {
log.warn("ClientInterceptor cannot use, ignoring...");
serverMap.put(server.getServer(), new ServerContext(channel, serializeService));
}
}
}
}
}
}
~~~
4. 解析初始化方法
我們看到該方法里面有這么一段代碼
~~~
ManagedChannel channel = ManagedChannelBuilder.forAddress(server.getHost(), server.getPort())
.defaultLoadBalancingPolicy("round_robin")
.nameResolverFactory(new DnsNameResolverProvider())
.idleTimeout(30, TimeUnit.SECONDS)
.usePlaintext().build();
~~~
這個方法只是實例化客戶端連接器,而且這個方法是在循環中,說明我們客戶端可以配置多個服務端連接。所以客戶端是一對多服務端。才會有上面的serverMap ,其實這個方法很就是生成ServerContext存放到內存中,要訪問哪個服務端的時候就拿出來,接下來看看下面的方法
~~~
/**
* 連接遠程服務
*/
public static ServerContext connect(String serverName) {
return serverMap.get(serverName);
}
~~~
看到了吧,就是一對多服務端,要的時候取出相應名字的context,接下來看看ServerContext這個類,這個類是我們封裝的
5. 查看ServerContext
* [ ] 屬性
~~~
private Channel channel;
private final SerializeService defaultSerializeService;
private CommonServiceGrpc.CommonServiceBlockingStub blockingStub;
~~~
這三個屬性,有用的是Channel ,這個是通道,SerializeService 是序列化的方法,自己寫,記得要跟服務端約定好,
blockingStub是grpc的關鍵,是google生成的方法,實現通信。
* [ ] 構造方法
~~~
ServerContext(Channel channel, SerializeService serializeService) {
this.channel = channel;
this.defaultSerializeService = serializeService;
blockingStub = CommonServiceGrpc.newBlockingStub(channel);
}
~~~
很簡單的構造方法,傳參,實例化
* [ ] 請求方法
~~~
/**
* 處理 gRPC 請求
*/
public GrpcResponse handle(SerializeType serializeType, GrpcRequest grpcRequest) {
SerializeService serializeService = SerializeUtils.getSerializeService(serializeType, this.defaultSerializeService);
ByteString bytes = serializeService.serialize(grpcRequest);
int value = (serializeType == null ? -1 : serializeType.getValue());
GrpcService.Request request = GrpcService.Request.newBuilder().setSerialize(value).setRequest(bytes).build();
GrpcService.Response response = null;
try{
response = blockingStub.handle(request);
}catch (Exception exception){
log.warn("rpc exception: {}", exception.getMessage());
if ("UNAVAILABLE: io exception".equals(exception.getMessage().trim())){
response = blockingStub.handle(request);
}
}
return serializeService.deserialize(response);
}
~~~
這個方法很簡單就是序列化并發送,由于java的http2.0采用ByteString通訊,所以必須序列化成ByteString,這里我們提供三種方法,一個是FastJSONSerializeService,這個是阿里的fastjson序列化,一個是ProtoStuffSerializeService,
這個是protobuff的包,是谷歌提供的,一個是SofaHessianSerializeService,這個是probuf-java,也是谷歌的,接下來看看往下執行的方法
~~~
GrpcService.Request request = GrpcService.Request.newBuilder().setSerialize(value).setRequest(bytes).build();
~~~
這個很關鍵了,GrpcService是用proto生成的,也就是protobuf對http2.0的支持,接下來看看我們寫的service.proto
~~~
syntax = "proto3";
option java_package = "com.anoyi.rpc";
option java_outer_classname = "GrpcService";
option java_multiple_files = false;
// 定義通用的 Grpc 服務
service CommonService {
// 處理請求
rpc handle ( Request ) returns ( Response ) {}
}
// 定義通用的 Grpc 請求體
message Request {
int32 serialize = 1;
bytes request = 2;
}
// 定義通用的 Grpc 響應體
message Response {
bytes response = 1;
}
~~~
service.proto采用proto3協議,java_package 的意思是我們生成的類放的包,我們放在com.anoyi.rpc,java_outer_classname 是我們生成的外部類名字,我們叫GrpcService,java_multiple_files 是我們是否支持分成多個java類,我們集成到一個類,叫做GrpcService,所以false了,然后CommonService是個service,我們com.anoyi.rpc包會生成CommonServiceGrpc這個類,自己去繼承重寫吧,就一個handle方法,參數是Request ,返回值是Response ,都是我們底下定義的結構體,這是個關鍵的服務端處理方法,Request 是個message ,也就是結構體,有兩個字段, int32 serialize = 1;bytes request = 2;一個是serialize ,是序列化方法,request 是我們序列化后的二進制,也及時byteString,Response 也就是一個byteString而已,因為就是返回。當然service.proto,我們會找個方法生成java文件,下一章節會介紹的,然后客戶端調用init就會初始化客戶端連接grpc的通道,這一章節就結束了