#### ngx_http_write_filter_module分析[](http://tengine.taobao.org/book/chapter_12.html#ngx-http-write-filter-module "永久鏈接至標題")
ngx_http_write_filter_module是最后一個body filter,可以看到它的注冊函數的特殊性:
[](http:// "點擊提交Issue,反饋你的意見...")
static ngx_int_t
ngx_http_write_filter_init(ngx_conf_t *cf)
{
ngx_http_top_body_filter = ngx_http_write_filter;
return NGX_OK;
}
ngx_http_write_filter_module是第一個注冊body filter的模塊,于是它也是最后一個執行的body filter模塊。
直接來看ngx_http_write_filter,下面的代碼中去掉了一些調試代碼:
[](http:// "點擊提交Issue,反饋你的意見...")
ngx_int_t
ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
off_t size, sent, nsent, limit;
ngx_uint_t last, flush;
ngx_msec_t delay;
ngx_chain_t *cl, *ln, **ll, *chain;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
c = r->connection;
if (c->error) {
return NGX_ERROR;
}
size = 0;
flush = 0;
last = 0;
ll = &r->out;
/* find the size, the flush point and the last link of the saved chain */
for (cl = r->out; cl; cl = cl->next) {
ll = &cl->next;
#if 1
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
return NGX_ERROR;
}
#endif
size += ngx_buf_size(cl->buf);
if (cl->buf->flush || cl->buf->recycled) {
flush = 1;
}
if (cl->buf->last_buf) {
last = 1;
}
}
/* add the new chain to the existent one */
for (ln = in; ln; ln = ln->next) {
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = ln->buf;
*ll = cl;
ll = &cl->next;
#if 1
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
return NGX_ERROR;
}
#endif
size += ngx_buf_size(cl->buf);
if (cl->buf->flush || cl->buf->recycled) {
flush = 1;
}
if (cl->buf->last_buf) {
last = 1;
}
}
*ll = NULL;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
/*
* avoid the output if there are no last buf, no flush point,
* there are the incoming bufs and the size of all bufs
* is smaller than "postpone_output" directive
*/
if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
return NGX_OK;
}
/* 如果請求由于被限速而必須延遲發送時,設置一個標識后退出 */
if (c->write->delayed) {
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
/* 如果buffer總大小為0,而且當前連接之前沒有由于底層發送接口的原因延遲,
則檢查是否有特殊標記 */
if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) {
/* last_buf標記,表示請求體已經發送結束 */
if (last) {
r->out = NULL;
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
return NGX_OK;
}
/* flush生效,而且又沒有實際數據,則清空當前的未發送隊列 */
if (flush) {
do {
r->out = r->out->next;
} while (r->out);
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
return NGX_OK;
}
return NGX_ERROR;
}
/* 請求有速率限制,則計算當前可以發送的大小 */
if (r->limit_rate) {
limit = r->limit_rate * (ngx_time() - r->start_sec + 1)
- (c->sent - clcf->limit_rate_after);
if (limit <= 0) {
c->write->delayed = 1;
ngx_add_timer(c->write,
(ngx_msec_t) (- limit * 1000 / r->limit_rate + 1));
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
if (clcf->sendfile_max_chunk
&& (off_t) clcf->sendfile_max_chunk < limit)
{
limit = clcf->sendfile_max_chunk;
}
} else {
limit = clcf->sendfile_max_chunk;
}
sent = c->sent;
/* 發送數據 */
chain = c->send_chain(c, r->out, limit);
if (chain == NGX_CHAIN_ERROR) {
c->error = 1;
return NGX_ERROR;
}
/* 更新限速相關的信息 */
if (r->limit_rate) {
nsent = c->sent;
if (clcf->limit_rate_after) {
sent -= clcf->limit_rate_after;
if (sent < 0) {
sent = 0;
}
nsent -= clcf->limit_rate_after;
if (nsent < 0) {
nsent = 0;
}
}
delay = (ngx_msec_t) ((nsent - sent) * 1000 / r->limit_rate);
if (delay > 0) {
limit = 0;
c->write->delayed = 1;
ngx_add_timer(c->write, delay);
}
}
if (limit
&& c->write->ready
&& c->sent - sent >= limit - (off_t) (2 * ngx_pagesize))
{
c->write->delayed = 1;
ngx_add_timer(c->write, 1);
}
/* 更新輸出鏈,釋放已經發送的節點 */
for (cl = r->out; cl && cl != chain; /* void */) {
ln = cl;
cl = cl->next;
ngx_free_chain(r->pool, ln);
}
r->out = chain;
/* 如果數據未發送完畢,則設置一個標記 */
if (chain) {
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
/* 如果由于底層發送接口導致數據未發送完全,且當前請求沒有其他數據需要發送,
此時要返回NGX_AGAIN,表示還有數據未發送 */
if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) {
return NGX_AGAIN;
}
return NGX_OK;
}
Nginx將待發送的chain鏈表保存在r->out,上面的函數先檢查之前未發送完的鏈表中是否有flush,recycled以及last_buf標識,并計算所有buffer的大小,接著對新輸入的chain鏈表做同樣的事情,并將新鏈表加到r->out的隊尾。
如果沒有輸出鏈表中沒有被標識為最后一塊buffer的節點,而且沒有需要flush或者急著回收的buffer,并且當前隊列中buffer總大小不夠postpone_output指令設置的大小(默認為1460字節)時,函數會直接返回。
ngx_http_write_filter會調用c->send_chain往客戶端發送數據,c->send_chain的取值在不同操作系統,編譯選項以及協議下(https下用的是ngx_ssl_send_chain)會取不同的函數,典型的linux操作系統下,它的取值為ngx_linux_sendfile_chain,也就是最終會調用這個函數來發送數據。它的函數原型為:
[](http:// "點擊提交Issue,反饋你的意見...")
ngx_chain_t *
ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
第一個參數是當前的連接,第二個參數是所需要發送的chain,第三個參數是所能發送的最大值。
首先看一下這個函數定義的一些重要局部變量:
send表示將要發送的buf已經已經發送的大小;
sent表示已經發送的buf的大小;
prev_send表示上一次發送的大小,也就是已經發送的buf的大小;
fprev 和prev-send類似,只不過是file類型的;
complete表示是否buf被完全發送了,也就是sent是否等于send - prev_send;
header表示需要是用writev來發送的buf,也就是only in memory的buf;
struct iovec?[*](http://tengine.taobao.org/book/chapter_12.html#id16)iov, headers[NGX_HEADERS] 這個主要是用于sendfile和writev的參數,這里注意上面header數組保存的就是iovec。
下面看函數開頭的一些初始化代碼:
[](http:// "點擊提交Issue,反饋你的意見...")
wev = c->write;
if (!wev->ready) {
return in;
}
/* the maximum limit size is 2G-1 - the page size */
if (limit == 0 || limit > (off_t) (NGX_SENDFILE_LIMIT - ngx_pagesize)) {
limit = NGX_SENDFILE_LIMIT - ngx_pagesize;
}
send = 0;
/* 設置header,也就是in memory的數組 */
header.elts = headers;
header.size = sizeof(struct iovec);
header.nalloc = NGX_HEADERS;
header.pool = c->pool;
下面這段代碼就是處理in memory的部分,然后將buf放入對應的iovec數組,處理核心思想就是合并內存連續并相鄰的buf(不管是in memory還是in file):
[](http:// "點擊提交Issue,反饋你的意見...")
for (cl = in; cl && send < limit; cl = cl->next) {
if (ngx_buf_special(cl->buf)) {
continue;
}
/* 如果既不在內存中,又不在文件中,則返回錯誤 */
if (!ngx_buf_in_memory(cl->buf) && !cl->buf->in_file) {
return NGX_CHAIN_ERROR;
}
/* 如果不只是在buf中,這是因為有時in file的buf可能需要內存中也有拷貝,
如果一個buf同時in memoey和in file的話,Nginx會把它當做in file來處理 */
if (!ngx_buf_in_memory_only(cl->buf)) {
break;
}
/* 得到buf的大小 */
size = cl->buf->last - cl->buf->pos;
/* 大于limit的話修改為size */
if (send + size > limit) {
size = limit - send;
}
/* 如果prev等于pos,則說明當前的buf的數據和前一個buf的數據是連續的 */
if (prev == cl->buf->pos) {
iov->iov_len += (size_t) size;
} else {
if (header.nelts >= IOV_MAX) {
break;
}
/* 否則說明是不同的buf,因此增加一個iovc */
iov = ngx_array_push(&header);
if (iov == NULL) {
return NGX_CHAIN_ERROR;
}
iov->iov_base = (void *) cl->buf->pos;
iov->iov_len = (size_t) size;
}
/* 這里可以看到prev保存了當前buf的結尾 */
prev = cl->buf->pos + (size_t) size;
/* 更新發送的大小 */
send += size;
}
然后是in file的處理,這里比較核心的一個判斷就是fprev == cl->buf->file_pos,和上面的in memory類似,fprev保存的就是上一次處理的buf的尾部。這里如果這兩個相等,那就說明當前的兩個buf是連續的(文件連續):
[](http:// "點擊提交Issue,反饋你的意見...")
/* 如果header的大小不為0則說明前面有需要發送的buf,
并且數據大小已經超過限制則跳過in file處理 */
if (header.nelts == 0 && cl && cl->buf->in_file && send < limit) {
/* 得到file
file = cl->buf;
/* 開始合并 */
do {
/* 得到大小 */
size = cl->buf->file_last - cl->buf->file_pos;
/* 如果太大則進行對齊處理 */
if (send + size > limit) {
size = limit - send;
aligned = (cl->buf->file_pos + size + ngx_pagesize - 1)
& ~((off_t) ngx_pagesize - 1);
if (aligned <= cl->buf->file_last) {
size = aligned - cl->buf->file_pos;
}
}
/* 設置file_size */
file_size += (size_t) size;
/* 設置需要發送的大小 */
send += size;
/* 和上面的in memory處理一樣就是保存這次的last */
fprev = cl->buf->file_pos + size;
cl = cl->next;
} while (cl
&& cl->buf->in_file
&& send < limit
&& file->file->fd == cl->buf->file->fd
&& fprev == cl->buf->file_pos);
}
然后就是發送部分,這里in file使用sendfile,in memory使用writev。處理邏輯比較簡單,就是發送后判斷發送成功的大小
[](http:// "點擊提交Issue,反饋你的意見...")
if (file) {
#if 1
if (file_size == 0) {
ngx_debug_point();
return NGX_CHAIN_ERROR;
}
#endif
#if (NGX_HAVE_SENDFILE64)
offset = file->file_pos;
#else
offset = (int32_t) file->file_pos;
#endif
/* 數據在文件中則調用sendfile發送數據 */
rc = sendfile(c->fd, file->file->fd, &offset, file_size);
...
/* 得到發送成功的字節數 */
sent = rc > 0 ? rc : 0;
} else {
/* 數據在內存中則調用writev發送數據 */
rc = writev(c->fd, header.elts, header.nelts);
...
/* 得到發送成功的字節數 */
sent = rc > 0 ? rc : 0;
}
接下來就是需要根據發送成功的字節數來更新chain:
[](http:// "點擊提交Issue,反饋你的意見...")
/* 如果send - prev_send == sent則說明該發送的都發完了 */
if (send - prev_send == sent) {
complete = 1;
}
/* 更新congnect的sent域 */
c->sent += sent;
/* 開始重新遍歷chain,這里是為了防止沒有發送完全的情況,
此時我們就需要切割buf了 */
for (cl = in; cl; cl = cl->next) {
if (ngx_buf_special(cl->buf)) {
continue;
}
if (sent == 0) {
break;
}
/* 得到buf size */
size = ngx_buf_size(cl->buf);
/* 如果大于當前的size,則說明這個buf的數據已經被完全發送完畢了,
因此更新它的域 */
if (sent >= size){
/* 更新sent域 */
sent -= size;
/* 如果在內存則更新pos */
if (ngx_buf_in_memory(cl->buf)) {
cl->buf->pos = cl->buf->last;
}
/* 如果在file中則更顯file_pos */
if (cl->buf->in_file) {
cl->buf->file_pos = cl->buf->file_last;
}
continue;
}
/* 到這里說明當前的buf只有一部分被發送出去了,因此只需要修改指針。
以便于下次發送 */
if (ngx_buf_in_memory(cl->buf)) {
cl->buf->pos += (size_t) sent;
}
/* 同上 */
if (cl->buf->in_file) {
cl->buf->file_pos += sent;
}
break;
}
最后一部分是一些是否退出循環的判斷。這里要注意,Nginx中如果發送未完全的話,將會直接返回,返回的就是沒有發送完畢的chain,它的buf也已經被更新。然后Nginx返回去處理其他的事情,等待可寫之后再次發送未發送完的數據:
[](http:// "點擊提交Issue,反饋你的意見...")
if (eintr) {
continue;
}
/* 如果未完成,則設置wev->ready為0后返回 */
if (!complete) {
wev->ready = 0;
return cl;
}
/* 發送數據超過限制,或沒有數據了 */
if (send >= limit || cl == NULL) {
return cl;
}
/* 更新in,也就是開始處理下一個chain */
in = cl;
- 上篇:nginx模塊開發篇
- nginx平臺初探
- 初探nginx架構
- nginx基礎概念
- connection
- request
- keepalive
- pipe
- lingering_close
- 基本數據結構
- ngx_str_t
- ngx_pool_t
- ngx_array_t
- ngx_hash_t
- ngx_hash_wildcard_t
- ngx_hash_combined_t
- ngx_hash_keys_arrays_t
- ngx_chain_t
- ngx_buf_t
- ngx_list_t
- ngx_queue_t
- nginx的配置系統
- 指令參數
- 指令上下文
- nginx的模塊化體系結構
- 模塊的分類
- nginx的請求處理
- handler模塊
- handler模塊簡介
- 模塊的基本結構
- 模塊配置結構
- 模塊配置指令
- 模塊上下文結構
- 模塊的定義
- handler模塊的基本結構
- handler模塊的掛載
- handler的編寫步驟
- 示例: hello handler 模塊
- handler模塊的編譯和使用
- 更多handler模塊示例分析
- http access module
- http static module
- http log module
- 過濾模塊
- 過濾模塊簡介
- 過濾模塊的分析
- upstream模塊
- upstream模塊
- upstream模塊接口
- memcached模塊分析
- 本節回顧
- 負載均衡模塊
- 配置
- 指令
- 鉤子
- 初始化配置
- 初始化請求
- peer.get和peer.free回調函數
- 本節回顧
- 其他模塊
- core模塊
- event模塊
- 模塊開發高級篇
- 變量
- 下篇:nginx原理解析篇
- nginx架構詳解
- nginx的源碼目錄結構
- nginx的configure原理
- 模塊編譯順序
- nginx基礎設施
- 內存池
- nginx的啟動階段
- 概述
- 共有流程
- 配置解析
- nginx的請求處理階段
- 接收請求流程
- http請求格式簡介
- 請求頭讀取
- 解析請求行
- 解析請求頭
- 請求體讀取
- 讀取請求體
- 丟棄請求體
- 多階段處理請求
- 多階段執行鏈
- POST_READ階段
- SERVER_REWRITE階段
- FIND_CONFIG階段
- REWRITE階段
- POST_REWRITE階段
- PREACCESS階段
- ACCESS階段
- POST_ACCESS階段
- TRY_FILES階段
- CONTENT階段
- LOG階段
- Nginx filter
- header filter分析
- body filter分析
- ngx_http_copy_filter_module分析
- ngx_http_write_filter_module分析
- subrequest原理解析
- https請求處理解析
- 附錄A 編碼風格
- 附錄B 常用API
- 附錄C 模塊編譯,調試與測試