<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                goim 工作流程 ![](https://box.kancloud.cn/7c8590b2972e8cf89b6bcb1f4c30330c_1457x1173.png) 其中 comet服務 在最前面對外提供服務,提供 tcp 和 websocket 兩種服務。 接受到消息之后 交給 logic 模塊處理。logic 模塊在調用 router 查詢路由信息。將發送的消息交給 kafka,再由 job 服務讀取kafka的消息,推給 comet服務。 拆分成這么多模塊的好處是可以多實例部署。其中 在 comet,router,logic 模塊都提供了性能監控 net/http/pprof 同時還提供了monitor 服務,服務啟動之后訪問 xxx:port/monitor/ping 返回 ok 說明服務啟動正常,方便運維檢查系統。 **各模塊功能分析** **comet 模塊** comet 模塊是在最前端,主要負責和client的鏈接保持,同時接受,發送消息,通知到客戶端。檢查鏈接是否斷開。 comet 屬于接入層,非常容易擴展,直接開啟多個comet節點,修改配置文件中的base節點下的server.id修改成不同值(注意一定要保證不同的comet進程值唯一),前端接入可以使用LVS 或者 DNS來轉發。 **logic 模塊** logic 模塊是comet 模塊調用的,接受 comet 模塊的命令, 然后進行處理,再發送的消息的kafka隊列上,同時鏈接 router 模塊,記錄用戶的 uid server room 等信息。同時獲得router模塊的信息。 logic 屬于無狀態的邏輯層,可以隨意增加節點,使用nginx upstream來擴展http接口,內部rpc部分,可以使用LVS四層轉發。 **router 模塊** router 主要記錄了用戶的session信息。存儲在 Bucket 對象里面。 **job 模塊** job服務每秒都在同步 comet的信息,然后再讀取 kafka隊列的信息。 push 到相關的comet服務器上。用戶就接受到了消息。 job 根據kafka的partition來擴展多job工作方式,具體可以參考下kafka的partition負載。 **新添加功能** 1、用戶通過websocket 發消息,comet模塊使用 rpc 調用logic 將消息發送到kafka,job消費kafka的消息推送給用戶。 2、將 kafka 消息隊列改成 nsq 消息隊列。kafka 與 nsq 對比 kafka消息會固化,存文件,nsq默認是不保存的 kafka消息因為固化下來,所以是保序的,nsq傳遞時候通常是無序的,當然你也可以保留下信息去check時間戳,因此nsq更適合處理數據量大但是彼此間沒有順序關系的消息。 nsq支持延時消息的投遞,比如我想這條消息5分鐘之后才被投遞出去被客戶端消費,較于普通的消息投遞,多了個毫秒數。延時消息可用于以下場景,比如一個訂單超過30分鐘未付款,修改其狀態 或者給客戶發短信提醒。 3、router掛掉后如何處理? 完成把客戶端的心跳消息,也通過logic,都發到router。如果router掛掉,可以根據這些心跳重建在線用戶map。 心跳消息發給router,還有一個好處是,能清理太久沒有下線的用戶臟數據。 ![](https://box.kancloud.cn/b40b732fab910e2c9607138022004439_1413x984.png) **op狀態** 握手:op = 0 握手返回: op = 1 心跳:op = 2 心跳返回: op = 3 發消息:op = 4 消息返回:op = 5 授權用戶: op = 7 返回: op = 8 OP_PROTO_READY // 消息推送 OP_PROTO_FINISH // 關閉退出 websocket.go **客戶端鏈接邏輯** ~~~ go acceptWebsocket 監聽客戶端鏈接 go serveWebsocket(server, conn, r) lAddr := conn.LocalAddr().String() 獲取服務端ip rAddr := conn.RemoteAddr().String() 獲取客戶端ip server.serveWebsocket(conn, rp, wp, tr) ch.Reader.ResetBuffer(conn, rb.Bytes()) 讀緩沖,重置IO,可讀游標重置為0,且b.buf變為buf tr.Add 添加一個時鐘對象 websocket.ReadRequest(rr) websocket,讀取請求 wp.Get() 獲取一個寫緩沖區 ch.Writer.ResetBuffer(conn, wb.Bytes()) 重置IO,可寫游標重置為0,句柄變為w,緩沖區變為buf websocket.Upgrade(conn, rr, wr, req) 交換協議 ch.CliProto.Set() 獲取一個proto對象的引用,用于寫,不會移動可寫游標 key, roomId, hb, err = server.authWebsocket(ws, p) 握手->注冊,返回 subkey,roomId,heartbeat 握手->注冊邏輯 server.operator.Connect(p); PRC -> 調用 logic RPC.Connect 通過 token 獲取 用戶 userId、roomId rpc -> 調用 RouterRPC.Put 通過 UserId、ServerId、RoomId 存 session server.Bucket(key) 使用subkey根據CityHash32算法,分配到對應的Bucket b.Put(key, roomId, ch) 分配到chs和rooms tr.Set(trd, hb) 設置更新定時器數據。 DefaultWhitelist.Contains(key) 判斷是否是白名單用戶 server.Stat.IncrWsOnline() 增加 websocket online 統計 ~~~ **客戶端鏈接邏輯流程圖** ![](https://box.kancloud.cn/2be435084ad10ba3fda13b654ddfda5a_1225x1272.png) **收發消息邏輯** ~~~ 參數: conn net.Conn 鏈接 rp, wp *bytes.Pool 讀寫緩沖池 Pool內存組織如下,Pool是一個鏈式存儲的棧,數據從棧頂出,同時數據也從棧頂回收。 tr *itime.Timer 最小堆算法實現的時鐘對象 協程 go server.serveWebsocket() 處理讀消息(接收心跳消息、接收用戶發消息) for { ch.CliProto.Set() 從 Ring 上,獲取一個proto對象的引用,用于寫,不會移動可寫游標 p.ReadWebsocket(ws) Read Websocket 讀websocket發來的消息 判斷是 心跳檢測 or 發消息操作 p.Operation == define.OP_HEARTBEAT 心跳檢測 p.Operation = define.OP_HEARTBEAT_REPLY 心跳回應 else 發消息操作 err = server.operator.Operate(p) p.Operation = define.OP_SEND_SMS_REPLY 發消息回應 ch.CliProto.SetAdv() 移動 Ring 的可寫游標 ch.Signal() 將消息 寫入 c.signal chan *proto.Proto } 協程 go server.dispatchWebsocket() 處理寫消息(心跳回應、推送消息) for { ch.Ready() 消費 c.signal chan *proto.Proto 的消息 switch p 判斷消息類型(ProtoReady消息回應、ProtoFinish頁面關閉、default 推送消息) ProtoReady->自己發的消息 p, err = ch.CliProto.Get() 從 Ring 上 獲取一個proto對象的引用,用于讀,不會移動可讀游標 p.WriteWebsocket(ws) 消息推送(發消息返回結果) ch.CliProto.GetAdv() 移動可讀游標 ProtoFinish頁面關閉 -->走斷開鏈接邏輯 default push推送消息 err = p.WriteWebsocket(ws) 消息推送 ws.Flush() 釋放寫緩沖池 } ~~~ **收發消息流程圖** ![](https://box.kancloud.cn/46b499792e47adbca7b1839e0e57fa63_1432x1596.png) **斷開鏈接邏輯** ~~~ err = p.ReadWebsocket(ws); err != nil 瀏覽器斷開鏈接 ws.ReadMessage(); err != nil err = ErrMessageClose b.Del(key) 根據subkey 刪除 chs和rooms 上的Channel和room tr.Del(trd) 刪除時間計數 ws.Close() 關閉websocket鏈接 ch.Close() 發送關閉channel信號 "proto.ProtoFinish" rp.Put(rb) 歸還一個緩沖區 server.operator.Disconnect(key, roomId); 等待注銷結果 注銷過程 server.operator.Disconnect(key, roomId); rpc -> 調用 logic RPC.Disconnect 根據 subKey 返回UserId、seq rpc -> 調用 RouterRPC.Del 根據 UserId、Seq、RoomId 刪除 session p = ch.Ready() (等待 return <-c.signal )接受到 "proto.ProtoFinish" 檢測到瀏覽器斷開 switch p -> proto.ProtoFinish: 進行退出操作 ws.Close() 關閉鏈接 wp.Put(wb) 歸還一個緩沖區 dispatch goroutine exit 退出協程 go server.dispatchWebsocket() 注銷結果返回 server tcp goroutine exit 退出協程 go server.serveWebsocket() server.Stat.DecrWsOnline() 減少 websocket online 統計 ~~~ **斷開鏈接流程圖** ![](https://box.kancloud.cn/82e87971bacdde40a5a9e870cd439d0a_786x1423.png) **幾個重要的結構體** 做為典型代碼即注釋的開源項目,goim 基本無太多閱讀障礙,幾個邏輯點梳理下很快就會明白。 Bucket: 每個 Comet 程序擁有若干個 Bucket, 可以理解為 Session Management, 保存著當前 Comet 服務于哪些 Room 和 Channel. 長連接具體分布在哪個 Bucket 上呢?根據 SubKey 一致性 Hash 來選擇。 Room: 可以理解為房間,群組或是一個 Group. 這個房間內維護 N 個 Channel, 即長連接用戶。在該 Room 內廣播消息,會發送給房間內的所有 Channel. Channel: 維護一個長連接用戶,只能對應一個 Room. 推送的消息可以在 Room 內廣播,也可以推送到指定的 Channel. Proto: 消息結構體,存放版本號,操作類型,消息序號和消息體。 **job模塊消費kafka邏輯** pushRoutines []chan *proto.MPushMsgArg 多播 broadcastRoutines []chan *proto.BoardcastArg 廣播 roomRoutines []chan *proto.BoardcastRoomArg 房播 ~~~ 消息從kafka集群消費,經過kafka模塊轉發至push模塊,push模塊對消息預處理/過濾/分類,然后發至不同的 comet 信道中 kafka -> push(msg.Value) 消費kafka消息 push -> push() 判斷消息類型,多播、廣播、房播 多播:define.KAFKA_MESSAGE_MULTI: -> go processPush(pushChs[i]) 根據啟動的 go 協程取模分配 -> comet.mPushComet(serverId int32, subKeys []string, body json.RawMessage) 消費 pushChs []chan *pushArg chan 的消息 -> Push(arg *proto.MPushMsgArg) 將消息推到 pushRoutines []chan *proto.MPushMsgArg chan 中 廣播:define.KAFKA_MESSAGE_BROADCAST: -> broadcast(m.Msg) 調用所有的 *Comet -> c.Broadcast(&args) 將消息推到 broadcastRoutines []chan *proto.BoardcastArg chan 中 房播:define.KAFKA_MESSAGE_BROADCAST_ROOM: -> roomBucket.Get(int32(m.RoomId)) 獲取房間信息 -> NewRoom(roomId, b.round.Timer(b.roomNum), b.options) -> go r.pushproc(t, options.BatchNum, options.SignalTime, options.IdleTime) -> broadcastRoomBytes(r.id, buf.Buffer()) 根據roomid獲取 *Comet -> c.BroadcastRoom(&args) 將消息推到 roomRoutines []chan *proto.BoardcastRoomArg chan 中(根據消息總數與options.RoutineSize取模) -> room.Push(0, define.OP_SEND_SMS_REPLY, m.Msg) 將消息保存到room.proto chan *proto.Proto chan 中 main.InitComet(addrs map[int32]string, options CometOptions) 初始化 comet -> go c.process(pushChan, roomChan, broadcastChan) 通過 rpc 調用 comet,作為pushChan,roomChan,broadcastChan信道的消費者 ~~~ **job模塊消費kafka邏輯流程圖** ![](https://box.kancloud.cn/d9f0e816b12f98f77f7b82684749f508_1431x1368.png) **logic 模塊 push http推送接口邏輯** ~~~ http接口-->kafka生產者邏輯 單人推送:push -> r.URL.Query().Get("uid") 獲取userId -> ioutil.ReadAll(r.Body) 獲取body -> genSubKey(userId) 通過用戶ID生成subKeys;同一個用戶可以同時多處登陸或者同處多實例登陸,它們都會被同等對待 -> rpc 到 router 模塊 RouterRPC.Get 獲取指定用戶的session信息 -> mpushKafka 根據serverId、subkeys、Msg ,push 到 kafka -> OP: define.KAFKA_MESSAGE_MULTI 單播 -> producer.Input() 多人推送:Pushs -> ioutil.ReadAll(r.Body) 獲取body -> parsePushsBody(body []byte) 將body轉成userIds、Msg -> genSubKeys(userIds) 并行獲取多個用戶信息,返回值為map[comet.serverId][]subkey. -> mpushKafka 根據serverId、subkeys、Msg ,push 到 kafka -> OP: define.KAFKA_MESSAGE_MULTI 單播 -> producer.Input() 房間推送:PushRoom -> ioutil.ReadAll(r.Body) 獲取body -> param.Get("rid") 獲取roomId -> strconv.ParseBool(param.Get("ensure")) 獲取ensure,是否強推 -> broadcastRoomKafka 根據roomId、Msg、ensure ,push 到 kafka -> OP: define.KAFKA_MESSAGE_BROADCAST_ROOM 房間內廣播 -> producer.Input() 廣播:PushAll -> ioutil.ReadAll(r.Body) 獲取body -> mpushKafka 根據Msg ,push 到 kafka -> OP: define.KAFKA_MESSAGE_BROADCAST 廣播 -> producer.Input() ~~~ **logic 模塊 push http推送接口邏輯流程圖** ![](https://box.kancloud.cn/d804684a24b03da7b272797239c5cee0_1512x1341.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看