goim 工作流程

其中 comet服務 在最前面對外提供服務,提供 tcp 和 websocket 兩種服務。
接受到消息之后 交給 logic 模塊處理。logic 模塊在調用 router 查詢路由信息。將發送的消息交給 kafka,再由 job 服務讀取kafka的消息,推給 comet服務。
拆分成這么多模塊的好處是可以多實例部署。其中 在 comet,router,logic 模塊都提供了性能監控 net/http/pprof
同時還提供了monitor 服務,服務啟動之后訪問 xxx:port/monitor/ping
返回 ok 說明服務啟動正常,方便運維檢查系統。
**各模塊功能分析**
**comet 模塊**
comet 模塊是在最前端,主要負責和client的鏈接保持,同時接受,發送消息,通知到客戶端。檢查鏈接是否斷開。
comet 屬于接入層,非常容易擴展,直接開啟多個comet節點,修改配置文件中的base節點下的server.id修改成不同值(注意一定要保證不同的comet進程值唯一),前端接入可以使用LVS 或者 DNS來轉發。
**logic 模塊**
logic 模塊是comet 模塊調用的,接受 comet 模塊的命令,
然后進行處理,再發送的消息的kafka隊列上,同時鏈接 router 模塊,記錄用戶的 uid server room 等信息。同時獲得router模塊的信息。
logic 屬于無狀態的邏輯層,可以隨意增加節點,使用nginx upstream來擴展http接口,內部rpc部分,可以使用LVS四層轉發。
**router 模塊**
router 主要記錄了用戶的session信息。存儲在 Bucket 對象里面。
**job 模塊**
job服務每秒都在同步 comet的信息,然后再讀取 kafka隊列的信息。
push 到相關的comet服務器上。用戶就接受到了消息。
job 根據kafka的partition來擴展多job工作方式,具體可以參考下kafka的partition負載。
**新添加功能**
1、用戶通過websocket 發消息,comet模塊使用 rpc 調用logic 將消息發送到kafka,job消費kafka的消息推送給用戶。
2、將 kafka 消息隊列改成 nsq 消息隊列。kafka 與 nsq 對比
kafka消息會固化,存文件,nsq默認是不保存的
kafka消息因為固化下來,所以是保序的,nsq傳遞時候通常是無序的,當然你也可以保留下信息去check時間戳,因此nsq更適合處理數據量大但是彼此間沒有順序關系的消息。
nsq支持延時消息的投遞,比如我想這條消息5分鐘之后才被投遞出去被客戶端消費,較于普通的消息投遞,多了個毫秒數。延時消息可用于以下場景,比如一個訂單超過30分鐘未付款,修改其狀態 或者給客戶發短信提醒。
3、router掛掉后如何處理?
完成把客戶端的心跳消息,也通過logic,都發到router。如果router掛掉,可以根據這些心跳重建在線用戶map。
心跳消息發給router,還有一個好處是,能清理太久沒有下線的用戶臟數據。

