RPC通信 [編輯本頁]
Swoole框架提供的RPC服務器支持了單連接并發、PHP-FPM下長連接維持等特性。在車輪互聯大規模應用,構建了4層架構的服務化架構。
服務器端代碼:http://git.oschina.net/swoole/swoole_framework/blob/master/libs/Swoole/Protocol/RPCServer.php
客戶端代碼:http://git.oschina.net/swoole/swoole_framework/blob/master/libs/Swoole/Client/RPC.php
與Http協議對比
很多企業使用Http Rest實現RPC通信,實現簡單可以利用到很多現成的工具和方案。但是Http通信協議存在2個嚴重的缺陷。
Http不支持單連接并發,如果要同時并發很多請求,必須創建大量TCP連接。如果php-fpm開啟500個進程,每此需要128個并發,那么就需要創建64000個TCP連接。
Http對長連接支持不夠好,很多Http程序都是設計為短連接的,在請求時創建TCP連接、請求結束時close,這會帶來額外的網絡通信消耗
Swoole框架的RPC客戶端使用16字節固定包頭+包體的通信方式,支持單連接并發、支持在php-fpm開啟長連接。
php-fpm長連接
在php-fpm中維持TCP長連接主要借助swoole擴展提供的SWOOLE_KEEP選項,客戶端設置此選項后,在請求結束時不會關閉連接,新的請求到來后可以復用TCP連接。另外底層內置了長連接檢測的能力。
在執行$client->connect()自動檢測連接是否可用,如果復用的連接已經失效,底層會重新創建一個新的TCP長連接。
在執行$client->connect()自動清理垃圾數據,避免上一次客戶端超時殘留的數據導致服務異常
$socket = new \swoole_client(SWOOLE_SOCK_TCP | SWOOLE_KEEP, WOOLE_SOCK_SYNC);
$socket->set(array(
'open_length_check' => true,
'package_max_length' => $this->packet_maxlen,
'package_length_type' => 'N',
'package_body_offset' => RPCServer::HEADER_SIZE,
'package_length_offset' => 0,
));
TCP包頭
struct
{
uint32_t length;
uint16_t reserved_1; //保留字段1
uint8_t reserved_2; //保留字段2
uint8_t type;
uint32_t uid;
uint32_t serid;
char body[0];
}
length:包體的長度
reserved_1 保留的16位字段
reserved_2 保留的8位字段
type:包體的打包格式,低4位用于表示包體打包的格式 =1使用PHP串化格式,=2使用JSON格式,其他格式暫未支持,高4位用于保存壓縮格式,如gzip
uid:用戶自定義的ID,保留字段
serid:Request/Response 串號
包體格式
request
{"call": "Service接口名稱", "params": “接口參數列表”, "env": "相關環境信息"}
call: 是指Service接口的名稱,如車輪Service提供的接口 User\Info::get
params:函數的參數列表,vector類型,params在底層會自動作為 User\Info::get 函數的參數傳入調用,在PHP代碼中相當于 call_user_func_array($call, $params)
env:相關環境信息,map類型,客戶端與服務器端可自由使用
response
{"errno": "整數,服務器端返回碼", "data": “接口返回值"}
errno:錯誤碼,正常調用為0
data:無固定格式,由 Service接口 返回值 決定
錯誤碼列表
8001; //未就緒
8002; //連接服務器失敗
8003; //服務器端超時
8004; //發送失敗
8005; //server返回了錯誤碼
8006; //解包失敗了
8007; //錯誤的協議頭
8008; //超過最大允許的長度
8009; //連接被關閉
9001; //錯誤的包頭
9002; //請求包體長度超過允許的范圍
9003; //服務器繁忙,超過處理能力
9204; //解包失敗
9205; //參數錯誤
9206; //函數不存在
9207; //執行錯誤
9208 //不允許該服務器登錄
9209 //認證不通過
9300 //被服務器踢掉了
Server端的實現中實現了打包格式的自適應,當發現調用端使用JSON格式時,Response包體也會打包為JSON。另外Swoole框架的RPC支持了gzip壓縮,啟用壓縮后可以節約內網通信的流量。
單連接并發
客戶端
請求串號就是單連接并發的秘訣了,客戶端即使是同一個連接,也可以同時發出多個Request,這與Http協議是不同的,Http協議即使啟用了Keep-Alive單個連接只能發出一次Request,必須等到服務器端發送Response才能發送下一個Request。RPC客戶端收到Response會根據其中的串號,將不同的Response和Request對應起來。
有些Request可能會超時,RPC客戶端通過對比請求ID可以判斷出哪些Response可能是上次請求超時殘留的數據,并進行丟棄處理。
服務器端
在車輪互聯的RPC服務器中,大部分使用了同步阻塞模式,小部分使用了異步模式。
同步服務器的實現依賴swoole擴展提供的dispatch_mode=3選項,并設置worker_num為128。swoole底層實現了連接與請求分離,同一個連接不同的Request包會被分配到不同的Worker進程并發地進行處理。Response再由swoole底層逐個發送給客戶端。服務器端也可以很好低支持單連接并發,即使只有一個TCP連接也可以利用到所有128個Worker進程的處理能力。
$server = new Swoole\Server('0.0.0.0', 8888);
$server->set(array(
'worker_num' => 128,
'max_request' => 5000,
'dispatch_mode' => 3,
'open_length_check' => 1,
'package_max_length' => $AppSvr->packet_maxlen,
'package_length_type' => 'N',
'package_body_offset' => \Swoole\Protocol\RPCServer::HEADER_SIZE,
'package_length_offset' => 0,
));
串行調用
$res = $service->call('User\Info::unlock', '18958653669', 1);
$result = $res->getResult(); //如果返回NULL,表示網絡調用失敗了,請檢查$res->code
$res = $service->call('User\Info::unlock', '18958653669', 1);
$result = $res->getResult();
$res = $service->call('User\Info::unlock', '18958653669', 1);
$result = $res->getResult();
并行調用
$res1 = $service->call('User\Info::unlock', '18958653669', 1);
$res2 = $service->call('User\Info::unlock', '18958653669', 1);
$res3 = $service->call('User\Info::unlock', '18958653669', 1);
//0.5表示500毫秒超時,$n表示成功返回的請求個數。如果少于發起的請求數,證明有個別請求超時了
$n = $service->wait(0.5);
$result1 = $res1->getResult();
$result2 = $res2->getResult();
$result3 = $res3->getResult();
實際上底層對于串行并行的處理方式是相同的,串行調用在執行getResult()時會自動wait一次,等待服務器端發送Response,RPC客戶端的wait操作基于swoole_client_select實現。
function wait($timeout = 0.5)
{
$st = microtime(true);
$success_num = 0;
while (count($this->waitList) > 0)
{
$write = $error = $read = array();
foreach ($this->waitList as $obj)
{
/**
* @var $obj RPC_Result
*/
if ($obj->socket !== null)
{
$read[] = $obj->socket;
}
}
if (empty($read))
{
break;
}
//去掉重復的socket
Tool::arrayUnique($read);
//等待可讀事件
$n = $this->select($read, $write, $error, $timeout);
if ($n > 0)
{
//可讀
foreach($read as $connection)
{
$data = $this->recvPacket($connection);
//socket被關閉了
if ($data === "")
{
foreach($this->waitList as $retObj)
{
if ($retObj->socket == $connection)
{
$retObj->code = RPC_Result::ERR_CLOSED;
unset($this->waitList[$retObj->requestId]);
$this->closeConnection($retObj->server_host, $retObj->server_port);
}
}
continue;
}
elseif ($data === false)
{
continue;
}
$header = unpack(RPCServer::HEADER_STRUCT, substr($data, 0, RPCServer::HEADER_SIZE));
//不在請求列表中,錯誤的請求串號
if (!isset($this->waitList[$header['serid']]))
{
trigger_error(__CLASS__ . " invalid responseId[{$header['serid']}].", E_USER_WARNING);
continue;
}
$retObj = $this->waitList[$header['serid']];
//成功處理
$this->finish(RPCServer::decode(substr($data, RPCServer::HEADER_SIZE), $header['type']), $retObj);
$success_num++;
}
}
//發生超時
if ((microtime(true) - $st) > $timeout)
{
foreach ($this->waitList as $obj)
{
$obj->code = ($obj->socket->isConnected()) ? RPC_Result::ERR_TIMEOUT : RPC_Result::ERR_CONNECT;
//執行after鉤子函數
$this->afterRequest($obj);
}
//清空當前列表
$this->waitList = array();
return $success_num;
}
}
//未發生任何超時
$this->waitList = array();
$this->requestIndex = 0;
return $success_num;
}
$waitList 是所有Request的集合
多個Request使用的TCP連接可能是相同的幾個,這里使用了Tool::arrayUnique進行去重
使用swoole_client_select等待Socket可讀事件
在可讀事件中調用recvPacket收包,并解析包頭,收到Response時讀取請求ID自動從waitList中移除Request
循環的默認會進行時間檢測,發生超時或全部成功時退出,返回Response的數量