<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 遠程調用 RPC(Remote procedure Call) > using php-amqplib 在第二節教程中,我們知道了怎樣使用工作隊列將耗時的任務分發給多個消費者。 但是,如果我們需要在遠程計算機上運行并等待結果怎么辦?那樣就是一個新的應用場景。這種模式通常稱為遠程調用或者叫 RPC。 在本教程中,我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。由于我們沒有需要分發的耗時任務,我們將創建一個返回斐波那契數列的虛擬RPC服務。 # 客戶端接口 為了說明如何使用RPC服務,我們將創建一個簡單的客戶端類。它將暴露一個名為call的方法,該方法發送RPC請求并阻塞,直到收到響應: ~~~ $fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call(30); echo " [.] Got ", $response, "\n"; ~~~ > 關于RPC的注釋 > > 雖然RPC是一種非常常見的計算模式,但是它經常被人們詬病。當程序員不知道該函數是調用的本地函數還是緩慢的RPC服務時,就會出現問題。這樣的混亂導致了系統的不可預知,并且增加了調試程序時不必要的復雜性。濫用RPC服務可能導致不可維護意大利面條式的代碼,而不是簡化軟件。 > > 銘記這一點,請參考以下建議: > + 確保調用本地和調用遠程的函數非常易于區分 > + 完善系統的文檔,使得系統的組件之間的依賴關系清晰 > + 處理異常情況,當RPC服務器長時間宕機時,客戶端該怎樣反應 > >當對使用RPC有疑問時應避免使用。如果可以的話,你應該使用異步管道 - 而不是類似RPC的阻塞,結果將被> 異步返回給下一個計算階段。 # 回調隊列 一般來說,在RabbitMQ上實現RPC很容易。客戶端發送請求消息,服務器回復響應消息。為了收到響應,我們需要發送一個回調隊列地址與請求。我們可以使用默認隊列。來試試吧: ~~~ list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $msg = new AMQPMessage( $payload, array('reply_to' => $queue_name)); $channel->basic_publish($msg, '', 'rpc_queue'); # ... then code to read a response message from the callback_queue ... ~~~ > 消息的屬性 > > AMQP 0-9-1 協議為消息預定義了一組默認的14個屬性。大多數屬性很少使用,除了以下幾個: > + delivery_mode: 將消息標記為持久性(值為2)或者費持久(1). 你可能記得這個屬性,我們在 第二節 教程中使用過。 > + content_type: 用于描述MIME類型的編碼。例如,對于經常使用的JSON編碼,將此屬性設置為 “application/json” 是一個很好的做法。 > + reply_to: 主要用于命名一個回調隊列 > + correlation_id: 用于將RPC響應與請求相關聯 # 關聯ID 在上面提出的方法中,我們建議為每個RPC請求創建一個回調隊列。這樣效率非常低,但幸運的是,這有一個更好的方法 - 讓我們為每個客戶端創建一個回調隊列。 這樣有引發了一個新的問題,在該隊列中收到響應后,不能確定響應所屬的請求。這就是 correlation_id 派上用場的時候。我們將為每個請求設置一個唯一值(correlation_id)。之后,當我們在回調隊列中收到一條消息時,我們將查看此屬性,并且基于此,我們能夠將響應與請求向匹配。如果我們收到一個未知的 correlation_id 的值,我們就能安全的丟棄該消息 - 它不屬于我們的請求。 你可能會問,為什么我們應該忽略回調消息中未知的消息,而不是拋出一個異常呢?這是由于在服務端可能發生條件競爭的情況,盡管這不太可能,RPC服務器可能會在發送給我們消息確認后,但在它發送請求消息之前掛掉。如果發生這種情況,RPC服務器重啟之后,將再次處理該請求。這就是為什么在客戶端上,我們必須合適的處理這些重復的響應,而RPC應該理想地是冪等的。 # 總結 ![](https://box.kancloud.cn/5d976634d0571ed87522667195d1cfc5_1110x340.jpg) ## 我們RPC工作流程 當客戶端啟動時,它創建一個匿名獨占回調隊列。 對于一個RPC請求,客戶端發送一個具有兩個屬性的消息: reply_to 它被設置為回調隊列, correlation_id 它被設置為每個請求的唯一值。 請求被發送到rpc_queue隊列。 RPC worker(又名: Server)一直在等待隊列上的請求。當請求出現時,它將執行請求的任務,并使用reply_to字段中的隊列將結果返回給客戶端。 客戶端等待回調隊列中的數據。當信息出現時,它檢查correlation_id屬性。如果它與請求中的值相匹配,則返回對應的應用程序的響應。 ## 合并代碼 求斐波納契任務: ~~~ function fib($n) { if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2); } ~~~ 上面我們聲明了 fib 函數。它只有假定有效的正整數輸入。(不要指望這個函數能夠計算比較大的數據,這可能是最慢的遞歸實現)。 我們 rpc_server.php 代碼如下: ~~~ <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('rpc_queue', false, false, false, false); function fib($n) { if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2); } echo " [x] Awaiting RPC requests\n"; $callback = function($req) { $n = intval($req->body); echo " [.] fib(", $n, ")\n"; $msg = new AMQPMessage( (string) fib($n), array('correlation_id' => $req->get('correlation_id')) ); $req->delivery_info['channel']->basic_publish( $msg, '', $req->get('reply_to')); $req->delivery_info['channel']->basic_ack( $req->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ~~~ 服務器的代碼非常簡單: * 如往常一樣,一開始我們建立連接,通道和聲明隊列。 * 我們可能想要運行多個服務器進程。為了在多個服務器上平均分配負載,我們需要在 $channel.basic_qos中設置prefetch_count。 * 我們使用 basic_consume 訪問隊列。然后我們進入等待請求消息的while循環,執行任務并返回響應。 rpc_client.php 代碼如下 ~~~ <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class FibonacciRpcClient { private $connection; private $channel; private $callback_queue; private $response; private $corr_id; public function __construct() { $this->connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest'); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response')); } public function on_response($rep) { if($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array('correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue) ); $this->channel->basic_publish($msg, '', 'rpc_queue'); while(!$this->response) { $this->channel->wait(); } return intval($this->response); } }; $fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call(30); echo " [.] Got ", $response, "\n"; ~~~ 現在是時候看看我們完整的例子,rpc_client.php 和 rpc_server.php 我們的RPC服務器現在已經準備就緒。我們可以啟動服務器: ~~~ php rpc_server.php # => [x] Awaiting RPC requests ~~~ 要求運行客戶端的fib數字: ~~~ php rpc_client.php # => [x] Requesting fib(30) ~~~ 這里提出的設計不是RPC服務的唯一可能的實現,但它具有一些重要的優點: * 如果RPC服務器處理太慢,你可以運行另一個服務來擴展。 嘗試在新的控制臺運行一個rpc_server.php. * 在客戶端,RPC需要發送和接收一條消息。不需要像queue_declare這樣的同步調用。因此,RPC客戶端只需要一個網絡往返單個RPC請求。 我們的代碼仍然非常簡單,不回去嘗試解決更復雜(但重要)的問題,如: * 如果服務器沒有運行,客戶端該如何反應? * 客戶端是否需要RPC服務調用的過期時間? * 如果服務器發生故障并且引發異常, 是否應該將其發送到客戶端? * 在處理之前防止無效的傳入消息(例如檢查邊界,類型) > 如果要進行實驗,可能會發現管理界面對查看隊列很有用。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看