[TOC]
## 1. 消息出來
topic:隊列的集合稱為topic
## 1.1 Simple
消息直接發送,無法保證
## 1.2 Order
### 1.2.1 使用場景
> 1. 一個生產者可以發送消息給多給topic
> 2. 一個topic默認有4個隊列
> 3. producer以roundrobin(輪詢)的方式給多個隊列發送消息
> 4. 同一個隊列消息遵守FIFO
> * 順序消費:
> 例如在網購的時候,我們需要下單,那么下單需要假如有三個順序,第一、創建訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單才有意義。
> RocketMQ可以保證順序消費,他的實現是生產者將這個三個消息放在topic的一個隊列里面,單機支持上萬個持久化隊列,消費端去消費的時候也是只能有一個Consumer去取得這個隊列里面的數據,然后順序消費。
> * rocketmq的順序消息需要滿足2點:
> 1.Producer端保證發送消息有序,且發送到同一個隊列。
> 2.consumer端只能讓一個consumer保證消費同一個隊列。
### 1.2.2 使用場景 如何實現
### 1.2.3 使用場景producer順序發送消息到同一隊列
> 1. 默認的情況下,producer會向topic(隊列的集合,默認四個隊列)中的隊列輪詢式的發生消息,這就不滿足順序消費一系列消息發送到一個隊列的要求,所以要修改向隊列發送消息的方法。
> 2. 重寫MessageQueueSelector,從字面理解就是消息隊列選擇器,非常的貼切!原理就是在隊列數量不變的情況下,通過一系列事務的編號(訂單id)和隊列叔取模
~~~
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 3); # 這里的3用于取模運算,相同編號的數據會路由到同一個隊列當中去
~~~
這里設置編號為1

這里設置編號為3,驗證了topic默認四個隊列,且可以指定消息用于取模的id

