https://blog.csdn.net/qq_45076180/article/details/111561984
---
- Broker: Kafka 集群中的每個服務器稱為一個 Broker,負責存儲和處理消息
- 分區副本(Replication):* Kafka 通過副本(Replica)機制提供高可用性,每個 Partition 會有多個副本(默認 3 個)。
- 分為 **Leader 副本** 和 **Follower 副本**:
* **Leader 副本**:負責讀寫操作。
* **Follower 副本**:被動同步 Leader 的數據,在 Leader 掛掉后,Kafka 自動選舉新的 Leader
#### **ISR(In-Sync Replica)**
* **ISR**(同步副本集合):所有與 Leader 副本保持同步的 Follower 副本。
* 只有 ISR 副本才有資格成為新的 Leader
> # 消息丟失
- 重試機制: retries 避免因臨時故障丟失數據, enable.idempotence=true + 開啟生產者冪等性 (`enable.idempotence=true`)
- Kafka **默認不會每次寫入都立即刷盤(落盤)**,而是**先將數據寫入操作系統的頁面緩存(Page Cache)**,然后**再批量刷入磁盤**, 可以通過修改配置調整入磁盤速度
- 生產者發送消息的時候有個確認機制:
- acks=0 : 生產者**不等待** Kafka 確認,直接發送下一條消息
- acks=1 : 只要**Leader 副本**收到消息就返回 ACK,不管 Follower 是否同步
- acks=-1/acks=all : 生產者等待 **Leader 和ISR(同步副本)** 確認消息寫入后才繼續
- 配合 `min.insync.replicas` 指定了**最少需要多少個 ISR 副本**確認消息寫入
- 關閉Unclean 選舉:
```
Broker 1(Leader) ---> 最新數據(消息 1、2、3、4、5)
Broker 2(Follower, ISR) ---> 最新數據(消息 1、2、3、4、5)
Broker 3(Follower, 非ISR) ---> 舊數據(消息 1、2、3)
**正常情況**:
* Broker 1 作為 Leader,Broker 2 作為 ISR 復制數據。
* Broker 3 由于同步較慢,被剔除出 ISR(非 ISR)
**如果 Broker 1 和 Broker 2 宕機:**
* **Unclean 選舉 = `true`**
* Kafka 允許 Broker 3 成為新的 Leader,但它**缺少消息 4 和 5**,因此數據丟失。
* **Unclean 選舉 = `false`**
* Kafka 不會選出新的 Leader,該分區**不可用**,直到 Broker 1 或 Broker 2 重新上線
```
- 數據回滾
| Broker | 舊數據(宕機前) | 重新上線后數據 |
| --- | --- | --- |
| Broker 1 | **1, 2, 3, 4, 5** | **1, 2, 3, 6, 7**(刪除 4、5,改為同步新 Leader) |
| Broker 2 | **1, 2, 3, 4, 5** | **1, 2, 3, 6, 7**(刪除 4、5,改為同步新 Leader) |
| Broker 3(新 Leader) | **1, 2, 3** | **1, 2, 3, 6, 7** |
- 消費者設置 自動提交偏移量: 如果消費失敗, 數據就丟失了(解決方案: 設置手動提交)
> # Rebalance 頻繁
- 在 Kafka 中,**Rebalance(再平衡)** 是指 **Kafka Consumer Group(消費者組)重新分配分區(Partition)的過程**。
- 消費者加入或離開 Consumer Group
```
session.timeout.ms=45000 # 默認 10s,適當加大避免誤判
heartbeat.interval.ms=15000 # 默認 3s,適當加大減少心跳頻率
```
> # Kafka 如何在多個分區中保證消息順序和消息處理效率?
- 需要順序消費的, 通過設置key放入同一個分區里
### **分區分配機制**
Kafka 通過**Partitioner**來決定消息發送到哪個分區。默認情況下,Kafka 使用以下規則:
1. **如果有 Key**:
* 對 Key 進行哈希(默認使用`murmur2`算法),然后對分區數取模,得到目標分區。
* **公式**:`partition = hash(key) % numPartitions`
* **結果**:相同 Key 的消息會被分配到同一個分區。
2. **如果沒有 Key**:
* 使用**輪詢策略**(Round Robin)將消息均勻分配到各個分區
---
消息丟失acks=all + min.insync.replicas ≥ 2
消息重復冪等性 (enable.idempotence=true) + 去重(Redis/數據庫唯一索引)
數據積壓增加消費者 + 批量消費 + 多線程處理
分區不均衡自定義分區策略 + Key 分區
消息順序單個 Partition 保持順序 + StickyAssignor
Rebalance 頻繁增加 session.timeout.ms + StickyAssignor
事務問題Kafka 事務 API + 冪等性
性能優化批量發送/消費 + Zero Copy + 壓縮
- 目錄
- 第一例 gRPC使用例子
- 第二例 基于go-micro做服務注冊和服務發現
- 第三例 留言板項目源碼
- 第四例 聊天室
- 第五例 工具庫
- dao
- common
- common.go
- config
- config.go
- gorm
- grom.go
- sqlx
- sqlx.go
- kafka
- kafka.go
- log
- log.go
- log2.go
- redis
- redis.go
- zookeeper
- zookeeper.go
- init
- main.go
- 工具庫
- cache
- cfg.go
- redis
- 示例
- database
- cfg.go
- gorm.go
- sql.go
- 示例
- mq
- cfg.go
- kafka_consumer.go
- kafka_producter.go
- 示例
- time
- time.go
- 第六例 原生sql操作
- 第七例 sqlx操作
- 第八例 Redis數據庫(gomodule/redigo)
- 第九例 Redis消息隊列
- 第十例 Redis集群連接
- 十一例 Zookeeper操作
- 十二例 Kafka操作
- 十三例 NSQ操作
- 十四例 二分查找
- 十五例 交換排序 - 冒泡排序
- 十六例 插入排序 - 直接插入排序
- 十七例 插入排序 - 希爾排序
- 十八例 交換排序 - 快速排序
- 十九例 算法求解應用
- 二十例 pprof性能分析
- 二一例 CPU信息采集
- 二二例 Heap信息采集
- 二三例 Http信息采集
- 二四例 單元測試(功能測試)
- 二五例 基準測試(壓力測試/性能測試)
- 二六例 gdb調試
- 二七例 json序列化和反序列化
- 二八例 protobuf序列化和反序列化
- 二九例 包管理工具 go vendor
- 三十例 包管理工具 go mod
- 三一例 zip壓縮
- 三二例 交叉編譯
- 三三例 線上環境部署
- 三四例 業務:實現固定周期維護
- 三五例 聊天室(精簡版)
- 三六例 并發安全字典
- 三七例 導出Excel表格
- 三八例 導出CSV表格
- 三九例 聊天室(高并發)
- 四十例 JWT (Json Web Token)
- 四一例 雪花算法生成 Id
- 四二例 對稱加密 AES
- 四三例 非對稱加密 RSA
- 四四例 簽名算法 SHA1
- 四五例 數據庫操作 gorm
- gorm V2
- 四六例 數據庫操作 gorm 集合
- 數據庫連接和創建表
- 查詢 - 分頁
- 查詢所有數據
- 查詢單條數據
- 插入一條或多條數據
- 更新一條或多條數據
- 更新一條或多條數據(有零值)
- 四七例 RSA(MD5WithRSA 算法)簽名和驗簽方式
- 四八例 線上部署腳本
- 四九例 Elasticsearch
- 五十例 對象池
- 五一例 中間庫(github.com/wong-winnie/library)
- 五二例 二維碼(生成和解析)
- 五三例 回調用例
- 五四例 文件服務器(MINIO)
- 五五例 chm文檔轉json
- 提取內容頁Json
- 將目錄索引和內容頁混合生成Json
- 目錄層級小案例
- 五六例 部署 gogs 代碼管理工具
- 五七例 通過命令行操作SVN
- 五八例 根據數據庫表生產模型
- 五九例 Trie樹
- 六十例 二進制排序
- 六一例 遞歸+迭代實現無限級分類
- 六二例 Arrow 數據結構
- 簡單介紹
- Go 用Arrow數據格式與其它語言交互
- 六三例 LMDB 內存映射型數據庫
- 獲取指定Key位置
- 六四例 切片數據按字段分類
- 六五例 Xorm 批量插入數據
- 六六例 FlatBuffers 序列化和反序列化
- FlatBuffers 步驟1
- FlatBuffers 步驟2
- 六七例 數據同步
- 增量同步v1
- 全量同步v1
- 定時器
- 六八例 Http請求
- 六九例 Gin + 數據庫操作
- 七十例 ClickHouse 列式數據庫
- 七一例 用圖表展示數據庫數據
- 七二例 go:linkname
- 七三例 四舍五入、保留3小數位
- 七四例 判斷兩個時間戳是否同一天
- 七五例 Gin Http請求
- 七六例 過濾器
- 七七例 Excel 導入導出
- 七八例 小程序向公眾號推消息
- 七九列 解析二進制數據
- 例子一
- 例子二
- 八十例 路由轉發
- 八一例 協程池(安全執行任務,捕獲異常)
- 八二例 切片 slice
- 八三例 集合 map
- 八四例 Redis 六種數據類型
- 八五例 Zstd壓縮
- 八六例 提高接口并發量
- 八七例 協程 goroutine 和 通道 channel
- 八七例 Mysql 事務和索引等
- 編寫中
- 數據交互
- mysql 索引和事務
- 發請求
- defer
- 其它
- linux
- OAuth2.0 和 JWT
- 其它2
- 其他
- Web3.0 智能合約
- 多人貪吃蛇
- V1
- 客戶端
- 服務端
- V2
- 同步方式
- 游戲框架
- deepseek
- k8s
- TRPC
- Kafka
- 加密
- mm
- 技術擴展閱讀