第一次聽到“消息隊列”這個詞時,不知你是不是和我反應一樣,感覺很高階很厲害的樣子,其實當我們了解了消息隊列之后,發現它與普通的技術類似,當我們熟悉之后,也能很快地上手并使用。
我們本課時的面試題是,消息隊列的使用場景有哪些?如何手動實現一個消息隊列和延遲消息隊列?
#### 典型回答
消息隊列的使用場景有很多,最常見的使用場景有以下幾個。
* [ ] 1.商品秒殺
比如,我們在做秒殺活動時,會發生短時間內出現爆發式的用戶請求,如果不采取相關的措施,會導致服務器忙不過來,響應超時的問題,輕則會導致服務假死,重則會讓服務器直接宕機,給用戶帶來的體驗也非常不好。如果這個時候加上了消息隊列,服務器接收到用戶的所有請求后,先把這些請求全部寫入到消息隊列中再排隊處理,這樣就不會導致同時處理多個請求的情況;如果消息隊列長度超過可以承載的最大數量,那么我們可以拋棄當前用戶的請求,通知前臺用戶“頁面出錯啦,請重新刷新”等提示,這樣就會有更好的交互體驗。
* [ ] 2.系統解耦
使用了消息隊列之后,我們可以把系統的業務功能模塊化,實現系統的解耦。例如,在沒有使用消息隊列之前,當前臺用戶完善了個人信息之后,首先我們需要更新用戶的資料,再添加一條用戶信息修改日志。但突然有一天產品經理提了一個需求,在前臺用戶信息更新之后,需要給此用戶的增加一定的積分獎勵,然后沒過幾天產品經理又提了一個需求,在前臺用戶信息更新之后,不但要增加積分獎勵,還要增加用戶的經驗值,但沒過幾天產品經理的需求又變了,他要求完善資料無需增加用戶的積分了,這樣反反復復、來來回回的折騰,我想研發的同學一定受不了,但這是互聯網公司的常態,那我們有沒有一勞永逸的辦法呢?
沒錯,這個時候我們想到了使用消息隊列來實現系統的解耦,每個功能的實現獨立開,只需要一個訂閱或者取消訂閱的開關就可以了,當需要增加功能時,只需要打開訂閱“用戶信息完善”的隊列就行,如果過兩天不用了,再把訂閱的開關關掉就行了,這樣我們就不用來來回回的改業務代碼了,也就輕松的實現了系統模塊間的解耦。
* [ ] 3.日志記錄
我們大部分的日志記錄行為其實是和前臺用戶操作的主業務沒有直接關系的,只是我們的運營人和經營人員需要拿到這部分用戶操作的日志信息,來進行用戶行為分析或行為監控。在我們沒有使用消息隊列之前,籠統的做法是當有用戶請求時,先處理用戶的請求再記錄日志,這兩個操作是放在一起的,而前臺用戶也需要等待日志添加完成之后才能拿到后臺的響應信息,這樣其實浪費了前臺用戶的部分時間。此時我們可以使用消息隊列,當響應完用戶請求之后,只需要把這個操作信息放入消息隊列之后,就可以直接返回結果給前臺用戶了,無序等待日志處理和日志添加完成,從而縮短了前臺用戶的等待時間。
我們可以通過 JDK 提供的 Queue 來實現自定義消息隊列,使用 DelayQueue 實現延遲消息隊列。
#### 考點分析
對于消息隊列的考察更側重于消息隊列的核心思想,因為只有理解了什么是消息隊列?以及什么情況下要用消息隊列?才能解決我們日常工作中遇到的問題,而消息隊列的具體實現,只需要掌握一個消息中間件的使用即可,因為消息隊列中間件的核心實現思路是一致的,不但如此,消息隊列中間件的使用也大致類似,只要掌握了一個就能觸類旁通的用好其他消息中間件。
和本課時相關的面試題,還有以下這兩個:
* 介紹一個你熟悉的消息中間件?
* 如何手動實現消息隊列?
#### 知識擴展
* [ ] 1.常用消息中間件 RabbitMQ
目前市面上比較常用的 MQ(Message Queue,消息隊列)中間件有 RabbitMQ、Kafka、RocketMQ,如果是輕量級的消息隊列可以使用 Redis 提供的消息隊列,本課時我們先來介紹一下 RabbitMQ,其他消息中間件將會在第 15 課時中單獨介紹。
RabbitMQ 是一個老牌開源的消息中間件,它實現了標準的 AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)消息中間件,使用 Erlang 語言開發,支持集群部署,和多種客戶端語言混合調用,它支持的主流開發語言有以下這些:
* Java and Spring
* .NET
* Ruby
* Python
* PHP
* JavaScript and Node
* Objective-C and Swift
* Rust
* Scala
* Go
更多支持語言,請點擊這里訪問官網查看。
RabbitMQ 中有 3 個重要的概念:生產者、消費者和代理。
* 生產者:消息的創建者,負責創建和推送數據到消息服務器。
* 消費者:消息的接收方,用于處理數據和確認消息。
* 代理:也就是 RabbitMQ 服務本身,它用于扮演“快遞”的角色,因為它本身并不生產消息,只是扮演了“快遞”的角色,把消息進行暫存和傳遞。
它們的運行流程,如下圖所示:

