<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                RabbitMQ入門教程 For Java【6】 - Remote procedure call (RPC) ? ? ? ?? ### **我的開發環境:** 操作系統:**Windows7 64bit** 開發環境:**JDK 1.7 -?1.7.0_55** 開發工具:**Eclipse Kepler SR2** RabbitMQ版本:?**3.6.0** Elang版本:**erl7.2.1** 關于Windows7下安裝RabbitMQ的教程請先在網上找一下,有空我再補安裝教程。 ### 源碼地址 https://github.com/chwshuang/rabbitmq.git ? ? ? ? 本教程中,我們將學習使用工作隊列讓多個消費者端來執行耗時的任務。比如我們需要通過遠程服務器幫我們計算某個結果。這種模式通常被稱之為遠程方法調用或RPC. ? ? ? ??我們通過RabbitMQ搭建一個RPC系統,一個客戶端和一個RPC服務器,客戶端有一個斐波那契數列方面的問題需要解決(Fibonacci numbers),RPC服務器負責技術收到這個消息,然后計算結果,并且返回這個斐波那契數列。 ### 客戶端接口 ? ? ? ??我們需要創建一個簡單的客戶端類,通過調用客戶端的call方法,來計算結果。 ~~~ FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result); ~~~ 遠程方法調用的注意事項: ? ? ? ??RPC在軟件開發中非常常見,也經常被批評。當一個程序員對代碼不熟悉的時候,跟蹤RPC的性能問題是出在本地還是遠程服務器就非常麻煩,對于RPC的使用,有幾點需要特別說明: - 使用遠程調用時的本地函數最好獨立出來 - 保證代碼組件之間的依賴關系清晰明了,并用日志記錄不同的執行過程和時間 - 發生客戶端運行緩慢或者假死時,先確認RPC服務器是否還活著! - 盡量使用異步隊列來處理RPC請求,盡量不要用同步阻塞的方式運行RPC請求 ### 回調隊列 ? ? ? ??在RabbitMQ的RPC中,客戶端發送請求后,還需要得到一個響應結果,我們需要像下面這樣,在發送請求時,帶上一個回調隊列: ~~~ callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ... ~~~ ? ? ? ??上面代碼中,我們需要引入一個新的類 ~~~ import com.rabbitmq.client.AMQP.BasicProperties; ~~~ ### 消息屬性 傳輸一條消息,AMQP協議預定義了14個屬性,下面幾個是使用比較頻繁的幾個屬性: - deliveryMode:配置一個消息是否持久化。(2表示持久化)這個在第二章中有說明。 - contentType :用來描述編碼的MIME類型。與html的MIME類型類似,例如,經常使用JSON編碼是將此屬性設置為一個很好的做法:application/json。 - replyTo : 回調隊列的名稱。 - correlationId:RPC響應請求的相關編號。這個在下一節講。 ### 關聯編號? Correlation Id ??????? 如果一個客戶端有很多的計算任務,按照上面的代碼,我們會為每個任務創建一個請求,然后等待返回的結果,這種方法貌似很耗時,如果把所有的任務都放到同一個連接中,那么我們又沒法分辨出返回的結果是那個任務的?為了解決這個問題,RabbitMQ提供了一個correlationid屬性來解決這個問題。RabbitMQ為每個請求提供唯一的編號,然后在返回隊列里如果看到了這個編號,就知道我們的任務處理完成了,如果收到的編號不認識,就可以安全的忽略。 ? ? ? ??你可能會疑問,如果忽略了,那么想知道這個返回結果的客戶端是不是就收不到這個結果了?這個基本上不會出現,但是,理論上也可能發生,例如一個RPC服務器,在發送確認消息前掛了,你收到的消息可能就是不完整的。這種情況,RabbitMQ會重新發送任務處理請求。這也是為什么客戶端必須處理這些重復請求以及RPC啟用冪次模式。 ### 總結: ![](https://box.kancloud.cn/2016-03-01_56d507d847aeb.jpg) RPC工作方式: 1. 當客戶端啟動時,會創建一個匿名的回調隊列 1. 在RPC請求中,定義了兩個屬性:replyTo,表示回調隊列的名稱; correlationId,表示請求任務的唯一編號,用來區分不同請求的返回結果。 1. 將請求發送到rpc_queue隊列中 1. RPC服務器等待rpc_queue隊列的請求,如果有消息,就處理,它將計算結果發送到請求中的回調隊列里。 1. 客戶端監聽回調隊列中的消息,如果有返回消息,它根據回調消息中的correlationid進行匹配計算結果。 ### 工程代碼 **計算斐波那契數列的方法** ~~~ private static int fib(int n) throws Exception { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } ~~~ 這個方法只是用來講解我們的教程,你可別拿它在生產環境跑大數據!下面是客戶端的代碼 ### 服務器端代碼: RPCServer.java 第一步仍然是建立連接、頻道和聲明隊列。 如果我們運行多個RPC服務器,為了達到負載均衡,需要通過channel.basicQos來設置從隊列中預取消息的個數。 我們通過basicConsume 訪問隊列,如果后消息任務來了,我們就開始工作,并將結果發送到回調隊列中。 ~~~ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("RPCServer [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println("RPCServer [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } } ~~~ ### 客戶端代碼 RPCClient.java ~~~ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.util.UUID; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println("RPCClient [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println("RPCClient [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (Exception ignore) { } } } } } ~~~ 在客戶端,我們也建立一個連接和通道,并聲明一個專用的“回調”隊列 我們設置調隊列中的唯一編號和回調隊列名稱 然后我們發送任務消息到RPC服務器 接下來循環監聽回調隊列中的每一個消息,找到與我們剛才發送任務消息編號相同的消息 ### 總結: 這里的例子只是RabbitMQ中RPC服務的一個實現,你也可以根據業務需要實現更多。 rpc有一個優點,如果一個RPC服務器處理不來,可以再增加一個、兩個、三個。 我們的例子中的代碼還比較簡單,還有很多問題沒有解決: 如果沒有發現服務器,客戶端如何處理? 如果客戶端的RPC請求超時了怎么辦? 如果服務器出現了故障,發生了異常,是否將異常發送到客戶端? 在處理消息前,怎樣防止無效的消息?檢查范圍、類型? 如果你想還想繼續了解RabbitMQ,你可以在RabbitMQ中安裝管理插件,然后查看消息隊列。 本教程所有文章: [RabbitMQ入門教程 For Java【1】 - Hello World](http://blog.csdn.net/chwshuang/article/details/50521708)? -?你好世界!? [RabbitMQ入門教程 For Java【2】 - Work Queues](http://blog.csdn.net/chwshuang/article/details/50506284)??- 工作隊列 [RabbitMQ入門教程 For Java【3】 - Publish/Subscribe](http://blog.csdn.net/chwshuang/article/details/50512057)?- 發布/訂閱 [RabbitMQ入門教程 For Java【4】 - Routing](http://blog.csdn.net/chwshuang/article/details/50505060)?- ?消息路由 [RabbitMQ入門教程 For Java【5】 - Topic](http://blog.csdn.net/chwshuang/article/details/50516904)? - ?模糊匹配 [RabbitMQ入門教程 For Java【6】 - Remote procedure call (RPC)](http://blog.csdn.net/chwshuang/article/details/50518570)?- 遠程調用
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看