以上可以保證同一系列事務被發送到了一個隊列當中。
### 1.2.4 使用場景 某一個Consumer順序消費同一個隊列
通過設置Listener實現
1. MessageListenerOrderly(有序的)
實現了MessageListenerOrderly表示一個隊列只會被一個線程取到,第二個線程無法訪問這個隊列
自動實現順序消費
~~~
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 設置自動提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",內容:" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
;
return ConsumeOrderlyStatus.SUCCESS;
}
});
~~~
2. MessageListenerConcurrently(無序的)
需要把線程池改為單線程模式。
> 1. ConsumeMessageOrderlyService類的start()方法,如果是集群消費,則啟動定時任務,定時向broker發送批量鎖住當前正在消費的隊列集合的消息,具體是consumer端拿到正在消費的隊列集合,發送鎖住隊列的消息至broker,broker端返回鎖住成功的隊列集合。
> consumer收到后,設置是否鎖住標志位。
> 這里注意2個變量:
> consumer端的RebalanceImpl里的ConcurrentHashMap processQueueTable,是否鎖住設置在ProcessQueue里。
> broker端的RebalanceLockManager里的ConcurrentHashMap> mqLockTable,這里維護著全局隊列鎖。
> 2. ConsumeMessageOrderlyService.ConsumeRequest的run方法是消費消息,這里還有個MessageQueueLock messageQueueLock,維護當前consumer端的本地隊列鎖。保證當前只有一個線程能夠進行消費。
> 3. 拉到消息存入ProcessQueue,然后判斷,本地是否獲得鎖,全局隊列是否被鎖住,然后從ProcessQueue里取出消息,用MessageListenerOrderly進行消費。
> 拉到消息后調用ProcessQueue.putMessage(final List msgs) 存入,具體是存入TreeMap msgTreeMap。
> 然后是調用ProcessQueue.takeMessags(final int batchSize)消費,具體是把msgTreeMap里消費過的消息,轉移到TreeMap msgTreeMapTemp。
> 4. 本地消費的事務控制,ConsumeOrderlyStatus.SUCCESS(提交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(掛起一會再消費),在此之前還有一個變量ConsumeOrderlyContext context的setAutoCommit()是否自動提交。
> 當SUSPEND_CURRENT_QUEUE_A_MOMENT時,autoCommit設置為true或者false沒有區別,本質跟消費相反,把消息從msgTreeMapTemp轉移回msgTreeMap,等待下次消費。
> 當SUCCESS時,autoCommit設置為true時比設置為false多做了2個動作,consumeRequest.getProcessQueue().commit()和this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
> ProcessQueue.commit() :本質是刪除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消費時從msgTreeMap轉移過來的。
> this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本質是把拉消息的偏移量更新到本地,然后定時更新到broker。
> 那么少了這2個動作會怎么樣呢,隨著消息的消費進行,msgTreeMapTemp里的消息堆積越來越多,消費消息的偏移量一直沒有更新到broker導致consumer每次重新啟動后都要從頭開始重復消費。
> 就算更新了offset到broker,那么msgTreeMapTemp里的消息堆積呢?不知道這算不算bug。
> 所以,還是把autoCommit設置為true比較好。
## 2. 生產中的使用
### 2.1 使用注意事項
> 1. 消費者處理MQ消息時必須冪等性(即無論接收到多少相同的消息,執行后的結果一致),如果不具有冪等性,則轉換成冪等性處理方法;
> 2. 業務方自己保證每條發送到RocketMQ消息都有唯一的ID,這樣消費者根據消息的唯一ID去重,并確保消息處理成功。
### 2.2 java 交互RocketMQ
#### 2.2.1 producer
~~~
package com.aixin.lovetocar.rocketmq.util;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
/**
* Created by dailin on 2018/4/25.
*/
public class RocketMQProducer {
private static DefaultMQProducer defaultMQProducer;
/**
* @param groupName 指定producer組
* @param nameServer namerserver地址
*/
public RocketMQProducer(String groupName, String nameServer) throws MQClientException {
defaultMQProducer = new DefaultMQProducer(groupName);
defaultMQProducer.setNamesrvAddr(nameServer);
defaultMQProducer.start(); //producer開始
}
/**
* 同步發送消息
*
* @param topic
* @param tags
* @param key
* @param data
* @throws Exception
*/
public void sentSynData(String topic, String tags, String key, String data) throws Exception {
Message msg = new Message(topic, tags, key, data.getBytes()); //封裝消息
SendResult sendResult = defaultMQProducer.send(msg); //發送消息
System.out.printf("%s%n", sendResult);
}
/**
* 同步發送消息
*
* @param topic
* @param tags
* @param data
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
* @throws MQBrokerException
*/
public void sentSynData(String topic, String tags, String data) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
Message msg = new Message(topic, tags, data.getBytes()); //封裝消息
SendResult sendResult = defaultMQProducer.send(msg); //發送消息
System.out.printf("%s%n", sendResult);
}
/**
* 發送順序消息
*
* @param topic
* @param tags
* @param data
* @param order
* @throws InterruptedException
* @throws RemotingException
* @throws MQClientException
* @throws MQBrokerException
*/
public void sentOrderDate(String topic, String tags, String key, String data, Integer order) throws InterruptedException, RemotingException,
MQClientException, MQBrokerException {
Message msg = new Message(topic, tags, key, data.getBytes());
SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, order);
System.out.println(sendResult);
}
/**
* 發送順序消息
*
* @param topic
* @param tags
* @param data
* @param order
* @throws InterruptedException
* @throws RemotingException
* @throws MQClientException
* @throws MQBrokerException
*/
public void sentOrderDate(String topic, String tags, String data, Integer order) throws InterruptedException, RemotingException,
MQClientException, MQBrokerException {
Message msg = new Message(topic, tags, data.getBytes());
//隊列選擇
SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, order);
System.out.println(sendResult);
}
public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer;
}
/**
* 關閉producer與RocketMQ的連接
*/
public void shudownProducer() {
defaultMQProducer.shutdown();
}
}
~~~
- Docker
- 什么是docker
- Docker安裝、組件啟動
- docker網絡
- docker命令
- docker swarm
- dockerfile
- mesos
- 運維
- Linux
- Linux基礎
- Linux常用命令_1
- Linux常用命令_2
- ip命令
- 什么是Linux
- SELinux
- Linux GCC編譯警告:Clock skew detected. 錯誤解決辦法
- 文件描述符
- find
- 資源統計
- LVM
- Linux相關配置
- 服務自啟動
- 服務器安全
- 字符集
- shell腳本
- shell命令
- 實用腳本
- shell 數組
- 循環與判斷
- 系統級別進程開啟和停止
- 函數
- java調用shell腳本
- 發送郵件
- Linux網絡配置
- Ubuntu
- Ubuntu發送郵件
- 更換apt-get源
- centos
- 防火墻
- 虛擬機下配置網絡
- yum重新安裝
- 安裝mysql5.7
- 配置本地yum源
- 安裝telnet
- 忘記root密碼
- rsync+ crontab
- Zabbix
- Zabbix監控
- Zabbix安裝
- 自動報警
- 自動發現主機
- 監控MySQL
- 安裝PHP常見錯誤
- 基于nginx安裝zabbix
- 監控Tomcat
- 監控redis
- web監控
- 監控進程和端口號
- zabbix自定義監控
- 觸發器函數
- zabbix監控mysql主從同步狀態
- Jenkins
- 安裝Jenkins
- jenkins+svn+maven
- jenkins執行shell腳本
- 參數化構建
- maven區分環境打包
- jenkins使用注意事項
- nginx
- nginx認證功能
- ubuntu下編譯安裝Nginx
- 編譯安裝
- Nginx搭建本地yum源
- 文件共享
- Haproxy
- 初識Haproxy
- haproxy安裝
- haproxy配置
- virtualbox
- virtualbox 復制新的虛擬機
- ubuntu下vitrualbox安裝redhat
- centos配置雙網卡
- 配置存儲
- Windows
- Windows安裝curl
- VMware vSphere
- 磁盤管理
- 增加磁盤
- gitlab
- 安裝
- tomcat
- Squid
- bigdata
- FastDFS
- FastFDS基礎
- FastFDS安裝及簡單實用
- api介紹
- 數據存儲
- FastDFS防盜鏈
- python腳本
- ELK
- logstash
- 安裝使用
- kibana
- 安準配置
- elasticsearch
- elasticsearch基礎_1
- elasticsearch基礎_2
- 安裝
- 操作
- java api
- 中文分詞器
- term vector
- 并發控制
- 對text字段排序
- 倒排和正排索引
- 自定義分詞器
- 自定義dynamic策略
- 進階練習
- 共享鎖和排它鎖
- nested object
- 父子關系模型
- 高亮
- 搜索提示
- Redis
- redis部署
- redis基礎
- redis運維
- redis-cluster的使用
- redis哨兵
- redis腳本備份還原
- rabbitMQ
- rabbitMQ安裝使用
- rpc
- RocketMQ
- 架構概念
- 安裝
- 實例
- 好文引用
- 知乎
- ACK
- postgresql
- 存儲過程
- 編程語言
- 計算機網絡
- 基礎_01
- tcp/ip
- http轉https
- Let's Encrypt免費ssl證書(基于haproxy負載)
- what's the http?
- 網關
- 網絡IO
- http
- 無狀態網絡協議
- Python
- python基礎
- 基礎數據類型
- String
- List
- 遍歷
- Python基礎_01
- python基礎_02
- python基礎03
- python基礎_04
- python基礎_05
- 函數
- 網絡編程
- 系統編程
- 類
- Python正則表達式
- pymysql
- java調用python腳本
- python操作fastdfs
- 模塊導入和sys.path
- 編碼
- 安裝pip
- python進階
- python之setup.py構建工具
- 模塊動態導入
- 內置函數
- 內置變量
- path
- python模塊
- 內置模塊_01
- 內置模塊_02
- log模塊
- collections
- Twisted
- Twisted基礎
- 異步編程初探與reactor模式
- yield-inlineCallbacks
- 系統編程
- 爬蟲
- urllib
- xpath
- scrapy
- 爬蟲基礎
- 爬蟲種類
- 入門基礎
- Rules
- 反反爬蟲策略
- 模擬登陸
- problem
- 分布式爬蟲
- 快代理整站爬取
- 與es整合
- 爬取APP數據
- 爬蟲部署
- collection for ban of web
- crawlstyle
- API
- 多次請求
- 向調度器發送請求
- 源碼學習
- LinkExtractor源碼分析
- 構建工具-setup.py
- selenium
- 基礎01
- 與scrapy整合
- Django
- Django開發入門
- Django與MySQL
- java
- 設計模式
- 單例模式
- 工廠模式
- java基礎
- java位移
- java反射
- base64
- java內部類
- java高級
- 多線程
- springmvc-restful
- pfx數字證書
- 生成二維碼
- 項目中使用log4j
- 自定義注解
- java發送post請求
- Date時間操作
- spring
- 基礎
- spring事務控制
- springMVC
- 注解
- 參數綁定
- springmvc+spring+mybatis+dubbo
- MVC模型
- SpringBoot
- java配置入門
- SpringBoot基礎入門
- SpringBoot web
- 整合
- SpringBoot注解
- shiro權限控制
- CommandLineRunner
- mybatis
- 靜態資源
- SSM整合
- Aware
- Spring API使用
- Aware接口
- mybatis
- 入門
- mybatis屬性自動映射、掃描
- 問題
- @Param 注解在Mybatis中的使用 以及傳遞參數的三種方式
- mybatis-SQL
- 逆向生成dao、model層代碼
- 反向工程中Example的使用
- 自增id回顯
- SqlSessionDaoSupport
- invalid bound statement(not found)
- 脈絡
- beetl
- beetl是什么
- 與SpringBoot整合
- shiro
- 什么是shiro
- springboot+shrio+mybatis
- 攔截url
- 枚舉
- 圖片操作
- restful
- java項目中日志處理
- JSON
- 文件工具類
- KeyTool生成證書
- 兼容性問題
- 開發規范
- 工具類開發規范
- 壓縮圖片
- 異常處理
- web
- JavaScript
- 基礎語法
- 創建對象
- BOM
- window對象
- DOM
- 閉包
- form提交-文件上傳
- td中內容過長
- 問題1
- js高級
- js文件操作
- 函數_01
- session
- jQuery
- 函數01
- data()
- siblings
- index()與eq()
- select2
- 動態樣式
- bootstrap
- 表單驗證
- 表格
- MUI
- HTML
- iframe
- label標簽
- 規范編程
- layer
- sss
- 微信小程序
- 基礎知識
- 實踐
- 自定義組件
- 修改自定義組件的樣式
- 基礎概念
- appid
- 跳轉
- 小程序發送ajax
- 微信小程序上下拉刷新
- if
- 工具
- idea
- Git
- maven
- svn
- Netty
- 基礎概念
- Handler
- SimpleChannelInboundHandler 與 ChannelInboundHandler
- 網絡編程
- 網絡I/O
- database
- oracle
- 游標
- PLSQL Developer
- mysql
- MySQL基準測試
- mysql備份
- mysql主從不同步
- mysql安裝
- mysql函數大全
- SQL語句
- 修改配置
- 關鍵字
- 主從搭建
- centos下用rpm包安裝mysql
- 常用sql
- information_scheme數據庫
- 值得學的博客
- mysql學習
- 運維
- mysql權限
- 配置信息
- 好文mark
- jsp
- jsp EL表達式
- C
- test