[TOC]
我們平時習慣于使用 Rabbitmq 和 Kafka 作為消息隊列中間件,來給應用程序之間增加異步消息傳遞功能。這兩個中間件都是專業的消息隊列中間件,特性之多超出了大多數人的理解能力。
使用過 Rabbitmq 的同學知道它使用起來有多復雜,發消息之前要創建 Exchange,再創建 Queue,還要將 Queue 和 Exchange 通過某種規則綁定起來,發消息的時候要指定 routing-key,還要控制頭部信息。消費者在消費消息之前也要進行上面一系列的繁瑣過程。但是絕大多數情況下,雖然我們的消息隊列只有一組消費者,但還是需要經歷上面這些繁瑣的過程。
有了 Redis,它就可以讓我們解脫出來,對于那些只有一組消費者的消息隊列,使用 Redis 就可以非常輕松的搞定。Redis 的消息隊列不是專業的消息隊列,它沒有非常多的高級特性,沒有 ack 保證,如果對消息的可靠性有著極致的追求,那么它就不適合使用。
## 異步消息隊列
Redis 的 list(列表) 數據結構常用來作為異步消息隊列使用,使用`rpush/lpush`操作入隊列,使用`lpop 和 rpop`來出隊列。

~~~
> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)
~~~
上面是 rpush 和 lpop 結合使用的例子。還可以使用 lpush 和 rpop 結合使用,效果是一樣的。這里不再贅述。
## 隊列空了怎么辦?
客戶端是通過隊列的 pop 操作來獲取消息,然后進行處理。處理完了再接著獲取消息,再進行處理。如此循環往復,這便是作為隊列消費者的客戶端的生命周期。
可是如果隊列空了,客戶端就會陷入 pop 的死循環,不停地 pop,沒有數據,接著再 pop,又沒有數據。這就是浪費生命的空輪詢。空輪詢不但拉高了客戶端的 CPU,redis 的 QPS 也會被拉高,如果這樣空輪詢的客戶端有幾十來個,Redis 的慢查詢可能會顯著增多。
通常我們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鐘就可以了。不但客戶端的 CPU 能降下來,Redis 的 QPS 也降下來了。
~~~
time.sleep(1) # python 睡 1s
Thread.sleep(1000) # java 睡 1s
~~~

