細心的你可能發現了,本系列課程中竟然出現了三個課時都是在說消息隊列,第 10 課時講了程序級別的消息隊列以及延遲消息隊列的實現,而第 15 課時講了常見的消息隊列中間件 RabbitMQ、Kafka 等,由此可見消息隊列在整個 Java 技術體系中的重要程度。本課時我們將重點來看一下 Redis 是如何實現消息隊列的。
我們本課時的面試題是,在 Redis 中實現消息隊列的方式有幾種?
#### 典型回答
早在 Redis 2.0 版本之前使用 Redis 實現消息隊列的方式有兩種:
* 使用 List 類型實現
* 使用 ZSet 類型實現
其中使用**List 類型實現的方式最為簡單和直接**,它主要是通過 lpush、rpop 存入和讀取實現消息隊列的,如下圖所示:

lpush 可以把最新的消息存儲到消息隊列(List 集合)的首部,而 rpop 可以讀取消息隊列的尾部,這樣就實現了先進先出,如下圖所示:

命令行的實現命令如下:
```
127.0.0.1:6379> lpush mq "java" #推送消息 java
(integer) 1
127.0.0.1:6379> lpush mq "msg" #推送消息 msg
(integer) 2
127.0.0.1:6379> rpop mq #接收到消息 java
"java"
127.0.0.1:6379> rpop mq #接收到消息 msg
"mq"
```
其中,mq 相當于消息隊列的名稱,而 lpush 用于生產并添加消息,而 rpop 用于拉取并消費消息。
使用 List 實現消息隊列的優點是消息可以被持久化,List 可以借助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用于把數據保存至磁盤,這樣當 Redis 重啟之后,消息不會丟失。
但使用 List 同樣存在一定的問題,比如消息不支持重復消費、沒有按照主題訂閱的功能、不支持消費消息確認等。
ZSet 實現消息隊列的方式和 List 類似,它是利用 zadd 和 zrangebyscore 來實現存入和讀取消息的,這里就不重復敘述了。但 ZSet 的實現方式更為復雜一些,因為 ZSet 多了一個分值(score)屬性,我們可以使用它來實現更多的功能,比如用它來存儲時間戳,以此來實現延遲消息隊列等。
ZSet 同樣具備持久化的功能,List 存在的問題它也同樣存在,不但如此,使用 ZSet 還不能存儲相同元素的值。因為它是有序集合,有序集合的存儲元素值是不能重復的,但分值可以重復,也就是說當消息值重復時,只能存儲一條信息在 ZSet 中。
在 Redis 2.0 之后 Redis 就新增了專門的發布和訂閱的類型,Publisher(發布者)和 Subscriber(訂閱者)來實現消息隊列了,它們對應的執行命令如下:
* 發布消息,publish channel "message"
* 訂閱消息,subscribe channel
使用發布和訂閱的類型,我們可以實現主題訂閱的功能,也就是 Pattern Subscribe 的功能。因此我們可以使用一個消費者“queue_*”來訂閱所有以“queue_”開頭的消息隊列,如下圖所示:

發布訂閱模式的優點很明顯,但同樣存在以下 3 個問題:
*
無法持久化保存消息,如果 Redis 服務器宕機或重啟,那么所有的消息將會丟失;
* 發布訂閱模式是“發后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費之前的歷史消息;
* 不支持消費者確認機制,穩定性不能得到保證,例如當消費者獲取到消息之后,還沒來得及執行就宕機了。因為沒有消費者確認機制,Redis 就會誤以為消費者已經執行了,因此就不會重復發送未被正常消費的消息了,這樣整體的 Redis 穩定性就被沒有辦法得到保障了。
然而在 Redis 5.0 之后新增了 Stream 類型,我們就可以使用 Stream 的 xadd 和 xrange 來實現消息的存入和讀取了,并且 Stream 提供了 xack 手動確認消息消費的命令,用它我們就可以實現消費者確認的功能了,使用命令如下:
```
127.0.0.1:6379> xack mq group1 1580959593553-0
(integer) 1
```
相關語法如下:
```
xack key group-key ID [ID ...]
```
消費確認增加了消息的可靠性,一般在業務處理完成之后,需要執行 ack 確認消息已經被消費完成,整個流程的執行如下圖所示:

