# RabbitMQ
基于AMQP協議,由Erlang語言開發,穩定性高,不丟失數據,與spring框架集成度高,使用非常廣泛。
AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件同產品,不同的開發語言等條件的限制。
【參考:https://www.jianshu.com/p/284b1b928ee1】
基于AMQP協議的消息隊列的架構:
:-: 
## 一、Windows系統安裝RabbitMQ
1. 安裝Erlang環境,并將bin目錄添加進環境變量;下載地址:[https://www.erlang.org/downloads](https://links.jianshu.com/go?to=https%3A%2F%2Fwww.erlang.org%2Fdownloads)
2. 安裝RabbitMQ,并將sbin目錄添加進環境變量;下載地址:[https://www.rabbitmq.com/install-windows.html](https://links.jianshu.com/go?to=https%3A%2F%2Fwww.rabbitmq.com%2Finstall-windows.html)。
3. 激活可視化管理插件:
sbin目錄下運行
~~~
rabbitmq-plugins.bat enable rabbitmq_management
~~~
4. 點擊rabbitmq-server.bat啟動服務,并在瀏覽器中訪問http://localhost:15672,使用默認賬號guest,密碼guest登錄。
## 二、消息隊列通信模式
### 2.1 直連模式
:-: 
生成者生產的消息直接放在隊列中,而不用通過交換機,消費者直接從隊列中拿取數據。
### 2.2 任務模型
Work Queue,或者稱為Task Queue, 任務模型。在某些場景下可能會導致生產的消息比消費的消息要多,這個時候就可以讓多個消費者共享一個消息隊列,每當一條消息被消費的時候就會消失,避免任務的重復執行。
:-: 
在默認的情況下,會采用輪詢的負載均衡方式給每個消費者平均分配消息。
### 2.3 廣播模型
fanout 扇出或稱廣播,在這種模式中需要我們設置交換機,并且會為每個消費生成臨時隊列,這些臨時隊列為消費者所獨有。這些臨時隊列中共享交換機中的數據。`即同個消息可以有多個消費者使用。`
:-: 
與任務模型不同的是每個消息被多個消費者消費。
### 2.4 訂閱模型
routing 路由訂閱模型(direct)
廣播模型會將所有的消息都共享給所有的臨時隊列,有時候可能需要區分消費者能夠消費的消息,這個時候就要用到訂閱模型。例如在日志系統中,error級別的要持久化到磁盤中,而info,error,warning等級別的只要打印到控制臺就行了,這個使用就可以使用訂閱模型對不同的消費者能夠消費的信息進行區分。**push推送模型**
:-: 
### 2.5 動態路由模型
Topic模型,與訂閱模型相比,動態路由模型可以通過通配符的方式類配置routing key的名稱。
這種模式下建議routing key的命名方式為:`name1.name2`
通配符的規則如下:
~~~
?* 匹配一個單詞
?# 匹配零個或多個單詞
~~~
## 三、與Spring Boot整合
1. 導入依賴
~~~
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.1</version>
</dependency>
~~~
2. 配置
~~~
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: demo2
username: demo2User
password: 123
~~~
3. 在生產者注入RabbitTemplate,調用API生產消息。
4. 在消費者中使用注解@RabbitListener表明是一個消費者。
### 3.1直連模式使用
1. 生產者
~~~
@SpringBootTest(classes = RabbitmqDemoApplication.class)
public class RabbitmqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void hello() {
rabbitTemplate.convertAndSend("hello", "hello for rabbitmq!");
}
}
~~~
convertAndSend參數中第一個參數是隊列名稱,第二個參數是要傳遞的消息。
2. 消費者
~~~
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloConsumer {
@RabbitHandler
public void receive(String message) {
System.out.println("message = " + message);
}
}
~~~
在這種模式中等到有消費者的時候才會創建消息隊列。
@RabbitListener中監聽隊列“hello”的內容,@RabbitHandler注解處理數據的方法。@Queue表示具體的隊列,其主要的參數有:
```
name:隊列名稱、默認
durable:是否進行持久化
exclusive:是否獨占
autoDelete:是否自動刪除
```
### 3.2 任務模式
1. 生產者
~~~
@Test
public void workTest() {
for (int i = 0; i < 1000; i++) {
rabbitTemplate.convertAndSend("work", "test work queue for rabbitmq!");
}
}
~~~
2. 消費者
~~~
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("消費者1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("消費者2 = " + message);
}
}
~~~
在一個類中配置多個消費者。
### 3.3 廣播模型
需要用到交換機。
1. 生產者
~~~
@Test
public void publicTest() {
for (int i = 0; i < 100; i++) {
// 交換機名稱、路由、消息對象
rabbitTemplate.convertAndSend("logs", "", "message:" + i);
}
}
~~~
2. 消費者
~~~
@Component
public class PublishConsumer {
@RabbitListener(bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs", type = "fanout"))})
public void receive1(String message) {
System.out.println("consumer1 = " + message);
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs", type = "fanout"))})
public void receive2(String message) {
System.out.println("consumer2 = " + message);
}
}
~~~
`bindings屬性`用來將該消費和交換機進行綁定
`@QueueBinding`隊列綁定;
`value屬性`:要綁定的隊列,直接@Queue為臨時生成;
`exchange屬性`:要綁定的交換機;
`@Exchange()`: value屬性為交換機的名稱,type為類型,這種模式中設置為“fanout”。
### 3.4 路由模型
在廣播模式的基礎上添加路由。
1. 生產者
~~~
@Test
public void routingTest() {
String routeErrorKey = "error";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend("routing", routeErrorKey,
"routing ["+ routeErrorKey +"] message for rabbitmq");
}
String routeInfoKey = "info";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend("routing", routeInfoKey,
"routing ["+ routeInfoKey +"] message for rabbitmq");
}
}
~~~
2. 消費者
~~~
@Component
public class RoutingConsumer {
@RabbitListener(bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "routing", type = "direct"), key = {"info"})})
public void receive1(String message) {
System.out.println("info routing = " + message);
}
@RabbitListener( bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "routing", type = "direct"), key = {"error"})})
public void receive2(String message) {
System.out.println("error routing = " + message);
}
}
~~~
設置交換機關注的key即可。
### 3.5 動態路由
key由通配符來設定:
1. 生產者
~~~
@Test
public void topicTest() {
String routKeyName = "user.save.delete";
rabbitTemplate.convertAndSend("topic", routKeyName, "topic [" + routKeyName + "] message for rabbitmq");
}
~~~
2. 消費者
~~~
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topic", type = "topic"),
key = {"user.*"}
)
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topic", type = "topic"),
key = {"user.#"}
)
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
~~~
## 四、應用場景
### 4.1 異步處理
`場景:用戶注冊后,可能需要發送注冊郵件和短信,傳統的方式有串行方式和并行方式`
* 串行方式: 將注冊信息寫入數據庫后,發送郵件后再發送短信,三個任務完成后才返回給客戶端。
:-: 
* 并行的方式:將注冊信息寫入數據庫后,同時發送郵件和短信,以上三個任務完成后才返回給客戶端。
:-: 
~~~
?發送郵件和短信并不是注冊必須的,只是一種通知和確認,客戶端不應該等待著兩者的完成。
~~~
* 消息隊列方式:將發郵件和短信的業務交給消息隊列進行異步處理。
:-: 
額外的時間為寫入消息隊列的時間。主程序并不進行等待。
`使用廣播模型`
### 4.2 應用解耦
`場景:用戶下訂單后,訂單系統需要通知庫存系統,傳統的做法就是訂單系統調用庫存系統的接口。`
:-: 
這樣做的缺點:
當庫存系統出現故障時,訂單就會失敗。訂單系統和庫存系統是高度耦合的存在。
引入消息隊列后:
:-: 
* 訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功
* 庫存系統:訂閱下單信息,獲取下單消息,進行庫存操作。就是庫存系統出現故障,訂單也會保存在消息隊列中,確保消息不丟失。
### 流量削峰
`場景:秒殺活動,一般會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前端加入消息隊列。`
作用:
1. 可以控制活動人數,超過一定閾值時直接拋棄用戶請求
2. 可以緩解短時間高流量壓垮應用
:-: 
- 第一章 Java基礎
- ThreadLocal
- Java異常體系
- Java集合框架
- List接口及其實現類
- Queue接口及其實現類
- Set接口及其實現類
- Map接口及其實現類
- JDK1.8新特性
- Lambda表達式
- 常用函數式接口
- stream流
- 面試
- 第二章 Java虛擬機
- 第一節、運行時數據區
- 第二節、垃圾回收
- 第三節、類加載機制
- 第四節、類文件與字節碼指令
- 第五節、語法糖
- 第六節、運行期優化
- 面試常見問題
- 第三章 并發編程
- 第一節、Java中的線程
- 第二節、Java中的鎖
- 第三節、線程池
- 第四節、并發工具類
- AQS
- 第四章 網絡編程
- WebSocket協議
- Netty
- Netty入門
- Netty-自定義協議
- 面試題
- IO
- 網絡IO模型
- 第五章 操作系統
- IO
- 文件系統的相關概念
- Java幾種文件讀寫方式性能對比
- Socket
- 內存管理
- 進程、線程、協程
- IO模型的演化過程
- 第六章 計算機網絡
- 第七章 消息隊列
- RabbitMQ
- 第八章 開發框架
- Spring
- Spring事務
- Spring MVC
- Spring Boot
- Mybatis
- Mybatis-Plus
- Shiro
- 第九章 數據庫
- Mysql
- Mysql中的索引
- Mysql中的鎖
- 面試常見問題
- Mysql中的日志
- InnoDB存儲引擎
- 事務
- Redis
- redis的數據類型
- redis數據結構
- Redis主從復制
- 哨兵模式
- 面試題
- Spring Boot整合Lettuce+Redisson實現布隆過濾器
- 集群
- Redis網絡IO模型
- 第十章 設計模式
- 設計模式-七大原則
- 設計模式-單例模式
- 設計模式-備忘錄模式
- 設計模式-原型模式
- 設計模式-責任鏈模式
- 設計模式-過濾模式
- 設計模式-觀察者模式
- 設計模式-工廠方法模式
- 設計模式-抽象工廠模式
- 設計模式-代理模式
- 第十一章 后端開發常用工具、庫
- Docker
- Docker安裝Mysql
- 第十二章 中間件
- ZooKeeper