https://zhuanlan.zhihu.com/p/25140744 中剖析過,consumer的每個實例是靠隊列分配來決定如何消費消息的。那么消費進度具體是如何管理的,又是如何保證消息成功消費的(RocketMQ有保證消息肯定消費成功的特性(失敗則重試)?
本文將詳細解析消息具體是如何ack的,又是如何保證消費肯定成功的。
由于以上工作所有的機制都實現在PushConsumer中,所以本文的原理均只適用于RocketMQ中的PushConsumer即Java客戶端中的DefaultPushConsumer。 若使用了PullConsumer模式,類似的工作如何ack,如何保證消費等均需要使用方自己實現。
注:廣播消費和集群消費的處理有部分區別,以下均特指集群消費(CLSUTER),廣播(BROADCASTING)下部分可能不適用。
保證消費成功
PushConsumer為了保證消息肯定消費成功,只有使用方明確表示消費成功,RocketMQ才會認為消息消費成功。中途斷電,拋出異常等都不會認為成功——即都會重新投遞。
消費的時候,我們需要注入一個消費回調,具體sample代碼如下:
~~~
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
doMyJob();//執行真正消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
~~~
業務實現消費回調的時候,當且僅當此回調函數返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才會認為這批消息(默認是1條)是消費完成的。(具體如何ACK見后面章節)
如果這時候消息消費失敗,例如數據庫異常,余額不足扣款失敗等一切業務認為消息需要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為這批消息消費失敗了。
為了保證消息是肯定被至少消費成功一次,RocketMQ會把這批消息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預。
注:
如果業務的回調沒有處理好而拋出異常,會認為是消費失敗當ConsumeConcurrentlyStatus.RECONSUME_LATER處理。當使用順序消費的回調MessageListenerOrderly時,由于順序消費是要前者消費成功才能繼續消費,所以沒有RECONSUME_LATER的這個狀態,只有SUSPEND_CURRENT_QUEUE_A_MOMENT來暫停隊列的其余消費,直到原消息不斷重試成功為止才能繼續消費。
啟動的時候從哪里消費
當新實例啟動的時候,PushConsumer會拿到本消費組broker已經記錄好的消費進度(consumer offset),按照這個進度發起自己的第一次Pull請求。
如果這個消費進度在Broker并沒有存儲起來,證明這個是一個全新的消費組,這時候客戶端有幾個策略可以選擇:
CONSUME_FROM_LAST_OFFSET //默認策略,從該隊列最尾開始消費,即跳過歷史消息
CONSUME_FROM_FIRST_OFFSET //從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前
所以,社區中經常有人問:“為什么我設了CONSUME_FROM_LAST_OFFSET,歷史的消息還是被消費了”? 原因就在于只有全新的消費組才會使用到這些策略,老的消費組都是按已經存儲過的消費進度繼續消費。
對于老消費組想跳過歷史消息可以采用以下兩種方法:
代碼按照日期判斷,太老的消息直接return CONSUME_SUCCESS過濾。代碼判斷消息的offset和MAX_OFFSET相差很遠,認為是積壓了很多,直接return CONSUME_SUCCESS過濾。消費者啟動前,先調整該消費組的消費進度,再開始消費。可以人工使用命令resetOffsetByTime,或調用內部的運維接口,祥見ResetOffsetByTimeCommand.java
消息ACK機制
RocketMQ是以consumer group+queue為單位是管理消費進度的,以一個consumer offset標記這個這個消費組在這條queue上的消費進度。
如果某已存在的消費組出現了新消費實例的時候,依靠這個組的消費進度,就可以判斷第一次是從哪里開始拉取的。
每次消息成功后,本地的消費進度會被更新,然后由定時器定時同步到broker,以此持久化消費進度。
但是每次記錄消費進度的時候,只會把一批消息中最小的offset值為消費進度值,如下圖:
這鐘方式和傳統的一條message單獨ack的方式有本質的區別。性能上提升的同時,會帶來一個潛在的重復問題——由于消費進度只是記錄了一個下標,就可能出現拉取了100條消息如 2101-2200的消息,后面99條都消費結束了,只有2101消費一直沒有結束的情況。
在這種情況下,RocketMQ為了保證消息肯定被消費成功,消費進度職能維持在2101,直到2101也消費結束了,本地的消費進度才會一下子更新到2200。
在這種設計下,就有消費大量重復的風險。如2101在還沒有消費完成的時候消費實例突然退出(機器斷電,或者被kill)。這條queue的消費進度還是維持在2101,當queue重新分配給新的實例的時候,新的實例從broker上拿到的消費進度還是維持在2101,這時候就會又從2101開始消費,2102-2200這批消息實際上已經被消費過還是會投遞一次。
對于這個場景,3.2.6之前的RocketMQ無能為力,所以業務必須要保證消息消費的冪等性,這也是RocketMQ官方多次強調的態度。
實際上,從源碼的角度上看,RocketMQ可能是考慮過這個問題的,截止到3.2.6的版本的源碼中,可以看到為了緩解這個問題的影響面,DefaultMQPushConsumer中有個配置consumeConcurrentlyMaxSpan
/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;
這個值默認是2000,當RocketMQ發現本地緩存的消息的最大值-最小值差距大于這個值(2000)的時候,會觸發流控——也就是說如果頭尾都卡住了部分消息,達到了這個閾值就不再拉取消息。
但作用實際很有限,像剛剛這個例子,2101的消費是死循環,其他消費非常正常的話,是無能為力的。一旦退出,在不人工干預的情況下,2101后所有消息全部重復。
Ack卡進度解決方案
對于這個卡消費進度的問題,最顯而易見的解法是設定一個超時時間,達到超時時間的那個消費當作消費失敗處理。
后來RocketMQ顯然也發現了這個問題,而RocketMQ在3.5.8之后也就是采用這樣的方案去解決這個問題。
在pushConsumer中 有一個consumeTimeout字段(默認15分鐘),用于設置最大的消費超時時間。消費前會記錄一個消費的開始時間,后面用于比對。消費者啟動的時候,會定期掃描所有消費的消息,達到這個timeout的那些消息,就會觸發sendBack并ack的操作。這里掃描的間隔也是consumeTimeout(單位分鐘)的間隔。
核心源碼如下:
//ConsumeMessageConcurrentlyService.java
public void start() {
this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
//ConsumeMessageConcurrentlyService.java
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue pq = next.getValue();
pq.cleanExpiredMsg(this.defaultMQPushConsumer);
}
}
//ProcessQueue.java
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;
}
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
} else {
break;
}
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
try {
pushConsumer.sendMessageBack(msg, 3);
log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
try {
msgTreeMap.remove(msgTreeMap.firstKey());
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
}
通過源碼看這個方案,其實可以看出有幾個不太完善的問題:
消費timeout的時間非常不精確。由于掃描的間隔是15分鐘,所以實際上觸發的時候,消息是有可能卡住了接近30分鐘(15*2)才被清理。由于定時器一啟動就開始調度了,中途這個consumeTimeout再更新也不會生效。
- Docker
- 什么是docker
- Docker安裝、組件啟動
- docker網絡
- docker命令
- docker swarm
- dockerfile
- mesos
- 運維
- Linux
- Linux基礎
- Linux常用命令_1
- Linux常用命令_2
- ip命令
- 什么是Linux
- SELinux
- Linux GCC編譯警告:Clock skew detected. 錯誤解決辦法
- 文件描述符
- find
- 資源統計
- LVM
- Linux相關配置
- 服務自啟動
- 服務器安全
- 字符集
- shell腳本
- shell命令
- 實用腳本
- shell 數組
- 循環與判斷
- 系統級別進程開啟和停止
- 函數
- java調用shell腳本
- 發送郵件
- Linux網絡配置
- Ubuntu
- Ubuntu發送郵件
- 更換apt-get源
- centos
- 防火墻
- 虛擬機下配置網絡
- yum重新安裝
- 安裝mysql5.7
- 配置本地yum源
- 安裝telnet
- 忘記root密碼
- rsync+ crontab
- Zabbix
- Zabbix監控
- Zabbix安裝
- 自動報警
- 自動發現主機
- 監控MySQL
- 安裝PHP常見錯誤
- 基于nginx安裝zabbix
- 監控Tomcat
- 監控redis
- web監控
- 監控進程和端口號
- zabbix自定義監控
- 觸發器函數
- zabbix監控mysql主從同步狀態
- Jenkins
- 安裝Jenkins
- jenkins+svn+maven
- jenkins執行shell腳本
- 參數化構建
- maven區分環境打包
- jenkins使用注意事項
- nginx
- nginx認證功能
- ubuntu下編譯安裝Nginx
- 編譯安裝
- Nginx搭建本地yum源
- 文件共享
- Haproxy
- 初識Haproxy
- haproxy安裝
- haproxy配置
- virtualbox
- virtualbox 復制新的虛擬機
- ubuntu下vitrualbox安裝redhat
- centos配置雙網卡
- 配置存儲
- Windows
- Windows安裝curl
- VMware vSphere
- 磁盤管理
- 增加磁盤
- gitlab
- 安裝
- tomcat
- Squid
- bigdata
- FastDFS
- FastFDS基礎
- FastFDS安裝及簡單實用
- api介紹
- 數據存儲
- FastDFS防盜鏈
- python腳本
- ELK
- logstash
- 安裝使用
- kibana
- 安準配置
- elasticsearch
- elasticsearch基礎_1
- elasticsearch基礎_2
- 安裝
- 操作
- java api
- 中文分詞器
- term vector
- 并發控制
- 對text字段排序
- 倒排和正排索引
- 自定義分詞器
- 自定義dynamic策略
- 進階練習
- 共享鎖和排它鎖
- nested object
- 父子關系模型
- 高亮
- 搜索提示
- Redis
- redis部署
- redis基礎
- redis運維
- redis-cluster的使用
- redis哨兵
- redis腳本備份還原
- rabbitMQ
- rabbitMQ安裝使用
- rpc
- RocketMQ
- 架構概念
- 安裝
- 實例
- 好文引用
- 知乎
- ACK
- postgresql
- 存儲過程
- 編程語言
- 計算機網絡
- 基礎_01
- tcp/ip
- http轉https
- Let's Encrypt免費ssl證書(基于haproxy負載)
- what's the http?
- 網關
- 網絡IO
- http
- 無狀態網絡協議
- Python
- python基礎
- 基礎數據類型
- String
- List
- 遍歷
- Python基礎_01
- python基礎_02
- python基礎03
- python基礎_04
- python基礎_05
- 函數
- 網絡編程
- 系統編程
- 類
- Python正則表達式
- pymysql
- java調用python腳本
- python操作fastdfs
- 模塊導入和sys.path
- 編碼
- 安裝pip
- python進階
- python之setup.py構建工具
- 模塊動態導入
- 內置函數
- 內置變量
- path
- python模塊
- 內置模塊_01
- 內置模塊_02
- log模塊
- collections
- Twisted
- Twisted基礎
- 異步編程初探與reactor模式
- yield-inlineCallbacks
- 系統編程
- 爬蟲
- urllib
- xpath
- scrapy
- 爬蟲基礎
- 爬蟲種類
- 入門基礎
- Rules
- 反反爬蟲策略
- 模擬登陸
- problem
- 分布式爬蟲
- 快代理整站爬取
- 與es整合
- 爬取APP數據
- 爬蟲部署
- collection for ban of web
- crawlstyle
- API
- 多次請求
- 向調度器發送請求
- 源碼學習
- LinkExtractor源碼分析
- 構建工具-setup.py
- selenium
- 基礎01
- 與scrapy整合
- Django
- Django開發入門
- Django與MySQL
- java
- 設計模式
- 單例模式
- 工廠模式
- java基礎
- java位移
- java反射
- base64
- java內部類
- java高級
- 多線程
- springmvc-restful
- pfx數字證書
- 生成二維碼
- 項目中使用log4j
- 自定義注解
- java發送post請求
- Date時間操作
- spring
- 基礎
- spring事務控制
- springMVC
- 注解
- 參數綁定
- springmvc+spring+mybatis+dubbo
- MVC模型
- SpringBoot
- java配置入門
- SpringBoot基礎入門
- SpringBoot web
- 整合
- SpringBoot注解
- shiro權限控制
- CommandLineRunner
- mybatis
- 靜態資源
- SSM整合
- Aware
- Spring API使用
- Aware接口
- mybatis
- 入門
- mybatis屬性自動映射、掃描
- 問題
- @Param 注解在Mybatis中的使用 以及傳遞參數的三種方式
- mybatis-SQL
- 逆向生成dao、model層代碼
- 反向工程中Example的使用
- 自增id回顯
- SqlSessionDaoSupport
- invalid bound statement(not found)
- 脈絡
- beetl
- beetl是什么
- 與SpringBoot整合
- shiro
- 什么是shiro
- springboot+shrio+mybatis
- 攔截url
- 枚舉
- 圖片操作
- restful
- java項目中日志處理
- JSON
- 文件工具類
- KeyTool生成證書
- 兼容性問題
- 開發規范
- 工具類開發規范
- 壓縮圖片
- 異常處理
- web
- JavaScript
- 基礎語法
- 創建對象
- BOM
- window對象
- DOM
- 閉包
- form提交-文件上傳
- td中內容過長
- 問題1
- js高級
- js文件操作
- 函數_01
- session
- jQuery
- 函數01
- data()
- siblings
- index()與eq()
- select2
- 動態樣式
- bootstrap
- 表單驗證
- 表格
- MUI
- HTML
- iframe
- label標簽
- 規范編程
- layer
- sss
- 微信小程序
- 基礎知識
- 實踐
- 自定義組件
- 修改自定義組件的樣式
- 基礎概念
- appid
- 跳轉
- 小程序發送ajax
- 微信小程序上下拉刷新
- if
- 工具
- idea
- Git
- maven
- svn
- Netty
- 基礎概念
- Handler
- SimpleChannelInboundHandler 與 ChannelInboundHandler
- 網絡編程
- 網絡I/O
- database
- oracle
- 游標
- PLSQL Developer
- mysql
- MySQL基準測試
- mysql備份
- mysql主從不同步
- mysql安裝
- mysql函數大全
- SQL語句
- 修改配置
- 關鍵字
- 主從搭建
- centos下用rpm包安裝mysql
- 常用sql
- information_scheme數據庫
- 值得學的博客
- mysql學習
- 運維
- mysql權限
- 配置信息
- 好文mark
- jsp
- jsp EL表達式
- C
- test