> ### 十二例 Kafka操作
* 生產者
~~~
package main
import (
"fmt"
"time"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer([]string{"192.168.1.170:9092", "192.168.1.171:9092", "192.168.1.172:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close()
number := 1
for {
msg := &sarama.ProducerMessage{}
msg.Topic = "my-topic"
msg.Value = sarama.StringEncoder(fmt.Sprint("this is a good test - ", number))
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed ", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
time.Sleep(time.Second)
number++
}
}
~~~
* 消費者
~~~
package main
import (
"fmt"
"sync"
"github.com/Shopify/sarama"
)
var (
wg sync.WaitGroup
)
func main() {
//創建消費者
consumer, err := sarama.NewConsumer([]string{"192.168.1.170:9092", "192.168.1.171:9092", "192.168.1.172:9092"}, nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
//設置分區
partitionList, err := consumer.Partitions("my-topic")
if err != nil {
fmt.Println("Failed to get the list of partitions: ", err)
return
}
fmt.Println(partitionList)
//循環分區
for partition := range partitionList {
pc, err := consumer.ConsumePartition("my-topic", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
defer pc.AsyncClose()
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
fmt.Println()
}
}(pc)
}
//time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}
~~~
- 第一例 留言板
- 第二例 gRPC使用例子
- 第三例 基于go-micro做服務注冊和服務發現
- 第四例 聊天室
- 第五例 工具庫 第五例 并發安全字典
- dao
- common
- common.go
- config
- config.go
- gorm
- grom.go
- sqlx
- sqlx.go
- kafka
- kafka.go
- log
- log.go
- log2.go
- redis
- redis.go
- zookeeper
- zookeeper.go
- init
- main.go
- 第六例 原生sql操作
- 第七例 sqlx操作
- 第八例 Redis數據庫(gomodule/redigo)
- 第九例 Redis消息隊列
- 第十例 Redis集群連接
- 第十一例 Zookeeper操作
- 第十二例 Kafka操作
- 第十三例 NSQ操作
- 第十四例 二分查找
- 第十五例 交換排序 - 冒泡排序
- 第十六例 插入排序 - 直接插入排序
- 第十七例 插入排序 - 希爾排序
- 第十八例 交換排序 - 快速排序
- 第十九例 算法求解應用
- 第二十例 pprof性能分析
- 第二一例 CPU信息采集
- 第二二例 Heap信息采集
- 第二三例 Http信息采集
- 第二四例 單元測試(功能測試)
- 第二五例 基準測試(壓力測試/性能測試)
- 第二六例 gdb調試
- 第二七例 json序列化和反序列化
- 第二八例 protobuf序列化和反序列化
- 第二九例 包管理工具 go vendor
- 第三十例 包管理工具 go mod
- 第三一例 zip壓縮
- 第三二例 交叉編譯
- 第三三例 線上環境部署
- 第三四例 業務:實現固定周期維護
- 第三五例 聊天室(精簡版)
- 第三六例 并發安全字典
- 第三七例 導出Excel表格
- 第三八例 導出CSV表格
- 第三九例 聊天室(高并發)
- 第四十例 JWT (Json Web Token)
- 第四一例 雪花算法生成 Id
- 第四二例 對稱加密 AES
- 第四三例 非對稱加密 RSA
- 第四四例 簽名算法 SHA1
- 第四五例 數據庫操作 gorm
- 第四六例 數據庫操作 gorm 集合
- 數據庫連接和創建表
- 查詢 - 分頁
- 查詢所有數據
- 查詢單條數據
- 插入一條或多條數據
- 更新一條或多條數據
- 更新一條或多條數據(有零值)
- 第四七例 RSA(MD5WithRSA 算法)簽名和驗簽方式
- 第四八例 線上部署腳本
- 第四九例 Elasticsearch
- 第五十例 對象池
- 第五十一例 相關閱讀