**op狀態**
握手:op = 0
握手返回: op = 1
心跳:op = 2
心跳返回: op = 3
發消息:op = 4
消息返回:op = 5
授權用戶: op = 7
返回: op = 8
OP_PROTO_READY // 消息推送
OP_PROTO_FINISH // 關閉退出
websocket.go
**客戶端鏈接邏輯**
~~~
go acceptWebsocket 監聽客戶端鏈接
go serveWebsocket(server, conn, r)
lAddr := conn.LocalAddr().String() 獲取服務端ip
rAddr := conn.RemoteAddr().String() 獲取客戶端ip
server.serveWebsocket(conn, rp, wp, tr)
ch.Reader.ResetBuffer(conn, rb.Bytes()) 讀緩沖,重置IO,可讀游標重置為0,且b.buf變為buf
tr.Add 添加一個時鐘對象
websocket.ReadRequest(rr) websocket,讀取請求
wp.Get() 獲取一個寫緩沖區
ch.Writer.ResetBuffer(conn, wb.Bytes()) 重置IO,可寫游標重置為0,句柄變為w,緩沖區變為buf
websocket.Upgrade(conn, rr, wr, req) 交換協議
ch.CliProto.Set() 獲取一個proto對象的引用,用于寫,不會移動可寫游標
key, roomId, hb, err = server.authWebsocket(ws, p) 握手->注冊,返回 subkey,roomId,heartbeat
握手->注冊邏輯
server.operator.Connect(p);
PRC -> 調用 logic RPC.Connect 通過 token 獲取 用戶 userId、roomId
rpc -> 調用 RouterRPC.Put 通過 UserId、ServerId、RoomId 存 session
server.Bucket(key) 使用subkey根據CityHash32算法,分配到對應的Bucket
b.Put(key, roomId, ch) 分配到chs和rooms
tr.Set(trd, hb) 設置更新定時器數據。
DefaultWhitelist.Contains(key) 判斷是否是白名單用戶
server.Stat.IncrWsOnline() 增加 websocket online 統計
~~~
**客戶端鏈接邏輯流程圖**

**收發消息邏輯**
~~~
參數:
conn net.Conn 鏈接
rp, wp *bytes.Pool 讀寫緩沖池
Pool內存組織如下,Pool是一個鏈式存儲的棧,數據從棧頂出,同時數據也從棧頂回收。
tr *itime.Timer 最小堆算法實現的時鐘對象
協程 go server.serveWebsocket() 處理讀消息(接收心跳消息、接收用戶發消息)
for {
ch.CliProto.Set() 從 Ring 上,獲取一個proto對象的引用,用于寫,不會移動可寫游標
p.ReadWebsocket(ws) Read Websocket 讀websocket發來的消息
判斷是 心跳檢測 or 發消息操作
p.Operation == define.OP_HEARTBEAT 心跳檢測
p.Operation = define.OP_HEARTBEAT_REPLY 心跳回應
else 發消息操作
err = server.operator.Operate(p)
p.Operation = define.OP_SEND_SMS_REPLY 發消息回應
ch.CliProto.SetAdv() 移動 Ring 的可寫游標
ch.Signal() 將消息 寫入 c.signal chan *proto.Proto
}
協程 go server.dispatchWebsocket() 處理寫消息(心跳回應、推送消息)
for {
ch.Ready() 消費 c.signal chan *proto.Proto 的消息
switch p 判斷消息類型(ProtoReady消息回應、ProtoFinish頁面關閉、default 推送消息)
ProtoReady->自己發的消息
p, err = ch.CliProto.Get() 從 Ring 上 獲取一個proto對象的引用,用于讀,不會移動可讀游標
p.WriteWebsocket(ws) 消息推送(發消息返回結果)
ch.CliProto.GetAdv() 移動可讀游標
ProtoFinish頁面關閉
-->走斷開鏈接邏輯
default push推送消息
err = p.WriteWebsocket(ws) 消息推送
ws.Flush() 釋放寫緩沖池
}
~~~
**收發消息流程圖**

**斷開鏈接邏輯**
~~~
err = p.ReadWebsocket(ws); err != nil 瀏覽器斷開鏈接
ws.ReadMessage(); err != nil
err = ErrMessageClose
b.Del(key) 根據subkey 刪除 chs和rooms 上的Channel和room
tr.Del(trd) 刪除時間計數
ws.Close() 關閉websocket鏈接
ch.Close() 發送關閉channel信號 "proto.ProtoFinish"
rp.Put(rb) 歸還一個緩沖區
server.operator.Disconnect(key, roomId); 等待注銷結果
注銷過程
server.operator.Disconnect(key, roomId);
rpc -> 調用 logic RPC.Disconnect 根據 subKey 返回UserId、seq
rpc -> 調用 RouterRPC.Del 根據 UserId、Seq、RoomId 刪除 session
p = ch.Ready() (等待 return <-c.signal )接受到 "proto.ProtoFinish" 檢測到瀏覽器斷開
switch p -> proto.ProtoFinish: 進行退出操作
ws.Close() 關閉鏈接
wp.Put(wb) 歸還一個緩沖區
dispatch goroutine exit 退出協程 go server.dispatchWebsocket()
注銷結果返回
server tcp goroutine exit 退出協程 go server.serveWebsocket()
server.Stat.DecrWsOnline() 減少 websocket online 統計
~~~
**斷開鏈接流程圖**

