[TOC]
服務端啟動時,配置生成一個隊列quene和Exchange,通過routing_key綁定,所以沒啟動一個服務,本地都會有一個隊列和交換機對應到rabbitmq上。
如果rabbitmq沒有對應的隊列,就會生成,對應到服務上去


同時啟動兩個服務,會均攤消費信息,并且都可以生產消息
# 1. 消息推送流程
不同的交換機通過routing_key,利用不同的策略將消息發送到隊列

* 隊列與交換機綁定
1. 生命隊列(name)和交換機(name)
2. 通過routingkey進行綁定
* 產生消息
1. 生產者向交換機發送消息(指定交換機名稱和routing_key)
2. 信息到達交換機,交換機根據routing_key找到綁定的隊列,并把消息發送到隊列
* 消費消息
1. 消費者根據隊列名稱綁定監聽隊列
2. 當隊列有消息時,就可以接收到
## 1.1 Direct Exchange?
1. 要求通過`routing_key`將一個或多個隊列綁定到交換機上
2. 消息通過發送到交換機,通過對應的routing_key發送到對應的所有隊列上
3. 一個隊列可以有多個消費者,但是消息只能被一個消費者消費(輪訓)
直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。
大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。
然后當一個消息攜帶著路由值為X,這個消息通過生產者發送給交換機時,**交換機就會根據這個路由值X去尋找綁定值也是X的隊列。**
實例:
#### **1. 配置隊列與交換機的綁定**
~~~
@Configuration
public class MQConfig {
//創建三個隊列1,2,3
//Queue的第一個參數為隊列名稱,第二個參數為是否持久存在
@Bean
public Queue directQueue1() {
return new Queue("queue1", true);
}
@Bean
public Queue directQueue2() {
return new Queue("queue2", false);
}
@Bean
public Queue directQueue3() {
return new Queue("queue3", true);
}
//創建直連交換機,參數為交換機的名稱
@Bean
public DirectExchange directExchange() {
return new DirectExchange("DIRECT_EXCHANGE"); //交換機名稱
}
//將三個隊列都與該直連交換機綁定起來,并賦予上面說的binding_key(也可以說是routing_key)
@Bean
public Binding bindingDirectExchange1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("key.1");
}
@Bean
public Binding bindingDirectExchange2() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("key.1");
}
@Bean
public Binding bindingDirectExchange3() {
return BindingBuilder.bind(directQueue3()).to(directExchange()).with("key.2");
}
}
~~~
#### 2. 發送者
通過routing_key發送到對應的所有隊列
~~~
@Autowired
MQSender mqSender;
@GetMapping("/directExchange/{message}")
public String directExchange(@PathVariable("message") String message) {
mqSender.send(message);
return "success";
}
~~~
~~~
@Service
public class MQSender {
//注入AmqpTemplate接口,該接口定義了發送和接收消息的基本操作
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String message) {
//第一個參數指將消息發送到該名稱的交換機,第二個參數為對應的routing_key,第三個參數為發送的具體消息
amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.1", message);
}
}
~~~
#### 4. 接收者
監聽隊列`@RabbitListener(queues = "queue1")`
~~~
@Service
public class MQReceiver {
private static final Logger logger = LoggerFactory.getLogger(MQReceiver.class);
//此注解表示監聽某個隊列,參數為隊列名
@RabbitListener(queues = "queue1")
public void receive1(String message) {
logger.info("queue1 receive : fanout message {}", message);
}
@RabbitListener(queues = "queue1")
public void receive1_1(String message) {
logger.info("queue1_1 receive : fanout message {}", message);
}
@RabbitListener(queues = "queue2")
public void receive2(String message) {
logger.info("queue2 receive : fanout message {}", message);
}
@RabbitListener(queues = "queue3")
public void receive3(String message) {
logger.info("receive : fanout message {}", message);
}
}
~~~

quene1綁定了兩個消費者,輪詢著消費

## 1.2 Fanout Exchange
廣播消息

