RabbitMQ入門教程 For Java【6】 - Remote procedure call (RPC)
? ? ? ??
### **我的開發環境:**
操作系統:**Windows7 64bit**
開發環境:**JDK 1.7 -?1.7.0_55**
開發工具:**Eclipse Kepler SR2**
RabbitMQ版本:?**3.6.0**
Elang版本:**erl7.2.1**
關于Windows7下安裝RabbitMQ的教程請先在網上找一下,有空我再補安裝教程。
### 源碼地址
https://github.com/chwshuang/rabbitmq.git
? ? ? ? 本教程中,我們將學習使用工作隊列讓多個消費者端來執行耗時的任務。比如我們需要通過遠程服務器幫我們計算某個結果。這種模式通常被稱之為遠程方法調用或RPC.
? ? ? ??我們通過RabbitMQ搭建一個RPC系統,一個客戶端和一個RPC服務器,客戶端有一個斐波那契數列方面的問題需要解決(Fibonacci numbers),RPC服務器負責技術收到這個消息,然后計算結果,并且返回這個斐波那契數列。
### 客戶端接口
? ? ? ??我們需要創建一個簡單的客戶端類,通過調用客戶端的call方法,來計算結果。
~~~
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
~~~
遠程方法調用的注意事項:
? ? ? ??RPC在軟件開發中非常常見,也經常被批評。當一個程序員對代碼不熟悉的時候,跟蹤RPC的性能問題是出在本地還是遠程服務器就非常麻煩,對于RPC的使用,有幾點需要特別說明:
- 使用遠程調用時的本地函數最好獨立出來
- 保證代碼組件之間的依賴關系清晰明了,并用日志記錄不同的執行過程和時間
- 發生客戶端運行緩慢或者假死時,先確認RPC服務器是否還活著!
- 盡量使用異步隊列來處理RPC請求,盡量不要用同步阻塞的方式運行RPC請求
### 回調隊列
? ? ? ??在RabbitMQ的RPC中,客戶端發送請求后,還需要得到一個響應結果,我們需要像下面這樣,在發送請求時,帶上一個回調隊列:
~~~
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
~~~
? ? ? ??上面代碼中,我們需要引入一個新的類
~~~
import com.rabbitmq.client.AMQP.BasicProperties;
~~~
### 消息屬性
傳輸一條消息,AMQP協議預定義了14個屬性,下面幾個是使用比較頻繁的幾個屬性:
- deliveryMode:配置一個消息是否持久化。(2表示持久化)這個在第二章中有說明。
- contentType :用來描述編碼的MIME類型。與html的MIME類型類似,例如,經常使用JSON編碼是將此屬性設置為一個很好的做法:application/json。
- replyTo : 回調隊列的名稱。
- correlationId:RPC響應請求的相關編號。這個在下一節講。
### 關聯編號? Correlation Id
??????? 如果一個客戶端有很多的計算任務,按照上面的代碼,我們會為每個任務創建一個請求,然后等待返回的結果,這種方法貌似很耗時,如果把所有的任務都放到同一個連接中,那么我們又沒法分辨出返回的結果是那個任務的?為了解決這個問題,RabbitMQ提供了一個correlationid屬性來解決這個問題。RabbitMQ為每個請求提供唯一的編號,然后在返回隊列里如果看到了這個編號,就知道我們的任務處理完成了,如果收到的編號不認識,就可以安全的忽略。
? ? ? ??你可能會疑問,如果忽略了,那么想知道這個返回結果的客戶端是不是就收不到這個結果了?這個基本上不會出現,但是,理論上也可能發生,例如一個RPC服務器,在發送確認消息前掛了,你收到的消息可能就是不完整的。這種情況,RabbitMQ會重新發送任務處理請求。這也是為什么客戶端必須處理這些重復請求以及RPC啟用冪次模式。
### 總結:

RPC工作方式:
1. 當客戶端啟動時,會創建一個匿名的回調隊列
1. 在RPC請求中,定義了兩個屬性:replyTo,表示回調隊列的名稱; correlationId,表示請求任務的唯一編號,用來區分不同請求的返回結果。
1. 將請求發送到rpc_queue隊列中
1. RPC服務器等待rpc_queue隊列的請求,如果有消息,就處理,它將計算結果發送到請求中的回調隊列里。
1. 客戶端監聽回調隊列中的消息,如果有返回消息,它根據回調消息中的correlationid進行匹配計算結果。
### 工程代碼
**計算斐波那契數列的方法**
~~~
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
~~~
這個方法只是用來講解我們的教程,你可別拿它在生產環境跑大數據!下面是客戶端的代碼
### 服務器端代碼:
RPCServer.java
第一步仍然是建立連接、頻道和聲明隊列。
如果我們運行多個RPC服務器,為了達到負載均衡,需要通過channel.basicQos來設置從隊列中預取消息的個數。
我們通過basicConsume 訪問隊列,如果后消息任務來了,我們就開始工作,并將結果發送到回調隊列中。
~~~
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println("RPCServer [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println("RPCServer [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
~~~
### 客戶端代碼
RPCClient.java
~~~
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println("RPCClient [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println("RPCClient [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}
~~~
在客戶端,我們也建立一個連接和通道,并聲明一個專用的“回調”隊列
我們設置調隊列中的唯一編號和回調隊列名稱
然后我們發送任務消息到RPC服務器
接下來循環監聽回調隊列中的每一個消息,找到與我們剛才發送任務消息編號相同的消息
### 總結:
這里的例子只是RabbitMQ中RPC服務的一個實現,你也可以根據業務需要實現更多。
rpc有一個優點,如果一個RPC服務器處理不來,可以再增加一個、兩個、三個。
我們的例子中的代碼還比較簡單,還有很多問題沒有解決:
如果沒有發現服務器,客戶端如何處理?
如果客戶端的RPC請求超時了怎么辦?
如果服務器出現了故障,發生了異常,是否將異常發送到客戶端?
在處理消息前,怎樣防止無效的消息?檢查范圍、類型?
如果你想還想繼續了解RabbitMQ,你可以在RabbitMQ中安裝管理插件,然后查看消息隊列。
本教程所有文章:
[RabbitMQ入門教程 For Java【1】 - Hello World](http://blog.csdn.net/chwshuang/article/details/50521708)? -?你好世界!?
[RabbitMQ入門教程 For Java【2】 - Work Queues](http://blog.csdn.net/chwshuang/article/details/50506284)??- 工作隊列
[RabbitMQ入門教程 For Java【3】 - Publish/Subscribe](http://blog.csdn.net/chwshuang/article/details/50512057)?- 發布/訂閱
[RabbitMQ入門教程 For Java【4】 - Routing](http://blog.csdn.net/chwshuang/article/details/50505060)?- ?消息路由
[RabbitMQ入門教程 For Java【5】 - Topic](http://blog.csdn.net/chwshuang/article/details/50516904)? - ?模糊匹配
[RabbitMQ入門教程 For Java【6】 - Remote procedure call (RPC)](http://blog.csdn.net/chwshuang/article/details/50518570)?- 遠程調用