其中“Group”為群組,消費者也就是接收者需要訂閱到群組才能正常獲取到消息。
以上就 Redis 實現消息隊列的四種方式,他們分別是:
* 使用 List 實現消息隊列;
* 使用 ZSet 實現消息隊列;
* 使用發布訂閱者模式實現消息隊列;
* 使用 Stream 實現消息隊列。
#### 考點分析
本課時的題目比較全面的考察了面試者對于 Redis 整體知識框架和新版本特性的理解和領悟。早期版本中比較常用的實現消息隊列的方式是 List、ZSet 和發布訂閱者模式,使用 Stream 來實現消息隊列是近兩年才流行起來的方案,并且很多企業也沒有使用到 Redis 5.0 這么新的版本。因此只需回答出前三種就算及格了,而 Stream 方式實現消息隊列屬于附加題,如果面試中能回答上來的話就更好了,它體現了你對新技術的敏感度與對技術的熱愛程度,屬于面試中的加分項。
和此知識點相關的面試題還有以下幾個:
* 在 Java 代碼中使用 List 實現消息隊列會有什么問題?應該如何解決?
* 在程序中如何使用 Stream 來實現消息隊列?
#### 知識擴展
* [ ] 使用 List 實現消息隊列
在 Java 程序中我們需要使用 Redis 客戶端框架來輔助程序操作 Redis,比如 Jedis 框架。
使用 Jedis 框架首先需要在 pom.xml 文件中添加 Jedis 依賴,配置如下:
```
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${version}</version>
</dependency>
```
List 實現消息隊列的完整代碼如下:
```
import redis.clients.jedis.Jedis;
publicclass ListMQTest {
public static void main(String[] args){
// 啟動一個線程作為消費者
new Thread(() -> consumer()).start();
// 生產者
producer();
}
/**
* 生產者
*/
public static void producer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.lpush("mq", "Hello, List.");
}
/**
* 消費者
*/
public static void consumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 消費消息
while (true) {
// 獲取消息
String msg = jedis.rpop("mq");
if (msg != null) {
// 接收到了消息
System.out.println("接收到消息:" + msg);
}
}
}
}
```
以上程序的運行結果是:
```
接收到消息:Hello, Java.
```
但是以上的代碼存在一個問題,可以看出以上消費者的實現是通過 while 無限循環來獲取消息,但如果消息的空閑時間比較長,一直沒有新任務,而 while 循環不會因此停止,它會一直執行循環的動作,這樣就會白白浪費了系統的資源。
此時我們可以借助 Redis 中的阻塞讀來替代 rpop 的方法就可以解決此問題,具體實現代碼如下:
```
import redis.clients.jedis.Jedis;
public class ListMQExample {
public static void main(String[] args) throws InterruptedException {
// 消費者
new Thread(() -> bConsumer()).start();
// 生產者
producer();
}
/**
* 生產者
*/
public static void producer() throws InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.lpush("mq", "Hello, Java.");
Thread.sleep(1000);
jedis.lpush("mq", "message 2.");
Thread.sleep(2000);
jedis.lpush("mq", "message 3.");
}
/**
* 消費者(阻塞版)
*/
public static void bConsumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
while (true) {
// 阻塞讀
for (String item : jedis.brpop(0,"mq")) {
// 讀取到相關數據,進行業務處理
System.out.println(item);
}
}
}
}
```
以上程序的運行結果是:
```
接收到消息:Hello, Java.
```
以上代碼是經過改良的,我們使用 brpop 替代 rpop 來讀取最后一條消息,就可以解決 while 循環在沒有數據的情況下,一直循環消耗系統資源的情況了。brpop 中的 b 是 blocking 的意思,表示阻塞讀,也就是當隊列沒有數據時,它會進入休眠狀態,當有數據進入隊列之后,它才會“蘇醒”過來執行讀取任務,這樣就可以解決 while 循環一直執行消耗系統資源的問題了。
使用 Stream 實現消息隊列
在開始實現消息隊列之前,我們必須先創建分組才行,因為消費者需要關聯分組信息才能正常運行,具體實現代碼如下:
```
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import utils.JedisUtils;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StreamGroupExample {
private static final String _STREAM_KEY = "mq"; // 流 key
private static final String _GROUP_NAME = "g1"; // 分組名稱
private static final String _CONSUMER_NAME = "c1"; // 消費者 1 的名稱
private static final String _CONSUMER2_NAME = "c2"; // 消費者 2 的名稱
public static void main(String[] args) {
// 生產者
producer();
// 創建消費組
createGroup(_STREAM_KEY, _GROUP_NAME);
// 消費者 1
new Thread(() -> consumer()).start();
// 消費者 2
new Thread(() -> consumer2()).start();
}
/**
* 創建消費分組
* @param stream 流 key
* @param groupName 分組名稱
*/
public static void createGroup(String stream, String groupName) {
Jedis jedis = JedisUtils.getJedis();
jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);
}
/**
* 生產者
*/
public static void producer() {
Jedis jedis = JedisUtils.getJedis();
// 添加消息 1
Map<String, String> map = new HashMap<>();
map.put("data", "redis");
StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);
System.out.println("消息添加成功 ID:" + id);
// 添加消息 2
Map<String, String> map2 = new HashMap<>();
map2.put("data", "java");
StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);
System.out.println("消息添加成功 ID:" + id2);
}
/**
* 消費者 1
*/
public static void consumer() {
Jedis jedis = JedisUtils.getJedis();
// 消費消息
while (true) {
// 讀取消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// 阻塞讀取一條消息(最大阻塞時間120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// 讀取到消息
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息內容
System.out.println("Consumer 1 讀取到消息 ID:" + list.get(0).getValue().get(0).getID() +
" 內容:" + new Gson().toJson(content));
}
}
}
/**
* 消費者 2
*/
public static void consumer2() {
Jedis jedis = JedisUtils.getJedis();
// 消費消息
while (true) {
// 讀取消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// 阻塞讀取一條消息(最大阻塞時間120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// 讀取到消息
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息內容
System.out.println("Consumer 2 讀取到消息 ID:" + list.get(0).getValue().get(0).getID() +
" 內容:" + new Gson().toJson(content));
}
}
}
}
```
以上代碼運行結果如下:
```
消息添加成功 ID:1580971482344-0
消息添加成功 ID:1580971482415-0
Consumer 1 讀取到消息 ID:1580971482344-0 內容:{"data":"redis"}
Consumer 2 讀取到消息 ID:1580971482415-0 內容:{"data":"java"}
```
其中,jedis.xreadGroup() 方法的第五個參數 noAck 表示是否自動確認消息,如果設置 true 收到消息會自動確認 (ack) 消息,否則需要手動確認。
可以看出,同一個分組內的多個 consumer 會讀取到不同消息,不同的 consumer 不會讀取到分組內的同一條消息。
>
小貼士:Jedis 框架要使用最新版,低版本 block 設置大于 0 時,會出現 bug,拋連接超時異常。
#### 小結
本課時我們講了 Redis 中消息隊列的四種實現方式:List 方式、ZSet 方式、發布訂閱者模式、Stream 方式,其中發布訂閱者模式不支持消息持久化、而其他三種方式支持持久化,并且 Stream 方式支持消費者確認。我們還使用 Jedis 框架完成了 List 和 Stream 的消息隊列功能,需要注意的是在 List 中需要使用 brpop 來讀取消息,而不是 rpop,這樣可以解決沒有任務時 ,while 一直循環浪費系統資源的問題。
#### 課后問答
* 1、Stream 如何手動確認消息呢
講師回復: Streams 有 xack key group-key ID 可以用來確認消息。
- 前言
- 開篇詞
- 開篇詞:大廠技術面試“潛規則”
- 模塊一:Java 基礎
- 第01講:String 的特點是什么?它有哪些重要的方法?
- 第02講:HashMap 底層實現原理是什么?JDK8 做了哪些優化?
- 第03講:線程的狀態有哪些?它是如何工作的?
- 第04講:詳解 ThreadPoolExecutor 的參數含義及源碼執行流程?
- 第05講:synchronized 和 ReentrantLock 的實現原理是什么?它們有什么區別?
- 第06講:談談你對鎖的理解?如何手動模擬一個死鎖?
- 第07講:深克隆和淺克隆有什么區別?它的實現方式有哪些?
- 第08講:動態代理是如何實現的?JDK Proxy 和 CGLib 有什么區別?
- 第09講:如何實現本地緩存和分布式緩存?
- 第10講:如何手寫一個消息隊列和延遲消息隊列?
- 模塊二:熱門框架
- 第11講:底層源碼分析 Spring 的核心功能和執行流程?(上)
- 第12講:底層源碼分析 Spring 的核心功能和執行流程?(下)
- 第13講:MyBatis 使用了哪些設計模式?在源碼中是如何體現的?
- 第14講:SpringBoot 有哪些優點?它和 Spring 有什么區別?
- 第15講:MQ 有什么作用?你都用過哪些 MQ 中間件?
- 模塊三:數據庫相關
- 第16講:MySQL 的運行機制是什么?它有哪些引擎?
- 第17講:MySQL 的優化方案有哪些?
- 第18講:關系型數據和文檔型數據庫有什么區別?
- 第19講:Redis 的過期策略和內存淘汰機制有什么區別?
- 第20講:Redis 怎樣實現的分布式鎖?
- 第21講:Redis 中如何實現的消息隊列?實現的方式有幾種?
- 第22講:Redis 是如何實現高可用的?
- 模塊四:Java 進階
- 第23講:說一下 JVM 的內存布局和運行原理?
- 第24講:垃圾回收算法有哪些?
- 第25講:你用過哪些垃圾回收器?它們有什么區別?
- 第26講:生產環境如何排除和優化 JVM?
- 第27講:單例的實現方式有幾種?它們有什么優缺點?
- 第28講:你知道哪些設計模式?分別對應的應用場景有哪些?
- 第29講:紅黑樹和平衡二叉樹有什么區別?
- 第30講:你知道哪些算法?講一下它的內部實現過程?
- 模塊五:加分項
- 第31講:如何保證接口的冪等性?常見的實現方案有哪些?
- 第32講:TCP 為什么需要三次握手?
- 第33講:Nginx 的負載均衡模式有哪些?它的實現原理是什么?
- 第34講:Docker 有什么優點?使用時需要注意什么問題?
- 彩蛋
- 彩蛋:如何提高面試成功率?