## 4) 事件觸發event_loop
? 接下來我們要嘗試添加多路IO的處理機制,當然linux的平臺下, 最優的選擇就是使用epoll來做,但是用原生的epoll實際上編程起來擴展性不是很強,那么我們就需要封裝一套IO事件處理機制。
### 4.1 io_event基于IO事件封裝
? 我們首先定義一個IO事件類來包括一個時間需要擁有的基本成員信息.
> lars_reactor/include/event_base.h
```c
#pragma once
/*
* 定義一些IO復用機制或者其他異常觸發機制的事件封裝
*
* */
class event_loop;
//IO事件觸發的回調函數
typedef void io_callback(event_loop *loop, int fd, void *args);
/*
* 封裝一次IO觸發實現
* */
struct io_event
{
io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}
int mask; //EPOLLIN EPOLLOUT
io_callback *read_callback; //EPOLLIN事件 觸發的回調
io_callback *write_callback;//EPOLLOUT事件 觸發的回調
void *rcb_args; //read_callback的回調函數參數
void *wcb_args; //write_callback的回調函數參數
};
```
? 一個`io_event`對象應該包含 一個epoll的事件標識`EPOLLIN/EPOLLOUT`,和對應事件的處理函數`read_callback`,`write_callback`。他們都應該是`io_callback`類型。然后對應的函數形參。
### 4.2 event_loop事件循環處理機制
? 接下來我們就要通過event_loop類來實現io_event的基本增刪操作,放在原生的`epoll`堆中。
> lars_reactor/include/event_loop.h
```h
#pragma once
/*
*
* event_loop事件處理機制
*
* */
#include <sys/epoll.h>
#include <ext/hash_map>
#include <ext/hash_set>
#include "event_base.h"
#define MAXEVENTS 10
// map: fd->io_event
typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
//定義指向上面map類型的迭代器
typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
//全部正在監聽的fd集合
typedef __gnu_cxx::hash_set<int> listen_fd_set;
class event_loop
{
public:
//構造,初始化epoll堆
event_loop();
//阻塞循環處理事件
void event_process();
//添加一個io事件到loop中
void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
//刪除一個io事件從loop中
void del_io_event(int fd);
//刪除一個io事件的EPOLLIN/EPOLLOUT
void del_io_event(int fd, int mask);
private:
int _epfd; //epoll fd
//當前event_loop 監控的fd和對應事件的關系
io_event_map _io_evs;
//當前event_loop 一共哪些fd在監聽
listen_fd_set listen_fds;
//一次性最大處理的事件
struct epoll_event _fired_evs[MAXEVENTS];
};
```
**屬性**:
`_epfd`:是epoll原生堆的fd。
`_io_evs`:是一個hash_map對象,主要是方便我們管理`fd`<—>`io_event`的對應關系,方便我們來查找和處理。
`_listen_fds`:記錄目前一共有多少個fd正在本我們的`event_loop`機制所監控.
`_fried_evs`:已經通過epoll_wait返回的被激活需要上層處理的fd集合.
**方法**:
`event_loop()`:構造函數,主要初始化epoll.
`event_process()`:永久阻塞,等待觸發的事件,去調用對應的函數callback方法。
`add_io_event()`:綁定一個fd和一個`io_event`的關系,并添加對應的事件到`event_loop`中。
`del_io_event()`:從`event_loop`刪除該事件。
? 具體實現方法如下:
> lars_reactor/src/event_loop.cpp
```c
#include "event_loop.h"
#include <assert.h>
//構造,初始化epoll堆
event_loop::event_loop()
{
//flag=0 等價于epll_craete
_epfd = epoll_create1(0);
if (_epfd == -1) {
fprintf(stderr, "epoll_create error\n");
exit(1);
}
}
//阻塞循環處理事件
void event_loop::event_process()
{
while (true) {
io_event_map_it ev_it;
int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
for (int i = 0; i < nfds; i++) {
//通過觸發的fd找到對應的綁定事件
ev_it = _io_evs.find(_fired_evs[i].data.fd);
assert(ev_it != _io_evs.end());
io_event *ev = &(ev_it->second);
if (_fired_evs[i].events & EPOLLIN) {
//讀事件,掉讀回調函數
void *args = ev->rcb_args;
ev->read_callback(this, _fired_evs[i].data.fd, args);
}
else if (_fired_evs[i].events & EPOLLOUT) {
//寫事件,掉寫回調函數
void *args = ev->wcb_args;
ev->write_callback(this, _fired_evs[i].data.fd, args);
}
else if (_fired_evs[i].events &(EPOLLHUP|EPOLLERR)) {
//水平觸發未處理,可能會出現HUP事件,正常處理讀寫,沒有則清空
if (ev->read_callback != NULL) {
void *args = ev->rcb_args;
ev->read_callback(this, _fired_evs[i].data.fd, args);
}
else if (ev->write_callback != NULL) {
void *args = ev->wcb_args;
ev->write_callback(this, _fired_evs[i].data.fd, args);
}
else {
//刪除
fprintf(stderr, "fd %d get error, delete it from epoll\n", _fired_evs[i].data.fd);
this->del_io_event(_fired_evs[i].data.fd);
}
}
}
}
}
/*
* 這里我們處理的事件機制是
* 如果EPOLLIN 在mask中, EPOLLOUT就不允許在mask中
* 如果EPOLLOUT 在mask中, EPOLLIN就不允許在mask中
* 如果想注冊EPOLLIN|EPOLLOUT的事件, 那么就調用add_io_event() 方法兩次來注冊。
* */
//添加一個io事件到loop中
void event_loop::add_io_event(int fd, io_callback *proc, int mask, void *args)
{
int final_mask;
int op;
//1 找到當前fd是否已經有事件
io_event_map_it it = _io_evs.find(fd);
if (it == _io_evs.end()) {
//2 如果沒有操作動作就是ADD
//沒有找到
final_mask = mask;
op = EPOLL_CTL_ADD;
}
else {
//3 如果有操作董酒是MOD
//添加事件標識位
final_mask = it->second.mask | mask;
op = EPOLL_CTL_MOD;
}
//4 注冊回調函數
if (mask & EPOLLIN) {
//讀事件回調函數注冊
_io_evs[fd].read_callback = proc;
_io_evs[fd].rcb_args = args;
}
else if (mask & EPOLLOUT) {
_io_evs[fd].write_callback = proc;
_io_evs[fd].wcb_args = args;
}
//5 epoll_ctl添加到epoll堆里
_io_evs[fd].mask = final_mask;
//創建原生epoll事件
struct epoll_event event;
event.events = final_mask;
event.data.fd = fd;
if (epoll_ctl(_epfd, op, fd, &event) == -1) {
fprintf(stderr, "epoll ctl %d error\n", fd);
return;
}
//6 將fd添加到監聽集合中
listen_fds.insert(fd);
}
//刪除一個io事件從loop中
void event_loop::del_io_event(int fd)
{
//將事件從_io_evs刪除
_io_evs.erase(fd);
//將fd從監聽集合中刪除
listen_fds.erase(fd);
//將fd從epoll堆刪除
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
}
//刪除一個io事件的EPOLLIN/EPOLLOUT
void event_loop::del_io_event(int fd, int mask)
{
//如果沒有該事件,直接返回
io_event_map_it it = _io_evs.find(fd);
if (it == _io_evs.end()) {
return ;
}
int &o_mask = it->second.mask;
//修正mask
o_mask = o_mask & (~mask);
if (o_mask == 0) {
//如果修正之后 mask為0,則刪除
this->del_io_event(fd);
}
else {
//如果修正之后,mask非0,則修改
struct epoll_event event;
event.events = o_mask;
event.data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event);
}
}
```
? 這里`del_io_event`提供兩個重載,一個是直接刪除事件,一個是修正事件。
### 4.3 Reactor集成event_loop機制
? 好了,那么接下來,就讓讓Lars Reactor框架集成`event_loop`機制。
首先簡單修正一個`tcp_server.cpp`文件,對之前的`do_accept()`的調度時機做一下修正。
```
1. 在`tcp_server`成員新增`event_loop`成員。
```
> lars_reactor/include/tcp_server.h
```h
#pragma once
#include <netinet/in.h>
#include "event_loop.h"
class tcp_server
{
public:
//server的構造函數
tcp_server(event_loop* loop, const char *ip, uint16_t port);
//開始提供創建鏈接服務
void do_accept();
//鏈接對象釋放的析構
~tcp_server();
private:
int _sockfd; //套接字
struct sockaddr_in _connaddr; //客戶端鏈接地址
socklen_t _addrlen; //客戶端鏈接地址長度
// ============= 新增 ======================
//event_loop epoll事件機制
event_loop* _loop;
// ============= 新增 ======================
};
```
2. 構造函數在創建完listen fd之后,添加accept事件。
> lars_reactor/src/tcp_server.cpp
```c
//listen fd 客戶端有新鏈接請求過來的回調函數
void accept_callback(event_loop *loop, int fd, void *args)
{
tcp_server *server = (tcp_server*)args;
server->do_accept();
}
//server的構造函數
tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
{
bzero(&_connaddr, sizeof(_connaddr));
//忽略一些信號 SIGHUP, SIGPIPE
//SIGPIPE:如果客戶端關閉,服務端再次write就會產生
//SIGHUP:如果terminal關閉,會給當前進程發送該信號
if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
fprintf(stderr, "signal ignore SIGHUP\n");
}
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
fprintf(stderr, "signal ignore SIGPIPE\n");
}
//1. 創建socket
_sockfd = socket(AF_INET, SOCK_STREAM /*| SOCK_NONBLOCK*/ | SOCK_CLOEXEC, IPPROTO_TCP);
if (_sockfd == -1) {
fprintf(stderr, "tcp_server::socket()\n");
exit(1);
}
//2 初始化地址
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
inet_aton(ip, &server_addr.sin_addr);
server_addr.sin_port = htons(port);
//2-1可以多次監聽,設置REUSE屬性
int op = 1;
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)) < 0) {
fprintf(stderr, "setsocketopt SO_REUSEADDR\n");
}
//3 綁定端口
if (bind(_sockfd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
fprintf(stderr, "bind error\n");
exit(1);
}
//4 監聽ip端口
if (listen(_sockfd, 500) == -1) {
fprintf(stderr, "listen error\n");
exit(1);
}
// ============= 新增 ======================
//5 將_sockfd添加到event_loop中
_loop = loop;
//6 注冊_socket讀事件-->accept處理
_loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
// ============= 新增 ======================
}
```
3. 修改do_accept()方法
> lars_reactor/src/tcp_server.cpp
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include "tcp_server.h"
#include "reactor_buf.h"
//臨時的收發消息
struct message{
char data[m4K];
char len;
};
struct message msg;
void server_rd_callback(event_loop *loop, int fd, void *args);
void server_wt_callback(event_loop *loop, int fd, void *args);
//...省略其他代碼
//...省略其他代碼
//server read_callback
void server_rd_callback(event_loop *loop, int fd, void *args)
{
int ret = 0;
struct message *msg = (struct message*)args;
input_buf ibuf;
ret = ibuf.read_data(fd);
if (ret == -1) {
fprintf(stderr, "ibuf read_data error\n");
//刪除事件
loop->del_io_event(fd);
//對端關閉
close(fd);
return;
}
if (ret == 0) {
//刪除事件
loop->del_io_event(fd);
//對端關閉
close(fd);
return ;
}
printf("ibuf.length() = %d\n", ibuf.length());
//將讀到的數據放在msg中
msg->len = ibuf.length();
bzero(msg->data, msg->len);
memcpy(msg->data, ibuf.data(), msg->len);
ibuf.pop(msg->len);
ibuf.adjust();
printf("recv data = %s\n", msg->data);
//刪除讀事件,添加寫事件
loop->del_io_event(fd, EPOLLIN);
loop->add_io_event(fd, server_wt_callback, EPOLLOUT, msg);
}
//server write_callback
void server_wt_callback(event_loop *loop, int fd, void *args)
{
struct message *msg = (struct message*)args;
output_buf obuf;
//回顯數據
obuf.send_data(msg->data, msg->len);
while(obuf.length()) {
int write_ret = obuf.write2fd(fd);
if (write_ret == -1) {
fprintf(stderr, "write connfd error\n");
return;
}
else if(write_ret == 0) {
//不是錯誤,表示此時不可寫
break;
}
}
//刪除寫事件,添加讀事件
loop->del_io_event(fd, EPOLLOUT);
loop->add_io_event(fd, server_rd_callback, EPOLLIN, msg);
}
//...省略其他代碼
//...省略其他代碼
//開始提供創建鏈接服務
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept與客戶端創建鏈接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立鏈接過多,資源不夠
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error");
exit(1);
}
}
else {
//accept succ!
// ============= 新增 ======================
this->_loop->add_io_event(connfd, server_rd_callback, EPOLLIN, &msg);
break;
// ============= 新增 ======================
}
}
}
//...省略其他代碼
//...省略其他代碼
```
### 4.4 完成Lars Reactor V0.3開發
? 我們將lars_reactor/example/lars_reactor_0.2的代碼復制一份到 lars_reactor/example/lars_reactor_0.3中。
> lars_reactor/example/lars_reactor_0.3/lars_reactor.cpp
```c
#include "tcp_server.h"
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
loop.event_process();
return 0;
}
```
編譯。
啟動服務器
```bash
$ ./lars_reactor
```
分別啟動2個客戶端
client1
```bash
$ nc 127.0.0.1 7777
hello Iam client1
hello Iam client1 回顯
```
client2
```bash
$ nc 127.0.0.1 7777
hello Iam client2
hello Iam client2 回顯
```
服務端打印
```bash
$ ./lars_reactor
begin accept
ibuf.length() = 18
recv data = hello Iam client1
begin accept
ibuf.length() = 18
recv data = hello Iam client2
```
目前我們已經成功將`event_loop`機制加入到reactor中了,接下來繼續添加功能。
---
### 關于作者:
作者:`Aceld(劉丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld)

