<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ## 6) tcp客戶端觸發模型 ? 我們可以給客戶端添加觸發模型。同時也提供一系列的接口供開發者寫客戶端應用程序來使用。 ### 6.1 tcp_client類設計 > lars_reactor/include/tcp_client.h ```c #pragma once #include "io_buf.h" #include "event_loop.h" #include "message.h" #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> class tcp_client { public: //初始化客戶端套接字 tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name); //發送message方法 int send_message(const char *data, int msglen, int msgid); //創建鏈接 void do_connect(); //處理讀業務 int do_read(); //處理寫業務 int do_write(); //釋放鏈接資源 void clean_conn(); ~tcp_client(); //設置業務處理回調函數 void set_msg_callback(msg_callback *msg_cb) { this->_msg_callback = msg_cb; } bool connected; //鏈接是否創建成功 //server端地址 struct sockaddr_in _server_addr; io_buf _obuf; io_buf _ibuf; private: int _sockfd; socklen_t _addrlen; //客戶端的事件處理機制 event_loop* _loop; //當前客戶端的名稱 用戶記錄日志 const char *_name; msg_callback *_msg_callback; }; ``` ? 這里注意的是,tcp_client并不是tcp_server的一部分,而是單純為寫客戶端提供的接口。所以這里也需要實現一套對讀寫事件處理的業務。 這里使用的讀寫緩沖是原始的`io_buf`,并不是服務器封裝好的`reactor_buf`原因是后者是轉為server做了一層封裝,io_buf的基本方法比較全。 **關鍵成員**: `_sockfd`:當前客戶端套接字。 `_server_addr`: 鏈接的服務端的IP地址。 `_loop`: 客戶端異步觸發事件機制event_loop句柄。 `_msg_callback`: 當前客戶端處理服務端的回調業務。 `connected`:是否已經成功connect服務端的標致。 **方法**: `tcp_client()`:構造函數,主要是在里面完成基本的套接字初始化及connect操作. `do_connect()`:創建鏈接 `do_read()`:處理鏈接的讀業務。 `do_write()`:處理鏈接的寫業務。 `clean_conn()`:清空鏈接資源。 ### 6.2 創建鏈接 > lars_reactor/src/tcp_client.cpp ```c tcp_client::tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name): _ibuf(4194304), _obuf(4194304) { _sockfd = -1; _msg_callback = NULL; _name = name; _loop = loop; bzero(&_server_addr, sizeof(_server_addr)); _server_addr.sin_family = AF_INET; inet_aton(ip, &_server_addr.sin_addr); _server_addr.sin_port = htons(port); _addrlen = sizeof(_server_addr); this->do_connect(); } ``` ? 這里初始化tcp_client鏈接信息,然后調用`do_connect()`創建鏈接. > lars_reactor/src/tcp_client.cpp ```c //創建鏈接 void tcp_client::do_connect() { if (_sockfd != -1) { close(_sockfd); } //創建套接字 _sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP); if (_sockfd == -1) { fprintf(stderr, "create tcp client socket error\n"); exit(1); } int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen); if (ret == 0) { //鏈接創建成功 connected = true; //注冊讀回調 _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this); //如果寫緩沖去有數據,那么也需要觸發寫回調 if (this->_obuf.length != 0) { _loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this); } printf("connect %s:%d succ!\n", inet_ntoa(_server_addr.sin_addr), ntohs(_server_addr.sin_port)); } else { if(errno == EINPROGRESS) { //fd是非阻塞的,可能會出現這個錯誤,但是并不表示鏈接創建失敗 //如果fd是可寫狀態,則為鏈接是創建成功的. fprintf(stderr, "do_connect EINPROGRESS\n"); //讓event_loop去觸發一個創建判斷鏈接業務 用EPOLLOUT事件立刻觸發 _loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this); } else { fprintf(stderr, "connection error\n"); exit(1); } } } ``` ### 6.3 有關非阻塞客戶端socket創建鏈接問題 ? 這里轉載一篇文章,是有關非阻塞套接字,connect返回-1,并且errno是`EINPROGRESS`的情況。因為我們的client是采用event_loop形式,socket需要被設置為非阻塞。所以需要針對這個情況做處理。下面是說明。 ? 客戶端測試程序時,由于出現很多客戶端,經過connect成功后,代碼卡在recv系統調用中,后來發現可能是由于socket默認是阻塞模式,所以會令很多客戶端鏈接處于鏈接卻不能傳輸數據狀態。 ? 后來修改socket為非阻塞模式,但在connect的時候,發現返回值為-1,剛開始以為是connect出現錯誤,但在服務器上看到了鏈接是ESTABLISED狀態。證明鏈接是成功的 ? 但為什么會出現返回值是-1呢? 經過查詢資料,以及看stevens的APUE,也發現有這么一說。 ? 當connect在非阻塞模式下,會出現返回`-1`值,錯誤碼是`EINPROGRESS`,但如何判斷connect是聯通的呢?stevens書中說明要在connect后,繼續判斷該socket是否可寫? ? **若可寫,則證明鏈接成功。** ? 如何判斷可寫,有2種方案,一種是select判斷是否可寫,二用poll模型。 select: ```c int CheckConnect(int iSocket) { fd_set rset; FD_ZERO(&rset); FD_SET(iSocket, &rset); timeval tm; tm. tv_sec = 0; tm.tv_usec = 0; if ( select(iSocket + 1, NULL, &rset, NULL, &tval) <= 0) { close(iSocket); return -1; } if (FD_ISSET(iSocket, &rset)) { int err = -1; socklen_t len = sizeof(int); if ( getsockopt(iSocket, SOL_SOCKET, SO_ERROR ,&err, &len) < 0 ) { close(iSocket); printf("errno:%d %s\n", errno, strerror(errno)); return -2; } if (err) { errno = err; close(iSocket); return -3; } } return 0; } ``` poll: ```c int CheckConnect(int iSocket) { struct pollfd fd; int ret = 0; socklen_t len = 0; fd.fd = iSocket; fd.events = POLLOUT; while ( poll (&fd, 1, -1) == -1 ) { if( errno != EINTR ){ perror("poll"); return -1; } } len = sizeof(ret); if ( getsockopt (iSocket, SOL_SOCKET, SO_ERROR, &ret, &len) == -1 ) { perror("getsockopt"); return -1; } if(ret != 0) { fprintf (stderr, "socket %d connect failed: %s\n", iSocket, strerror (ret)); return -1; } return 0; } ``` ### 6.3 針對EINPROGRESS的連接創建處理 ? 看上面`do_connect()`的代碼其中一部分: ```c if(errno == EINPROGRESS) { //fd是非阻塞的,可能會出現這個錯誤,但是并不表示鏈接創建失敗 //如果fd是可寫狀態,則為鏈接是創建成功的. fprintf(stderr, "do_connect EINPROGRESS\n"); //讓event_loop去觸發一個創建判斷鏈接業務 用EPOLLOUT事件立刻觸發 _loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this); } ``` 這里是又觸發一個寫事件,直接讓程序流程跳轉到`connection_delay()`方法.那么我們需要在里面判斷鏈接是否已經判斷成功,并且做出一定的創建成功之后的業務動作。 > lars_reactor/src/tcp_client.cpp ```c //判斷鏈接是否是創建鏈接,主要是針對非阻塞socket 返回EINPROGRESS錯誤 static void connection_delay(event_loop *loop, int fd, void *args) { tcp_client *cli = (tcp_client*)args; loop->del_io_event(fd); int result = 0; socklen_t result_len = sizeof(result); getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len); if (result == 0) { //鏈接是建立成功的 cli->connected = true; printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port)); //建立連接成功之后,主動發送send_message const char *msg = "hello lars!"; int msgid = 1; cli->send_message(msg, strlen(msg), msgid); loop->add_io_event(fd, read_callback, EPOLLIN, cli); if (cli->_obuf.length != 0) { //輸出緩沖有數據可寫 loop->add_io_event(fd, write_callback, EPOLLOUT, cli); } } else { //鏈接創建失敗 fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port)); } } ``` ? 這是一個事件回調,所以用的是static方法而不是成員方法。首先是利用`getsockopt`判斷鏈接是否創建成功,如果成功,那么 我們當前這個版本的客戶端是直接寫死主動調用`send_message()`方法發送給服務端一個`hello lars!`字符串。然后直接交給我們的`read_callback()`方法處理,當然如果寫緩沖有數據,我們也會觸發寫的`write_callback()`方法。 ? 接下來,看看這兩個callback以及send_message是怎么實現的。 **callback** > lars_reactor/src/tcp_client.cpp ```c static void write_callback(event_loop *loop, int fd, void *args) { tcp_client *cli = (tcp_client *)args; cli->do_write(); } static void read_callback(event_loop *loop, int fd, void *args) { tcp_client *cli = (tcp_client *)args; cli->do_read(); } //處理讀業務 int tcp_client::do_read() { //確定已經成功建立連接 assert(connected == true); // 1. 一次性全部讀取出來 //得到緩沖區里有多少字節要被讀取,然后將字節數放入b里面。 int need_read = 0; if (ioctl(_sockfd, FIONREAD, &need_read) == -1) { fprintf(stderr, "ioctl FIONREAD error"); return -1; } //確保_buf可以容納可讀數據 assert(need_read <= _ibuf.capacity - _ibuf.length); int ret; do { ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read); } while(ret == -1 && errno == EINTR); if (ret == 0) { //對端關閉 if (_name != NULL) { printf("%s client: connection close by peer!\n", _name); } else { printf("client: connection close by peer!\n"); } clean_conn(); return -1; } else if (ret == -1) { fprintf(stderr, "client: do_read() , error\n"); clean_conn(); return -1; } assert(ret == need_read); _ibuf.length += ret; //2. 解包 msg_head head; int msgid, length; while (_ibuf.length >= MESSAGE_HEAD_LEN) { memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN); msgid = head.msgid; length = head.msglen; /* if (length + MESSAGE_HEAD_LEN < _ibuf.length) { break; } */ //頭部讀取完畢 _ibuf.pop(MESSAGE_HEAD_LEN); //3. 交給業務函數處理 if (_msg_callback != NULL) { this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL); } //數據區域處理完畢 _ibuf.pop(length); } //重置head指針 _ibuf.adjust(); return 0; } //處理寫業務 int tcp_client::do_write() { //數據有長度,切頭部索引是起始位置 assert(_obuf.head == 0 && _obuf.length); int ret; while (_obuf.length) { //寫數據 do { ret = write(_sockfd, _obuf.data, _obuf.length); } while(ret == -1 && errno == EINTR);//非阻塞異常繼續重寫 if (ret > 0) { _obuf.pop(ret); _obuf.adjust(); } else if (ret == -1 && errno != EAGAIN) { fprintf(stderr, "tcp client write \n"); this->clean_conn(); } else { //出錯,不能再繼續寫 break; } } if (_obuf.length == 0) { //已經寫完,刪除寫事件 printf("do write over, del EPOLLOUT\n"); this->_loop->del_io_event(_sockfd, EPOLLOUT); } return 0; } //釋放鏈接資源,重置連接 void tcp_client::clean_conn() { if (_sockfd != -1) { printf("clean conn, del socket!\n"); _loop->del_io_event(_sockfd); close(_sockfd); } connected = false; //重新連接 this->do_connect(); } tcp_client::~tcp_client() { close(_sockfd); } ``` ? 這里是基本的讀數據和寫數據的處理業務實現。我們重點看`do_read()`方法,里面有段代碼: ```c //3. 交給業務函數處理 if (_msg_callback != NULL) { this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL); } ``` ? 是將我們從服務端讀取到的代碼,交給了`_msg_callback()`方法來處理的,這個實際上是用戶開發者自己在業務上注冊的回調業務函數。在tcp_client.h中我們已經提供了`set_msg_callback`暴露給開發者注冊使用。 ------ **send_message** > lars_reactor/src/tcp_client.cpp ```c //主動發送message方法 int tcp_client::send_message(const char *data, int msglen, int msgid) { if (connected == false) { fprintf(stderr, "no connected , send message stop!\n"); return -1; } //是否需要添加寫事件觸發 //如果obuf中有數據,沒必要添加,如果沒有數據,添加完數據需要觸發 bool need_add_event = (_obuf.length == 0) ? true:false; if (msglen + MESSAGE_HEAD_LEN > this->_obuf.capacity - _obuf.length) { fprintf(stderr, "No more space to Write socket!\n"); return -1; } //封裝消息頭 msg_head head; head.msgid = msgid; head.msglen = msglen; memcpy(_obuf.data + _obuf.length, &head, MESSAGE_HEAD_LEN); _obuf.length += MESSAGE_HEAD_LEN; memcpy(_obuf.data + _obuf.length, data, msglen); _obuf.length += msglen; if (need_add_event) { _loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this); } return 0; } ``` ? 將發送的數據寫給obuf,然后出發write_callback將obuf的數據傳遞給對方服務端。 ### 6.4 完成Lars Reactor V0.4開發 ? 好了,現在我們框架部分已經完成,接下來我們就要實現一個serverapp 和 一個clientapp來進行測試. 我們創建`example/lars_reactor_0.4`文件夾。 **Makefile** ```makefile CXX=g++ CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated INC=-I../../include LIB=-L../../lib -llreactor -lpthread OBJS = $(addsuffix .o, $(basename $(wildcard *.cc))) all: $(CXX) -o server $(CFLAGS) server.cpp $(INC) $(LIB) $(CXX) -o client $(CFLAGS) client.cpp $(INC) $(LIB) clean: -rm -f *.o server client ``` 服務端代碼: > server.cpp ```c #include "tcp_server.h" int main() { event_loop loop; tcp_server server(&loop, "127.0.0.1", 7777); loop.event_process(); return 0; } ``` 客戶端代碼: > client.cpp ```c #include "tcp_client.h" #include <stdio.h> #include <string.h> //客戶端業務 void busi(const char *data, uint32_t len, int msgid, tcp_client *conn, void *user_data) { //得到服務端回執的數據 printf("recv server: [%s]\n", data); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } int main() { event_loop loop; //創建tcp客戶端 tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.4"); //注冊回調業務 client.set_msg_callback(busi); //開啟事件監聽 loop.event_process(); return 0; } ``` 編譯并分別啟動server 和client 服務端輸出: ```bash $ ./server begin accept get new connection succ! read data: hello lars! server send_message: hello lars!:11, msgid = 1 ``` 客戶端輸出: ```bash $ ./client do_connect EINPROGRESS connect 127.0.0.1:7777 succ! do write over, del EPOLLOUT recv server: [hello lars!] msgid: [1] len: [11] ``` ? 現在客戶端已經成功的發送數據給服務端,并且回顯的數據也直接被客戶端解析,我們的框架到現在就可以做一個基本的客戶端和服務端的完成了,但是還差很多,接下來我們繼續優化。 --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看