[TOC]
# 1 單機版消息中心
一個消息中心,最基本的需要支持多生產者、多消費者,例如下:
```java
class Scratch {
public static void main(String[] args) {
// 實際中會有 nameserver 服務來找到 broker 具體位置以及 broker 主從信息
Broker broker = new Broker();
Producer producer1 = new Producer();
producer1.connectBroker(broker);
Producer producer2 = new Producer();
producer2.connectBroker(broker);
Consumer consumer1 = new Consumer();
consumer1.connectBroker(broker);
Consumer consumer2 = new Consumer();
consumer2.connectBroker(broker);
for (int i = 0; i < 2; i++) {
producer1.asyncSendMsg("producer1 send msg" + i);
producer2.asyncSendMsg("producer2 send msg" + i);
}
System.out.println("broker has msg:" + broker.getAllMagByDisk());
for (int i = 0; i < 1; i++) {
System.out.println("consumer1 consume msg:" + consumer1.syncPullMsg());
}
for (int i = 0; i < 3; i++) {
System.out.println("consumer2 consume msg:" + consumer2.syncPullMsg());
}
}
}
class Producer {
private Broker broker;
public void connectBroker(Broker broker) {
this.broker = broker;
}
public void asyncSendMsg(String msg) {
if (broker == null) {
throw new RuntimeException("please connect broker first");
}
new Thread(() -> {
broker.sendMsg(msg);
}).start();
}
}
class Consumer {
private Broker broker;
public void connectBroker(Broker broker) {
this.broker = broker;
}
public String syncPullMsg() {
return broker.getMsg();
}
}
class Broker {
// 對應 RocketMQ 中 MessageQueue,默認情況下 1 個 Topic 包含 4 個 MessageQueue
private LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue(Integer.MAX_VALUE);
// 實際發送消息到 broker 服務器使用 Netty 發送
public void sendMsg(String msg) {
try {
messageQueue.put(msg);
// 實際會同步或異步落盤,異步落盤使用的定時任務定時掃描落盤
} catch (InterruptedException e) {
}
}
public String getMsg() {
try {
return messageQueue.take();
} catch (InterruptedException e) {
}
return null;
}
public String getAllMagByDisk() {
StringBuilder sb = new StringBuilder("\n");
messageQueue.iterator().forEachRemaining((msg) -> {
sb.append(msg + "\n");
});
return sb.toString();
}
}
```
問題:
1. 沒有實現真正執行消息存儲落盤
2. 沒有實現 NameServer 去作為注冊中心,定位服務
3. 使用 LinkedBlockingQueue 作為消息隊列,注意,參數是無限大,在真正 RocketMQ 也是如此是無限大,理論上不會出現對進來的數據進行拋棄,但是會有內存泄漏問題(阿里巴巴開發手冊也因為這個問題,建議我們使用自制線程池)
4. 沒有使用多個隊列(即多個 LinkedBlockingQueue),RocketMQ 的順序消息是通過生產者和消費者同時使用同一個 MessageQueue 來實現,但是如果我們只有一個 MessageQueue,那我們天然就支持順序消息
5. 沒有使用 MappedByteBuffer 來實現文件映射從而使消息數據落盤非常的快(實際 RocketMQ 使用的是 FileChannel+DirectBuffer)
# 2 分布式消息中心
## 2.1 問題與解決
### 2.1.1 消息丟失的問題
1. 當你系統需要保證百分百消息不丟失,你可以使用生產者每發送一個消息,Broker 同步返回一個消息發送成功的反饋消息
2. 即每發送一個消息,同步落盤后才返回生產者消息發送成功,這樣只要生產者得到了消息發送生成的返回,事后除了硬盤損壞,都可以保證不會消息丟失
3. 但是這同時引入了一個問題,同步落盤怎么才能快?
### 2.1.2 同步落盤怎么才能快
1. 使用 FileChannel + DirectBuffer 池,使用堆外內存,加快內存拷貝
2. 使用數據和索引分離,當消息需要寫入時,使用 commitlog 文件順序寫,當需要定位某個消息時,查詢index 文件來定位,從而減少文件IO隨機讀寫的性能損耗
### 2.1.3 消息堆積的問題
1. 后臺定時任務每隔72小時,刪除舊的沒有使用過的消息信息
2. 根據不同的業務實現不同的丟棄任務,具體參考線程池的 AbortPolicy,例如FIFO/LRU等(RocketMQ沒有此策略)
3. 消息定時轉移,或者對某些重要的 TAG 型(支付型)消息真正落庫
### 2.1.4 定時消息的實現
1. 實際 RocketMQ 沒有實現任意精度的定時消息,它只支持某些特定的時間精度的定時消息
2. 實現定時消息的原理是:創建特定時間精度的 MessageQueue,例如生產者需要定時1s之后被消費者消費,你只需要將此消息發送到特定的 Topic,例如:MessageQueue-1 表示這個 MessageQueue 里面的消息都會延遲一秒被消費,然后 Broker 會在 1s 后發送到消費者消費此消息,使用 newSingleThreadScheduledExecutor 實現
### 2.1.5 順序消息的實現
1. 與定時消息同原理,生產者生產消息時指定特定的 MessageQueue ,消費者消費消息時,消費特定的 MessageQueue,其實單機版的消息中心在一個 MessageQueue 就天然支持了順序消息
2. 注意:同一個 MessageQueue 保證里面的消息是順序消費的前提是:消費者是串行的消費該 MessageQueue,因為就算 MessageQueue 是順序的,但是當并行消費時,還是會有順序問題,但是串行消費也同時引入了兩個問題:
>1. 引入鎖來實現串行
>2. 前一個消費阻塞時后面都會被阻塞
### 2.1.6 分布式消息的實現
1. 需要前置知識:2PC
2. RocketMQ4.3 起支持,原理為2PC,即兩階段提交,prepared->commit/rollback
3. 生產者發送事務消息,假設該事務消息 Topic 為 Topic1-Trans,Broker 得到后首先更改該消息的 Topic 為 Topic1-Prepared,該 Topic1-Prepared 對消費者不可見。然后定時回調生產者的本地事務A執行狀態,根據本地事務A執行狀態,來是否將該消息修改為 Topic1-Commit 或 Topic1-Rollback,消費者就可以正常找到該事務消息或者不執行等
>注意,就算是事務消息最后回滾了也不會物理刪除,只會邏輯刪除該消息
### 2.1.7 消息的 push 實現
1. 注意,RocketMQ 已經說了自己會有低延遲問題,其中就包括這個消息的 push 延遲問題
2. 因為這并不是真正的將消息主動的推送到消費者,而是 Broker 定時任務每5s將消息推送到消費者
### 2.1.8 消息重復發送的避免
1. RocketMQ 會出現消息重復發送的問題,因為在網絡延遲的情況下,這種問題不可避免的發生,如果非要實現消息不可重復發送,那基本太難,因為網絡環境無法預知,還會使程序復雜度加大,因此默認允許消息重復發送
2. RocketMQ 讓使用者在消費者端去解決該問題,即需要消費者端在消費消息時支持冪等性的去消費消息
3. 最簡單的解決方案是每條消費記錄有個消費狀態字段,根據這個消費狀態字段來是否消費或者使用一個集中式的表,來存儲所有消息的消費狀態,從而避免重復消費
4. 具體實現可以查詢關于消息冪等消費的解決方案
### 2.1.9 廣播消費與集群消費
1. 消息消費區別:廣播消費,訂閱該 Topic 的消息者們都會消費**每個**消息。集群消費,訂閱該 Topic 的消息者們只會有一個去消費**某個**消息
2. 消息落盤區別:具體表現在消息消費進度的保存上。廣播消費,由于每個消費者都獨立的去消費每個消息,因此每個消費者各自保存自己的消息消費進度。而集群消費下,訂閱了某個 Topic,而旗下又有多個 MessageQueue,每個消費者都可能會去消費不同的 MessageQueue,因此總體的消費進度保存在 Broker 上集中的管理
### 2.1.10 RocketMQ 不使用 ZooKeeper 作為注冊中心的原因,以及自制的 NameServer 優缺點?
1. ZooKeeper 作為支持順序一致性的中間件,在某些情況下,它為了滿足一致性,會丟失一定時間內的可用性,RocketMQ 需要注冊中心只是為了發現組件地址,在某些情況下,RocketMQ 的注冊中心可以出現數據不一致性,這同時也是 NameServer 的缺點,因為 NameServer 集群間互不通信,它們之間的注冊信息可能會不一致
2. 另外,當有新的服務器加入時,NameServer 并不會立馬通知到 Produer,而是由 Produer 定時去請求 NameServer 獲取最新的 Broker/Consumer 信息(這種情況是通過 Producer 發送消息時,負載均衡解決)
- 一.JVM
- 1.1 java代碼是怎么運行的
- 1.2 JVM的內存區域
- 1.3 JVM運行時內存
- 1.4 JVM內存分配策略
- 1.5 JVM類加載機制與對象的生命周期
- 1.6 常用的垃圾回收算法
- 1.7 JVM垃圾收集器
- 1.8 CMS垃圾收集器
- 1.9 G1垃圾收集器
- 2.面試相關文章
- 2.1 可能是把Java內存區域講得最清楚的一篇文章
- 2.0 GC調優參數
- 2.1GC排查系列
- 2.2 內存泄漏和內存溢出
- 2.2.3 深入理解JVM-hotspot虛擬機對象探秘
- 1.10 并發的可達性分析相關問題
- 二.Java集合架構
- 1.ArrayList深入源碼分析
- 2.Vector深入源碼分析
- 3.LinkedList深入源碼分析
- 4.HashMap深入源碼分析
- 5.ConcurrentHashMap深入源碼分析
- 6.HashSet,LinkedHashSet 和 LinkedHashMap
- 7.容器中的設計模式
- 8.集合架構之面試指南
- 9.TreeSet和TreeMap
- 三.Java基礎
- 1.基礎概念
- 1.1 Java程序初始化的順序是怎么樣的
- 1.2 Java和C++的區別
- 1.3 反射
- 1.4 注解
- 1.5 泛型
- 1.6 字節與字符的區別以及訪問修飾符
- 1.7 深拷貝與淺拷貝
- 1.8 字符串常量池
- 2.面向對象
- 3.關鍵字
- 4.基本數據類型與運算
- 5.字符串與數組
- 6.異常處理
- 7.Object 通用方法
- 8.Java8
- 8.1 Java 8 Tutorial
- 8.2 Java 8 數據流(Stream)
- 8.3 Java 8 并發教程:線程和執行器
- 8.4 Java 8 并發教程:同步和鎖
- 8.5 Java 8 并發教程:原子變量和 ConcurrentMap
- 8.6 Java 8 API 示例:字符串、數值、算術和文件
- 8.7 在 Java 8 中避免 Null 檢查
- 8.8 使用 Intellij IDEA 解決 Java 8 的數據流問題
- 四.Java 并發編程
- 1.線程的實現/創建
- 2.線程生命周期/狀態轉換
- 3.線程池
- 4.線程中的協作、中斷
- 5.Java鎖
- 5.1 樂觀鎖、悲觀鎖和自旋鎖
- 5.2 Synchronized
- 5.3 ReentrantLock
- 5.4 公平鎖和非公平鎖
- 5.3.1 說說ReentrantLock的實現原理,以及ReentrantLock的核心源碼是如何實現的?
- 5.5 鎖優化和升級
- 6.多線程的上下文切換
- 7.死鎖的產生和解決
- 8.J.U.C(java.util.concurrent)
- 0.簡化版(快速復習用)
- 9.鎖優化
- 10.Java 內存模型(JMM)
- 11.ThreadLocal詳解
- 12 CAS
- 13.AQS
- 0.ArrayBlockingQueue和LinkedBlockingQueue的實現原理
- 1.DelayQueue的實現原理
- 14.Thread.join()實現原理
- 15.PriorityQueue 的特性和原理
- 16.CyclicBarrier的實際使用場景
- 五.Java I/O NIO
- 1.I/O模型簡述
- 2.Java NIO之緩沖區
- 3.JAVA NIO之文件通道
- 4.Java NIO之套接字通道
- 5.Java NIO之選擇器
- 6.基于 Java NIO 實現簡單的 HTTP 服務器
- 7.BIO-NIO-AIO
- 8.netty(一)
- 9.NIO面試題
- 六.Java設計模式
- 1.單例模式
- 2.策略模式
- 3.模板方法
- 4.適配器模式
- 5.簡單工廠
- 6.門面模式
- 7.代理模式
- 七.數據結構和算法
- 1.什么是紅黑樹
- 2.二叉樹
- 2.1 二叉樹的前序、中序、后序遍歷
- 3.排序算法匯總
- 4.java實現鏈表及鏈表的重用操作
- 4.1算法題-鏈表反轉
- 5.圖的概述
- 6.常見的幾道字符串算法題
- 7.幾道常見的鏈表算法題
- 8.leetcode常見算法題1
- 9.LRU緩存策略
- 10.二進制及位運算
- 10.1.二進制和十進制轉換
- 10.2.位運算
- 11.常見鏈表算法題
- 12.算法好文推薦
- 13.跳表
- 八.Spring 全家桶
- 1.Spring IOC
- 2.Spring AOP
- 3.Spring 事務管理
- 4.SpringMVC 運行流程和手動實現
- 0.Spring 核心技術
- 5.spring如何解決循環依賴問題
- 6.springboot自動裝配原理
- 7.Spring中的循環依賴解決機制中,為什么要三級緩存,用二級緩存不夠嗎
- 8.beanFactory和factoryBean有什么區別
- 九.數據庫
- 1.mybatis
- 1.1 MyBatis-# 與 $ 區別以及 sql 預編譯
- Mybatis系列1-Configuration
- Mybatis系列2-SQL執行過程
- Mybatis系列3-之SqlSession
- Mybatis系列4-之Executor
- Mybatis系列5-StatementHandler
- Mybatis系列6-MappedStatement
- Mybatis系列7-參數設置揭秘(ParameterHandler)
- Mybatis系列8-緩存機制
- 2.淺談聚簇索引和非聚簇索引的區別
- 3.mysql 證明為什么用limit時,offset很大會影響性能
- 4.MySQL中的索引
- 5.數據庫索引2
- 6.面試題收集
- 7.MySQL行鎖、表鎖、間隙鎖詳解
- 8.數據庫MVCC詳解
- 9.一條SQL查詢語句是如何執行的
- 10.MySQL 的 crash-safe 原理解析
- 11.MySQL 性能優化神器 Explain 使用分析
- 12.mysql中,一條update語句執行的過程是怎么樣的?期間用到了mysql的哪些log,分別有什么作用
- 十.Redis
- 0.快速復習回顧Redis
- 1.通俗易懂的Redis數據結構基礎教程
- 2.分布式鎖(一)
- 3.分布式鎖(二)
- 4.延時隊列
- 5.位圖Bitmaps
- 6.Bitmaps(位圖)的使用
- 7.Scan
- 8.redis緩存雪崩、緩存擊穿、緩存穿透
- 9.Redis為什么是單線程、及高并發快的3大原因詳解
- 10.布隆過濾器你值得擁有的開發利器
- 11.Redis哨兵、復制、集群的設計原理與區別
- 12.redis的IO多路復用
- 13.相關redis面試題
- 14.redis集群
- 十一.中間件
- 1.RabbitMQ
- 1.1 RabbitMQ實戰,hello world
- 1.2 RabbitMQ 實戰,工作隊列
- 1.3 RabbitMQ 實戰, 發布訂閱
- 1.4 RabbitMQ 實戰,路由
- 1.5 RabbitMQ 實戰,主題
- 1.6 Spring AMQP 的 AMQP 抽象
- 1.7 Spring AMQP 實戰 – 整合 RabbitMQ 發送郵件
- 1.8 RabbitMQ 的消息持久化與 Spring AMQP 的實現剖析
- 1.9 RabbitMQ必備核心知識
- 2.RocketMQ 的幾個簡單問題與答案
- 2.Kafka
- 2.1 kafka 基礎概念和術語
- 2.2 Kafka的重平衡(Rebalance)
- 2.3.kafka日志機制
- 2.4 kafka是pull還是push的方式傳遞消息的?
- 2.5 Kafka的數據處理流程
- 2.6 Kafka的腦裂預防和處理機制
- 2.7 Kafka中partition副本的Leader選舉機制
- 2.8 如果Leader掛了的時候,follower沒來得及同步,是否會出現數據不一致
- 2.9 kafka的partition副本是否會出現腦裂情況
- 十二.Zookeeper
- 0.什么是Zookeeper(漫畫)
- 1.使用docker安裝Zookeeper偽集群
- 3.ZooKeeper-Plus
- 4.zk實現分布式鎖
- 5.ZooKeeper之Watcher機制
- 6.Zookeeper之選舉及數據一致性
- 十三.計算機網絡
- 1.進制轉換:二進制、八進制、十六進制、十進制之間的轉換
- 2.位運算
- 3.計算機網絡面試題匯總1
- 十四.Docker
- 100.面試題收集合集
- 1.美團面試常見問題總結
- 2.b站部分面試題
- 3.比心面試題
- 4.騰訊面試題
- 5.哈羅部分面試
- 6.筆記
- 十五.Storm
- 1.Storm和流處理簡介
- 2.Storm 核心概念詳解
- 3.Storm 單機版本環境搭建
- 4.Storm 集群環境搭建
- 5.Storm 編程模型詳解
- 6.Storm 項目三種打包方式對比分析
- 7.Storm 集成 Redis 詳解
- 8.Storm 集成 HDFS 和 HBase
- 9.Storm 集成 Kafka
- 十六.Elasticsearch
- 1.初識ElasticSearch
- 2.文檔基本CRUD、集群健康檢查
- 3.shard&replica
- 4.document核心元數據解析及ES的并發控制
- 5.document的批量操作及數據路由原理
- 6.倒排索引
- 十七.分布式相關
- 1.分布式事務解決方案一網打盡
- 2.關于xxx怎么保證高可用的問題
- 3.一致性hash原理與實現
- 4.微服務注冊中心 Nacos 比 Eureka的優勢
- 5.Raft 協議算法
- 6.為什么微服務架構中需要網關
- 0.CAP與BASE理論
- 十八.Dubbo
- 1.快速掌握Dubbo常規應用
- 2.Dubbo應用進階
- 3.Dubbo調用模塊詳解
- 4.Dubbo調用模塊源碼分析
- 6.Dubbo協議模塊