RabbitMQ 具備以下幾個優點:
* 支持持久化,RabbitMQ 支持磁盤持久化功能,保證了消息不會丟失;
* 高并發,RabbitMQ 使用了 Erlang 開發語言,Erlang 是為電話交換機開發的語言,天生自帶高并發光環和高可用特性;
* 支持分布式集群,正是因為 Erlang 語言實現的,因此 RabbitMQ 集群部署也非常簡單,只需要啟動每個節點并使用 --link 把節點加入到集群中即可,并且 RabbitMQ 支持自動選主和自動容災;
* 支持多種語言,比如 Java、.NET、PHP、Python、JavaScript、Ruby、Go 等;
* 支持消息確認,支持消息消費確認(ack)保證了每條消息可以被正常消費;
* 它支持很多插件,比如網頁控制臺消息管理插件、消息延遲插件等,RabbitMQ 的插件很多并且使用都很方便。
RabbitMQ 的消息類型,分為以下四種:
* direct(默認類型)模式,此模式為一對一的發送方式,也就是一條消息只會發送給一個消費者;
* headers 模式,允許你匹配消息的 header 而非路由鍵(RoutingKey),除此之外 headers 和 direct 的使用完全一致,但因為 headers 匹配的性能很差,幾乎不會被用到;
* fanout 模式,為多播的方式,會把一個消息分發給所有的訂閱者;
* topic 模式,為主題訂閱模式,允許使用通配符(#、*)匹配一個或者多個消息,我可以使用“cn.mq.#”匹配到多個前綴是“cn.mq.xxx”的消息,比如可以匹配到“cn.mq.rabbit”、“cn.mq.kafka”等消息。
* [ ] 2.自定義消息隊列
我們可使用 Queue 來實現消息隊列,Queue 大體可分為以下三類:
* **雙端隊列(Deque)**是 Queue 的子類也是 Queue 的補充類,頭部和尾部都支持元素插入和獲取;
* 阻塞隊列指的是在元素操作時(添加或刪除),如果沒有成功,會阻塞等待執行,比如當添加元素時,如果隊列元素已滿,隊列則會阻塞等待直到有空位時再插入;
* 非阻塞隊列,和阻塞隊列相反,它會直接返回操作的結果,而非阻塞等待操作,雙端隊列也屬于非阻塞隊列。
自定義消息隊列的實現代碼如下:
```
import java.util.LinkedList;
import java.util.Queue;
public class CustomQueue {
// 定義消息隊列
private static Queue<String> queue = new LinkedList<>();
public static void main(String[] args) {
producer(); // 調用生產者
consumer(); // 調用消費者
}
// 生產者
public static void producer() {
// 添加消息
queue.add("first message.");
queue.add("second message.");
queue.add("third message.");
}
// 消費者
public static void consumer() {
while (!queue.isEmpty()) {
// 消費消息
System.out.println(queue.poll());
}
}
}
```
以上程序的執行結果是:
```
first message.
second message.
third message.
```
可以看出消息是以先進先出順序進行消費的。
實現自定義延遲隊列需要實現 Delayed 接口,重寫 getDelay() 方法,延遲隊列完整實現代碼如下:
```
import lombok.Getter;
import lombok.Setter;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 自定義延遲隊列
*/
public class CustomDelayQueue {
// 延遲消息隊列
private static DelayQueue delayQueue = new DelayQueue();
public static void main(String[] args) throws InterruptedException {
producer(); // 調用生產者
consumer(); // 調用消費者
}
// 生產者
public static void producer() {
// 添加消息
delayQueue.put(new MyDelay(1000, "消息1"));
delayQueue.put(new MyDelay(3000, "消息2"));
}
// 消費者
public static void consumer() throws InterruptedException {
System.out.println("開始執行時間:" +
DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()) {
System.out.println(delayQueue.take());
}
System.out.println("結束執行時間:" +
DateFormat.getDateTimeInstance().format(new Date()));
}
/**
* 自定義延遲隊列
*/
static class MyDelay implements Delayed {
// 延遲截止時間(單位:毫秒)
long delayTime = System.currentTimeMillis();
// 借助 lombok 實現
@Getter
@Setter
private String msg;
/**
* 初始化
* @param delayTime 設置延遲執行時間
* @param msg 執行的消息
*/
public MyDelay(long delayTime, String msg) {
this.delayTime = (this.delayTime + delayTime);
this.msg = msg;
}
// 獲取剩余時間
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 隊列里元素的排序依據
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return this.msg;
}
}
}
```
以上程序的執行結果是:
```
開始執行時間:2020-4-2 16:17:28
消息1
消息2
結束執行時間:2020-4-2 16:17:31
```
可以看出,消息 1 和消息 2 都實現了延遲執行的功能。
#### 小結
本課時講了消息隊列的使用場景:商品秒殺、系統解耦和日志記錄,我們還介紹了 RabbitMQ 以及它的消息類型和它的特點等內容,同時還使用 Queue 的子類 LinkedList 實現了自定義消息隊列,使用 DelayQueue 實現了自定義延遲消息隊列。
#### 課后問答
* 1、
- 前言
- 開篇詞
- 開篇詞:大廠技術面試“潛規則”
- 模塊一:Java 基礎
- 第01講:String 的特點是什么?它有哪些重要的方法?
- 第02講:HashMap 底層實現原理是什么?JDK8 做了哪些優化?
- 第03講:線程的狀態有哪些?它是如何工作的?
- 第04講:詳解 ThreadPoolExecutor 的參數含義及源碼執行流程?
- 第05講:synchronized 和 ReentrantLock 的實現原理是什么?它們有什么區別?
- 第06講:談談你對鎖的理解?如何手動模擬一個死鎖?
- 第07講:深克隆和淺克隆有什么區別?它的實現方式有哪些?
- 第08講:動態代理是如何實現的?JDK Proxy 和 CGLib 有什么區別?
- 第09講:如何實現本地緩存和分布式緩存?
- 第10講:如何手寫一個消息隊列和延遲消息隊列?
- 模塊二:熱門框架
- 第11講:底層源碼分析 Spring 的核心功能和執行流程?(上)
- 第12講:底層源碼分析 Spring 的核心功能和執行流程?(下)
- 第13講:MyBatis 使用了哪些設計模式?在源碼中是如何體現的?
- 第14講:SpringBoot 有哪些優點?它和 Spring 有什么區別?
- 第15講:MQ 有什么作用?你都用過哪些 MQ 中間件?
- 模塊三:數據庫相關
- 第16講:MySQL 的運行機制是什么?它有哪些引擎?
- 第17講:MySQL 的優化方案有哪些?
- 第18講:關系型數據和文檔型數據庫有什么區別?
- 第19講:Redis 的過期策略和內存淘汰機制有什么區別?
- 第20講:Redis 怎樣實現的分布式鎖?
- 第21講:Redis 中如何實現的消息隊列?實現的方式有幾種?
- 第22講:Redis 是如何實現高可用的?
- 模塊四:Java 進階
- 第23講:說一下 JVM 的內存布局和運行原理?
- 第24講:垃圾回收算法有哪些?
- 第25講:你用過哪些垃圾回收器?它們有什么區別?
- 第26講:生產環境如何排除和優化 JVM?
- 第27講:單例的實現方式有幾種?它們有什么優缺點?
- 第28講:你知道哪些設計模式?分別對應的應用場景有哪些?
- 第29講:紅黑樹和平衡二叉樹有什么區別?
- 第30講:你知道哪些算法?講一下它的內部實現過程?
- 模塊五:加分項
- 第31講:如何保證接口的冪等性?常見的實現方案有哪些?
- 第32講:TCP 為什么需要三次握手?
- 第33講:Nginx 的負載均衡模式有哪些?它的實現原理是什么?
- 第34講:Docker 有什么優點?使用時需要注意什么問題?
- 彩蛋
- 彩蛋:如何提高面試成功率?