RabbitMQ入門教程 For Java【5】 - Topic
### **我的開發環境:**
操作系統:**Windows7 64bit**
開發環境:**JDK 1.7 -?1.7.0_55**
開發工具:**Eclipse Kepler SR2**
RabbitMQ版本:?**3.6.0**
Elang版本:**erl7.2.1**
關于Windows7下安裝RabbitMQ的教程請先在網上找一下,有空我再補安裝教程。
### 源碼地址
https://github.com/chwshuang/rabbitmq.git
# Topic模式
??????? 匹配模式,如果按照百度翻譯和百度百科,直接叫主題或者話題就得了,但是如果你真的明白它在RabbitMQ中代表什么,就不能這么直接的翻譯成中文了。如果要用中文理解它的意思,先了解它在RabbitMQ中用來做什么:topic類型的交換器允許在RabbitMQ中使用模糊匹配來綁定自己感興趣的信息。
??????? 所以,我覺得這一章應該叫macth模式更合適,中文 - 匹配模式。
??????? 在上一章,通過直連交換器,生產者發送不同路由關鍵字的日志,消費者端通過綁定自己感興趣的路由關鍵字來接收消息,進行完善日志系統。如果我想只接收生產者com.test.rabbitmq.topic包下的日志,其他包的忽略掉,之前的日志系統處理起來可能就非常麻煩,還好,我們有匹配模式,現在我們將生產者發送過來的消息按照包名來命名,那么消費者端就可以在匹配模式下使用【#.topic.*】這個路由關鍵字來獲得感興趣的消息。
### 匹配交換器
通過匹配交換器,我們可以配置更靈活的消息系統,你可以在匹配交換器模式下發送這樣的路由關鍵字:
“a.b.c”、“c.d”、“quick.orange.rabbit”
不過一定要記住,路由關鍵字【routingKey】不能超過255個字節(bytes)
匹配交換器的匹配符
- *(星號)表示一個單詞
- #(井號)表示零個或者多個單詞
### **示例說明:**
這一章的例子中,我們使用三個段式的路由關鍵字,有三個單詞和兩個點組成。第一個詞是速度,第二個詞是顏色,第三個是動物名稱。
我們用三個關鍵字來綁定,Q1綁定關鍵字是【*.orange.*】,Q2綁定關鍵字是【*.*.rabbit】和【lazy.#】,然后分析會發生什么:

- Q1會收到所有orange這種顏色相關的消息
- Q2會收到所有rabbit這個動物相關的消息和所有速度lazy的動物的消息
### **分析:**
生產者發送“quick.orange.rabbit”的消息,兩個隊列都會收到
生產者發送“lazy.orange.elephant”,兩隊列也都會收到。
生產者發送"quick.orange.fox",那么只有Q1會收到。
生產者發送"lazy.brown.fox",那么只會有Q2能收到。
生產者發送"quick.brown.fox",那么這條消息會被丟棄,誰也收不到。
生產者發送"quick.orange.male.rabbit",這個消息也會被丟棄,誰也收不到。
生產者發送"lazy.orange.male.rabbit",這個消息會被Q2的【lazy.#】規則匹配上,發送到Q2隊列中。
### 注意
交換器在匹配模式下:
如果消費者端的路由關鍵字只使用【#】來匹配消息,在匹配【topic】模式下,它會變成一個分發【fanout】模式,接收所有消息。
如果消費者端的路由關鍵字中沒有【#】或者【*】,它就變成直連【direct】模式來工作。
### 測試代碼
### 包圖