1. 扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。
2. **這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。**
1. config
~~~
@Configuration
public class FanoutMQConfig {
//創建三個隊列1,2,3
//Queue的第一個參數為隊列名稱,第二個參數為是否持久存在
@Bean
public Queue fanoutQueue1() {
return new Queue("queue1", true);
}
@Bean
public Queue fanoutQueue2() {
return new Queue("queue2", false);
}
@Bean
public Queue fanoutQueue3() {
return new Queue("queue3", true);
}
//創建扇形交換機,參數為交換機的名稱
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("FANOUT_EXCHANGE");
}
//將三個隊列都與該交換機綁定起來,無需binding_key
@Bean
public Binding bindingFanoutExchange1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange3() {
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
}
~~~
2. controller
~~~
@GetMapping("/fanoutExchange/{message}")
public String fanoutExchange(@PathVariable("message") String message) {
fanoutMQSender.send(message);
return "success";
}
~~~
3. sender
~~~
//扇形交換機
public void fanoutSend(String message) {
//第一個參數指將消息發送到該名稱的交換機,第二個參數為對應的routing_key,第三個參數為發送的具體消息
amqpTemplate.convertAndSend("FANOUT_EXCHANGE", "", message);
}
~~~
4. received不變,還是監聽隊列
所以說隊列不變,只是不同的交換機將信息發送給隊列,以下三個隊列都接收到了消息。
```
2021-05-11 19:14:34.479 INFO 17996 --- [ntContainer#1-1] c.t.s.service.FanoutMQReceiver : queue1_1 receive : fanout message hello-directexchange
2021-05-11 19:14:34.479 INFO 17996 --- [ntContainer#0-1] c.t.s.service.FanoutMQReceiver : receive : fanout message hello-directexchange
2021-05-11 19:14:34.479 INFO 17996 --- [ntContainer#2-1] c.t.s.service.FanoutMQReceiver : queue2 receive : fanout message hello-directexchange
```
## 1.3 Topic Exchange
直連交換機的`routing_key`方法非常簡單,如果希望將一條消息發送給多個隊列,那么這個交換機需要綁定非常多的`routing_key`,這樣的話消息的管理就會非常的困難。
所以根據一定的規則綁定隊列方便很多
簡單地介紹下規則:
> *??(星號) 用來表示一個單詞 (必須出現的)
> #??(井號) 用來表示任意數量(零個或多個)單詞
>
通配的綁定鍵是跟隊列進行綁定的,舉個小例子
隊列Q1 綁定鍵為 `*.TT.*`? ? ? ? ? 隊列Q2綁定鍵為? TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊列Q1將會收到;
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊列Q2將會收到;
主題交換機是非常強大的,為啥這么膨脹?
當一個隊列的綁定鍵為 "#"(井號) 的時候,這個隊列將會無視消息的路由鍵,接收所有的消息。
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
所以主題交換機也就實現了扇形交換機的功能,和直連交換機的功能。
另外還有 Header Exchange 頭交換機 ,Default Exchange 默認交換機,Dead Letter Exchange 死信交換機,這幾個該篇暫不做講述
1. config
~~~
@Configuration
public class TopicMQConfig {
@Bean
public Queue topicQueue1() {
return new Queue("TOPIC_QUEUE1", true);
}
@Bean
public Queue topicQueue2() {
return new Queue("TOPIC_QUEUE2", true);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("TOPIC_EXCHANGE");
}
//將topicQueue1與topicExchange交換機綁定
@Bean
public Binding bindQueue1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
}
//將topicQueue2與topicExchange交換機綁定,可以接收topic. 開頭的routing_key
@Bean
public Binding bindQueue2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
}
}
~~~
隊列2可以接收`topic.`開頭的routing_key消息
2. send
~~~
public void send(String message) {
//第一個參數指將消息發送到該名稱的交換機,第二個參數為對應的routing_key,第三個參數為發送的具體消息
amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key1", message);
amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key2", message);
}
~~~
3. receive
~~~
@Service
public class TopicMQReceiver {
private static final Logger logger = LoggerFactory.getLogger(TopicMQReceiver.class);
@RabbitListener(queues = "TOPIC_QUEUE1")
public void receiveQueue1(String message) {
logger.info("receive : TOPIC_QUEUE1 {}", message);
}
@RabbitListener(queues ="TOPIC_QUEUE2")
public void receiveQueue2(String message) {
logger.info("receive : TOPIC_QUEUE2 {}", message);
}
}
~~~

## 1.4 Headers exchange
* 首部交換機:
通過構建消息時,加入“header”來區分隊列
1. config
~~~
@Configuration
public class HeaderMQConfig {
@Bean
public Queue headersQueue() {
return new Queue("HEADERS_QUEUE");
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("HEADERS_EXCHANGE");
}
//將headersQueue與HeadersExchange交換機綁定
@Bean
public Binding bingHeadersQueue() {
//map為綁定的規則
Map<String, Object> map = new HashMap<>();
map.put("headers1", "value1");
map.put("headers2", "value2");
//whereAll表示需要滿足所有條件
return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
}
}
~~~
2. send
~~~
public void send(String message) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("headers1", "value1");
messageProperties.setHeader("headers2", "value2");
//要發送的消息,第一個參數為具體的消息字節數組,第二個參數為消息規則
Message msg = new Message(message.getBytes(), messageProperties); //加入類似header的信息
amqpTemplate.convertAndSend("HEADERS_EXCHANGE", "", msg);
}
~~~
3. receive
~~~
@RabbitListener(queues = "HEADERS_QUEUE")
public void receiveHeadersQueue(byte[] message) {
logger.info("receive : HeadersQueue {}", new String(message));
}
~~~
http://localhost:8080/mq/headerExchange/hello-directexchange

# 2. 隊列屬性
## 2.1 quene name
> 1. 隊列名稱,隊列在聲明(declare)后才能被使用。如果一個隊列尚不存在,聲明一個隊列會創建它。
> 2. 如果聲明的隊列已經存在,并且屬性完全相同,那么此次聲明不會對原有隊列產生任何影響。如果聲明中的屬性與已存在隊列的屬性有差異,那么將會拋出一個406通道級異常。
>
## 2.2 durable
Boolean 默認false
隊列的聲明默認是存放到內存中的,稱為暫存隊列,消息代理重啟會丟失。如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫。但是隊列持久化并不意味著消息持久化當消息代理重啟后消息依舊會丟失。
exclusive :是否排外的,有兩個作用,一:當連接關閉時connection.close()該隊列是否會自動刪除;二:該隊列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個隊列,沒有任何問題,如果是排外的,會對當前隊列加鎖,其他通道channel是不能訪問的。
autoDelete :當最后一個消費者斷開連接之后隊列是否自動被刪除。