<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ## 4) 事件觸發event_loop ? 接下來我們要嘗試添加多路IO的處理機制,當然linux的平臺下, 最優的選擇就是使用epoll來做,但是用原生的epoll實際上編程起來擴展性不是很強,那么我們就需要封裝一套IO事件處理機制。 ### 4.1 io_event基于IO事件封裝 ? 我們首先定義一個IO事件類來包括一個時間需要擁有的基本成員信息. > lars_reactor/include/event_base.h ```c #pragma once /* * 定義一些IO復用機制或者其他異常觸發機制的事件封裝 * * */ class event_loop; //IO事件觸發的回調函數 typedef void io_callback(event_loop *loop, int fd, void *args); /* * 封裝一次IO觸發實現 * */ struct io_event { io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {} int mask; //EPOLLIN EPOLLOUT io_callback *read_callback; //EPOLLIN事件 觸發的回調 io_callback *write_callback;//EPOLLOUT事件 觸發的回調 void *rcb_args; //read_callback的回調函數參數 void *wcb_args; //write_callback的回調函數參數 }; ``` ? 一個`io_event`對象應該包含 一個epoll的事件標識`EPOLLIN/EPOLLOUT`,和對應事件的處理函數`read_callback`,`write_callback`。他們都應該是`io_callback`類型。然后對應的函數形參。 ### 4.2 event_loop事件循環處理機制 ? 接下來我們就要通過event_loop類來實現io_event的基本增刪操作,放在原生的`epoll`堆中。 > lars_reactor/include/event_loop.h ```h #pragma once /* * * event_loop事件處理機制 * * */ #include <sys/epoll.h> #include <ext/hash_map> #include <ext/hash_set> #include "event_base.h" #define MAXEVENTS 10 // map: fd->io_event typedef __gnu_cxx::hash_map<int, io_event> io_event_map; //定義指向上面map類型的迭代器 typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it; //全部正在監聽的fd集合 typedef __gnu_cxx::hash_set<int> listen_fd_set; class event_loop { public: //構造,初始化epoll堆 event_loop(); //阻塞循環處理事件 void event_process(); //添加一個io事件到loop中 void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL); //刪除一個io事件從loop中 void del_io_event(int fd); //刪除一個io事件的EPOLLIN/EPOLLOUT void del_io_event(int fd, int mask); private: int _epfd; //epoll fd //當前event_loop 監控的fd和對應事件的關系 io_event_map _io_evs; //當前event_loop 一共哪些fd在監聽 listen_fd_set listen_fds; //一次性最大處理的事件 struct epoll_event _fired_evs[MAXEVENTS]; }; ``` **屬性**: `_epfd`:是epoll原生堆的fd。 `_io_evs`:是一個hash_map對象,主要是方便我們管理`fd`<—>`io_event`的對應關系,方便我們來查找和處理。 `_listen_fds`:記錄目前一共有多少個fd正在本我們的`event_loop`機制所監控. `_fried_evs`:已經通過epoll_wait返回的被激活需要上層處理的fd集合. **方法**: `event_loop()`:構造函數,主要初始化epoll. `event_process()`:永久阻塞,等待觸發的事件,去調用對應的函數callback方法。 `add_io_event()`:綁定一個fd和一個`io_event`的關系,并添加對應的事件到`event_loop`中。 `del_io_event()`:從`event_loop`刪除該事件。 ? 具體實現方法如下: > lars_reactor/src/event_loop.cpp ```c #include "event_loop.h" #include <assert.h> //構造,初始化epoll堆 event_loop::event_loop() { //flag=0 等價于epll_craete _epfd = epoll_create1(0); if (_epfd == -1) { fprintf(stderr, "epoll_create error\n"); exit(1); } } //阻塞循環處理事件 void event_loop::event_process() { while (true) { io_event_map_it ev_it; int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10); for (int i = 0; i < nfds; i++) { //通過觸發的fd找到對應的綁定事件 ev_it = _io_evs.find(_fired_evs[i].data.fd); assert(ev_it != _io_evs.end()); io_event *ev = &(ev_it->second); if (_fired_evs[i].events & EPOLLIN) { //讀事件,掉讀回調函數 void *args = ev->rcb_args; ev->read_callback(this, _fired_evs[i].data.fd, args); } else if (_fired_evs[i].events & EPOLLOUT) { //寫事件,掉寫回調函數 void *args = ev->wcb_args; ev->write_callback(this, _fired_evs[i].data.fd, args); } else if (_fired_evs[i].events &(EPOLLHUP|EPOLLERR)) { //水平觸發未處理,可能會出現HUP事件,正常處理讀寫,沒有則清空 if (ev->read_callback != NULL) { void *args = ev->rcb_args; ev->read_callback(this, _fired_evs[i].data.fd, args); } else if (ev->write_callback != NULL) { void *args = ev->wcb_args; ev->write_callback(this, _fired_evs[i].data.fd, args); } else { //刪除 fprintf(stderr, "fd %d get error, delete it from epoll\n", _fired_evs[i].data.fd); this->del_io_event(_fired_evs[i].data.fd); } } } } } /* * 這里我們處理的事件機制是 * 如果EPOLLIN 在mask中, EPOLLOUT就不允許在mask中 * 如果EPOLLOUT 在mask中, EPOLLIN就不允許在mask中 * 如果想注冊EPOLLIN|EPOLLOUT的事件, 那么就調用add_io_event() 方法兩次來注冊。 * */ //添加一個io事件到loop中 void event_loop::add_io_event(int fd, io_callback *proc, int mask, void *args) { int final_mask; int op; //1 找到當前fd是否已經有事件 io_event_map_it it = _io_evs.find(fd); if (it == _io_evs.end()) { //2 如果沒有操作動作就是ADD //沒有找到 final_mask = mask; op = EPOLL_CTL_ADD; } else { //3 如果有操作董酒是MOD //添加事件標識位 final_mask = it->second.mask | mask; op = EPOLL_CTL_MOD; } //4 注冊回調函數 if (mask & EPOLLIN) { //讀事件回調函數注冊 _io_evs[fd].read_callback = proc; _io_evs[fd].rcb_args = args; } else if (mask & EPOLLOUT) { _io_evs[fd].write_callback = proc; _io_evs[fd].wcb_args = args; } //5 epoll_ctl添加到epoll堆里 _io_evs[fd].mask = final_mask; //創建原生epoll事件 struct epoll_event event; event.events = final_mask; event.data.fd = fd; if (epoll_ctl(_epfd, op, fd, &event) == -1) { fprintf(stderr, "epoll ctl %d error\n", fd); return; } //6 將fd添加到監聽集合中 listen_fds.insert(fd); } //刪除一個io事件從loop中 void event_loop::del_io_event(int fd) { //將事件從_io_evs刪除 _io_evs.erase(fd); //將fd從監聽集合中刪除 listen_fds.erase(fd); //將fd從epoll堆刪除 epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL); } //刪除一個io事件的EPOLLIN/EPOLLOUT void event_loop::del_io_event(int fd, int mask) { //如果沒有該事件,直接返回 io_event_map_it it = _io_evs.find(fd); if (it == _io_evs.end()) { return ; } int &o_mask = it->second.mask; //修正mask o_mask = o_mask & (~mask); if (o_mask == 0) { //如果修正之后 mask為0,則刪除 this->del_io_event(fd); } else { //如果修正之后,mask非0,則修改 struct epoll_event event; event.events = o_mask; event.data.fd = fd; epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event); } } ``` ? 這里`del_io_event`提供兩個重載,一個是直接刪除事件,一個是修正事件。 ### 4.3 Reactor集成event_loop機制 ? 好了,那么接下來,就讓讓Lars Reactor框架集成`event_loop`機制。 首先簡單修正一個`tcp_server.cpp`文件,對之前的`do_accept()`的調度時機做一下修正。 ``` 1. 在`tcp_server`成員新增`event_loop`成員。 ``` > lars_reactor/include/tcp_server.h ```h #pragma once #include <netinet/in.h> #include "event_loop.h" class tcp_server { public: //server的構造函數 tcp_server(event_loop* loop, const char *ip, uint16_t port); //開始提供創建鏈接服務 void do_accept(); //鏈接對象釋放的析構 ~tcp_server(); private: int _sockfd; //套接字 struct sockaddr_in _connaddr; //客戶端鏈接地址 socklen_t _addrlen; //客戶端鏈接地址長度 // ============= 新增 ====================== //event_loop epoll事件機制 event_loop* _loop; // ============= 新增 ====================== }; ``` 2. 構造函數在創建完listen fd之后,添加accept事件。 > lars_reactor/src/tcp_server.cpp ```c //listen fd 客戶端有新鏈接請求過來的回調函數 void accept_callback(event_loop *loop, int fd, void *args) { tcp_server *server = (tcp_server*)args; server->do_accept(); } //server的構造函數 tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port) { bzero(&_connaddr, sizeof(_connaddr)); //忽略一些信號 SIGHUP, SIGPIPE //SIGPIPE:如果客戶端關閉,服務端再次write就會產生 //SIGHUP:如果terminal關閉,會給當前進程發送該信號 if (signal(SIGHUP, SIG_IGN) == SIG_ERR) { fprintf(stderr, "signal ignore SIGHUP\n"); } if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) { fprintf(stderr, "signal ignore SIGPIPE\n"); } //1. 創建socket _sockfd = socket(AF_INET, SOCK_STREAM /*| SOCK_NONBLOCK*/ | SOCK_CLOEXEC, IPPROTO_TCP); if (_sockfd == -1) { fprintf(stderr, "tcp_server::socket()\n"); exit(1); } //2 初始化地址 struct sockaddr_in server_addr; bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; inet_aton(ip, &server_addr.sin_addr); server_addr.sin_port = htons(port); //2-1可以多次監聽,設置REUSE屬性 int op = 1; if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)) < 0) { fprintf(stderr, "setsocketopt SO_REUSEADDR\n"); } //3 綁定端口 if (bind(_sockfd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) { fprintf(stderr, "bind error\n"); exit(1); } //4 監聽ip端口 if (listen(_sockfd, 500) == -1) { fprintf(stderr, "listen error\n"); exit(1); } // ============= 新增 ====================== //5 將_sockfd添加到event_loop中 _loop = loop; //6 注冊_socket讀事件-->accept處理 _loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this); // ============= 新增 ====================== } ``` 3. 修改do_accept()方法 > lars_reactor/src/tcp_server.cpp ```c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <unistd.h> #include <signal.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <arpa/inet.h> #include <errno.h> #include "tcp_server.h" #include "reactor_buf.h" //臨時的收發消息 struct message{ char data[m4K]; char len; }; struct message msg; void server_rd_callback(event_loop *loop, int fd, void *args); void server_wt_callback(event_loop *loop, int fd, void *args); //...省略其他代碼 //...省略其他代碼 //server read_callback void server_rd_callback(event_loop *loop, int fd, void *args) { int ret = 0; struct message *msg = (struct message*)args; input_buf ibuf; ret = ibuf.read_data(fd); if (ret == -1) { fprintf(stderr, "ibuf read_data error\n"); //刪除事件 loop->del_io_event(fd); //對端關閉 close(fd); return; } if (ret == 0) { //刪除事件 loop->del_io_event(fd); //對端關閉 close(fd); return ; } printf("ibuf.length() = %d\n", ibuf.length()); //將讀到的數據放在msg中 msg->len = ibuf.length(); bzero(msg->data, msg->len); memcpy(msg->data, ibuf.data(), msg->len); ibuf.pop(msg->len); ibuf.adjust(); printf("recv data = %s\n", msg->data); //刪除讀事件,添加寫事件 loop->del_io_event(fd, EPOLLIN); loop->add_io_event(fd, server_wt_callback, EPOLLOUT, msg); } //server write_callback void server_wt_callback(event_loop *loop, int fd, void *args) { struct message *msg = (struct message*)args; output_buf obuf; //回顯數據 obuf.send_data(msg->data, msg->len); while(obuf.length()) { int write_ret = obuf.write2fd(fd); if (write_ret == -1) { fprintf(stderr, "write connfd error\n"); return; } else if(write_ret == 0) { //不是錯誤,表示此時不可寫 break; } } //刪除寫事件,添加讀事件 loop->del_io_event(fd, EPOLLOUT); loop->add_io_event(fd, server_rd_callback, EPOLLIN, msg); } //...省略其他代碼 //...省略其他代碼 //開始提供創建鏈接服務 void tcp_server::do_accept() { int connfd; while(true) { //accept與客戶端創建鏈接 printf("begin accept\n"); connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen); if (connfd == -1) { if (errno == EINTR) { fprintf(stderr, "accept errno=EINTR\n"); continue; } else if (errno == EMFILE) { //建立鏈接過多,資源不夠 fprintf(stderr, "accept errno=EMFILE\n"); } else if (errno == EAGAIN) { fprintf(stderr, "accept errno=EAGAIN\n"); break; } else { fprintf(stderr, "accept error"); exit(1); } } else { //accept succ! // ============= 新增 ====================== this->_loop->add_io_event(connfd, server_rd_callback, EPOLLIN, &msg); break; // ============= 新增 ====================== } } } //...省略其他代碼 //...省略其他代碼 ``` ### 4.4 完成Lars Reactor V0.3開發 ? 我們將lars_reactor/example/lars_reactor_0.2的代碼復制一份到 lars_reactor/example/lars_reactor_0.3中。 > lars_reactor/example/lars_reactor_0.3/lars_reactor.cpp ```c #include "tcp_server.h" int main() { event_loop loop; tcp_server server(&loop, "127.0.0.1", 7777); loop.event_process(); return 0; } ``` 編譯。 啟動服務器 ```bash $ ./lars_reactor ``` 分別啟動2個客戶端 client1 ```bash $ nc 127.0.0.1 7777 hello Iam client1 hello Iam client1 回顯 ``` client2 ```bash $ nc 127.0.0.1 7777 hello Iam client2 hello Iam client2 回顯 ``` 服務端打印 ```bash $ ./lars_reactor begin accept ibuf.length() = 18 recv data = hello Iam client1 begin accept ibuf.length() = 18 recv data = hello Iam client2 ``` 目前我們已經成功將`event_loop`機制加入到reactor中了,接下來繼續添加功能。 --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看