偏移量 ?
golang 將kafka的offset置為最新 : https://blog.csdn.net/u011677067/article/details/81026314
```
package main
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"strconv"
"sync"
)
//全局注釋 kafka 雖然性能比 rabbitmq要快 但是他丟失數據庫的可能性更大,而且還會存在重復接受消息的情況
var Topic = "266"
var partition = int32(0)
func main() {
sarama.Logger = log{}
cfg := sarama.NewConfig()
cfg.Version = sarama.V2_2_0_0
cfg.Producer.Return.Errors = true
cfg.Net.SASL.Enable = false
cfg.Producer.Return.Successes = true //這個是關鍵,否則讀取不到消息
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Partitioner = sarama.NewManualPartitioner //允許指定分組
cfg.Consumer.Return.Errors = true
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
//cfg.Group.Return.Notifications = true
cfg.ClientID = "service-exchange-api"
var kafka = KafkaConfig{
Addrs: []string{"127.0.0.1:9092"},
Config: cfg,
}
_, _, err := NewKafkaClient(kafka)
fmt.Println("err:", err)
}
//發送消息 此為異步發送消息
func NewAsyncProducer(client sarama.Client, i int) error {
c, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
defer c.Close()
p, o, err := c.SendMessage(&sarama.ProducerMessage{Topic: Topic, Value: sarama.StringEncoder("消息發送成功拉ssssssss!!!!" + strconv.Itoa(i))})
if err != nil {
fmt.Printf("err:", err)
return err
}
fmt.Println(p, o)
/*c, err := sarama.NewAsyncProducerFromClient(client)
//sarama.NewSyncProducerFromClient() 此為同步
if err != nil {
return err
}
defer c.Close()
//Topic 為主題,Partition為區域 Partition如果不給默認為0 記得設置cfg.Producer.Partitioner = sarama.NewManualPartitioner 這里為允許設置指定的分區
//分區是從0開始,記得在啟動配置文件時修改Partition的分區
//不同的主題包括不同的分區都是有著不同的offset
c.Input() <- &sarama.ProducerMessage{Topic: Topic,Key:sarama.StringEncoder(fmt.Sprintf("/topic/market/order-trade")), Value: sarama.StringEncoder("消息發送成功拉ssssssss!!!!"+strconv.Itoa(i))}
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-c.Errors():
fmt.Println("Produced message failure: ", err)
default:
//fmt.Println("Produced message success",err)
}*/
return nil
}
//客戶端接收消息
func NewKafkaClient(cfg KafkaConfig) (sarama.Client, func(), error) {
//創建鏈接 創建客戶機
c, err := sarama.NewClient(cfg.Addrs, cfg.Config)
if err != nil {
return nil, nil, err
}
go func() {
//目前默認是肯定能使用的
consumer, err := sarama.NewConsumerGroupFromClient("default-group", c)
//client, err := sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "group-1", cfg.Config)
if err != nil {
fmt.Println(err)
}
loopConsumer(consumer, Topic, partition, "b")
consumer.Close()
}()
go func() {
for i := 0; i < 10; i++ {
NewAsyncProducer(c, i)
}
}()
wg := &sync.WaitGroup{}
wg.Add(1)
wg.Wait()
return c, func() {
err := c.Close()
if err != nil {
fmt.Print(err)
}
}, nil
}
func loopConsumer(consumer sarama.ConsumerGroup, topic string, partition int32, item string) {
go func() {
for err := range consumer.Errors() {
fmt.Println(err)
}
}()
ctx, _ := context.WithCancel(context.Background())
hand := MainHandler{}
for {
err := consumer.Consume(ctx, []string{topic}, &hand)
if err != nil {
fmt.Println(err)
break
}
if ctx.Err() != nil {
break
}
}
/*for {
msg := <-partitionConsumer.Messages()
pom.MarkOffset(msg.Offset + 1, "備注")
fmt.Printf("[%s] : Consumed message: [%s], offset: [%d]\n",item, string(msg.Value), msg.Offset)
}*/
}
type KafkaConfig struct {
Addrs []string
Config *sarama.Config
}
type MainHandler struct {
}
func (m *MainHandler) Setup(sess sarama.ConsumerGroupSession) error {
// 如果極端情況下markOffset失敗,需要手動同步offset
return nil
}
func (m *MainHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
// 如果極端情況下markOffset失敗,需要手動同步offset
return nil
}
//此方法會自動控制偏移值,當分組里的主題消息被接收到時,則偏移值會進行加1 他是跟著主題走的
func (m *MainHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
fmt.Println(fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s Offset = %s", string(message.Value), message.Timestamp, message.Topic, message.Offset))
sess.MarkMessage(message, "")
}
return nil
}
type log struct{}
func (log) Print(v ...interface{}) {
fmt.Println(v...)
}
func (log) Printf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
func (log) Println(v ...interface{}) {
fmt.Println(v...)
}
```
- 111
- 日記
- 工具11
- 20200723
- 20200724
- 20201019
- 更多閱讀
- 回收站
- kafka 消費失敗和重復消費問題
- ABC
- 20200127
- MySQL從刪庫到跑路
- PHP從放棄到入門
- help
- 我的日志
- 博客驗證碼
- 項目版本管理
- C++ Json序列化
- 20190425
- 圖片
- 關鍵字
- 鏈接
- 分布式, 分庫, 分表
- 游戲開發
- goLand 編輯器
- 區塊鏈
- A-計劃
- B-計劃
- gin框架
- 鎖
- 力扣-答題
- 數據庫
- mysql 索引優化
- 挖礦
- 分布式鎖
- 跨域問題
- kafka
- 長連接
- 面向對象 面向過程 函數式編程
- websocket
- 其它問題
- zeroMq
- 工具
- linux - systemctl
- gitbook 部署
- Ubantu 基礎配置
- 備注服務
- 更換身份證(身份證到期了)
- 資源05
- 備注服務2
- 分布式
- TODO
- 資料準備
- 文章閱讀
- mysql 高可用
- 日志1
- 日記2 - 區塊鏈
- centos7 系統服務腳本
- copy_service 服務替換
- go kafka 孤人自嘲 - 偏移量 - kafka
- go vendor
- golang 顯示git工具欄
- 圖片資源
- 資訊01
- 資源01
- 資源02
- 資源03-數據庫
- 資源04
- php歷史數據
- golang 數據
- 文件1
- 文件2
- 文件3
- 文件4
- 文件5
- 文件6
- 文件7
- 文件8
- 文件9
- 文件10
- Flutter
- 管理后臺系統
- 重裝系統