## 隊列延遲
用上面睡眠的辦法可以解決問題。但是有個小問題,那就是睡眠會導致消息的延遲增大。如果只有 1 個消費者,那么這個延遲就是 1s。如果有多個消費者,這個延遲會有所下降,因為每個消費者的睡覺時間是岔開來的。
有沒有什么辦法能顯著降低延遲呢?你當然可以很快想到:那就把睡覺的時間縮短點。這種方式當然可以,不過有沒有更好的解決方案呢?當然也有,那就是 blpop/brpop。
這兩個指令的前綴字符`b`代表的是`blocking`,也就是阻塞讀。
阻塞讀在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消息的延遲幾乎為零。用`blpop/brpop`替代前面的`lpop/rpop`,就完美解決了上面的問題。
## 空閑連接自動斷開
你以為上面的方案真的很完美么?先別急著開心,其實他還有個問題需要解決。
什么問題?——**空閑連接**的問題。
如果線程一直阻塞在哪里,Redis 的客戶端連接就成了閑置連接,閑置過久,服務器一般會主動斷開連接,減少閑置資源占用。這個時候`blpop/brpop`會拋出異常來。
所以編寫客戶端消費者的時候要小心,注意捕獲異常,還要重試。
## 鎖沖突處理
上節課我們講了分布式鎖的問題,但是沒有提到客戶端在處理請求時加鎖沒加成功怎么辦。一般有 3 種策略來處理加鎖失敗:
1. 直接拋出異常,通知用戶稍后重試;
2. sleep 一會再重試;
3. 將請求轉移至延時隊列,過一會再試;
**直接拋出特定類型的異常**
這種方式比較適合由用戶直接發起的請求,用戶看到錯誤對話框后,會先閱讀對話框的內容,再點擊重試,這樣就可以起到人工延時的效果。如果考慮到用戶體驗,可以由前端的代碼替代用戶自己來進行延時重試控制。它本質上是對當前請求的放棄,由用戶決定是否重新發起新的請求。
**sleep**
sleep 會阻塞當前的消息處理線程,會導致隊列的后續消息處理出現延遲。如果碰撞的比較頻繁或者隊列里消息比較多,sleep 可能并不合適。如果因為個別死鎖的 key 導致加鎖不成功,線程會徹底堵死,導致后續消息永遠得不到及時處理。
**延時隊列**
這種方式比較適合異步消息處理,將當前沖突的請求扔到另一個隊列延后處理以避開沖突。
## 延時隊列的實現
延時隊列可以通過 Redis 的 zset(有序列表) 來實現。我們將消息序列化成一個字符串作為 zset 的`value`,這個消息的到期處理時間作為`score`,然后用多個線程輪詢 zset 獲取到期的任務進行處理,多個線程是為了保障可用性,萬一掛了一個線程還有其它線程可以繼續處理。因為有多個線程,所以需要考慮并發爭搶任務,確保任務不能被多次執行。
~~~
def delay(msg):
msg.id = str(uuid.uuid4()) # 保證 value 值唯一
value = json.dumps(msg)
retry_ts = time.time() + 5 # 5 秒后重試
redis.zadd("delay-queue", retry_ts, value)
def loop():
while True:
# 最多取 1 條
values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)
if not values:
time.sleep(1) # 延時隊列空的,休息 1s
continue
value = values[0] # 拿第一條,也只有一條
success = redis.zrem("delay-queue", value) # 從消息隊列中移除該消息
if success: # 因為有多進程并發的可能,最終只會有一個進程可以搶到消息
msg = json.loads(value)
handle_msg(msg)
~~~
Redis 的 zrem 方法是多線程多進程爭搶任務的關鍵,它的返回值決定了當前實例有沒有搶到任務,因為 loop 方法可能會被多個線程、多個進程調用,同一個任務可能會被多個進程線程搶到,通過 zrem 來決定唯一的屬主。
同時,我們要注意一定要對 handle\_msg 進行異常捕獲,避免因為個別任務處理問題導致循環異常退出。以下是 Java 版本的延時隊列實現,因為要使用到 Json 序列化,所以還需要 fastjson 庫的支持。
~~~
import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import redis.clients.jedis.Jedis;
public class RedisDelayingQueue<T> {
static class TaskItem<T> {
public String id;
public T msg;
}
// fastjson 序列化對象中存在 generic 類型時,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() {
}.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem<T> task = new TaskItem<T>();
task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
task.msg = msg;
String s = JSON.toJSONString(task); // fastjson 序列化
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 后再試
}
public void loop() {
while (!Thread.interrupted()) {
// 只取一條
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500); // 歇會繼續
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) { // 搶到了
TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
this.handleMsg(task.msg);
}
}
}
public void handleMsg(T msg) {
System.out.println(msg);
}
public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
Thread producer = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}
};
Thread consumer = new Thread() {
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
}
}
}
~~~
## 進一步優化
上面的算法中同一個任務可能會被多個進程取到之后再使用 zrem 進行爭搶,那些沒搶到的進程都是白取了一次任務,這是浪費。可以考慮使用 lua scripting 來優化一下這個邏輯,將 zrangebyscore 和 zrem 一同挪到服務器端進行原子化操作,這樣多個進程之間爭搶任務時就不會出現這種浪費了。
## 思考
1. Redis 作為消息隊列為什么不能保證 100% 的可靠性?
因為沒有ask機制,當消費端崩潰后消息丟失。pop出消息后,list 中就沒這個消息了,如果處理消息的程序拿到消息還未處理就掛掉了,那消息就丟失了,所以是不可靠隊列。[https://redis.io/commands/rpoplpush](https://link.juejin.im/?target=https%3A%2F%2Fredis.io%2Fcommands%2Frpoplpush) 這個可以實現可靠隊列
> Rpoplpush 命令用于移除列表的最后一個元素,并將該元素添加到另一個列表并返回。
2. 使用 Lua Scripting 來優化延時隊列的邏輯。
- 一.JVM
- 1.1 java代碼是怎么運行的
- 1.2 JVM的內存區域
- 1.3 JVM運行時內存
- 1.4 JVM內存分配策略
- 1.5 JVM類加載機制與對象的生命周期
- 1.6 常用的垃圾回收算法
- 1.7 JVM垃圾收集器
- 1.8 CMS垃圾收集器
- 1.9 G1垃圾收集器
- 2.面試相關文章
- 2.1 可能是把Java內存區域講得最清楚的一篇文章
- 2.0 GC調優參數
- 2.1GC排查系列
- 2.2 內存泄漏和內存溢出
- 2.2.3 深入理解JVM-hotspot虛擬機對象探秘
- 1.10 并發的可達性分析相關問題
- 二.Java集合架構
- 1.ArrayList深入源碼分析
- 2.Vector深入源碼分析
- 3.LinkedList深入源碼分析
- 4.HashMap深入源碼分析
- 5.ConcurrentHashMap深入源碼分析
- 6.HashSet,LinkedHashSet 和 LinkedHashMap
- 7.容器中的設計模式
- 8.集合架構之面試指南
- 9.TreeSet和TreeMap
- 三.Java基礎
- 1.基礎概念
- 1.1 Java程序初始化的順序是怎么樣的
- 1.2 Java和C++的區別
- 1.3 反射
- 1.4 注解
- 1.5 泛型
- 1.6 字節與字符的區別以及訪問修飾符
- 1.7 深拷貝與淺拷貝
- 1.8 字符串常量池
- 2.面向對象
- 3.關鍵字
- 4.基本數據類型與運算
- 5.字符串與數組
- 6.異常處理
- 7.Object 通用方法
- 8.Java8
- 8.1 Java 8 Tutorial
- 8.2 Java 8 數據流(Stream)
- 8.3 Java 8 并發教程:線程和執行器
- 8.4 Java 8 并發教程:同步和鎖
- 8.5 Java 8 并發教程:原子變量和 ConcurrentMap
- 8.6 Java 8 API 示例:字符串、數值、算術和文件
- 8.7 在 Java 8 中避免 Null 檢查
- 8.8 使用 Intellij IDEA 解決 Java 8 的數據流問題
- 四.Java 并發編程
- 1.線程的實現/創建
- 2.線程生命周期/狀態轉換
- 3.線程池
- 4.線程中的協作、中斷
- 5.Java鎖
- 5.1 樂觀鎖、悲觀鎖和自旋鎖
- 5.2 Synchronized
- 5.3 ReentrantLock
- 5.4 公平鎖和非公平鎖
- 5.3.1 說說ReentrantLock的實現原理,以及ReentrantLock的核心源碼是如何實現的?
- 5.5 鎖優化和升級
- 6.多線程的上下文切換
- 7.死鎖的產生和解決
- 8.J.U.C(java.util.concurrent)
- 0.簡化版(快速復習用)
- 9.鎖優化
- 10.Java 內存模型(JMM)
- 11.ThreadLocal詳解
- 12 CAS
- 13.AQS
- 0.ArrayBlockingQueue和LinkedBlockingQueue的實現原理
- 1.DelayQueue的實現原理
- 14.Thread.join()實現原理
- 15.PriorityQueue 的特性和原理
- 16.CyclicBarrier的實際使用場景
- 五.Java I/O NIO
- 1.I/O模型簡述
- 2.Java NIO之緩沖區
- 3.JAVA NIO之文件通道
- 4.Java NIO之套接字通道
- 5.Java NIO之選擇器
- 6.基于 Java NIO 實現簡單的 HTTP 服務器
- 7.BIO-NIO-AIO
- 8.netty(一)
- 9.NIO面試題
- 六.Java設計模式
- 1.單例模式
- 2.策略模式
- 3.模板方法
- 4.適配器模式
- 5.簡單工廠
- 6.門面模式
- 7.代理模式
- 七.數據結構和算法
- 1.什么是紅黑樹
- 2.二叉樹
- 2.1 二叉樹的前序、中序、后序遍歷
- 3.排序算法匯總
- 4.java實現鏈表及鏈表的重用操作
- 4.1算法題-鏈表反轉
- 5.圖的概述
- 6.常見的幾道字符串算法題
- 7.幾道常見的鏈表算法題
- 8.leetcode常見算法題1
- 9.LRU緩存策略
- 10.二進制及位運算
- 10.1.二進制和十進制轉換
- 10.2.位運算
- 11.常見鏈表算法題
- 12.算法好文推薦
- 13.跳表
- 八.Spring 全家桶
- 1.Spring IOC
- 2.Spring AOP
- 3.Spring 事務管理
- 4.SpringMVC 運行流程和手動實現
- 0.Spring 核心技術
- 5.spring如何解決循環依賴問題
- 6.springboot自動裝配原理
- 7.Spring中的循環依賴解決機制中,為什么要三級緩存,用二級緩存不夠嗎
- 8.beanFactory和factoryBean有什么區別
- 九.數據庫
- 1.mybatis
- 1.1 MyBatis-# 與 $ 區別以及 sql 預編譯
- Mybatis系列1-Configuration
- Mybatis系列2-SQL執行過程
- Mybatis系列3-之SqlSession
- Mybatis系列4-之Executor
- Mybatis系列5-StatementHandler
- Mybatis系列6-MappedStatement
- Mybatis系列7-參數設置揭秘(ParameterHandler)
- Mybatis系列8-緩存機制
- 2.淺談聚簇索引和非聚簇索引的區別
- 3.mysql 證明為什么用limit時,offset很大會影響性能
- 4.MySQL中的索引
- 5.數據庫索引2
- 6.面試題收集
- 7.MySQL行鎖、表鎖、間隙鎖詳解
- 8.數據庫MVCC詳解
- 9.一條SQL查詢語句是如何執行的
- 10.MySQL 的 crash-safe 原理解析
- 11.MySQL 性能優化神器 Explain 使用分析
- 12.mysql中,一條update語句執行的過程是怎么樣的?期間用到了mysql的哪些log,分別有什么作用
- 十.Redis
- 0.快速復習回顧Redis
- 1.通俗易懂的Redis數據結構基礎教程
- 2.分布式鎖(一)
- 3.分布式鎖(二)
- 4.延時隊列
- 5.位圖Bitmaps
- 6.Bitmaps(位圖)的使用
- 7.Scan
- 8.redis緩存雪崩、緩存擊穿、緩存穿透
- 9.Redis為什么是單線程、及高并發快的3大原因詳解
- 10.布隆過濾器你值得擁有的開發利器
- 11.Redis哨兵、復制、集群的設計原理與區別
- 12.redis的IO多路復用
- 13.相關redis面試題
- 14.redis集群
- 十一.中間件
- 1.RabbitMQ
- 1.1 RabbitMQ實戰,hello world
- 1.2 RabbitMQ 實戰,工作隊列
- 1.3 RabbitMQ 實戰, 發布訂閱
- 1.4 RabbitMQ 實戰,路由
- 1.5 RabbitMQ 實戰,主題
- 1.6 Spring AMQP 的 AMQP 抽象
- 1.7 Spring AMQP 實戰 – 整合 RabbitMQ 發送郵件
- 1.8 RabbitMQ 的消息持久化與 Spring AMQP 的實現剖析
- 1.9 RabbitMQ必備核心知識
- 2.RocketMQ 的幾個簡單問題與答案
- 2.Kafka
- 2.1 kafka 基礎概念和術語
- 2.2 Kafka的重平衡(Rebalance)
- 2.3.kafka日志機制
- 2.4 kafka是pull還是push的方式傳遞消息的?
- 2.5 Kafka的數據處理流程
- 2.6 Kafka的腦裂預防和處理機制
- 2.7 Kafka中partition副本的Leader選舉機制
- 2.8 如果Leader掛了的時候,follower沒來得及同步,是否會出現數據不一致
- 2.9 kafka的partition副本是否會出現腦裂情況
- 十二.Zookeeper
- 0.什么是Zookeeper(漫畫)
- 1.使用docker安裝Zookeeper偽集群
- 3.ZooKeeper-Plus
- 4.zk實現分布式鎖
- 5.ZooKeeper之Watcher機制
- 6.Zookeeper之選舉及數據一致性
- 十三.計算機網絡
- 1.進制轉換:二進制、八進制、十六進制、十進制之間的轉換
- 2.位運算
- 3.計算機網絡面試題匯總1
- 十四.Docker
- 100.面試題收集合集
- 1.美團面試常見問題總結
- 2.b站部分面試題
- 3.比心面試題
- 4.騰訊面試題
- 5.哈羅部分面試
- 6.筆記
- 十五.Storm
- 1.Storm和流處理簡介
- 2.Storm 核心概念詳解
- 3.Storm 單機版本環境搭建
- 4.Storm 集群環境搭建
- 5.Storm 編程模型詳解
- 6.Storm 項目三種打包方式對比分析
- 7.Storm 集成 Redis 詳解
- 8.Storm 集成 HDFS 和 HBase
- 9.Storm 集成 Kafka
- 十六.Elasticsearch
- 1.初識ElasticSearch
- 2.文檔基本CRUD、集群健康檢查
- 3.shard&replica
- 4.document核心元數據解析及ES的并發控制
- 5.document的批量操作及數據路由原理
- 6.倒排索引
- 十七.分布式相關
- 1.分布式事務解決方案一網打盡
- 2.關于xxx怎么保證高可用的問題
- 3.一致性hash原理與實現
- 4.微服務注冊中心 Nacos 比 Eureka的優勢
- 5.Raft 協議算法
- 6.為什么微服務架構中需要網關
- 0.CAP與BASE理論
- 十八.Dubbo
- 1.快速掌握Dubbo常規應用
- 2.Dubbo應用進階
- 3.Dubbo調用模塊詳解
- 4.Dubbo調用模塊源碼分析
- 6.Dubbo協議模塊