>**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
- 一、Lars系統概述
- 第1章-概述
- 第2章-項目目錄構建
- 二、Reactor模型服務器框架
- 第1章-項目結構與V0.1雛形
- 第2章-內存管理與Buffer封裝
- 第3章-事件觸發EventLoop
- 第4章-鏈接與消息封裝
- 第5章-Client客戶端模型
- 第6章-連接管理及限制
- 第7章-消息業務路由分發機制
- 第8章-鏈接創建/銷毀Hook機制
- 第9章-消息任務隊列與線程池
- 第10章-配置文件讀寫功能
- 第11章-udp服務與客戶端
- 第12章-數據傳輸協議protocol buffer
- 第13章-QPS性能測試
- 第14章-異步消息任務機制
- 第15章-鏈接屬性設置功能
- 三、Lars系統之DNSService
- 第1章-Lars-dns簡介
- 第2章-數據庫創建
- 第3章-項目目錄結構及環境構建
- 第4章-Route結構的定義
- 第5章-獲取Route信息
- 第6章-Route訂閱模式
- 第7章-Backend Thread實時監控
- 四、Lars系統之Report Service
- 第1章-項目概述-數據表及proto3協議定義
- 第2章-獲取report上報數據
- 第3章-存儲線程池及消息隊列
- 五、Lars系統之LoadBalance Agent
- 第1章-項目概述及構建
- 第2章-主模塊業務結構搭建
- 第3章-Report與Dns Client設計與實現
- 第4章-負載均衡模塊基礎設計
- 第5章-負載均衡獲取Host主機信息API
- 第6章-負載均衡上報Host主機信息API
- 第7章-過期窗口清理與過載超時(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-負載均衡獲取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能測試工具
- 第12章- Lars啟動工具腳本