我們仍然從server.c的main函數開始。
當完成fdevent的初始化之后,第一個需要fdevent處理的事情就是將在初始化網絡的過程中得到的監聽fd(socket函數的返回值)注冊到fdevent系統中。
該動作調用的是network_register_fdevents()函數,定義在network.c文件中:
~~~
/**
* 在fd events系統中注冊監聽socket。
* 這個函數在子進程中被調用。
*/
int network_register_fdevents(server * srv)
{
size_t i;
if (-1 == fdevent_reset(srv->ev)){return -1;}
/*
* register fdevents after reset
*/
/* 遍歷所有的監聽fd并將其注冊到fdevent系統中 */
for (i = 0; i < srv->srv_sockets.used; i++)
{
server_socket *srv_socket = srv->srv_sockets.ptr[i];
fdevent_register(srv->ev, srv_socket->fd, network_server_handle_fdevent, srv_socket);
fdevent_event_add(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN);
}
return 0;
}
~~~
在初始化網絡的過程中,調用socket函數之后,將其返回值(監聽fd)保存在server結構體的srv_sockets成員中,這個成員是一個server_socket_array結構體,而server_socket_array結構體是server_socket結構體的指針數組。
server_socket結構體定義如下:
~~~
typedef struct
{
sock_addr addr; //socket fd對應的的地址。
int fd; //socket()函數返回的監聽fd
int fde_ndx; //和fd相同。
buffer *ssl_pemfile;
buffer *ssl_ca_file;
buffer *ssl_cipher_list;
unsigned short ssl_use_sslv2;
unsigned short use_ipv6; //標記是否使用ipv6
unsigned short is_ssl;
buffer *srv_token;
#ifdef USE_OPENSSL
SSL_CTX *ssl_ctx;
#endif
unsigned short is_proxy_ssl;
} server_socket;
~~~
fdevent_register()函數:
~~~
int fdevent_register(fdevents * ev, int fd, fdevent_handler handler, void *ctx)
{
/* 創建一個fdnode的實例,然后對其成員賦值 */
fdnode *fdn;
fdn = fdnode_init();
fdn->handler = handler;
fdn->fd = fd;
fdn->ctx = ctx;
/* 以fd為下標將實例存入fdevents結構體中的fdarray數組中。
* fd作為下標可以將查詢時間變為 O(1)
*/
ev->fdarray[fd] = fdn;
return 0;
}
~~~
第三個參數是一個函數指針,其定義為
~~~
typedef handler_t(*fdevent_handler) (void *srv, void *ctx, int revents)
~~~
這個函數指針對應XXX_handle_fdevent()類型的函數。比如network.c/ network_server_handle_fdevent() ,connections.c/ connection_handle_fdevent()。
這些函數的作用是在fdevent系統檢測到fd有IO事件發生時,處理這些IO事件。
比如,network_server_handle_fdevent()處理監聽fd(socket函數的返回值)發生的IO事件;
connection_handle_fdevent()處理連接fd(accept函數的返回值)發生的IO事件。
除了上面的兩個函數,還有
~~~
stat_cacahe.c/stat_cache_handle_fdevent(),
mod_cgi.c/cgi_handle_fdevent(),
mod_fastcgi.c/fcgi_handle_fdevent(),
mod_proxy.c/ proxy_handle_fdevent()和
mod_scgi.c/scgi_handle_fdevent()等。
~~~
fdevent_event_add函數:
~~~
int fdevent_event_add(fdevents * ev, int *fde_ndx, int fd, int events)
{
int fde = fde_ndx ? *fde_ndx : -1;
if (ev->event_add)
fde = ev->event_add(ev, fde, fd, events)
if (fde_ndx)
*fde_ndx = fde;
return 0;
}
~~~
函數中調用了fdevents結構體中event_add函數指針對應的函數。
我們看看fdevent_linux_sysepoll.c中的fdevent_linux_sysepoll_event_add()函數,這個函數的地址在初始化的時候被賦給fdevents中的event_add指針:
~~~
static int fdevent_linux_sysepoll_event_add(fdevents * ev, int fde_ndx, int fd, int events)
{
struct epoll_event ep;
int add = 0;
if (fde_ndx == -1) //描述符不在epoll的檢測中,增加之。
add = 1;
memset(&ep, 0, sizeof(ep));
ep.events = 0;
/**
* 在ep中設置需要監聽的IO事件。
* EPOLLIN : 描述符可讀。
* EPOLLOUT :描述符可寫。
* 其他的事件還有:EPOLLRDHUP , EPOLLPRI, EPOLLERR, EPOLLHUP, EPOLLET, EPOLLONESHOT等。
*/
if (events & FDEVENT_IN)
ep.events |= EPOLLIN;
if (events & FDEVENT_OUT)
ep.events |= EPOLLOUT;
/*
* EPOLLERR :描述符發生錯誤。
* EPOLLHUP :描述符被掛斷。通常是連接斷開。
*/
ep.events |= EPOLLERR | EPOLLHUP /* | EPOLLET */ ;
ep.data.ptr = NULL;
ep.data.fd = fd;
/*
* EPOLL_CTL_ADD : 增加描述符fd到ev->epoll_fd中,并關聯ep中的事件到fd上。
* EPOLL_CTL_MOD : 修改fd所關聯的事件。
*/
if (0 != epoll_ctl(ev->epoll_fd, add ?EPOLL_CTL_ADD : EPOLL_CTL_MOD, fd, &ep))
{
fprintf(stderr, "%s.%d: epoll_ctl failed: %s, dying\n",__FILE__,__LINE__, strerror(errno));
SEGFAULT();
return 0;
}
return fd;
}
~~~
函數的第四個參數events是一個整型,每一位對應一種IO事件。
在network_register_fdevents函數中,傳給fdevent_event_add()函數的第四個參數是FDEVENT_IN,這是一個宏:
~~~
/*
* 用于標記文件描述符的狀態
*/
#define FDEVENT_IN BV(0) //文件描述符是否可寫
#define FDEVENT_PRI BV(1) //不阻塞的可讀高優先級的數據 poll
#define FDEVENT_OUT BV(2) //文件描述符是否可讀
#define FDEVENT_ERR BV(3) //文件描述符是否出錯
#define FDEVENT_HUP BV(4) //已掛斷 poll
#define FDEVENT_NVAL BV(5) //描述符不引用一打開文件 poll
~~~
其中BV也是一個宏,定義在settings.c文件中:
~~~
#define BV(x) (1 << x)
~~~
其作用就是將一個整數變量第x位置1,其余置0。
通過這些宏,就可以在一個整數中用不同的位表示不同的事件;
這些宏和epoll.h中的枚舉EPOLL_EVENTS對應。
由于當有連接請求時,監聽fd的表現是有數據可讀,因此,只監聽其FDEVENT_IN事件。注冊之后,監聽fd就開始等待連接請求。
~~~
//啟動事件輪詢。底層使用的是IO多路轉接。
if ((n = fdevent_poll(srv->ev, 1000)) > 0)
{
/* n是事件的數量 */
int revents;
int fd_ndx = -1;
/* 逐個處理已經準備好的請求,直到所有的請求處理結束 */
do
{
fdevent_handler handler;
void *context;
handler_t r;
fd_ndx = fdevent_event_next_fdndx(srv->ev, fd_ndx); //獲得發生了 I/O 事件的文件描述符在 fdarray 中的索引
revents = fdevent_event_get_revent(srv->ev, fd_ndx); //獲得該文件描述符上發生的 I/O 事件類型
fd = fdevent_event_get_fd(srv->ev, fd_ndx); //獲得該文件描述符
handler = fdevent_get_handler(srv->ev, fd); //獲得 I/O 事件處理的回調函數
context = fdevent_get_context(srv->ev, fd); //獲得 I/O 事件處理的上下文環境
/*
* connection_handle_fdevent needs a joblist_append
*/
/**
* 調用回調函數進行I/O事件處理,并傳入相關參數
*/
switch (r = (*handler) (srv, context, revents))
{
case HANDLER_FINISHED:
case HANDLER_GO_ON:
case HANDLER_WAIT_FOR_EVENT:
case HANDLER_WAIT_FOR_FD:
break;
case HANDLER_ERROR:
SEGFAULT();
break;
default:
log_error_write(srv, __FILE__, __LINE__, "d", r);
break;
}
}while (--n > 0);
}
else if (n < 0 && errno != EINTR)
{
log_error_write(srv, __FILE__, __LINE__, "ss","fdevent_poll failed:", strerror(errno));
}
~~~
首先調用fdevent_poll()函數等待IO事件發生,如果沒有IO事件,程序會阻塞在這個函數中。
如果有fd發生了IO事件,則從fdevent_poll函數中返回,返回值是發生了IO事件的fd的數量。
fdevent_poll()函數調用fdevents結構體中的poll,最終調用的是epoll_wait()函數。epoll_wait()函數將發生了IO事件的fd對應的epoll_evet結構體實例存儲在fdevents結構體的epoll_events數組成員中。
fdevent_event_next_fdndx函數返回epoll_events數組中下一個元素的下標,fdevent_event_get_revent函數調用ev->event_get_revent()獲得fd發生的IO事件,最終調用的是:
~~~
static int fdevent_linux_sysepoll_event_get_revent(fdevents * ev, size_t ndx)
{
int events = 0, e;
e = ev->epoll_events[ndx].events;
if (e & EPOLLIN)
events |= FDEVENT_IN;
if (e & EPOLLOUT)
events |= FDEVENT_OUT;
if (e & EPOLLERR)
events |= FDEVENT_ERR;
if (e & EPOLLHUP)
events |= FDEVENT_HUP;
if (e & EPOLLPRI) //有緊急數據到達(帶外數據)
events |= FDEVENT_PRI;
return e;
}
~~~
這個函數就做了一個轉換。
最后,在switch語句中調用fd對應的handler函數處理事件。對于監聽fd,調用的函數為:
~~~
/**
* 這個是監聽socket的IO事件處理函數。
* 主要工作是建立和客戶端的socket連接。只處理讀事件。在處理過程中,
* 每次調用這個函數都試圖一次建立100個連接,這樣可以提高效率。
*/
handler_t network_server_handle_fdevent(void *s, void *context, int revents)
{
server *srv = (server *) s;
server_socket *srv_socket = (server_socket *) context;
connection *con;
int loops = 0;
UNUSED(context);
/*
* 只有fd事件是FDEVENT_IN時,才進行事件處理。
*/
if (revents != FDEVENT_IN)
{
log_error_write(srv, __FILE__, __LINE__, "sdd", "strange event for server socket", srv_socket->fd, revents);
return HANDLER_ERROR;
}
/*
* accept()s at most 100 connections directly we jump out after 100 to give the waiting connections a chance
*一次監聽fd的IO事件,表示有客戶端請求連接,對其的處理就是建立連接。建立連接后并不急著退出函數,
* 而是繼續嘗試建立新連接,直到已經建立了100次連接。這樣可以提高效率。
*/
for (loops = 0; loops < 100 && NULL != (con =connection_accept(srv, srv_socket)); loops++)
{
handler_t r;
//根據當前狀態,改變con的狀態機,并做出相應的動作。
connection_state_machine(srv, con);
switch (r = plugins_call_handle_joblist(srv, con))
{
case HANDLER_FINISHED:
case HANDLER_GO_ON:
break;
default:
log_error_write(srv, __FILE__, __LINE__, "d", r);
break;
}
}
return HANDLER_GO_ON;
}
~~~
監聽fd有IO事件,表示有客戶端請求連接,對其的處理就是建立連接。在這個函數中,建立連接后并不急著退出,而是繼續【嘗試】建立新連接,直到已經建立了100次連接。這樣可以提高效率。
connection_accept()函數接受連接請求并返回一個connection結構體指針。接著對這個連接啟動狀態機。然后把連接加到作業隊列中。
注意,在將監聽fd注冊到fdevent系統時,它被設置成了非阻塞的,因此,如果在調用accept()函數時沒有連接請求,那么accept()函數會直接出錯返回,這樣connection_accept就返回一個NULL,退出了for循環。因此,這里所說的建立100次連接只是”嘗試“而已,并不會在沒有連接的時候阻塞。
至此,fdevent系統對于監聽fd的處理就完成了一個循環。
處理完IO事件以后fd接著在epoll中等待下一次事件。