### 代碼
ReceiveLogsTopic1.java
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個匹配模式的交換器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 路由關鍵字
String[] routingKeys = new String[]{"*.orange.*"};
// 綁定路由關鍵字
for (String bindingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println("ReceiveLogsTopic1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
}
System.out.println("ReceiveLogsTopic1 [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsTopic1 [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
~~~
ReceiveLogsTopic2.java
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個匹配模式的交換器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 路由關鍵字
String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
// 綁定路由關鍵字
for (String bindingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
}
System.out.println("ReceiveLogsTopic2 [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsTopic2 [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
~~~
TopicSend.java
~~~
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class TopicSend {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
// 聲明一個匹配模式的交換器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 待發送的消息
String[] routingKeys = new String[]{"quick.orange.rabbit",
"lazy.orange.elephant",
"quick.orange.fox",
"lazy.brown.fox",
"quick.brown.fox",
"quick.orange.male.rabbit",
"lazy.orange.male.rabbit"};
// 發送消息
for(String severity :routingKeys){
String message = "From "+severity+" routingKey' s message!";
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println("TopicSend [x] Sent '" + severity + "':'" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
~~~
先運行ReceiveLogsTopic1.java
~~~
ReceiveLogsTopic1 [*] Waiting for messages. To exit press CTRL+C
~~~
再運行ReceiveLogsTopic2
~~~
ReceiveLogsTopic2 exchange:topic_logs, queue:amq.gen-JwqUJNUGpdFGkeY5B6TsLw, BindRoutingKey:*.*.rabbit
ReceiveLogsTopic2 exchange:topic_logs, queue:amq.gen-JwqUJNUGpdFGkeY5B6TsLw, BindRoutingKey:lazy.#
ReceiveLogsTopic2 [*] Waiting for messages. To exit press CTRL+C
~~~
然后運行TopicSend.java發送7條消息
~~~
TopicSend [x] Sent 'quick.orange.rabbit':'From quick.orange.rabbit routingKey' s message!'
TopicSend [x] Sent 'lazy.orange.elephant':'From lazy.orange.elephant routingKey' s message!'
TopicSend [x] Sent 'quick.orange.fox':'From quick.orange.fox routingKey' s message!'
TopicSend [x] Sent 'lazy.brown.fox':'From lazy.brown.fox routingKey' s message!'
TopicSend [x] Sent 'quick.brown.fox':'From quick.brown.fox routingKey' s message!'
TopicSend [x] Sent 'quick.orange.male.rabbit':'From quick.orange.male.rabbit routingKey' s message!'
TopicSend [x] Sent 'lazy.orange.male.rabbit':'From lazy.orange.male.rabbit routingKey' s message!'
~~~
再看ReceiveLogsTopic1.java,收到3條匹配的消息。
~~~
ReceiveLogsTopic1 [x] Received 'quick.orange.rabbit':'From quick.orange.rabbit routingKey' s message!'
ReceiveLogsTopic1 [x] Received 'lazy.orange.elephant':'From lazy.orange.elephant routingKey' s message!'
ReceiveLogsTopic1 [x] Received 'quick.orange.fox':'From quick.orange.fox routingKey' s message!'
~~~
再看ReceiveLogsTopic2.java,收到4條匹配的消息。
~~~
ReceiveLogsTopic2 [x] Received 'quick.orange.rabbit':'From quick.orange.rabbit routingKey' s message!'
ReceiveLogsTopic2 [x] Received 'lazy.orange.elephant':'From lazy.orange.elephant routingKey' s message!'
ReceiveLogsTopic2 [x] Received 'lazy.brown.fox':'From lazy.brown.fox routingKey' s message!'
ReceiveLogsTopic2 [x] Received 'lazy.orange.male.rabbit':'From lazy.orange.male.rabbit routingKey' s message!'
~~~
最后,咱們來開動腦筋看看下面的題,當是我留的課外作業:
1. 在匹配交互器模式下,消費者端路由關鍵字 “*” 能接收到生產者端發來路由關鍵字為空的消息嗎?
1. 在匹配交換器模式下,消費者端路由關鍵字“#.*”能接收到生產者端以“..”為關鍵字的消息嗎?如果發送來的消息只有一個單詞,能匹配上嗎?
1. “a.*.#” 與 “a.#” 有什么不同嗎?
如果你的課外作業做完,并且自己動手驗證過,OK,進入第六章-遠程方法調用吧!
本教程所有文章:
[RabbitMQ入門教程 For Java【1】 - Hello World](http://blog.csdn.net/chwshuang/article/details/50521708)? -?你好世界!?
[RabbitMQ入門教程 For Java【2】 - Work Queues](http://blog.csdn.net/chwshuang/article/details/50506284)??- 工作隊列
[RabbitMQ入門教程 For Java【3】 - Publish/Subscribe](http://blog.csdn.net/chwshuang/article/details/50512057)?- 發布/訂閱
[RabbitMQ入門教程 For Java【4】 - Routing](http://blog.csdn.net/chwshuang/article/details/50505060)?- ?消息路由
[RabbitMQ入門教程 For Java【5】 - Topic](http://blog.csdn.net/chwshuang/article/details/50516904)? - ?模糊匹配
[RabbitMQ入門教程 For Java【6】 - Remote procedure call (RPC)](http://blog.csdn.net/chwshuang/article/details/50518570)?- 遠程調用
### 提示
由于本教程中rabbitmq是在本機安裝,使用的是默認端口(5672)。
如果你的例子運行中的主機、端口不同,請進行必要設置,否則可能無法運行。
### 獲得幫助
如果你閱讀這個教程有障礙,可以通過GitHub項目成員找到開發者的郵件地址聯系他們。
~~~
https://github.com/orgs/rabbitmq/people
~~~