> ### 生產者
~~~
package main
import (
"fmt"
"time"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer([]string{"192.168.1.170:9092", "192.168.1.171:9092", "192.168.1.172:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close()
number := 1
for {
msg := &sarama.ProducerMessage{}
msg.Topic = "my-topic"
msg.Value = sarama.StringEncoder(fmt.Sprint("this is a good test - ", number))
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed ", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
time.Sleep(time.Second)
number++
}
}
~~~
> ### 消費者
~~~
package main
import (
"fmt"
"sync"
"github.com/Shopify/sarama"
)
var (
wg sync.WaitGroup
)
func main() {
//創建消費者
consumer, err := sarama.NewConsumer([]string{"192.168.1.170:9092", "192.168.1.171:9092", "192.168.1.172:9092"}, nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
//設置分區
partitionList, err := consumer.Partitions("my-topic")
if err != nil {
fmt.Println("Failed to get the list of partitions: ", err)
return
}
fmt.Println(partitionList)
//循環分區
for partition := range partitionList {
//sarama.OffsetNewest 獲取最新的
//sarama.OffsetOldest 從頭讀到尾
pc, err := consumer.ConsumePartition("my-topic", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
defer pc.AsyncClose()
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
fmt.Println()
}
}(pc)
}
//time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}
~~~
> ### 相關閱讀
* [kafka學習筆記:知識點整理](https://www.cnblogs.com/cyfonly/p/5954614.html)
* [Kafka分區與消費者的關系](https://www.cnblogs.com/cjsblog/p/9664536.html)
* [golang kafka小試消息隊列](https://www.cnblogs.com/cjsblog/p/9664536.html)
* * *
* windows gcc not fount 安裝 mingw-w64
* [https://sourceforge.net/projects/mingw-w64/](https://sourceforge.net/projects/mingw-w64/)
- 第一序 入門教程(一)
- 1.1環境配置
- 1.1 環境配置(補充:Linux下安裝)
- 1.1 環境配置(補充:線上部署)
- 1.2 開發工具GoLand
- 1.3 準備工作
- 1.4 第一個應用程序 Hello World
- 1.4 補充 go get github 超時
- 第二序 入門教程(二)
- 2.1 語法結構
- 2.2 常量, 變量
- 2.2.1 命名規則
- 2.2.2 變量
- 2.2.2 變量(補充:類型推斷的好處)
- 2.2.2 變量(補充:泛型)
- 2.2.3 常量
- 2.2.4 iota
- 2.2.5 Unicode字符編碼
- 2.2.6 GBK 轉 UTF8
- 2.3 條件語句
- 2.3.1 判斷語句 if
- 2.3.2 選擇語句 switch
- 2.3.3 循環語句 for
- 2.3.4 遍歷 range
- 2.3.5 跳轉語句 goto, break, continue
- 2.3.6 for 和 for range區別
- 2.4 數組, 切片, 集合, 通道
- 2.4.1 make, len, cap, new, nil
- 2.4.1 make, len, cap, new, nil (補充:nil)
- 2.4.2 數組 array
- 2.4.3.1 切片 slice - 1
- 2.4.3.2 切片 slice - 2
- 2.4.3.3 slice list ring
- 2.4.4 集合 map
- 2.4.5 goroutine
- 2.4.6 channel
- 2.5 函數, 結構, 方法, 接口
- 2.5.1 函數 function
- 2.5.2 結構 struct
- 2.5.3 方法 method
- 2.5.4 接口 interface
- 2.5.5 Go是面向對象的語言嗎?
- 2.5.6 json序列化和反序列化
- 2.5.7 T和指針T
- 2.6 defer, panic, recover
- 2.6.1 defer
- 2.6.2 painc, recover
- 2.7 指針
- 2.7 指針(補充: 可尋址和不可尋址)
- 2.8 反射
- 第三序 相關閱讀
- 3.1 相關閱讀1
- 3.2 相關閱讀2
- 3.3 相關閱讀3
- 第四序 性能分析和調試工具
- 4.1 pprof工具介紹
- 4.2 CPU信息采集
- 4.3 Heap信息采集
- 4.4 Http信息采集
- 4.5 單元測試(功能測試)
- 4.6 基準測試(壓力測試/性能測試)
- 4.7 示例測試(example)
- 4.8 gdb調試
- 第五序 網絡編程
- 5.1 http請求和響應
- 5.2 socket
- 5.2.1 概念
- 5.2.2 服務端
- 5.2.3 客戶端
- 5.3 WebSocket
- 5.3.1 第一版
- 5.3.1.1 服務端
- 5.3.1.2 客戶端
- 5.3.1.3 相關閱讀
- 5.3.2 服務端
- 5.3.3 客戶端
- 5.3.4 nginx配置
- 5.3.5 修改版
- 5.3.5.1 草稿 - 1
- 5.3.5.2 草稿 - 2
- 5.3.5.3 草稿 - 3
- 5.3.5.4 服務端
- 5.3.5.5 客戶端
- 5.4 打印客戶端頭部信息
- 第六序 算法
- 6.1 查找
- 6.1.1 二分查找
- 6.2 排序
- 6.2.1 交換排序 - 冒泡排序
- 6.2.2 插入排序 - 直接插入排序
- 6.2.3 插入排序 - 希爾排序
- 6.2.4 交換排序 - 快速排序
- 6.3 算法求解應用
- 第七序 微服務
- 7.1 相關閱讀
- 7.2 gRPC
- 7.2.1 準備工作
- 7.2.2 編譯.proto文件
- 7.2.3 gRPC服務端
- 7.2.4 gRPC客戶端
- 7.3 micro/micro
- 7.3.1 服務發現
- 7.3.2 安裝consul
- 7.3.3 準備工作
- 7.3.4 服務端
- 7.3.5 客戶端
- 7.3.6 默認的服務發現
- 7.3.7 文檔閱讀
- 7.4 protobuf序列化
- 第八序 Web
- 8.1 視圖模板
- 8.1.1 main.go
- 8.1.2 login.html
- 8.2 原生留言板
- 8.2.1 原生sql
- 8.2.1.1 main.go
- 8.2.1.2 view
- 8.2.1.2.1 index.html
- 8.2.1.2.2 create.html
- 8.2.2 sqlx
- 8.3 Gin框架
- 第九序 數據庫
- 9.0 資料收集
- 9.1 Redis數據庫 (gomodule/redigo)
- 9.1.1 介紹
- 9.1.2 消息隊列
- 9.2 Redis數據庫(go-redis/redis)
- 第十序 日記
- 10.1 SimplePanic
- 10.2 第一版日記庫
- 10.2.1 winnielog
- 10.2.2 使用
- 第十一序 中間鍵
- 11.0 資料收集
- 11.1 NSQ
- 11.2 zookeeper
- 11.3 kafka
- 第十二序 加密
- 12.1 Token
- 12.2 SHA1
- 2.3 RSA + AES
- 第十三序 分布式鎖
- 第十四序 標準庫練習
- container/list
- 鏈表
- container/ring
- 環形鏈表
- context
- flag (獲取命令行參數)
- io
- strconv
- sync
- 為什么需要鎖?
- 互斥鎖
- 讀寫鎖
- 條件變量
- 計數器
- 并發安全字典
- 自制并發安全字典
- 官方并發安全字典
- 連接池
- sync/atomic
- 原子操作
- 第十五序 其它內容
- 文件讀寫
- 工作池
- 第十六序 相關閱讀