<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ? ? ? ?翻譯:http://www.rabbitmq.com/tutorials/tutorial-six-java.html ? ? ?在第二篇博文中,我們已經了解到了如何使用工作隊列來向多個消費者分散耗時任務。 ? ? ?但是付過我們需要在遠程電腦上運行一個方法然后等待結果,該怎么辦?這是不同的需求。這個模式通常叫做RPC。 ? ? ?本文我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個可擴展的RPC服務器端。由于我們沒有任何真實的耗時任務需要分配,所以我們將創建一個虛擬的RPC服務,可以返回斐波納契數列。 # Client interface(客戶端接口) ? ? ?為了說明RPC服務可以使用,我們創建一個簡單的客戶端類。暴露一個方法——發送RPC請求,然后阻塞直到獲得結果。 ~~~ FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result); ~~~ # Callback queue(回調隊列) ? ? ?一般在RabbitMQ中做RPC是很簡單的。客戶端發送請求消息,服務器回復響應的消息。為了接受響應的消息,我們需要在請求消息中發送一個回調隊列。可以用默認的隊列(僅限java客戶端)。試一試吧: ~~~ BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ... ~~~ # Message properties(消息屬性) ? ? ?AMQP協議為消息預定義了一組14個屬性。大部分的屬性是很少使用的。除了一下幾種: - deliveryMode:標記消息傳遞模式,2-消息持久化,其他值-瞬態。在第二篇文章中還提到過。 - contentType:內容類型,用于描述編碼的mime-type。例如經常為該屬性設置JSON編碼。 - replyTo:應答,通用的回調隊列名稱 - correlationId:關聯ID,方便RPC響應與請求關聯 ? ? ?我們需要添加一個新的導入 ~~~ import com.rabbitmq.client.AMQP.BasicProperties; ~~~ # Correlation Id ? ? ?在上述方法中為每個RPC請求創建一個回調隊列。這是很低效的。幸運的是,一個解決方案:可以為每個客戶端創建一個單一的回調隊列。 ? ? ?新的問題被提出,隊列收到一條回復消息,但是不清楚是那條請求的回復。這是就需要使用correlationId屬性了。我們要為每個請求設置唯一的值。然后,在回調隊列中獲取消息,看看這個屬性,關聯response和request就是基于這個屬性值的。如果我們看到一個未知的correlationId屬性值的消息,可以放心的無視它——它不是我們發送的請求。 ? ? ?你可能問道,為什么要忽略回調隊列中未知的信息,而不是當作一個失敗?這是由于在服務器端競爭條件的導致的。雖然不太可能,但是如果RPC服務器在發送給我們結果后,發送請求反饋前就掛掉了,這有可能會發送未知correlationId屬性值的消息。如果發生了這種情況,重啟RPC服務器將會重新處理該請求。這就是為什么在客戶端必須很好的處理重復響應,RPC應該是冪等的。 # Summary(總結) ![](https://box.kancloud.cn/2016-02-18_56c53cbe36857.jpg) ? ? ?我們的RPC的處理流程: 1. 當客戶端啟動時,創建一個匿名的回調隊列。 1. 客戶端為RPC請求設置2個屬性:replyTo,設置回調隊列名字;correlationId,標記request。 1. 請求被發送到rpc_queue隊列中。 1. RPC服務器端監聽rpc_queue隊列中的請求,當請求到來時,服務器端會處理并且把帶有結果的消息發送給客戶端。接收的隊列就是replyTo設定的回調隊列。 1. 客戶端監聽回調隊列,當有消息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。 # 完整的實例 RPC服務器端(RPCServer.java) ~~~ /** * RPC服務器端 * * @author arron * @date 2015年9月30日 下午3:49:01 * @version 1.0 */ public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個頻道 Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); //限制:每次最多給一個消費者發送1條消息 channel.basicQos(1); //為rpc_queue隊列創建消費者,用于處理請求 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //獲取請求中的correlationId屬性值,并將其設置到結果消息的correlationId屬性中 BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); //獲取回調隊列名字 String callQueueName = props.getReplyTo(); String message = new String(delivery.getBody(),"UTF-8"); System.out.println(" [.] fib(" + message + ")"); //獲取結果 String response = "" + fib(Integer.parseInt(message)); //先發送回調結果 channel.basicPublish("", callQueueName, replyProps,response.getBytes()); //后手動發送消息反饋 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } /** * 計算斐波列其數列的第n項 * * @param n * @return * @throws Exception */ private static int fib(int n) throws Exception { if (n < 0) throw new Exception("參數錯誤,n必須大于等于0"); if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } } ~~~ RPC客戶端(RPCClient.java): ~~~ /** * * @author arron * @date 2015年9月30日 下午3:44:43 * @version 1.0 */ public class RPCClient { private static final String RPC_QUEUE_NAME = "rpc_queue"; private Connection connection; private Channel channel; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 創建一個連接 connection = factory.newConnection(); // 創建一個頻道 channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); //為每一個客戶端獲取一個隨機的回調隊列 replyQueueName = channel.queueDeclare().getQueue(); //為每一個客戶端創建一個消費者(用于監聽回調隊列,獲取結果) consumer = new QueueingConsumer(channel); //消費者與隊列關聯 channel.basicConsume(replyQueueName, true, consumer); } /** * 獲取斐波列其數列的值 * * @param message * @return * @throws Exception */ public String call(String message) throws Exception{ String response = null; String corrId = java.util.UUID.randomUUID().toString(); //設置replyTo和correlationId屬性值 BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); //發送消息到rpc_queue隊列 channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(),"UTF-8"); break; } } return response; } public static void main(String[] args) throws Exception { RPCClient fibonacciRpc = new RPCClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result); } } ~~~ 輸出結果: ~~~ fib(4) is 3 ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看