[TOC]
# 遠程調用 RPC(Remote procedure Call)
> using php-amqplib
在第二節教程中,我們知道了怎樣使用工作隊列將耗時的任務分發給多個消費者。
但是,如果我們需要在遠程計算機上運行并等待結果怎么辦?那樣就是一個新的應用場景。這種模式通常稱為遠程調用或者叫 RPC。
在本教程中,我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。由于我們沒有需要分發的耗時任務,我們將創建一個返回斐波那契數列的虛擬RPC服務。
# 客戶端接口
為了說明如何使用RPC服務,我們將創建一個簡單的客戶端類。它將暴露一個名為call的方法,該方法發送RPC請求并阻塞,直到收到響應:
~~~
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";
~~~
> 關于RPC的注釋
>
> 雖然RPC是一種非常常見的計算模式,但是它經常被人們詬病。當程序員不知道該函數是調用的本地函數還是緩慢的RPC服務時,就會出現問題。這樣的混亂導致了系統的不可預知,并且增加了調試程序時不必要的復雜性。濫用RPC服務可能導致不可維護意大利面條式的代碼,而不是簡化軟件。
>
> 銘記這一點,請參考以下建議:
> + 確保調用本地和調用遠程的函數非常易于區分
> + 完善系統的文檔,使得系統的組件之間的依賴關系清晰
> + 處理異常情況,當RPC服務器長時間宕機時,客戶端該怎樣反應
>
>當對使用RPC有疑問時應避免使用。如果可以的話,你應該使用異步管道 - 而不是類似RPC的阻塞,結果將被> 異步返回給下一個計算階段。
# 回調隊列
一般來說,在RabbitMQ上實現RPC很容易。客戶端發送請求消息,服務器回復響應消息。為了收到響應,我們需要發送一個回調隊列地址與請求。我們可以使用默認隊列。來試試吧:
~~~
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$msg = new AMQPMessage(
$payload,
array('reply_to' => $queue_name));
$channel->basic_publish($msg, '', 'rpc_queue');
# ... then code to read a response message from the callback_queue ...
~~~
> 消息的屬性
>
> AMQP 0-9-1 協議為消息預定義了一組默認的14個屬性。大多數屬性很少使用,除了以下幾個:
> + delivery_mode: 將消息標記為持久性(值為2)或者費持久(1). 你可能記得這個屬性,我們在 第二節 教程中使用過。
> + content_type: 用于描述MIME類型的編碼。例如,對于經常使用的JSON編碼,將此屬性設置為 “application/json” 是一個很好的做法。
> + reply_to: 主要用于命名一個回調隊列
> + correlation_id: 用于將RPC響應與請求相關聯
# 關聯ID
在上面提出的方法中,我們建議為每個RPC請求創建一個回調隊列。這樣效率非常低,但幸運的是,這有一個更好的方法 - 讓我們為每個客戶端創建一個回調隊列。
這樣有引發了一個新的問題,在該隊列中收到響應后,不能確定響應所屬的請求。這就是 correlation_id 派上用場的時候。我們將為每個請求設置一個唯一值(correlation_id)。之后,當我們在回調隊列中收到一條消息時,我們將查看此屬性,并且基于此,我們能夠將響應與請求向匹配。如果我們收到一個未知的 correlation_id 的值,我們就能安全的丟棄該消息 - 它不屬于我們的請求。
你可能會問,為什么我們應該忽略回調消息中未知的消息,而不是拋出一個異常呢?這是由于在服務端可能發生條件競爭的情況,盡管這不太可能,RPC服務器可能會在發送給我們消息確認后,但在它發送請求消息之前掛掉。如果發生這種情況,RPC服務器重啟之后,將再次處理該請求。這就是為什么在客戶端上,我們必須合適的處理這些重復的響應,而RPC應該理想地是冪等的。
# 總結