**幾個重要的結構體**
做為典型代碼即注釋的開源項目,goim 基本無太多閱讀障礙,幾個邏輯點梳理下很快就會明白。
Bucket: 每個 Comet 程序擁有若干個 Bucket, 可以理解為 Session Management, 保存著當前 Comet 服務于哪些 Room 和 Channel. 長連接具體分布在哪個 Bucket 上呢?根據 SubKey 一致性 Hash 來選擇。
Room: 可以理解為房間,群組或是一個 Group. 這個房間內維護 N 個 Channel, 即長連接用戶。在該 Room 內廣播消息,會發送給房間內的所有 Channel.
Channel: 維護一個長連接用戶,只能對應一個 Room. 推送的消息可以在 Room 內廣播,也可以推送到指定的 Channel.
Proto: 消息結構體,存放版本號,操作類型,消息序號和消息體。
**job模塊消費kafka邏輯**
pushRoutines []chan *proto.MPushMsgArg 多播
broadcastRoutines []chan *proto.BoardcastArg 廣播
roomRoutines []chan *proto.BoardcastRoomArg 房播
~~~
消息從kafka集群消費,經過kafka模塊轉發至push模塊,push模塊對消息預處理/過濾/分類,然后發至不同的 comet 信道中
kafka
-> push(msg.Value) 消費kafka消息
push
-> push() 判斷消息類型,多播、廣播、房播
多播:define.KAFKA_MESSAGE_MULTI:
-> go processPush(pushChs[i]) 根據啟動的 go 協程取模分配
-> comet.mPushComet(serverId int32, subKeys []string, body json.RawMessage) 消費 pushChs []chan *pushArg chan 的消息
-> Push(arg *proto.MPushMsgArg) 將消息推到 pushRoutines []chan *proto.MPushMsgArg chan 中
廣播:define.KAFKA_MESSAGE_BROADCAST:
-> broadcast(m.Msg) 調用所有的 *Comet
-> c.Broadcast(&args) 將消息推到 broadcastRoutines []chan *proto.BoardcastArg chan 中
房播:define.KAFKA_MESSAGE_BROADCAST_ROOM:
-> roomBucket.Get(int32(m.RoomId)) 獲取房間信息
-> NewRoom(roomId, b.round.Timer(b.roomNum), b.options)
-> go r.pushproc(t, options.BatchNum, options.SignalTime, options.IdleTime)
-> broadcastRoomBytes(r.id, buf.Buffer()) 根據roomid獲取 *Comet
-> c.BroadcastRoom(&args) 將消息推到 roomRoutines []chan *proto.BoardcastRoomArg chan 中(根據消息總數與options.RoutineSize取模)
-> room.Push(0, define.OP_SEND_SMS_REPLY, m.Msg) 將消息保存到room.proto chan *proto.Proto chan 中
main.InitComet(addrs map[int32]string, options CometOptions) 初始化 comet
-> go c.process(pushChan, roomChan, broadcastChan) 通過 rpc 調用 comet,作為pushChan,roomChan,broadcastChan信道的消費者
~~~
**job模塊消費kafka邏輯流程圖**

