[TOC]
基于 protobuf 的 RPC 可以說是五花八門,其中不乏非常優秀的代碼例如 brpc, muduo-rpc 等。
protobuf 實現了序列化部分,并且預留了 RPC 接口,但是沒有實現網絡交互的部分。
本文想介紹下,如何實現基于 protobuf 實現一個極簡版的 RPC ,這樣有助于我們閱讀 RPC 源碼。
一次完整的 RPC 通信實際上是有三部分代碼共同完成:
1. protobuf 自動生成的代碼
2. RPC 框架
3. 用戶填充代碼
本文假設用戶熟悉 protobuf 并且有 RPC 框架的使用經驗。首先介紹下 protobuf 自動生成的代碼,接著介紹下用戶填充代碼,然后逐步介紹下極簡的 RPC 框架的實現思路,相關代碼可以直接跳到文章最后。
## 1\. proto
我們定義了`EchoService`, method 為`Echo`.
~~~
package echo;
option cc_generic_services = true;
message EchoRequest {
required string msg = 1;
}
message EchoResponse {
required string msg = 2;
}
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
}
~~~
protoc 自動生成`echo.pb.h echo.pb.cc`兩部分代碼. 其中`service EchoService`這一句會生成`EchoService EchoService_Stub`兩個類,分別是 server 端和 client 端需要關心的。
對 server 端,通過`EchoService::Echo`來處理請求,代碼未實現,需要子類來 override.
~~~
class EchoService : public ::google::protobuf::Service {
...
virtual void Echo(::google::protobuf::RpcController* controller,
const ::echo::EchoRequest* request,
::echo::EchoResponse* response,
::google::protobuf::Closure* done);
};
void EchoService::Echo(::google::protobuf::RpcController* controller,
const ::echo::EchoRequest*,
::echo::EchoResponse*,
::google::protobuf::Closure* done) {
//代碼未實現
controller->SetFailed("Method Echo() not implemented.");
done->Run();
}
~~~
對 client 端,通過`EchoService_Stub`來發送數據,`EchoService_Stub::Echo`調用了`::google::protobuf::Channel::CallMethod`,但是`Channel`是一個純虛類,需要 RPC 框架在子類里實現需要的功能。
~~~
class EchoService_Stub : public EchoService {
...
void Echo(::google::protobuf::RpcController* controller,
const ::echo::EchoRequest* request,
::echo::EchoResponse* response,
::google::protobuf::Closure* done);
private:
::google::protobuf::RpcChannel* channel_;
};
void EchoService_Stub::Echo(::google::protobuf::RpcController* controller,
const ::echo::EchoRequest* request,
::echo::EchoResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}
~~~
## 2\. server && client
有過 RPC 使用經驗的話,都了解 server 端代碼類似于這樣(參考[brpc echo\_c++ server.cpp](https://github.com/brpc/brpc/blob/master/example/echo_c%2B%2B/server.cpp))
~~~
//override Echo method
class MyEchoService : public echo::EchoService {
public:
virtual void Echo(::google::protobuf::RpcController* /* controller */,
const ::echo::EchoRequest* request,
::echo::EchoResponse* response,
::google::protobuf::Closure* done) {
std::cout << request->msg() << std::endl;
response->set_msg(
std::string("I have received '") + request->msg() + std::string("'"));
done->Run();
}
};//MyEchoService
int main() {
MyServer my_server;
MyEchoService echo_service;
my_server.add(&echo_service);
my_server.start("127.0.0.1", 6688);
return 0;
}
~~~
只要定義子類 service 實現 method 方法,再把 service 加到 server 里就可以了。
而 client 基本這么實現(參考[brpc echo\_c++ client.cpp](https://github.com/brpc/brpc/blob/master/example/echo_c%2B%2B/client.cpp))
~~~
int main() {
MyChannel channel;
channel.init("127.0.0.1", 6688);
echo::EchoRequest request;
echo::EchoResponse response;
request.set_msg("hello, myrpc.");
echo::EchoService_Stub stub(&channel);
MyController cntl;
stub.Echo(&cntl, &request, &response, NULL);
std::cout << "resp:" << response.msg() << std::endl;
return 0;
}
~~~
這樣的用法看起來很自然,但是仔細想想背后的實現,肯定會有很多疑問:
1. 為什么 server 端只需要實現`MyEchoService::Echo`函數,client端只需要調用`EchoService_Stub::Echo`就能發送和接收對應格式的數據?中間的調用流程是怎么樣子的?
2. 如果 server 端接收多種 pb 數據(例如還有一個 method`rpc Post(DeepLinkReq) returns (DeepLinkResp);`),那么怎么區分接收到的是哪個格式?
3. 區分之后,又如何構造出對應的對象來?例如`MyEchoService::Echo`參數里的`EchoRequest EchoResponse`,因為 rpc 框架并不清楚這些具體類和函數的存在,框架并不清楚具體類的名字,也不清楚 method 名字,卻要能夠構造對象并調用這個函數?
可以推測答案在`MyServer MyChannel MyController`里,接下來我們逐步分析下。
## 3\. 處理流程
考慮下 server 端的處理流程
1. 從對端接收數據
2. 通過標識機制判斷如何反序列化到 request 數據類型
3. 生成對應的 response 數據類型
4. 調用對應的 service-method ,填充 response 數據
5. 序列化 response
6. 發送數據回對端
具體講下上一節提到的接口設計的問題,體現在2 3 4步驟里,還是上面 Echo 的例子,因為 RPC 框架并不能提前知道`EchoService::Echo`這個函數,怎么調用這個函數呢?
`google/protobuf/service.h`里`::google::protobuf::Service`的源碼如下:
~~~
class LIBPROTOBUF_EXPORT Service {
virtual void CallMethod(const MethodDescriptor* method,
RpcController* controller,
const Message* request,
Message* response,
Closure* done) = 0;
};//Service
~~~
Service 是一個純虛類,`CallMethod = 0`,`EchoService`實現如下
~~~
void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done) {
GOOGLE_DCHECK_EQ(method->service(), EchoService_descriptor_);
switch(method->index()) {
case 0:
Echo(controller,
::google::protobuf::down_cast<const ::echo::EchoRequest*>(request),
::google::protobuf::down_cast< ::echo::EchoResponse*>(response),
done);
break;
default:
GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
break;
}
}
~~~
可以看到這里會有一次數據轉化`down_cast`,因此框架可以通過調用`::google::protobuf::ServiceCallMethod`函數來調用`Echo`,數據統一為`Message*`格式,這樣就可以解決框架的接口問題了。
再考慮下 client 端處理流程。
`EchoService_Stub::Echo`的實現里:
~~~
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
~~~
因此先看下`::google::protobuf::RpcChannel`的實現:
~~~
// Abstract interface for an RPC channel. An RpcChannel represents a
// communication line to a Service which can be used to call that Service's
// methods. The Service may be running on another machine. Normally, you
// should not call an RpcChannel directly, but instead construct a stub Service
// wrapping it. Example:
// RpcChannel* channel = new MyRpcChannel("remotehost.example.com:1234");
// MyService* service = new MyService::Stub(channel);
// service->MyMethod(request, &response, callback);
class LIBPROTOBUF_EXPORT RpcChannel {
public:
inline RpcChannel() {}
virtual ~RpcChannel();
// Call the given method of the remote service. The signature of this
// procedure looks the same as Service::CallMethod(), but the requirements
// are less strict in one important way: the request and response objects
// need not be of any specific class as long as their descriptors are
// method->input_type() and method->output_type().
virtual void CallMethod(const MethodDescriptor* method,
RpcController* controller,
const Message* request,
Message* response,
Closure* done) = 0;
private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};
~~~
pb 的注釋非常清晰,channel 可以理解為一個通道,連接了 rpc 服務的兩端,本質上也是通過 socket 通信的。
但是`RpcChannel`也是一個純虛類,`CallMethod = 0`。
因此我們需要實現一個子類,基類為`RpcChannel`,并且實現`CallMethod`方法,應該實現兩個功能:
1. 序列化 request ,發送到對端,同時需要標識機制使得對端知道如何解析(schema)和處理(method)這類數據。
2. 接收對端數據,反序列化到 response
此外還有`RpcController`,也是一個純虛類,是一個輔助類,用于獲取RPC結果,對端IP等。
## 4\. 標識機制
上一節提到的所謂標識機制,就是當 client 發送一段數據流到 server ,server 能夠知道這段 buffer 對應的數據格式,應該如何處理,對應的返回數據格式是什么樣的。
最簡單暴力的方式就是在每組數據里都標識下是什么格式的,返回值希望是什么格式的,這樣一定能解決問題。
但是 pb 里明顯不用這樣,因為 server/client 使用相同(或者兼容)的 proto,只要標識下數據類型名就可以了。不過遇到相同類型的 method 也會有問題,例如
~~~
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
rpc AnotherEcho(EchoRequest) returns (EchoResponse)
}
~~~
因此可以使用 service 和 method 名字,通過 proto 就可以知道 request/response 類型了。
因此,結論是:**我們在每次數據傳遞里加上`service method`名字就可以了。**
pb 里有很多 xxxDescriptor 的類,`service method`也不例外。例如`GetDescriptor`可以獲取`ServiceDescriptor`.
~~~
class LIBPROTOBUF_EXPORT Service {
...
// Get the ServiceDescriptor describing this service and its methods.
virtual const ServiceDescriptor* GetDescriptor() = 0;
};//Service
~~~
通過`ServiceDescriptor`就可以獲取對應的`name`及`MethodDescriptor`.
~~~
class LIBPROTOBUF_EXPORT ServiceDescriptor {
public:
// The name of the service, not including its containing scope.
const string& name() const;
...
// The number of methods this service defines.
int method_count() const;
// Gets a MethodDescriptor by index, where 0 <= index < method_count().
// These are returned in the order they were defined in the .proto file.
const MethodDescriptor* method(int index) const;
};//ServiceDescriptor
~~~
而`MethodDecriptor`可以獲取對應的`name`及從屬的`ServiceDescriptor`
~~~
class LIBPROTOBUF_EXPORT MethodDescriptor {
public:
// Name of this method, not including containing scope.
const string& name() const;
...
// Gets the service to which this method belongs. Never NULL.
const ServiceDescriptor* service() const;
};//MethodDescriptor
~~~
因此:
1. server 端傳入一個`::google::protobuf::Service`時,我們可以記錄 service name 及所有的 method name.
2. client 端調用`virtual void CallMethod(const MethodDescriptor* method...`時,也可以獲取到 method name 及對應的 service name.
這樣,就可以知道發送的數據類型了。
## 5\. 構造參數
前面還提到的一個問題,是如何構造具體參數的問題。實現 RPC 框架時,肯定是不知道`EchoRequest EchoResponse`類名的,但是通過`::google::protobuf::Service`的接口可以構造出對應的對象來
~~~
// const MethodDescriptor* method =
// service->GetDescriptor()->FindMethodByName("Foo");
// Message* request = stub->GetRequestPrototype (method)->New();
// Message* response = stub->GetResponsePrototype(method)->New();
// request->ParseFromString(input);
// service->CallMethod(method, *request, response, callback);
virtual const Message& GetRequestPrototype(
const MethodDescriptor* method) const = 0;
virtual const Message& GetResponsePrototype(
const MethodDescriptor* method) const = 0;
~~~
而`Message`通過`New`可以構造出對應的對象
~~~
class LIBPROTOBUF_EXPORT Message : public MessageLite {
public:
inline Message() {}
virtual ~Message();
// Basic Operations ------------------------------------------------
// Construct a new instance of the same type. Ownership is passed to the
// caller. (This is also defined in MessageLite, but is defined again here
// for return-type covariance.)
virtual Message* New() const = 0;
...
~~~
這樣,我們就可以得到`Service::Method`需要的對象了。
## 6\. Server/Channel/Controller子類實現
前面已經介紹了基本思路,本節介紹下具體的實現部分。
### 6.1. RpcMeta
`RpcMeta`用于解決傳遞 service-name method-name 的問題,定義如下
~~~
package myrpc;
message RpcMeta {
optional string service_name = 1;
optional string method_name = 2;
optional int32 data_size = 3;
}
~~~
其中`data_size`表示接下來要傳輸的數據大小,例如`EchoRequest`對象的大小。
同時我們還需要一個`int`來表示`RpcMeta`的大小,因此我們來看下`Channel`的實現
### 6.2. Channel
~~~
//繼承自RpcChannel,實現數據發送和接收
class MyChannel : public ::google::protobuf::RpcChannel {
public:
//init傳入ip:port,網絡交互使用boost.asio
void init(const std::string& ip, const int port) {
_io = boost::make_shared<boost::asio::io_service>();
_sock = boost::make_shared<boost::asio::ip::tcp::socket>(*_io);
boost::asio::ip::tcp::endpoint ep(
boost::asio::ip::address::from_string(ip), port);
_sock->connect(ep);
}
//EchoService_Stub::Echo會調用Channel::CallMethod
//其中第一個參數MethodDescriptor* method,可以獲取service-name method-name
virtual void CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* /* controller */,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure*) {
//request數據序列化
std::string serialzied_data = request->SerializeAsString();
//獲取service-name method-name,填充到rpc_meta
myrpc::RpcMeta rpc_meta;
rpc_meta.set_service_name(method->service()->name());
rpc_meta.set_method_name(method->name());
rpc_meta.set_data_size(serialzied_data.size());
//rpc_meta序列化
std::string serialzied_str = rpc_meta.SerializeAsString();
//獲取rpc_meta序列化數據大小,填充到數據頭部,占用4個字節
int serialzied_size = serialzied_str.size();
serialzied_str.insert(0, std::string((const char*)&serialzied_size, sizeof(int)));
//尾部追加request序列化后的數據
serialzied_str += serialzied_data;
//發送全部數據:
//|rpc_meta大小(定長4字節)|rpc_meta序列化數據(不定長)|request序列化數據(不定長)|
_sock->send(boost::asio::buffer(serialzied_str));
//接收4個字節:序列化的resp數據大小
char resp_data_size[sizeof(int)];
_sock->receive(boost::asio::buffer(resp_data_size));
//接收N個字節:N=序列化的resp數據大小
int resp_data_len = *(int*)resp_data_size;
std::vector<char> resp_data(resp_data_len, 0);
_sock->receive(boost::asio::buffer(resp_data));
//反序列化到resp
response->ParseFromString(std::string(&resp_data[0], resp_data.size()));
}
private:
boost::shared_ptr<boost::asio::io_service> _io;
boost::shared_ptr<boost::asio::ip::tcp::socket> _sock;
};//MyChannel
~~~
通過實現`Channel::CallMethod`方法,我們就可以在調用子類方法,例如`EchoService_Stub::Echo`時自動實現數據的發送/接收、序列化/反序列化了。
### 6.3 Server
`Server`的實現會復雜一點,因為可能注冊多個`Service::Method`,當接收到 client 端的數據,解析`RpcMeta`得到`service-name method-name`后,需要找到對應的`Service::Method`,注冊時就需要記錄這部分信息。
因此,我們先看下`add`方法的實現:
~~~
class MyServer {
public:
void add(::google::protobuf::Service* service) {
ServiceInfo service_info;
service_info.service = service;
service_info.sd = service->GetDescriptor();
for (int i = 0; i < service_info.sd->method_count(); ++i) {
service_info.mds[service_info.sd->method(i)->name()] = service_info.sd->method(i);
}
_services[service_info.sd->name()] = service_info;
}
...
private:
struct ServiceInfo{
::google::protobuf::Service* service;
const ::google::protobuf::ServiceDescriptor* sd;
std::map<std::string, const ::google::protobuf::MethodDescriptor*> mds;
};//ServiceInfo
//service_name -> {Service*, ServiceDescriptor*, MethodDescriptor* []}
std::map<std::string, ServiceInfo> _services;
~~~
我在實現里,`_services`記錄了 service 及對應的`ServiceDescriptor MethodDescriptor`。而`ServiceDescritpr::FindMethodByName`方法可以查找 method ,因此不記錄`method_name`也可以。不過出于性能考慮,我覺得還可以記錄更多,例如 req/resp 數據類型等。
注冊 service 后,就可以啟動 server 監聽端口和接收數據了
~~~
//監聽ip:port,接收數據
void MyServer::start(const std::string& ip, const int port) {
boost::asio::io_service io;
boost::asio::ip::tcp::acceptor acceptor(
io,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::address::from_string(ip),
port));
while (true) {
auto sock = boost::make_shared<boost::asio::ip::tcp::socket>(io);
acceptor.accept(*sock);
std::cout << "recv from client:"
<< sock->remote_endpoint().address()
<< std::endl;
//接收4個字節:rpc_meta長度
char meta_size[sizeof(int)];
sock->receive(boost::asio::buffer(meta_size));
int meta_len = *(int*)(meta_size);
//接收rpc_meta數據
std::vector<char> meta_data(meta_len, 0);
sock->receive(boost::asio::buffer(meta_data));
myrpc::RpcMeta meta;
meta.ParseFromString(std::string(&meta_data[0], meta_data.size()));
//接收req數據
std::vector<char> data(meta.data_size(), 0);
sock->receive(boost::asio::buffer(data));
//數據處理
dispatch_msg(
meta.service_name(),
meta.method_name(),
std::string(&data[0], data.size()),
sock);
}
}
~~~
`start`啟動一個循環,解析`RpcMeta`數據并接收 request 數據,之后交給 dispatch\_msg 處理。
~~~
void MyServer::dispatch_msg(
const std::string& service_name,
const std::string& method_name,
const std::string& serialzied_data,
const boost::shared_ptr<boost::asio::ip::tcp::socket>& sock) {
//根據service_name method_name查找對應的注冊的Service*
auto service = _services[service_name].service;
auto md = _services[service_name].mds[method_name];
std::cout << "recv service_name:" << service_name << std::endl;
std::cout << "recv method_name:" << method_name << std::endl;
std::cout << "recv type:" << md->input_type()->name() << std::endl;
std::cout << "resp type:" << md->output_type()->name() << std::endl;
//根據Service*生成req resp對象
auto recv_msg = service->GetRequestPrototype(md).New();
recv_msg->ParseFromString(serialzied_data);
auto resp_msg = service->GetResponsePrototype(md).New();
MyController controller;
auto done = ::google::protobuf::NewCallback(
this,
&MyServer::on_resp_msg_filled,
recv_msg,
resp_msg,
sock);
//調用Service::Method(即用戶實現的子類方法)
service->CallMethod(md, &controller, recv_msg, resp_msg, done);
~~~
用戶填充`resp_msg`后,會調用`done`指定的回調函數(也就是我們在`MyEchoService::Echo`代碼里對應的`done->Run()`這一句)。
在用戶填充數據后,`on_resp_msg_filled`用于完成序列化及發送的工作。
~~~
void MyServer::on_resp_msg_filled(
::google::protobuf::Message* recv_msg,
::google::protobuf::Message* resp_msg,
const boost::shared_ptr<boost::asio::ip::tcp::socket> sock) {
//avoid mem leak
boost::scoped_ptr<::google::protobuf::Message> recv_msg_guard(recv_msg);
boost::scoped_ptr<::google::protobuf::Message> resp_msg_guard(resp_msg);
std::string resp_str;
pack_message(resp_msg, &resp_str);
sock->send(boost::asio::buffer(resp_str));
}
~~~
`pack_message`用于打包數據,其實就是在序列化數據前插入4字節長度數據
~~~
void pack_message(
const ::google::protobuf::Message* msg,
std::string* serialized_data) {
int serialized_size = msg->ByteSize();
serialized_data->assign(
(const char*)&serialized_size,
sizeof(serialized_size));
msg->AppendToString(serialized_data);
}
~~~
程序輸出如下
~~~
$ ./client
resp:I have received 'hello, myrpc.'
~~~
~~~
$ ./server
recv from client:127.0.0.1
recv service_name:EchoService
recv method_name:Echo
recv type:EchoRequest
resp type:EchoResponse
hello, myrpc.
~~~
完整代碼,打包放在了[Tiny-Tools](https://github.com/yingshin/Tiny-Tools/tree/master/protobuf-rpc-demo),使用 cmake 編譯,注意指定 protobuf boost 庫的路徑。
- 前言
- 服務器開發設計
- Reactor模式
- 一種心跳,兩種設計
- 聊聊 TCP 長連接和心跳那些事
- 學習TCP三次握手和四次揮手
- Linux基礎
- Linux的inode的理解
- 異步IO模型介紹
- 20個最常用的GCC編譯器參數
- epoll
- epoll精髓
- epoll原理詳解及epoll反應堆模型
- epoll的坑
- epoll的本質
- socket的SO_REUSEADDR參數全面分析
- 服務器網絡
- Protobuf
- Protobuf2 語法指南
- 一種自動反射消息類型的 Protobuf 網絡傳輸方案
- 微服務
- RPC框架
- 什么是RPC
- 如何科學的解釋RPC
- RPC 消息協議
- 實現一個極簡版的RPC
- 一個基于protobuf的極簡RPC
- 如何基于protobuf實現一個極簡版的RPC
- 開源RPC框架
- thrift
- grpc
- brpc
- Dubbo
- 服務注冊,發現,治理
- Redis
- Redis發布訂閱
- Redis分布式鎖
- 一致性哈希算法
- Redis常見問題
- Redis數據類型
- 緩存一致性
- LevelDB
- 高可用
- keepalived基本理解
- keepalived操做
- LVS 學習
- 性能優化
- Linux服務器程序性能優化方法
- SRS性能(CPU)、內存優化工具用法
- centos6的性能分析工具集合
- CentOS系統性能工具 sar 示例!
- Linux性能監控工具集sysstat
- gdb相關
- Linux 下如何產生core文件(core dump設置)