## 我們RPC工作流程
當客戶端啟動時,它創建一個匿名獨占回調隊列。
對于一個RPC請求,客戶端發送一個具有兩個屬性的消息: reply_to 它被設置為回調隊列, correlation_id 它被設置為每個請求的唯一值。
請求被發送到rpc_queue隊列。
RPC worker(又名: Server)一直在等待隊列上的請求。當請求出現時,它將執行請求的任務,并使用reply_to字段中的隊列將結果返回給客戶端。
客戶端等待回調隊列中的數據。當信息出現時,它檢查correlation_id屬性。如果它與請求中的值相匹配,則返回對應的應用程序的響應。
## 合并代碼
求斐波納契任務:
~~~
function fib($n) {
if ($n == 0)
return 0;
if ($n == 1)
return 1;
return fib($n-1) + fib($n-2);
}
~~~
上面我們聲明了 fib 函數。它只有假定有效的正整數輸入。(不要指望這個函數能夠計算比較大的數據,這可能是最慢的遞歸實現)。
我們 rpc_server.php 代碼如下:
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n) {
if ($n == 0)
return 0;
if ($n == 1)
return 1;
return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requests\n";
$callback = function($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ")\n";
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);
$req->delivery_info['channel']->basic_publish(
$msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
~~~
服務器的代碼非常簡單:
* 如往常一樣,一開始我們建立連接,通道和聲明隊列。
* 我們可能想要運行多個服務器進程。為了在多個服務器上平均分配負載,我們需要在 $channel.basic_qos中設置prefetch_count。
* 我們使用 basic_consume 訪問隊列。然后我們進入等待請求消息的while循環,執行任務并返回響應。
rpc_client.php 代碼如下
~~~
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FibonacciRpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
public function __construct() {
$this->connection = new AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false);
$this->channel->basic_consume(
$this->callback_queue, '', false, false, false, false,
array($this, 'on_response'));
}
public function on_response($rep) {
if($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($n) {
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string) $n,
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
};
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";
~~~
現在是時候看看我們完整的例子,rpc_client.php 和 rpc_server.php
我們的RPC服務器現在已經準備就緒。我們可以啟動服務器:
~~~
php rpc_server.php
# => [x] Awaiting RPC requests
~~~
要求運行客戶端的fib數字:
~~~
php rpc_client.php
# => [x] Requesting fib(30)
~~~
這里提出的設計不是RPC服務的唯一可能的實現,但它具有一些重要的優點:
* 如果RPC服務器處理太慢,你可以運行另一個服務來擴展。 嘗試在新的控制臺運行一個rpc_server.php.
* 在客戶端,RPC需要發送和接收一條消息。不需要像queue_declare這樣的同步調用。因此,RPC客戶端只需要一個網絡往返單個RPC請求。
我們的代碼仍然非常簡單,不回去嘗試解決更復雜(但重要)的問題,如:
* 如果服務器沒有運行,客戶端該如何反應?
* 客戶端是否需要RPC服務調用的過期時間?
* 如果服務器發生故障并且引發異常, 是否應該將其發送到客戶端?
* 在處理之前防止無效的傳入消息(例如檢查邊界,類型)
> 如果要進行實驗,可能會發現管理界面對查看隊列很有用。
- OAuth
- 簡介
- 步驟
- 單點登錄
- .user.ini
- 時間轉換為今天昨天前天幾天前
- 獲取ip接口
- 協程
- 概念
- yield-from && return-values
- 協程與阻塞的思考
- 中間件
- mysqli異步與php的協程
- 代碼片段
- pdo 執行的sql語句
- 二進制安全
- 捕捉異常中斷
- global
- 利用cookie模擬登陸
- 解析非正常json
- 簡單的對稱加密算法
- RSA 加密
- 過濾掉emoji表情
- 判斷遠程圖片是否存在
- 一分鐘限制請求100次
- 文件處理
- 多文件上傳
- 顯示所有文件
- 文件下載和上面顯示所有文件配合
- 文件的刪除,統計,存數組等
- 圖片處理
- 簡介
- 驗證碼
- 圖片等比縮放
- 批量添加水印
- beanstalkd
- 安裝
- 使用
- RabbitMQ
- 簡介
- debain安裝
- centos安裝
- 常用方法
- 入門
- 工作隊列
- 訂閱,發布
- 路由
- 主題
- 遠程調用RPC
- 消息中間件的選型
- .htaccess
- isset、empty、if區別以及0、‘’、null
- php各版本
- php7.2 不向后兼容的改動
- php中的各種坑
- php7改變
- php慢日志
- 郵件
- PHPMailer實現發郵件
- 驗證郵件地址真實性
- 文件下載
- FastCgi 與 PHP-fpm 之間的關系
- openssl 加解密
- 反射
- 鉤子方法
- 查找插件
- opcode
- opcache使用
- opcache優化
- 分布式一致性hash算法
- 概念
- 哈希算法好壞的四個定義
- php實現
- java實現
- 數組
- jwt
- jwt簡介
- 單點登錄
- phpize
- GeoIP擴展
- php無法獲得https網頁內容的解決方案
- homestead運行的腳本
- Unicode和Utf-8轉換
- php優化
- kafka
- fpm配置
- configure配置詳解