**logic 模塊 push http推送接口邏輯**
~~~
http接口-->kafka生產者邏輯
單人推送:push
-> r.URL.Query().Get("uid") 獲取userId
-> ioutil.ReadAll(r.Body) 獲取body
-> genSubKey(userId) 通過用戶ID生成subKeys;同一個用戶可以同時多處登陸或者同處多實例登陸,它們都會被同等對待
-> rpc 到 router 模塊 RouterRPC.Get 獲取指定用戶的session信息
-> mpushKafka 根據serverId、subkeys、Msg ,push 到 kafka
-> OP: define.KAFKA_MESSAGE_MULTI 單播
-> producer.Input()
多人推送:Pushs
-> ioutil.ReadAll(r.Body) 獲取body
-> parsePushsBody(body []byte) 將body轉成userIds、Msg
-> genSubKeys(userIds) 并行獲取多個用戶信息,返回值為map[comet.serverId][]subkey.
-> mpushKafka 根據serverId、subkeys、Msg ,push 到 kafka
-> OP: define.KAFKA_MESSAGE_MULTI 單播
-> producer.Input()
房間推送:PushRoom
-> ioutil.ReadAll(r.Body) 獲取body
-> param.Get("rid") 獲取roomId
-> strconv.ParseBool(param.Get("ensure")) 獲取ensure,是否強推
-> broadcastRoomKafka 根據roomId、Msg、ensure ,push 到 kafka
-> OP: define.KAFKA_MESSAGE_BROADCAST_ROOM 房間內廣播
-> producer.Input()
廣播:PushAll
-> ioutil.ReadAll(r.Body) 獲取body
-> mpushKafka 根據Msg ,push 到 kafka
-> OP: define.KAFKA_MESSAGE_BROADCAST 廣播
-> producer.Input()
~~~
**logic 模塊 push http推送接口邏輯流程圖**

