[TOC]
# 1. routing_key規則
通過 Topic 交換機發送的消息的 routing_key 不能隨意寫,必須滿足規則:<mark>它必須是一個單詞列表,以點號`.`分隔開</mark>,這些單詞可以是任意單詞,比如:
```
stock.usd.nyse
nyse.vmw
quick.orange.rabbit
```
但這個單詞列表最多不能超過 255 個字節。
<br/>
在這個規則列表中,其中有兩個替換符是大家需要注意的:`*`可以代替一個單詞;`#`可以替代零個或多個單詞。

Q1 → 綁定的是中間帶 orange,并且共有3個單詞的key,`*.orange.*`。
Q2 → 綁定的是最后一個單詞是 rabbit,并且共有3個單詞的key,`*.*.rabbit`;還有就是第一個單詞是 lazy 的多個單詞的key,`lazy.#`。
<br/>
我們來看看上圖隊列綁定關系之間數據接收情況是怎么樣的。
```
quick.orange.rabbit 被隊列 Q1Q2 接收到
lazy.orange.elephant 被隊列 Q1Q2 接收到
quick.orange.fox 被隊列 Q1 接收到
lazy.brown.fox 被隊列 Q2 接收到
lazy.pink.rabbit 雖然滿足兩個綁定但只被隊列 Q2 接收一次
quick.brown.fox 不匹配任何綁定不會被任何隊列接收到會被丟棄
quick.orange.male.rabbit 是四個單詞不匹配任何綁定會被丟棄
lazy.orange.male.rabbit 是四個單詞但匹配 Q2
```
<br/>
當隊列綁定關系是下列這種情況時需要引起注意:
(1)當一個隊列綁定鍵是`#`,那么這個隊列將接收所有數據,就有點像 fanout 了。
(2)如果隊列綁定鍵當中沒有出現`#`和`*`,那么該隊列綁定類型就是 direct 了。
<br/>
# 2. Topic交換機演示
**1. 生產者**
```java
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
/*
* 聲明一個 exchange。
* exchangeDeclare(String var1, String var2)
* var1: exchange名稱
* var2: exchange類型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/*
* Q1-->綁定的是中間帶 orange 并且共有3個單詞的key(*.orange.*)
* Q2-->綁定的是最后一個單詞是 rabbit 并且共有3個單詞(*.*.rabbit) 和 第一個單詞是 lazy 的多個單詞(lazy.#) 的key
*/
Map<String, String> bindingKeyMap = new HashMap<>(16);
bindingKeyMap.put("quick.orange.rabbit", "被隊列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant", "被隊列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox", "被隊列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox", "被隊列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit", "雖然滿足兩個綁定但只被隊列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何綁定不會被任何隊列接收到會被丟棄");
bindingKeyMap.put("quick.orange.male.rabbit", "是四個單詞不匹配任何綁定會被丟棄");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四個單詞但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
System.out.println(EmitLogTopic.class.getSimpleName() + "[生產者發出消息]: " + message);
}
}
}
}
```
**2. 兩個消費者**
(1)*`ReceiveLogsTopic01`*
```java
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//聲明 Q1 隊列與綁定關系
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println(ReceiveLogsTopic01.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String newMsg = "{隊列名稱: " + queueName + ", routingKey: " + delivery.getEnvelope().getRoutingKey()
+ ", msg: " + message + "}";
System.out.println(ReceiveLogsTopic01.class.getSimpleName() + "[接收到的消息]: " + newMsg);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
```
(2)*`ReceiveLogsTopic02`*
```java
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//聲明 Q2 隊列與綁定關系
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println(ReceiveLogsTopic02.class.getSimpleName() + "[等待接收消息...]");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String newMsg = "{隊列名稱: " + queueName + ", routingKey: " + delivery.getEnvelope().getRoutingKey()
+ ", msg: " + message + "}";
System.out.println(ReceiveLogsTopic02.class.getSimpleName() + "[接收到的消息]: " + newMsg);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
```
**3. 測試**
先啟動兩個消費者,再啟動生產者并生產多條消息,控制臺輸出如下。
```
-----------EmitLogTopic生產者生產了多條消息-----------
EmitLogTopic[生產者發出消息]: 是四個單詞不匹配任何綁定會被丟棄
EmitLogTopic[生產者發出消息]: 不匹配任何綁定不會被任何隊列接收到會被丟棄
EmitLogTopic[生產者發出消息]: 被隊列 Q1Q2 接收到
EmitLogTopic[生產者發出消息]: 被隊列 Q2 接收到
EmitLogTopic[生產者發出消息]: 被隊列 Q1Q2 接收到
EmitLogTopic[生產者發出消息]: 被隊列 Q1 接收到
EmitLogTopic[生產者發出消息]: 雖然滿足兩個綁定但只被隊列 Q2 接收一次
EmitLogTopic[生產者發出消息]: 是四個單詞但匹配 Q2
-----------ReceiveLogsTopic01消費者收到的消息-----------
ReceiveLogsTopic01[接收到的消息]: {隊列名稱: Q1, routingKey: lazy.orange.elephant, msg: 被隊列 Q1Q2 接收到}
ReceiveLogsTopic01[接收到的消息]: {隊列名稱: Q1, routingKey: quick.orange.rabbit, msg: 被隊列 Q1Q2 接收到}
ReceiveLogsTopic01[接收到的消息]: {隊列名稱: Q1, routingKey: quick.orange.fox, msg: 被隊列 Q1 接收到}
-----------ReceiveLogsTopic02消費者收到的消息-----------
ReceiveLogsTopic02[接收到的消息]: {隊列名稱: Q2, routingKey: lazy.orange.elephant, msg: 被隊列 Q1Q2 接收到}
ReceiveLogsTopic02[接收到的消息]: {隊列名稱: Q2, routingKey: lazy.brown.fox, msg: 被隊列 Q2 接收到}
ReceiveLogsTopic02[接收到的消息]: {隊列名稱: Q2, routingKey: quick.orange.rabbit, msg: 被隊列 Q1Q2 接收到}
ReceiveLogsTopic02[接收到的消息]: {隊列名稱: Q2, routingKey: lazy.pink.rabbit, msg: 雖然滿足兩個綁定但只被隊列 Q2 接收一次}
ReceiveLogsTopic02[接收到的消息]: {隊列名稱: Q2, routingKey: lazy.orange.male.rabbit, msg: 是四個單詞但匹配 Q2}
```
****
案例代碼:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01
- 消息隊列
- 什么是MQ
- MQ的作用
- MQ的分類
- MQ的選擇
- RabbitMQ
- RabbitMQ是什么
- 四大核心概念
- 工作原理
- 環境搭建
- windows系統下的搭建
- centos7系統下的搭建
- 常用命令
- 服務相關命令
- 管理用戶命令
- 管理隊列命令
- 第一個RabbitMQ程序
- 工作隊列
- 輪詢分發消息
- 消息應答
- 持久化
- 發布確認
- 發布確認原理
- 發布確認策略
- 交換機概念
- 交換機類型
- 無名交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 死信隊列
- 死信概念
- 死信來源
- 死信實戰
- 延遲隊列
- 什么是延遲隊列
- TTL設置方式
- 隊列TTL延遲隊列
- 消息TTL延遲隊列
- 插件打造延遲隊列
- 延遲隊列總結
- 發布確認高級
- 代碼實現
- 回退消息
- 備份交換機
- 冪等性
- 冪等性概念
- 消息重復消費
- 消費端冪等性保障
- 優先級隊列
- 使用場景
- 設置優先級
- 惰性隊列
- 什么是惰性隊列
- 隊列的兩種模式
- 聲明惰性隊列
- RabbitMQ集群
- 為什么要搭建集群
- 集群搭建步驟
- 集群工作方式
- 脫離集群
- 鏡像隊列
- 高可用負載均衡