Kafka是什么
~~~
kafka使用scala開發,支持多語言客戶端(c++、java、python、go等)
Kafka最先由LinkedIn公司開發,之后成為Apache的頂級項目。
Kafka是一個分布式的、分區化、可復制提交的日志服務
LinkedIn使用Kafka實現了公司不同應用程序之間的松耦和,那么作為一個可擴展、高可靠的消息系統
支持高Throughput的應用
scale out:無需停機即可擴展機器
持久化:通過將數據持久化到硬盤以及replication防止數據丟失
支持online和offline的場景
~~~
Kafka的特點
Kafka是分布式的,其所有的構件borker(服務端集群)、producer(消息生產)、consumer(消息消費者)都可以是分布式的。
在消息的生產時可以使用一個標識topic來區分,且可以進行分區;每一個分區都是一個順序的、不可變的消息隊列, 并且可以持續的添加。
同時為發布和訂閱提供高吞吐量。據了解,Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡
常用的場景
監控:主機通過Kafka發送與系統和應用程序健康相關的指標,然后這些信息會被收集和處理從而創建監控儀表盤并發送警告。
消息隊列: 應用程度使用Kafka作為傳統的消息系統實現標準的隊列和消息的發布—訂閱,例如搜索和內容提要(Content Feed)。比起大多數的消息系統來說,Kafka有更好的吞吐量,內置的分區,冗余及容錯性,這讓Kafka成為了一個很好的大規模消息處理應用的解決方案。消息系統 一般吞吐量相對較低,但是需要更小的端到端延時,并嘗嘗依賴于Kafka提供的強大的持久性保障。在這個領域,Kafka足以媲美傳統消息系統,如ActiveMR或RabbitMQ
站點的用戶活動追蹤: 為了更好地理解用戶行為,改善用戶體驗,將用戶查看了哪個頁面、點擊了哪些內容等信息發送到每個數據中心的Kafka集群上,并通過Hadoop進行分析、生成日常報告。
流處理:保存收集流數據,以提供之后對接的Storm或其他流式計算框架進行處理。很多用戶會將那些從原始topic來的數據進行 階段性處理,匯總,擴充或者以其他的方式轉換到新的topic下再繼續后面的處理。例如一個文章推薦的處理流程,可能是先從RSS數據源中抓取文章的內 容,然后將其丟入一個叫做“文章”的topic中;后續操作可能是需要對這個內容進行清理,比如回復正常數據或者刪除重復數據,最后再將內容匹配的結果返 還給用戶。這就在一個獨立的topic之外,產生了一系列的實時數據處理的流程。
日志聚合。使用Kafka代替日志聚合(log aggregation)。日志聚合一般來說是從服務器上收集日志文件,然后放到一個集中的位置(文件服務器或HDFS)進行處理。然而Kafka忽略掉 文件的細節,將其更清晰地抽象成一個個日志或事件的消息流。這就讓Kafka處理過程延遲更低,更容易支持多數據源和分布式數據處理。比起以日志為中心的 系統比如Scribe或者Flume來說,Kafka提供同樣高效的性能和因為復制導致的更高的耐用性保證,以及更低的端到端延遲
持久性日志:Kafka可以為一種外部的持久性日志的分布式系統提供服務。這種日志可以在節點間備份數據,并為故障節點數據回復提供一種重新同步的機制。Kafka中日志壓縮功能為這種用法提供了條件。在這種用法中,Kafka類似于Apache BookKeeper項目。
Kafka中包含以下基礎概念
1. Topic(話題):Kafka中用于區分不同類別信息的類別名稱。由producer指定
2. Producer(生產者):將消息發布到Kafka特定的Topic的對象(過程)
3. Consumers(消費者):訂閱并處理特定的Topic中的消息的對象(過程)
4. Broker(Kafka服務集群):已發布的消息保存在一組服務器中,稱之為Kafka集群。集群中的每一個服務器都是一個代理(Broker). 消費者可以訂閱一個或多個話題,并從Broker拉數據,從而消費這些已發布的消息。
5. Partition(分區):Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)
6. Message:消息,是通信的基本單位,每個producer可以向一個topic(主題)發布一些消息。
消息
消息由一個固定大小的報頭和可變長度但不透明的字節陣列負載。報頭包含格式版本和CRC32效驗和以檢測損壞或截斷
消息格式
~~~
1. 4 byte CRC32 of the message
2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
bit 0 ~ 2 : Compression codec
0 : no compression
1 : gzip
2 : snappy
3 : lz4
bit 3 : Timestamp type
0 : create time
1 : log append time
bit 4 ~ 7 : reserved
4. (可選) 8 byte timestamp only if "magic" identifier is greater than 0
5. 4 byte key length, containing length K
6. K byte key
7. 4 byte payload length, containing length V
8. V byte payload
~~~
安裝kafka
1)打開config目錄下的server.properties, 修改log.dirs為D:\kafka_logs,修改advertised.host.name=服務器ip
2)啟動kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties
// 報錯:(Error: missing `server' JVM at `C:\Program Files (x86)\Java\jre1.8.0_144\binver\jvm.dll'.
// Please install or use the JRE or JDK that contains these missing components.)
解決方法:將C:\Program Files (x86)\Java\jre1.8.0_144\bin\servertool.exe 改名為:server.exe
Mac啟動:sh bin/kafka-server-start.sh config/server.properties
kafka搭建
kafka環境基于zookeeper,zookeeper環境基于JAVA-JDK。
安裝JAVA-JDK
~~~
從oracle下載最新的SDK安裝。
~~~
安裝zookeeper
下載zookeeper 下載地址:http://apache.fayea.com/zookeeper/
重命名conf/zoo_sample.cfg 為conf/zoo.cfg
編輯 conf/zoo.cfg,修改dataDir=/Users/liupengjie/Desktop/tool/zookeeper-3.4.10/data
(Windows:修改dataDir=D:\zookeeper-3.3.6\data\)
啟動zookeeper: ./bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /Users/liupengjie/Desktop/tool/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
停止ZK服務: ./bin/zkServer.sh stop
重啟ZK服務: ./bin/zkServer.sh restart
查看ZK服務狀態: ./bin/zkServer.sh status
(Windows 啟動:bin/zkServer.cmd)
安裝kafka
下載kafka2.1.2 打開鏈接:http://kafka.apache.org/downloads.html (下載二進制包)
kafka配置
通常情況下需要在解壓縮kafka后,修改config/server.properties 配置文件中的以下項
~~~
log.dirs = kafka-logs
advertised.host.name=192.168.1.125 (本機ip)
# root directory for all kafka znodes.
zookeeper.connect = localhost:9092
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
listeners = PLAINTEXT://ip:9092 (listeners = PLAINTEXT://your.host.name:9092)
/*
log.dirs=/Users/liupengjie/Desktop/tool/kafka_2.11-0.11.0.0/kafka_logs
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
#listeners=PLAINTEXT://:9092
advertised.host.name=192.168.1.125
*/
~~~
log.dirs 指的是kafka的log Data保存的目錄,默認為Null。如果不指定log Data會保存到log.dir設置的目錄中,log.dir默認為/tmp/kafka-logs。需要保證啟動KafKaServer的用戶對log.dirs或log.dir設置的目錄具有讀與寫的權限。
zookeeper.connect 指的是zookeeper集群的地址,可以是多個,多個之間用逗號分割hostname1:port1,hostname2:port2,hostname3:port3
listeners 監聽列表(以逗號分隔 不同的協議(如plaintext,trace,ssl、不同的IP和端口))
啟動kafka
~~~
cd bin
./kafka-server-start.sh ../config/server.properties
(Windows啟動:./bin/windows/kafka-server-start.bat ./config/server.preperties)
/*
Windows啟動報錯:
Error: missing `server' JVM at `C:\Program Files (x86)\Java\jre1.8.0_144\binver\jvm.dll'.
Please install or use the JRE or JDK that contains these missing components.
解決方法:
將C:\Program Files (x86)\Java\jre1.8.0_144\bin\servertool.exe 改名為:server.exe
*/
~~~
kafka鏈接zookeeper
~~~
cd bin
$ kafka-console-consumer.sh --topic nginx_log --zookeeper 127.0.0.1 2181
(Windows 執行:kafka-console-consumer.bat --topic nginx_log --zookeeper 127.0.0.1 2181)
/*
Windows啟動報錯:
Error: missing `server' JVM at `C:\Program Files (x86)\Java\jre1.8.0_144\binver\jvm.dll'.
Please install or use the JRE or JDK that contains these missing components.
解決方法:將C:\Program Files (x86)\Java\jre1.8.0_144\bin\servertool.exe 改名為:server.exe
*/
~~~
查看topic的詳細信息
~~~
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic nginx_log --describe
~~~
kafka消費者客戶端命令
~~~
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic nginx_log --from-beginning
~~~
Kafka常用命令
以下是kafka常用命令行總結:
1.查看topic的詳細信息
./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1
2、為topic增加副本
./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute
3、創建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1
4、為topic增加partition
./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1
5、kafka生產者客戶端命令
./kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1
6、kafka消費者客戶端命令
./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1
7、kafka服務啟動
./kafka-server-start.sh -daemon ../config/server.properties
8、下線broker
./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60
shutdown broker
9、刪除topic
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1
10、查看consumer組內消費的offset
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test --topic testKJ1
- 序言
- 目錄
- 環境搭建
- Linux搭建golang環境
- Windows搭建golang環境
- Mac搭建golang環境
- Go 環境變量
- 編輯器
- vs code
- Mac 安裝vs code
- Windows 安裝vs code
- vim編輯器
- 介紹
- 1.Go語言的主要特征
- 2.golang內置類型和函數
- 3.init函數和main函數
- 4.包
- 1.工作空間
- 2.源文件
- 3.包結構
- 4.文檔
- 5.編寫 Hello World
- 6.Go語言 “ _ ”(下劃線)
- 7.運算符
- 8.命令
- 類型
- 1.變量
- 2.常量
- 3.基本類型
- 1.基本類型介紹
- 2.字符串String
- 3.數組Array
- 4.類型轉換
- 4.引用類型
- 1.引用類型介紹
- 2.切片Slice
- 3.容器Map
- 4.管道Channel
- 5.指針
- 6.自定義類型Struct
- 流程控制
- 1.條件語句(if)
- 2.條件語句 (switch)
- 3.條件語句 (select)
- 4.循環語句 (for)
- 5.循環語句 (range)
- 6.循環控制Goto、Break、Continue
- 函數
- 1.函數定義
- 2.參數
- 3.返回值
- 4.匿名函數
- 5.閉包、遞歸
- 6.延遲調用 (defer)
- 7.異常處理
- 8.單元測試
- 壓力測試
- 方法
- 1.方法定義
- 2.匿名字段
- 3.方法集
- 4.表達式
- 5.自定義error
- 接口
- 1.接口定義
- 2.執行機制
- 3.接口轉換
- 4.接口技巧
- 面向對象特性
- 并發
- 1.并發介紹
- 2.Goroutine
- 3.Chan
- 4.WaitGroup
- 5.Context
- 應用
- 反射reflection
- 1.獲取基本類型
- 2.獲取結構體
- 3.Elem反射操作基本類型
- 4.反射調用結構體方法
- 5.Elem反射操作結構體
- 6.Elem反射獲取tag
- 7.應用
- json協議
- 1.結構體轉json
- 2.map轉json
- 3.int轉json
- 4.slice轉json
- 5.json反序列化為結構體
- 6.json反序列化為map
- 終端讀取
- 1.鍵盤(控制臺)輸入fmt
- 2.命令行參數os.Args
- 3.命令行參數flag
- 文件操作
- 1.文件創建
- 2.文件寫入
- 3.文件讀取
- 4.文件刪除
- 5.壓縮文件讀寫
- 6.判斷文件或文件夾是否存在
- 7.從一個文件拷貝到另一個文件
- 8.寫入內容到Excel
- 9.日志(log)文件
- server服務
- 1.服務端
- 2.客戶端
- 3.tcp獲取網頁數據
- 4.http初識-瀏覽器訪問服務器
- 5.客戶端訪問服務器
- 6.訪問延遲處理
- 7.form表單提交
- web模板
- 1.渲染終端
- 2.渲染瀏覽器
- 3.渲染存儲文件
- 4.自定義io.Writer渲染
- 5.模板語法
- 時間處理
- 1.格式化
- 2.運行時間
- 3.定時器
- 鎖機制
- 互斥鎖
- 讀寫鎖
- 性能比較
- sync.Map
- 原子操作
- 1.原子增(減)值
- 2.比較并交換
- 3.導入、導出、交換
- 加密解密
- 1.md5
- 2.base64
- 3.sha
- 4.hmac
- 常用算法
- 1.冒泡排序
- 2.選擇排序
- 3.快速排序
- 4.插入排序
- 5.睡眠排序
- 限流器
- 日志包
- 日志框架logrus
- 隨機數驗證碼
- 生成指定位數的隨機數
- 生成圖形驗證碼
- 編碼格式轉換
- UTF-8與GBK
- 解決中文亂碼
- 設計模式
- 創建型模式
- 單例模式
- singleton.go
- singleton_test.go
- 抽象工廠模式
- abstractfactory.go
- abstractfactory_test.go
- 工廠方法模式
- factorymethod.go
- factorymethod_test.go
- 原型模式
- prototype.go
- prototype_test.go
- 生成器模式
- builder.go
- builder_test.go
- 結構型模式
- 適配器模式
- adapter.go
- adapter_test.go
- 橋接模式
- bridge.go
- bridge_test.go
- 合成/組合模式
- composite.go
- composite_test.go
- 裝飾模式
- decoretor.go
- decorator_test.go
- 外觀模式
- facade.go
- facade_test.go
- 享元模式
- flyweight.go
- flyweight_test.go
- 代理模式
- proxy.go
- proxy_test.go
- 行為型模式
- 職責鏈模式
- chainofresponsibility.go
- chainofresponsibility_test.go
- 命令模式
- command.go
- command_test.go
- 解釋器模式
- interpreter.go
- interperter_test.go
- 迭代器模式
- iterator.go
- iterator_test.go
- 中介者模式
- mediator.go
- mediator_test.go
- 備忘錄模式
- memento.go
- memento_test.go
- 觀察者模式
- observer.go
- observer_test.go
- 狀態模式
- state.go
- state_test.go
- 策略模式
- strategy.go
- strategy_test.go
- 模板模式
- templatemethod.go
- templatemethod_test.go
- 訪問者模式
- visitor.go
- visitor_test.go
- 數據庫操作
- golang操作MySQL
- 1.mysql使用
- 2.insert操作
- 3.select 操作
- 4.update 操作
- 5.delete 操作
- 6.MySQL事務
- golang操作Redis
- 1.redis介紹
- 2.golang鏈接redis
- 3.String類型 Set、Get操作
- 4.String 批量操作
- 5.設置過期時間
- 6.list隊列操作
- 7.Hash表
- 8.Redis連接池
- 其它Redis包
- go-redis/redis包
- 安裝介紹
- String 操作
- List操作
- Set操作
- Hash操作
- golang操作ETCD
- 1.etcd介紹
- 2.鏈接etcd
- 3.etcd存取
- 4.etcd監聽Watch
- golang操作kafka
- 1.kafka介紹
- 2.寫入kafka
- 3.kafka消費
- golang操作ElasticSearch
- 1.ElasticSearch介紹
- 2.kibana介紹
- 3.寫入ElasticSearch
- NSQ
- 安裝
- 生產者
- 消費者
- zookeeper
- 基本操作測試
- 簡單的分布式server
- Zookeeper命令行使用
- GORM
- gorm介紹
- gorm查詢
- gorm更新
- gorm刪除
- gorm錯誤處理
- gorm事務
- sql構建
- gorm 用法介紹
- Go操作memcached
- beego框架
- 1.beego框架環境搭建
- 2.參數配置
- 1.默認參數
- 2.自定義配置
- 3.config包使用
- 3.路由設置
- 1.自動匹配
- 2.固定路由
- 3.正則路由
- 4.注解路由
- 5.namespace
- 4.多種數據格式輸出
- 1.直接輸出字符串
- 2.模板數據輸出
- 3.json格式數據輸出
- 4.xml格式數據輸出
- 5.jsonp調用
- 5.模板處理
- 1.模板語法
- 2.基本函數
- 3.模板函數
- 6.請求處理
- 1.GET請求
- 2.POST請求
- 3.文件上傳
- 7.表單驗證
- 1.表單驗證
- 2.定制錯誤信息
- 3.struct tag 驗證
- 4.XSRF過濾
- 8.靜態文件處理
- 1.layout設計
- 9.日志處理
- 1.日志處理
- 2.logs 模塊
- 10.會話控制
- 1.會話控制
- 2.session 包使用
- 11.ORM 使用
- 1.鏈接數據庫
- 2. CRUD 操作
- 3.原生 SQL 操作
- 4.構造查詢
- 5.事務處理
- 6.自動建表
- 12.beego 驗證碼
- 1.驗證碼插件
- 2.驗證碼使用
- beego admin
- 1.admin安裝
- 2.admin開發
- beego 熱升級
- beego實現https
- gin框架
- 安裝使用
- 路由設置
- 模板處理
- 文件上傳
- gin框架中文文檔
- gin錯誤總結
- 項目
- 秒殺項目
- 日志收集
- 面試題
- 面試題一
- 面試題二
- 錯題集
- Go語言陷阱和常見錯誤
- 常見語法錯誤
- 初級
- 中級
- 高級
- Go高級應用
- goim
- goim 啟動流程
- goim 工作流程
- goim 結構體
- gopush
- gopush工作流程
- gopush啟動流程
- gopush業務流程
- gopush應用
- gopush新添功能
- gopush壓力測試
- 壓測注意事項
- rpc
- HTTP RPC
- TCP RPC
- JSON RPC
- 常見RPC開源框架
- pprof
- pprof介紹
- pprof應用
- 使用pprof及Go 程序的性能優化
- 封裝 websocket
- cgo
- Golang GC
- 查看程序運行過程中的GC信息
- 定位gc問題所在
- Go語言 demo
- 用Go語言計算一個人的年齡,生肖,星座
- 超簡易Go語言實現的留言板代碼
- 信號處理模塊,可用于在線加載配置,配置動態加載的信號為SIGHUP
- 陽歷和陰歷相互轉化的工具類 golang版本
- 錯誤總結
- 網絡編程
- 網絡編程http
- 網絡編程tcp
- Http請求
- Go語言必知的90個知識點
- 第三方庫應用
- cli應用
- Cobra
- 圖表庫
- go-echarts
- 開源IM
- im_service
- 機器學習庫
- Tensorflow
- 生成二維碼
- skip2/go-qrcode生成二維碼
- boombuler/barcode生成二維碼
- tuotoo/qrcode識別二維碼
- 日志庫
- 定時任務
- robfig/cron
- jasonlvhit/gocron
- 拼多多開放平臺 SDK
- Go編譯
- 跨平臺交叉編譯
- 一問一答
- 一問一答(一)
- 為什么 Go 標準庫中有些函數只有簽名,沒有函數體?
- Go開發的應用
- etcd
- k8s
- Caddy
- nsq
- Docker
- web框架