<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ## 8) 消息業務路由分發機制 ? 現在我們發送的消息都是message結構的,有個message頭里面其中有兩個關鍵的字段,`msgid`和`msglen`,其中加入`msgid`的意義就是我們可以甄別是哪個消息,從而對這類消息做出不同的業務處理。但是現在我們無論是服務端還是客戶端都是寫死的兩個業務,就是"回顯業務",顯然這并不滿足我們作為服務器框架的需求。我們需要開發者可以注冊自己的回調業務。所以我們需要提供一個注冊業務的入口,然后在后端根據不同的`msgid`來激活不同的回調業務函數。 ### 8.1 添加消息分發路由類msg_router ? 下面我們提供這樣一個中轉的router模塊,在include/message.h添加 > lars_reactor/include/message.h ```c #pragma once #include <ext/hash_map> //解決tcp粘包問題的消息頭 struct msg_head { int msgid; int msglen; }; //消息頭的二進制長度,固定數 #define MESSAGE_HEAD_LEN 8 //消息頭+消息體的最大長度限制 #define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN) //msg 業務回調函數原型 //===================== 消息分發路由機制 ================== class tcp_client; typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data); //消息路由分發機制 class msg_router { public: msg_router():_router(),_args() {} //給一個消息ID注冊一個對應的回調業務函數 int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data) { if(_router.find(msgid) != _router.end()) { //該msgID的回調業務已經存在 return -1; } _router[msgid] = msg_cb; _args[msgid] = user_data; return 0; } //調用注冊的對應的回調業務函數 void call(int msgid, uint32_t msglen, const char *data, tcp_client *client) { //判斷msgid對應的回調是否存在 if (_router.find(msgid) == _router.end()) { fprintf(stderr, "msgid %d is not register!\n", msgid); return; } //直接取出回調函數,執行 msg_callback *callback = _router[msgid]; void *user_data = _args[msgid]; callback(data, msglen, msgid, client, user_data); } private: //針對消息的路由分發,key為msgID, value為注冊的回調業務函數 __gnu_cxx::hash_map<int, msg_callback *> _router; //回調業務函數對應的參數,key為msgID, value為對應的參數 __gnu_cxx::hash_map<int, void *> _args; }; //===================== 消息分發路由機制 ================== ``` ? 開發者需要注冊一個`msg_callback`類型的函數,通過`msg_router`類的`register_msg_router()`方法來注冊,同時通過`call()`方法來調用。 ? 全部回調業務函數和msgid的對應關系保存在一個hash_map類型的`_router`map中,`_args`保存對應的參數。 ? 但是這里有個小細節需要注意一下,`msg_callback`的函數類型聲明是這樣的。 ```c typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data); ``` ? 其中這里面第4個參數,只能是tcp_client類型的參數,也就是我們之前的設計的msg_callback只支持tcp_client的消息回調機制,但是很明顯我們的需求是不僅是`tcp_client`要用,tcp_server中的`tcp_conn`也要用到這個機制,那么很顯然這個參數在這就不是很合適,那么如果設定一個形參既能指向`tcp_client`又能能指向`tcp_conn`兩個類型呢,當然答案就只能是將這兩個類抽象出來一層,用父類指針指向子類然后通過多態特性來調用就可以了,所以我們需要先定義一個抽象類。 ### 8.2 鏈接抽象類創建 ? 經過分析,我們定義如下的抽象類,并提供一些接口。 > lars_reactor/include/net_connection.h ```c #pragma once /* * * 網絡通信的抽象類,任何需要進行收發消息的模塊,都可以實現該類 * * */ class net_connection { public: //發送消息的接口 virtual int send_message(const char *data, int datalen, int msgid) = 0; }; ``` ? 然后讓我們tcp_server端的`tcp_conn`類繼承`net_connecton`, 客戶端的`tcp_client` 繼承`net_connection` > lars_reactor/include/tcp_conn.h ```c class tcp_conn : public net_connection { //... }; ``` > lars_reactor/include/tcp_client.h ```c class tcp_client : public net_connection { //... } ``` 這樣,我們就可以用一個net_connection指針指向這兩種不同的對象實例了。 ? 接下來我們將`msg_callback`回調業務函數類型改成 ```c typedef void msg_callback(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data); ``` ? 這樣這個業務函數就可以支持tcp_conn和tcp_client了。 所以修改之后,我們的`msg_router`類定義如下: > lars_reactor/include/message.h ```c //消息路由分發機制 class msg_router { public: msg_router(): { printf("msg router init ...\n"); } //給一個消息ID注冊一個對應的回調業務函數 int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data) { if(_router.find(msgid) != _router.end()) { //該msgID的回調業務已經存在 return -1; } printf("add msg cb msgid = %d\n", msgid); _router[msgid] = msg_cb; _args[msgid] = user_data; return 0; } //調用注冊的對應的回調業務函數 void call(int msgid, uint32_t msglen, const char *data, net_connection *net_conn) { printf("call msgid = %d\n", msgid); //判斷msgid對應的回調是否存在 if (_router.find(msgid) == _router.end()) { fprintf(stderr, "msgid %d is not register!\n", msgid); return; } //直接取出回調函數,執行 msg_callback *callback = _router[msgid]; void *user_data = _args[msgid]; callback(data, msglen, msgid, net_conn, user_data); printf("=======\n"); } private: //針對消息的路由分發,key為msgID, value為注冊的回調業務函數 __gnu_cxx::hash_map<int, msg_callback*> _router; //回調業務函數對應的參數,key為msgID, value為對應的參數 __gnu_cxx::hash_map<int, void*> _args; }; ``` ### 8.3 msg_router集成到tcp_server中 #### A. tcp_server添加msg_router靜態成員變量 > lars_reactor/include/tcp_server.h ```c class tcp_server { public: // ... //---- 消息分發路由 ---- static msg_router router; // ... }; ``` 同時定義及初始化 > lars_reactor/src/tcp_server.cpp ```c //... // ==== 消息分發路由 === msg_router tcp_server::router; //... ``` #### B. tcp_server提供注冊路由方法 > lars_reactor/include/tcp_server.c ```c class tcp_server { public: //... //注冊消息路由回調函數 void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) { router.register_msg_router(msgid, cb, user_data); } //... public: //全部已經在線的連接信息 //---- 消息分發路由 ---- static msg_router router; //... }; ``` #### C. 修正tcp_conn的do_read改成消息分發 > lars_reactor/src/tcp_conn.cpp ```c //... //處理讀業務 void tcp_conn::do_read() { //1. 從套接字讀取數據 int ret = ibuf.read_data(_connfd); if (ret == -1) { fprintf(stderr, "read data from socket\n"); this->clean_conn(); return ; } else if ( ret == 0) { //對端正常關閉 printf("connection closed by peer\n"); clean_conn(); return ; } //2. 解析msg_head數據 msg_head head; //[這里用while,可能一次性讀取多個完整包過來] while (ibuf.length() >= MESSAGE_HEAD_LEN) { //2.1 讀取msg_head頭部,固定長度MESSAGE_HEAD_LEN memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN); if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) { fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen); this->clean_conn(); break; } if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) { //緩存buf中剩余的數據,小于實際上應該接受的數據 //說明是一個不完整的包,應該拋棄 break; } //2.2 再根據頭長度讀取數據體,然后針對數據體處理 業務 //頭部處理完了,往后偏移MESSAGE_HEAD_LEN長度 ibuf.pop(MESSAGE_HEAD_LEN); //處理ibuf.data()業務數據 printf("read data: %s\n", ibuf.data()); //消息包路由模式 tcp_server::router.call(head.msgid, head.msglen, ibuf.data(), this); ////回顯業務 //callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this); //消息體處理完了,往后便宜msglen長度 ibuf.pop(head.msglen); } ibuf.adjust(); return ; } //... ``` ### 8.4 msg_router集成到tcp_client中 > lars_reactor/include/tcp_client.h ```c class tcp_client : public net_connection { public: // ... //設置業務處理回調函數 //void set_msg_callback(msg_callback *msg_cb) //{ //this->_msg_callback = msg_cb; //} //注冊消息路由回調函數 void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) { _router.register_msg_router(msgid, cb, user_data); } private: //處理消息的分發路由 msg_router _router; //msg_callback *_msg_callback; //單路由模式去掉 // ... // ... }; ``` ? 然后在修正`tcp_client`的`do_read()`方法。 > lars_reactor/src/tcp_client.cpp ```c //處理讀業務 int tcp_client::do_read() { //確定已經成功建立連接 assert(connected == true); // 1. 一次性全部讀取出來 //得到緩沖區里有多少字節要被讀取,然后將字節數放入b里面。 int need_read = 0; if (ioctl(_sockfd, FIONREAD, &need_read) == -1) { fprintf(stderr, "ioctl FIONREAD error"); return -1; } //確保_buf可以容納可讀數據 assert(need_read <= _ibuf.capacity - _ibuf.length); int ret; do { ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read); } while(ret == -1 && errno == EINTR); if (ret == 0) { //對端關閉 if (_name != NULL) { printf("%s client: connection close by peer!\n", _name); } else { printf("client: connection close by peer!\n"); } clean_conn(); return -1; } else if (ret == -1) { fprintf(stderr, "client: do_read() , error\n"); clean_conn(); return -1; } assert(ret == need_read); _ibuf.length += ret; //2. 解包 msg_head head; int msgid, length; while (_ibuf.length >= MESSAGE_HEAD_LEN) { memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN); msgid = head.msgid; length = head.msglen; /* if (length + MESSAGE_HEAD_LEN < _ibuf.length) { break; } */ //頭部讀取完畢 _ibuf.pop(MESSAGE_HEAD_LEN); // =================================== //3. 交給業務函數處理 //if (_msg_callback != NULL) { //this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL); //} // 消息路由分發 this->_router.call(msgid, length, _ibuf.data + _ibuf.head, this); // =================================== //數據區域處理完畢 _ibuf.pop(length); } //重置head指針 _ibuf.adjust(); return 0; } ``` ### 8.5 完成Lars Reactor V0.6開發 我們現在重新寫一下 server.cpp 和client.cpp的兩個應用程序 > lars_reacor/example/lars_reactor_0.6/server.cpp ```c #include "tcp_server.h" //回顯業務的回調函數 void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { printf("callback_busi ...\n"); //直接回顯 conn->send_message(data, len, msgid); } //打印信息回調函數 void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { printf("recv client: [%s]\n", data); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } int main() { event_loop loop; tcp_server server(&loop, "127.0.0.1", 7777); //注冊消息業務路由 server.add_msg_router(1, callback_busi); server.add_msg_router(2, print_busi); loop.event_process(); return 0; } ``` > lars_reacor/example/lars_reactor_0.6/client.cpp ```c #include "tcp_client.h" #include <stdio.h> #include <string.h> //客戶端業務 void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { //得到服務端回執的數據 printf("recv server: [%s]\n", data); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } int main() { event_loop loop; //創建tcp客戶端 tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6"); //注冊消息路由業務 client.add_msg_router(1, busi); //開啟事件監聽 loop.event_process(); return 0; } ``` > lars_reactor/src/tcp_client.cpp ```c //判斷鏈接是否是創建鏈接,主要是針對非阻塞socket 返回EINPROGRESS錯誤 static void connection_delay(event_loop *loop, int fd, void *args) { tcp_client *cli = (tcp_client*)args; loop->del_io_event(fd); int result = 0; socklen_t result_len = sizeof(result); getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len); if (result == 0) { //鏈接是建立成功的 cli->connected = true; printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port)); // ================ 發送msgid:1 ===== //建立連接成功之后,主動發送send_message const char *msg = "hello lars!"; int msgid = 1; cli->send_message(msg, strlen(msg), msgid); // ================ 發送msgid:2 ===== const char *msg2 = "hello Aceld!"; msgid = 2; cli->send_message(msg2, strlen(msg2), msgid); // ================ loop->add_io_event(fd, read_callback, EPOLLIN, cli); if (cli->_obuf.length != 0) { //輸出緩沖有數據可寫 loop->add_io_event(fd, write_callback, EPOLLOUT, cli); } } else { //鏈接創建失敗 fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port)); } } ``` 運行結果: 服務端 ```c $ ./server msg_router init... add msg cb msgid = 1 add msg cb msgid = 2 begin accept get new connection succ! read data: hello lars! call msgid = 1 callback_busi ... server send_message: hello lars!:11, msgid = 1 ======= read data: hello Aceld! call msgid = 2 recv client: [hello Aceld!] msgid: [2] len: [12] ``` 客戶端 ```c $ ./client msg_router init... do_connect EINPROGRESS add msg cb msgid = 1 connect 127.0.0.1:7777 succ! do write over, del EPOLLOUT call msgid = 1 recv server: [hello lars!] msgid: [1] len: [11] ======= ``` --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看