<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] > [github](https://github.com/alberliu/gim) ### 簡要介紹 gim是一個即時通訊服務器,代碼全部使用golang完成。主要功能 1.支持tcp,websocket接入 2.離線消息同步 3.多業務接入 4.單用戶多設備同時在線 5.單聊,群聊,以及超大群聊天場景 6.支持服務水平擴展 ### 使用技術: 數據庫:Mysql+Redis 通訊框架:Grpc 長連接通訊協議:Protocol Buffers 日志框架:Zap ### 安裝部署 1.首先安裝MySQL,Redis 2.創建數據庫gim,執行sql/create_table.sql,完成初始化表的創建(數據庫包含提供測試的一些初始數據) 3.修改config下配置文件,使之和你本地配置一致 4.分別切換到cmd的tcp_conn,ws_conn,logic目錄下,執行go run main.go,啟動TCP連接層服務器,WebSocket連接層服務器,邏輯層服務器 ### 迅速跑通本地測試 1.在test目錄下,tcp_conn或者ws_conn目錄下,執行go run main,啟動測試腳本 2.根據提示,依次填入app_id,user_id,device_id,sync_sequence(中間空格空開),進行長連接登錄;數據庫device表中已經初始化了一些設備信息,用作測試 3.執行api/logic/logic_client_ext_test.go下的TestLogicExtServer_SendMessage函數,發送消息 ### 業務服務器如何接入 1.首先生成私鑰和公鑰 2.在app表里根據你的私鑰添加一條app記錄 3.將app_id和公鑰保存到業務服務器 4.將用戶通過LogicClientExtServer.AddUser接口添加到IM服務器 5.通過LogicClientExtServer.RegisterDevice接口注冊設備,獲取設備id(device_id) 6.將app_id,user_id,device_id用公鑰通過公鑰加密,生成token,相應庫的代碼在pkg/util/aes.go 7.接下來使用這個token,app就可以和IM服務器交互 ### rpc接口簡介 項目所有的proto協議在gim/public/proto/目錄下 1.tcp.proto 長連接通訊協議 2.logic_client.ext.proto 對客戶端(Android設備,IOS設備)提供的rpc協議 3.logic_server.ext.proto 對業務服務器提供的rpc協議 4.logic.int.proto 對conn服務層提供的rpc協議 5.conn.int.proto 對logic服務層提供的rpc協議 ### 項目目錄簡介 項目結構遵循 https://github.com/golang-standards/project-layout ``` api: 服務對外提供的grpc接口 cmd: 服務啟動入口 config: 服務配置 internal: 每個服務私有代碼 pkg: 服務共有代碼 sql: 項目sql文件 test: 長連接測試腳本 ``` ### 服務簡介 1.tcp_conn 維持與客戶端的TCP長連接,心跳,以及TCP拆包粘包,消息編解碼 2.ws_conn 維持與客戶端的WebSocket長連接,心跳,消息編解碼 3.logic 設備信息,用戶信息,群組信息管理,消息轉發邏輯 ### TCP拆包粘包 遵循LV的協議格式,一個消息包分為兩部分,消息字節長度以及消息內容。 這里為了減少內存分配,拆出來的包的內存復用讀緩存區內存。 **拆包流程:** 1.首先從系統緩存區讀取字節流到buffer 2.根據包頭的length字段,檢查報的value字段的長度是否大于等于length 3.如果大于,返回一個完整包(此包內存復用),重復步驟2 4.如果小于,將buffer的有效字節前移,重復步驟1 ### 單用戶多設備支持,離線消息同步 每個用戶都會維護一個自增的序列號,當用戶A給用戶B發送消息是,首先會獲取A的最大序列號,設置為這條消息的seq,持久化到用戶A的消息列表, 再通過長連接下發到用戶A賬號登錄的所有設備,再獲取用戶B的最大序列號,設置為這條消息的seq,持久化到用戶B的消息列表,再通過長連接下發 到用戶B賬號登錄的所有設備。 假如用戶的某個設備不在線,在設備長連接登錄時,用本地收到消息的最大序列號,到服務器做消息同步,這樣就可以保證離線消息不丟失。 ### 讀擴散和寫擴散 首先解釋一下,什么是讀擴散,什么是寫擴散 #### 讀擴散 **簡介**:群組成員發送消息時,先建立一個會話,都將這個消息寫入這個會話中,同步離線消息時,需要同步這個會話的未同步消息 **優點**:每個消息只需要寫入數據庫一次就行,減少數據庫訪問次數,節省數據庫空間 **缺點**:一個用戶有n個群組,客戶端每次同步消息時,要上傳n個序列號,服務器要對這n個群組分別做消息同步 #### 寫擴散 **簡介**:在群組中,每個用戶維持一個自己的消息列表,當群組中有人發送消息時,給群組的每個用戶的消息列表插入一條消息即可 **優點**:每個用戶只需要維護一個序列號和消息列表 **缺點**:一個群組有多少人,就要插入多少條消息,當群組成員很多時,DB的壓力會增大 ### 消息轉發邏輯選型以及特點 #### 普通群組: 采用寫擴散,群組成員信息持久化到數據庫保存。支持消息離線同步。 #### 超大群組: 采用讀擴散,群組成員信息保存到redis,不支持離線消息同步。 ### 核心流程時序圖 #### 長連接登錄 ![eaf3a08af9c64bbd.png](http://www.wailian.work/images/2019/10/26/eaf3a08af9c64bbd.png) #### 離線消息同步 ![ef9c9452e65be3ced63573164fec7ed5.png](http://s1.wailian.download/2019/12/25/ef9c9452e65be3ced63573164fec7ed5.png) #### 心跳 ![6ea6acf2cd4b956e.png](http://www.wailian.work/images/2019/10/26/6ea6acf2cd4b956e.png) #### 消息單發 c1.d1和c1.d2分別表示c1用戶的兩個設備d1和d2,c2.d3和c2.d4同理 ![e000fda2f18e86f3.png](http://www.wailian.work/images/2019/10/26/e000fda2f18e86f3.png) #### 小群消息群發 c1,c2.c3表示一個群組中的三個用戶 ![749fc468746055a8ecf3fba913b66885.png](http://s1.wailian.download/2019/12/26/749fc468746055a8ecf3fba913b66885.png) #### 大群消息群發 ![e3f92bdbb3eef199d185c28292307497.png](https://s1.wailian.download/2019/12/26/e3f92bdbb3eef199d185c28292307497.png) ### 錯誤處理,鏈路追蹤,日志打印 系統中的錯誤一般可以歸類為兩種,一種是業務定義的錯誤,一種就是未知的錯誤,在業務正式上線的時候,業務定義的錯誤的屬于正常業務邏輯,不需要打印出來, 但是未知的錯誤,我們就需要打印出來,我們不僅要知道是什么錯誤,還要知道錯誤的調用堆棧,所以這里我對GRPC的錯誤進行了一些封裝,使之包含調用堆棧。 ```go func WrapError(err error) error { if err == nil { return nil } s := &spb.Status{ Code: int32(codes.Unknown), Message: err.Error(), Details: []*any.Any{ { TypeUrl: TypeUrlStack, Value: util.Str2bytes(stack()), }, }, } return status.FromProto(s).Err() } // Stack 獲取堆棧信息 func stack() string { var pc = make([]uintptr, 20) n := runtime.Callers(3, pc) var build strings.Builder for i := 0; i < n; i++ { f := runtime.FuncForPC(pc[i] - 1) file, line := f.FileLine(pc[i] - 1) n := strings.Index(file, name) if n != -1 { s := fmt.Sprintf(" %s:%d \n", file[n:], line) build.WriteString(s) } } return build.String() } ``` 這樣,不僅可以拿到錯誤的堆棧,錯誤的堆棧也可以跨RPC傳輸,但是,但是這樣你只能拿到當前服務的堆棧,卻不能拿到調用方的堆棧,就比如說,A服務調用 B服務,當B服務發生錯誤時,在A服務通過日志打印錯誤的時候,我們只打印了B服務的調用堆棧,怎樣可以把A服務的堆棧打印出來。我們在A服務調用的地方也獲取 一次堆棧。 ```go func WrapRPCError(err error) error { if err == nil { return nil } e, _ := status.FromError(err) s := &spb.Status{ Code: int32(e.Code()), Message: e.Message(), Details: []*any.Any{ { TypeUrl: TypeUrlStack, Value: util.Str2bytes(GetErrorStack(e) + " --grpc-- \n" + stack()), }, }, } return status.FromProto(s).Err() } func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { err := invoker(ctx, method, req, reply, cc, opts...) return gerrors.WrapRPCError(err) } var LogicIntClient pb.LogicIntClient func InitLogicIntClient(addr string) { conn, err := grpc.DialContext(context.TODO(), addr, grpc.WithInsecure(), grpc.WithUnaryInterceptor(interceptor)) if err != nil { logger.Sugar.Error(err) panic(err) } LogicIntClient = pb.NewLogicIntClient(conn) } ``` 像這樣,就可以獲取完整一次調用堆棧。 錯誤打印也沒有必要在函數返回錯誤的時候,每次都去打印。因為錯誤已經包含了堆棧信息 ```go // 錯誤的方式 if err != nil { logger.Sugar.Error(err) return err } // 正確的方式 if err != nil { return err } ``` 然后,我們在上層統一打印就可以 ```go func startServer { extListen, err := net.Listen("tcp", conf.LogicConf.ClientRPCExtListenAddr) if err != nil { panic(err) } extServer := grpc.NewServer(grpc.UnaryInterceptor(LogicClientExtInterceptor)) pb.RegisterLogicClientExtServer(extServer, &LogicClientExtServer{}) err = extServer.Serve(extListen) if err != nil { panic(err) } } func LogicClientExtInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { defer func() { logPanic("logic_client_ext_interceptor", ctx, req, info, &err) }() resp, err = handler(ctx, req) logger.Logger.Debug("logic_client_ext_interceptor", zap.Any("info", info), zap.Any("ctx", ctx), zap.Any("req", req), zap.Any("resp", resp), zap.Error(err)) s, _ := status.FromError(err) if s.Code() != 0 && s.Code() < 1000 { md, _ := metadata.FromIncomingContext(ctx) logger.Logger.Error("logic_client_ext_interceptor", zap.String("method", info.FullMethod), zap.Any("md", md), zap.Any("req", req), zap.Any("resp", resp), zap.Error(err), zap.String("stack", gerrors.GetErrorStack(s))) } return } ``` 這樣做的前提就是,在業務代碼中透傳context,golang不像其他語言,可以在線程本地保存變量,像Java的ThreadLocal,所以只能通過函數參數的形式進行傳遞,gim中,service層函數的第一個參數 都是context,但是dao層和cache層就不需要了,不然,顯得代碼臃腫。 最后可以在客戶端的每次請求添加一個隨機的request_id,這樣客戶端到服務的每次請求都可以串起來了。 ```go func getCtx() context.Context { token, _ := util.GetToken(1, 2, 3, time.Now().Add(1*time.Hour).Unix(), util.PublicKey) return metadata.NewOutgoingContext(context.TODO(), metadata.Pairs( "app_id", "1", "user_id", "2", "device_id", "3", "token", token, "request_id", strconv.FormatInt(time.Now().UnixNano(), 10))) } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看