- 序言
- 目錄
- 環境搭建
- Linux搭建golang環境
- Windows搭建golang環境
- Mac搭建golang環境
- Go 環境變量
- 編輯器
- vs code
- Mac 安裝vs code
- Windows 安裝vs code
- vim編輯器
- 介紹
- 1.Go語言的主要特征
- 2.golang內置類型和函數
- 3.init函數和main函數
- 4.包
- 1.工作空間
- 2.源文件
- 3.包結構
- 4.文檔
- 5.編寫 Hello World
- 6.Go語言 “ _ ”(下劃線)
- 7.運算符
- 8.命令
- 類型
- 1.變量
- 2.常量
- 3.基本類型
- 1.基本類型介紹
- 2.字符串String
- 3.數組Array
- 4.類型轉換
- 4.引用類型
- 1.引用類型介紹
- 2.切片Slice
- 3.容器Map
- 4.管道Channel
- 5.指針
- 6.自定義類型Struct
- 流程控制
- 1.條件語句(if)
- 2.條件語句 (switch)
- 3.條件語句 (select)
- 4.循環語句 (for)
- 5.循環語句 (range)
- 6.循環控制Goto、Break、Continue
- 函數
- 1.函數定義
- 2.參數
- 3.返回值
- 4.匿名函數
- 5.閉包、遞歸
- 6.延遲調用 (defer)
- 7.異常處理
- 8.單元測試
- 壓力測試
- 方法
- 1.方法定義
- 2.匿名字段
- 3.方法集
- 4.表達式
- 5.自定義error
- 接口
- 1.接口定義
- 2.執行機制
- 3.接口轉換
- 4.接口技巧
- 面向對象特性
- 并發
- 1.并發介紹
- 2.Goroutine
- 3.Chan
- 4.WaitGroup
- 5.Context
- 應用
- 反射reflection
- 1.獲取基本類型
- 2.獲取結構體
- 3.Elem反射操作基本類型
- 4.反射調用結構體方法
- 5.Elem反射操作結構體
- 6.Elem反射獲取tag
- 7.應用
- json協議
- 1.結構體轉json
- 2.map轉json
- 3.int轉json
- 4.slice轉json
- 5.json反序列化為結構體
- 6.json反序列化為map
- 終端讀取
- 1.鍵盤(控制臺)輸入fmt
- 2.命令行參數os.Args
- 3.命令行參數flag
- 文件操作
- 1.文件創建
- 2.文件寫入
- 3.文件讀取
- 4.文件刪除
- 5.壓縮文件讀寫
- 6.判斷文件或文件夾是否存在
- 7.從一個文件拷貝到另一個文件
- 8.寫入內容到Excel
- 9.日志(log)文件
- server服務
- 1.服務端
- 2.客戶端
- 3.tcp獲取網頁數據
- 4.http初識-瀏覽器訪問服務器
- 5.客戶端訪問服務器
- 6.訪問延遲處理
- 7.form表單提交
- web模板
- 1.渲染終端
- 2.渲染瀏覽器
- 3.渲染存儲文件
- 4.自定義io.Writer渲染
- 5.模板語法
- 時間處理
- 1.格式化
- 2.運行時間
- 3.定時器
- 鎖機制
- 互斥鎖
- 讀寫鎖
- 性能比較
- sync.Map
- 原子操作
- 1.原子增(減)值
- 2.比較并交換
- 3.導入、導出、交換
- 加密解密
- 1.md5
- 2.base64
- 3.sha
- 4.hmac
- 常用算法
- 1.冒泡排序
- 2.選擇排序
- 3.快速排序
- 4.插入排序
- 5.睡眠排序
- 限流器
- 日志包
- 日志框架logrus
- 隨機數驗證碼
- 生成指定位數的隨機數
- 生成圖形驗證碼
- 編碼格式轉換
- UTF-8與GBK
- 解決中文亂碼
- 設計模式
- 創建型模式
- 單例模式
- singleton.go
- singleton_test.go
- 抽象工廠模式
- abstractfactory.go
- abstractfactory_test.go
- 工廠方法模式
- factorymethod.go
- factorymethod_test.go
- 原型模式
- prototype.go
- prototype_test.go
- 生成器模式
- builder.go
- builder_test.go
- 結構型模式
- 適配器模式
- adapter.go
- adapter_test.go
- 橋接模式
- bridge.go
- bridge_test.go
- 合成/組合模式
- composite.go
- composite_test.go
- 裝飾模式
- decoretor.go
- decorator_test.go
- 外觀模式
- facade.go
- facade_test.go
- 享元模式
- flyweight.go
- flyweight_test.go
- 代理模式
- proxy.go
- proxy_test.go
- 行為型模式
- 職責鏈模式
- chainofresponsibility.go
- chainofresponsibility_test.go
- 命令模式
- command.go
- command_test.go
- 解釋器模式
- interpreter.go
- interperter_test.go
- 迭代器模式
- iterator.go
- iterator_test.go
- 中介者模式
- mediator.go
- mediator_test.go
- 備忘錄模式
- memento.go
- memento_test.go
- 觀察者模式
- observer.go
- observer_test.go
- 狀態模式
- state.go
- state_test.go
- 策略模式
- strategy.go
- strategy_test.go
- 模板模式
- templatemethod.go
- templatemethod_test.go
- 訪問者模式
- visitor.go
- visitor_test.go
- 數據庫操作
- golang操作MySQL
- 1.mysql使用
- 2.insert操作
- 3.select 操作
- 4.update 操作
- 5.delete 操作
- 6.MySQL事務
- golang操作Redis
- 1.redis介紹
- 2.golang鏈接redis
- 3.String類型 Set、Get操作
- 4.String 批量操作
- 5.設置過期時間
- 6.list隊列操作
- 7.Hash表
- 8.Redis連接池
- 其它Redis包
- go-redis/redis包
- 安裝介紹
- String 操作
- List操作
- Set操作
- Hash操作
- golang操作ETCD
- 1.etcd介紹
- 2.鏈接etcd
- 3.etcd存取
- 4.etcd監聽Watch
- golang操作kafka
- 1.kafka介紹
- 2.寫入kafka
- 3.kafka消費
- golang操作ElasticSearch
- 1.ElasticSearch介紹
- 2.kibana介紹
- 3.寫入ElasticSearch
- NSQ
- 安裝
- 生產者
- 消費者
- zookeeper
- 基本操作測試
- 簡單的分布式server
- Zookeeper命令行使用
- GORM
- gorm介紹
- gorm查詢
- gorm更新
- gorm刪除
- gorm錯誤處理
- gorm事務
- sql構建
- gorm 用法介紹
- Go操作memcached
- beego框架
- 1.beego框架環境搭建
- 2.參數配置
- 1.默認參數
- 2.自定義配置
- 3.config包使用
- 3.路由設置
- 1.自動匹配
- 2.固定路由
- 3.正則路由
- 4.注解路由
- 5.namespace
- 4.多種數據格式輸出
- 1.直接輸出字符串
- 2.模板數據輸出
- 3.json格式數據輸出
- 4.xml格式數據輸出
- 5.jsonp調用
- 5.模板處理
- 1.模板語法
- 2.基本函數
- 3.模板函數
- 6.請求處理
- 1.GET請求
- 2.POST請求
- 3.文件上傳
- 7.表單驗證
- 1.表單驗證
- 2.定制錯誤信息
- 3.struct tag 驗證
- 4.XSRF過濾
- 8.靜態文件處理
- 1.layout設計
- 9.日志處理
- 1.日志處理
- 2.logs 模塊
- 10.會話控制
- 1.會話控制
- 2.session 包使用
- 11.ORM 使用
- 1.鏈接數據庫
- 2. CRUD 操作
- 3.原生 SQL 操作
- 4.構造查詢
- 5.事務處理
- 6.自動建表
- 12.beego 驗證碼
- 1.驗證碼插件
- 2.驗證碼使用
- beego admin
- 1.admin安裝
- 2.admin開發
- beego 熱升級
- beego實現https
- gin框架
- 安裝使用
- 路由設置
- 模板處理
- 文件上傳
- gin框架中文文檔
- gin錯誤總結
- 項目
- 秒殺項目
- 日志收集
- 面試題
- 面試題一
- 面試題二
- 錯題集
- Go語言陷阱和常見錯誤
- 常見語法錯誤
- 初級
- 中級
- 高級
- Go高級應用
- goim
- goim 啟動流程
- goim 工作流程
- goim 結構體
- gopush
- gopush工作流程
- gopush啟動流程
- gopush業務流程
- gopush應用
- gopush新添功能
- gopush壓力測試
- 壓測注意事項
- rpc
- HTTP RPC
- TCP RPC
- JSON RPC
- 常見RPC開源框架
- pprof
- pprof介紹
- pprof應用
- 使用pprof及Go 程序的性能優化
- 封裝 websocket
- cgo
- Golang GC
- 查看程序運行過程中的GC信息
- 定位gc問題所在
- Go語言 demo
- 用Go語言計算一個人的年齡,生肖,星座
- 超簡易Go語言實現的留言板代碼
- 信號處理模塊,可用于在線加載配置,配置動態加載的信號為SIGHUP
- 陽歷和陰歷相互轉化的工具類 golang版本
- 錯誤總結
- 網絡編程
- 網絡編程http
- 網絡編程tcp
- Http請求
- Go語言必知的90個知識點
- 第三方庫應用
- cli應用
- Cobra
- 圖表庫
- go-echarts
- 開源IM
- im_service
- 機器學習庫
- Tensorflow
- 生成二維碼
- skip2/go-qrcode生成二維碼
- boombuler/barcode生成二維碼
- tuotoo/qrcode識別二維碼
- 日志庫
- 定時任務
- robfig/cron
- jasonlvhit/gocron
- 拼多多開放平臺 SDK
- Go編譯
- 跨平臺交叉編譯
- 一問一答
- 一問一答(一)
- 為什么 Go 標準庫中有些函數只有簽名,沒有函數體?
- Go開發的應用
- etcd
- k8s
- Caddy
- nsq
- Docker
- web框架