<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                ## 5) tcp鏈接與Message消息封裝 ? 好了,現在我們來將服務器的連接做一個簡單的封裝,在這之前,我們要將我我們所發的數據做一個規定,采用TLV的格式,來進行封裝。目的是解決TCP傳輸的粘包問題。 ### 5.1 Message消息封裝 ![](https://img.kancloud.cn/06/33/06336f097f9db897457e35f1b0399533_1024x768.jpeg) ? 先創建一個message.h頭文件 > lars_reactor/include/message.h ```h #pragma once //解決tcp粘包問題的消息頭 struct msg_head { int msgid; int msglen; }; //消息頭的二進制長度,固定數 #define MESSAGE_HEAD_LEN 8 //消息頭+消息體的最大長度限制 #define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN) ``` ? 接下來我們每次在server和 client之間傳遞數據的時候,都發送這種數據格式的頭再加上后面的數據內容即可。 ### 5.2 創建一個tcp_conn連接類 > lars_reactor/include/tcp_conn.h ```h #pragma once #include "reactor_buf.h" #include "event_loop.h" //一個tcp的連接信息 class tcp_conn { public: //初始化tcp_conn tcp_conn(int connfd, event_loop *loop); //處理讀業務 void do_read(); //處理寫業務 void do_write(); //銷毀tcp_conn void clean_conn(); //發送消息的方法 int send_message(const char *data, int msglen, int msgid); private: //當前鏈接的fd int _connfd; //該連接歸屬的event_poll event_loop *_loop; //輸出buf output_buf obuf; //輸入buf input_buf ibuf; }; ``` 簡單說明一下里面的成員和方法: **成員**: `_connfd`:server剛剛accept成功的套接字 `_loop`:當前鏈接所綁定的事件觸發句柄. `obuf`:鏈接輸出緩沖,向對端寫數據 `ibuf`:鏈接輸入緩沖,從對端讀數據 **方法**: `tcp_client()`:構造,主要在里面實現初始化及創建鏈接鏈接的connect過程。 `do_read()`:讀數據處理業務,主要是EPOLLIN事件觸發。 `do_write()`:寫數據處理業務,主要是EPOLLOUT事件觸發。 `clean_conn()`:清空鏈接資源。 `send_message()`:將消息打包成TLV格式發送給對端。 ? 接下來,實現以下`tcp_conn`類. > lars_reactor/src/tcp_conn.cpp ```c #include <unistd.h> #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <string.h> #include "tcp_conn.h" #include "message.h" //回顯業務 void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn) { conn->send_message(data, len, msgid); } //連接的讀事件回調 static void conn_rd_callback(event_loop *loop, int fd, void *args) { tcp_conn *conn = (tcp_conn*)args; conn->do_read(); } //連接的寫事件回調 static void conn_wt_callback(event_loop *loop, int fd, void *args) { tcp_conn *conn = (tcp_conn*)args; conn->do_write(); } //初始化tcp_conn tcp_conn::tcp_conn(int connfd, event_loop *loop) { _connfd = connfd; _loop = loop; //1. 將connfd設置成非阻塞狀態 int flag = fcntl(_connfd, F_GETFL, 0); fcntl(_connfd, F_SETFL, O_NONBLOCK|flag); //2. 設置TCP_NODELAY禁止做讀寫緩存,降低小包延遲 int op = 1; setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h //3. 將該鏈接的讀事件讓event_loop監控 _loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this); //4 將該鏈接集成到對應的tcp_server中 //TODO } //處理讀業務 void tcp_conn::do_read() { //1. 從套接字讀取數據 int ret = ibuf.read_data(_connfd); if (ret == -1) { fprintf(stderr, "read data from socket\n"); this->clean_conn(); return ; } else if ( ret == 0) { //對端正常關閉 printf("connection closed by peer\n"); clean_conn(); return ; } //2. 解析msg_head數據 msg_head head; //[這里用while,可能一次性讀取多個完整包過來] while (ibuf.length() >= MESSAGE_HEAD_LEN) { //2.1 讀取msg_head頭部,固定長度MESSAGE_HEAD_LEN memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN); if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) { fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen); this->clean_conn(); break; } if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) { //緩存buf中剩余的數據,小于實際上應該接受的數據 //說明是一個不完整的包,應該拋棄 break; } //2.2 再根據頭長度讀取數據體,然后針對數據體處理 業務 //TODO 添加包路由模式 //頭部處理完了,往后偏移MESSAGE_HEAD_LEN長度 ibuf.pop(MESSAGE_HEAD_LEN); //處理ibuf.data()業務數據 printf("read data: %s\n", ibuf.data()); //回顯業務 callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this); //消息體處理完了,往后便宜msglen長度 ibuf.pop(head.msglen); } ibuf.adjust(); return ; } //處理寫業務 void tcp_conn::do_write() { //do_write是觸發玩event事件要處理的事情, //應該是直接將out_buf力度數據io寫會對方客戶端 //而不是在這里組裝一個message再發 //組裝message的過程應該是主動調用 //只要obuf中有數據就寫 while (obuf.length()) { int ret = obuf.write2fd(_connfd); if (ret == -1) { fprintf(stderr, "write2fd error, close conn!\n"); this->clean_conn(); return ; } if (ret == 0) { //不是錯誤,僅返回0表示不可繼續寫 break; } } if (obuf.length() == 0) { //數據已經全部寫完,將_connfd的寫事件取消掉 _loop->del_io_event(_connfd, EPOLLOUT); } return ; } //發送消息的方法 int tcp_conn::send_message(const char *data, int msglen, int msgid) { printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid); bool active_epollout = false; if(obuf.length() == 0) { //如果現在已經數據都發送完了,那么是一定要激活寫事件的 //如果有數據,說明數據還沒有完全寫完到對端,那么沒必要再激活等寫完再激活 active_epollout = true; } //1 先封裝message消息頭 msg_head head; head.msgid = msgid; head.msglen = msglen; //1.1 寫消息頭 int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN); if (ret != 0) { fprintf(stderr, "send head error\n"); return -1; } //1.2 寫消息體 ret = obuf.send_data(data, msglen); if (ret != 0) { //如果寫消息體失敗,那就回滾將消息頭的發送也取消 obuf.pop(MESSAGE_HEAD_LEN); return -1; } if (active_epollout == true) { //2. 激活EPOLLOUT寫事件 _loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this); } return 0; } //銷毀tcp_conn void tcp_conn::clean_conn() { //鏈接清理工作 //1 將該鏈接從tcp_server摘除掉 //TODO //2 將該鏈接從event_loop中摘除 _loop->del_io_event(_connfd); //3 buf清空 ibuf.clear(); obuf.clear(); //4 關閉原始套接字 int fd = _connfd; _connfd = -1; close(fd); } ``` ? 具體每個方法的實現,都很清晰。其中`conn_rd_callback()`和`conn_wt_callback()`是注冊讀寫事件的回調函數,設置為static是因為函數類型沒有this指針。在里面分別再調用`do_read()`和`do_write()`方法。 ### 5.3 修正tcp_server對accept之后的處理方法 > lars_reactor/src/tcp_server.cpp ```c //... //開始提供創建鏈接服務 void tcp_server::do_accept() { int connfd; while(true) { //accept與客戶端創建鏈接 printf("begin accept\n"); connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen); if (connfd == -1) { if (errno == EINTR) { fprintf(stderr, "accept errno=EINTR\n"); continue; } else if (errno == EMFILE) { //建立鏈接過多,資源不夠 fprintf(stderr, "accept errno=EMFILE\n"); } else if (errno == EAGAIN) { fprintf(stderr, "accept errno=EAGAIN\n"); break; } else { fprintf(stderr, "accept error"); exit(1); } } else { //accept succ! // ============= 將之前的觸發回調的刪掉,改成如下==== tcp_conn *conn = new tcp_conn(connfd, _loop); if (conn == NULL) { fprintf(stderr, "new tcp_conn error\n"); exit(1); } // ============================================ printf("get new connection succ!\n"); break; } } } //... ``` ? 這樣,每次accept成功之后,創建一個與當前客戶端套接字綁定的tcp_conn對象。在構造里就完成了基本的對于EPOLLIN事件的監聽和回調動作. ? 現在可以先編譯一下,保證沒有語法錯誤,但是如果想測試,就不能夠使用`nc`指令測試了,因為現在服務端只能夠接收我們自定義的TLV格式的報文。那么我們需要自己寫一個客戶端來完成基本的測試。 --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看