### 概述
? ? ? upstream 機制使得 Nginx 成為一個反向代理服務器,Nginx 接收來自下游客戶端的 http 請求,并處理該請求,同時根據該請求向上游服務器發送 tcp 請求報文,上游服務器會根據該請求返回相應地響應報文,Nginx 根據上游服務器的響應報文,決定是否向下游客戶端轉發響應報文。另外 upstream 機制提供了負載均衡的功能,可以將請求負載均衡到集群服務器的某個服務器上面。
### 啟動 upstream
? ? ? 在 Nginx 中調用 ngx_http_upstream_init 方法啟動 upstream 機制,但是在使用 upstream 機制之前必須調用 ngx_http_upstream_create 方法創建 ngx_http_upstream_t 結構體,因為默認情況下 ngx_http_request_t 結構體中的 upstream 成員是指向 NULL,該結構體的具體初始化工作還需由 HTTP 模塊完成。有關 ngx_http_upstream_t 結構體 和ngx_http_upstream_conf_t 結構體的相關說明可參考文章《[Nginx 中 upstream 機制](http://blog.csdn.net/chenhanzhun/article/details/42680343)》。
? ? ?下面是函數 ngx_http_upstream_create 的實現:
~~~
/* 創建 ngx_http_upstream_t 結構體 */
ngx_int_t
ngx_http_upstream_create(ngx_http_request_t *r)
{
ngx_http_upstream_t *u;
u = r->upstream;
/*
* 若已經創建過ngx_http_upstream_t 且定義了cleanup成員,
* 則調用cleanup清理方法將原始結構體清除;
*/
if (u && u->cleanup) {
r->main->count++;
ngx_http_upstream_cleanup(r);
}
/* 從內存池分配ngx_http_upstream_t 結構體空間 */
u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
if (u == NULL) {
return NGX_ERROR;
}
/* 給ngx_http_request_t 結構體成員upstream賦值 */
r->upstream = u;
u->peer.log = r->connection->log;
u->peer.log_error = NGX_ERROR_ERR;
#if (NGX_THREADS)
u->peer.lock = &r->connection->lock;
#endif
#if (NGX_HTTP_CACHE)
r->cache = NULL;
#endif
u->headers_in.content_length_n = -1;
return NGX_OK;
}
~~~
關于 upstream 機制的啟動方法 ngx_http_upstream_init 的執行流程如下:
- 檢查 Nginx 與下游服務器之間連接上的讀事件是否在定時器中,即檢查 timer_set 標志位是否為 1,若該標志位為 1,則把讀事件從定時器中移除;
- 調用 ngx_http_upstream_init_request 方法啟動 upstream 機制;
ngx_http_upstream_init_request 方法執行流程如下所示:
- 檢查 ngx_http_upstream_t 結構體中的 store 標志位是否為 0;檢查 ngx_http_request_t 結構體中的 post_action 標志位是否為0;檢查 ngx_http_upstream_conf_t 結構體中的ignore_client_abort 是否為 0;若上面的標志位都為 0,則設置ngx_http_request_t 請求的讀事件的回調方法為ngx_http_upstream_rd_check_broken_connection;設置寫事件的回調方法為 ngx_http_upstream_wr_check_broken_connection;這兩個方法都會調用 ngx_http_upstream_check_broken_connection方法檢查 Nginx 與下游之間的連接是否正常,若出現錯誤,則終止連接;
- 若不滿足上面的標志位,即至少有一個不為 0 ,調用請求中ngx_http_upstream_t 結構體中某個 HTTP 模塊實現的create_request 方法,構造發往上游服務器的請求;
- 調用 ngx_http_cleanup_add 方法向原始請求的 cleanup 鏈表尾端添加一個回調 handler 方法,該回調方法設置為ngx_http_upstream_cleanup,若當前請求結束時會調用該方法做一些清理工作;
- 調用 ngx_http_upstream_connect 方法向上游服務器發起連接請求;
~~~
/* 初始化啟動upstream機制 */
void
ngx_http_upstream_init(ngx_http_request_t *r)
{
ngx_connection_t *c;
/* 獲取當前請求所對應的連接 */
c = r->connection;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http init upstream, client timer: %d", c->read->timer_set);
#if (NGX_HTTP_SPDY)
if (r->spdy_stream) {
ngx_http_upstream_init_request(r);
return;
}
#endif
/*
* 檢查當前連接上讀事件的timer_set標志位是否為1,若該標志位為1,
* 表示讀事件在定時器機制中,則需要把它從定時器機制中移除;
* 因為在啟動upstream機制后,就不需要對客戶端的讀操作進行超時管理;
*/
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
if (!c->write->active) {
if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT)
== NGX_ERROR)
{
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
}
ngx_http_upstream_init_request(r);
}
~~~
~~~
static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
ngx_str_t *host;
ngx_uint_t i;
ngx_resolver_ctx_t *ctx, temp;
ngx_http_cleanup_t *cln;
ngx_http_upstream_t *u;
ngx_http_core_loc_conf_t *clcf;
ngx_http_upstream_srv_conf_t *uscf, **uscfp;
ngx_http_upstream_main_conf_t *umcf;
if (r->aio) {
return;
}
u = r->upstream;
#if (NGX_HTTP_CACHE)
...
...
#endif
/* 文件緩存標志位 */
u->store = (u->conf->store || u->conf->store_lengths);
/*
* 檢查ngx_http_upstream_t 結構中標志位 store;
* 檢查ngx_http_request_t 結構中標志位 post_action;
* 檢查ngx_http_upstream_conf_t 結構中標志位 ignore_client_abort;
* 若上面這些標志位為1,則表示需要檢查Nginx與下游(即客戶端)之間的TCP連接是否斷開;
*/
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
}
/* 把當前請求包體結構保存在ngx_http_upstream_t 結構的request_bufs鏈表緩沖區中 */
if (r->request_body) {
u->request_bufs = r->request_body->bufs;
}
/* 調用create_request方法構造發往上游服務器的請求 */
if (u->create_request(r) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
/* 獲取ngx_http_upstream_t結構中主動連接結構peer的local本地地址信息 */
u->peer.local = ngx_http_upstream_get_local(r, u->conf->local);
/* 獲取ngx_http_core_module模塊的loc級別的配置項結構 */
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
/* 初始化ngx_http_upstream_t結構中成員output向下游發送響應的方式 */
u->output.alignment = clcf->directio_alignment;
u->output.pool = r->pool;
u->output.bufs.num = 1;
u->output.bufs.size = clcf->client_body_buffer_size;
u->output.output_filter = ngx_chain_writer;
u->output.filter_ctx = &u->writer;
u->writer.pool = r->pool;
/* 添加用于表示上游響應的狀態,例如:錯誤編碼、包體長度等 */
if (r->upstream_states == NULL) {
r->upstream_states = ngx_array_create(r->pool, 1,
sizeof(ngx_http_upstream_state_t));
if (r->upstream_states == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
} else {
u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
}
/*
* 調用ngx_http_cleanup_add方法原始請求的cleanup鏈表尾端添加一個回調handler方法,
* 該handler回調方法設置為ngx_http_upstream_cleanup,若當前請求結束時會調用該方法做一些清理工作;
*/
cln = ngx_http_cleanup_add(r, 0);
if (cln == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
cln->handler = ngx_http_upstream_cleanup;
cln->data = r;
u->cleanup = &cln->handler;
if (u->resolved == NULL) {
/* 若沒有實現u->resolved標志位,則定義上游服務器的配置 */
uscf = u->conf->upstream;
} else {
/*
* 若實現了u->resolved標志位,則解析主機域名,指定上游服務器的地址;
*/
/*
* 若已經指定了上游服務器地址,則不需要解析,
* 直接調用ngx_http_upstream_connection方法向上游服務器發起連接;
* 并return從當前函數返回;
*/
if (u->resolved->sockaddr) {
if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_http_upstream_connect(r, u);
return;
}
/*
* 若還沒指定上游服務器的地址,則需解析主機域名;
* 若成功解析出上游服務器的地址和端口號,
* 則調用ngx_http_upstream_connection方法向上游服務器發起連接;
*/
host = &u->resolved->host;
umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
uscfp = umcf->upstreams.elts;
for (i = 0; i < umcf->upstreams.nelts; i++) {
uscf = uscfp[i];
if (uscf->host.len == host->len
&& ((uscf->port == 0 && u->resolved->no_port)
|| uscf->port == u->resolved->port)
&& ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
{
goto found;
}
}
if (u->resolved->port == 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no port in upstream \"%V\"", host);
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
temp.name = *host;
ctx = ngx_resolve_start(clcf->resolver, &temp);
if (ctx == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
if (ctx == NGX_NO_RESOLVER) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no resolver defined to resolve %V", host);
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
return;
}
ctx->name = *host;
ctx->handler = ngx_http_upstream_resolve_handler;
ctx->data = r;
ctx->timeout = clcf->resolver_timeout;
u->resolved->ctx = ctx;
if (ngx_resolve_name(ctx) != NGX_OK) {
u->resolved->ctx = NULL;
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
found:
if (uscf == NULL) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"no upstream configuration");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
if (uscf->peer.init(r, uscf) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_http_upstream_connect(r, u);
}
static void
ngx_http_upstream_rd_check_broken_connection(ngx_http_request_t *r)
{
ngx_http_upstream_check_broken_connection(r, r->connection->read);
}
static void
ngx_http_upstream_wr_check_broken_connection(ngx_http_request_t *r)
{
ngx_http_upstream_check_broken_connection(r, r->connection->write);
}
~~~
### 建立連接
? ? ? upstream 機制與上游服務器建立 TCP 連接時,采用的是非阻塞模式的套接字,即發起連接請求之后立即返回,不管連接是否建立成功,若沒有立即建立成功,則需在 epoll 事件機制中監聽該套接字。向上游服務器發起連接請求由函數ngx_http_upstream_connect?實現。在分析 ngx_http_upstream_connect 方法之前,首先分析下 ngx_event_connect_peer 方法,因為該方法會被ngx_http_upstream_connect 方法調用。
ngx_event_connect_peer 方法的執行流程如下所示:
- 調用 ngx_socket 方法創建一個 TCP 套接字;
- 調用 ngx_nonblocking 方法設置該 TCP 套接字為非阻塞模式;
- 設置套接字連接接收和發送網絡字符流的方法;
- 設置套接字連接上讀、寫事件方法;
- 將 TCP 套接字以期待 EPOLLIN | EPOLLOUT 事件的方式添加到epoll 事件機制中;
- 調用 connect 方法向服務器發起 TCP 連接請求;
ngx_http_upstream_connect 方法表示向上游服務器發起連接請求,其執行流程如下所示:
- 調用 ngx_event_connect_peer 方法主動向上游服務器發起連接請求,需要注意的是該方法已經將相應的套接字注冊到epoll事件機制來監聽讀、寫事件,該方法返回值為 rc;
- 若 rc = NGX_ERROR,表示發起連接失敗,則調用ngx_http_upstream_finalize_request 方法關閉連接請求,并 return 從當前函數返回;
- 若 rc = NGX_BUSY,表示當前上游服務器處于不活躍狀態,則調用 ngx_http_upstream_next 方法根據傳入的參數嘗試重新發起連接請求,并 return 從當前函數返回;
- 若 rc = NGX_DECLINED,表示當前上游服務器負載過重,則調用 ngx_http_upstream_next 方法嘗試與其他上游服務器建立連接,并 return 從當前函數返回;
- 設置上游連接 ngx_connection_t 結構體的讀事件、寫事件的回調方法 handler 都為 ngx_http_upstream_handler,設置 ngx_http_upstream_t 結構體的寫事件 write_event_handler 的回調為 ngx_http_upstream_send_request_handler,讀事件 read_event_handler 的回調方法為 ngx_http_upstream_process_header;
- 若 rc = NGX_AGAIN,表示當前已經發起連接,但是沒有收到上游服務器的確認應答報文,即上游連接的寫事件不可寫,則需調用 ngx_add_timer 方法將上游連接的寫事件添加到定時器中,管理超時確認應答;
- 若 rc = NGX_OK,表示成功建立連接,則調用 ngx_http_upsream_send_request 方法向上游服務器發送請求;
~~~
/* 向上游服務器建立連接 */
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_time_t *tp;
ngx_connection_t *c;
r->connection->log->action = "connecting to upstream";
if (u->state && u->state->response_sec) {
tp = ngx_timeofday();
u->state->response_sec = tp->sec - u->state->response_sec;
u->state->response_msec = tp->msec - u->state->response_msec;
}
u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
tp = ngx_timeofday();
u->state->response_sec = tp->sec;
u->state->response_msec = tp->msec;
/* 向上游服務器發起連接 */
rc = ngx_event_connect_peer(&u->peer);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream connect: %i", rc);
/* 下面根據rc不同返回值進行分析 */
/* 若建立連接失敗,則關閉當前請求,并return從當前函數返回 */
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->state->peer = u->peer.name;
/*
* 若返回rc = NGX_BUSY,表示當前上游服務器不活躍,
* 則調用ngx_http_upstream_next向上游服務器重新發起連接,
* 實際上,該方法最終還是調用ngx_http_upstream_connect方法;
* 并return從當前函數返回;
*/
if (rc == NGX_BUSY) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
return;
}
/*
* 若返回rc = NGX_DECLINED,表示當前上游服務器負載過重,
* 則調用ngx_http_upstream_next向上游服務器重新發起連接,
* 實際上,該方法最終還是調用ngx_http_upstream_connect方法;
* 并return從當前函數返回;
*/
if (rc == NGX_DECLINED) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */
c = u->peer.connection;
c->data = r;
/* 設置當前連接ngx_connection_t 上讀、寫事件的回調方法 */
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;
/* 設置upstream機制的讀、寫事件的回調方法 */
u->write_event_handler = ngx_http_upstream_send_request_handler;
u->read_event_handler = ngx_http_upstream_process_header;
c->sendfile &= r->connection->sendfile;
u->output.sendfile = c->sendfile;
if (c->pool == NULL) {
/* we need separate pool here to be able to cache SSL connections */
c->pool = ngx_create_pool(128, r->connection->log);
if (c->pool == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
c->log = r->connection->log;
c->pool->log = c->log;
c->read->log = c->log;
c->write->log = c->log;
/* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
u->writer.out = NULL;
u->writer.last = &u->writer.out;
u->writer.connection = c;
u->writer.limit = 0;
/*
* 檢查當前ngx_http_upstream_t 結構的request_sent標志位,
* 若該標志位為1,則表示已經向上游服務器發送請求,即本次發起連接失敗;
* 則調用ngx_http_upstream_reinit方法重新向上游服務器發起連接;
*/
if (u->request_sent) {
if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
if (r->request_body
&& r->request_body->buf
&& r->request_body->temp_file
&& r == r->main)
{
/*
* the r->request_body->buf can be reused for one request only,
* the subrequests should allocate their own temporary bufs
*/
u->output.free = ngx_alloc_chain_link(r->pool);
if (u->output.free == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->output.free->buf = r->request_body->buf;
u->output.free->next = NULL;
u->output.allocated = 1;
r->request_body->buf->pos = r->request_body->buf->start;
r->request_body->buf->last = r->request_body->buf->start;
r->request_body->buf->tag = u->output.tag;
}
u->request_sent = 0;
/*
* 若返回rc = NGX_AGAIN,表示沒有收到上游服務器允許建立連接的應答;
* 由于寫事件已經添加到epoll事件機制中等待可寫事件發生,
* 所有在這里只需將當前連接的寫事件添加到定時器機制中進行超時管理;
* 并return從當前函數返回;
*/
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}
#if (NGX_HTTP_SSL)
if (u->ssl && c->ssl == NULL) {
ngx_http_upstream_ssl_init_connection(r, u, c);
return;
}
#endif
/*
* 若返回值rc = NGX_OK,表示連接成功建立,
* 調用此方法向上游服務器發送請求 */
ngx_http_upstream_send_request(r, u);
}
~~~
### 發送請求
? ? ? 當 Nginx 與上游服務器成功建立連接之后,會調用 ngx_http_upstream_send_request 方法發送請求,若是該方法不能一次性把請求內容發送完成時,則需等待 epoll 事件機制的寫事件發生,若寫事件發生,則會調用寫事件 write_event_handler 的回調方法 ngx_http_upstream_send_request_handler 繼續發送請求,并且有可能會多次調用該寫事件的回調方法, 直到把請求發送完成。
下面是 ngx_http_upstream_send_request 方法的執行流程:
- 檢查 ngx_http_upstream_t 結構體中的標志位 request_sent 是否為 0,若為 0 表示未向上游發送請求。 且此時調用 ngx_http_upstream_test_connect 方法測試是否與上游建立連接,若返回非 NGX_OK, 則需調用 ngx_http_upstream_next 方法試圖與上游建立連接,并return 從當前函數返回;
- 調用 ngx_output_chain 方法向上游發送保存在 request_bufs 鏈表中的請求數據,該方法返回值為 rc,并設置 request_sent 標志位為 1,檢查連接上寫事件 timer_set 標志位是否為1,若為 1 調用ngx_del_timer 方法將寫事件從定時器中移除;
- 若 rc = NGX_ERROR,表示當前連接上出錯,則調用 ngx_http_upstream_next 方法嘗試再次與上游建立連接,并 return 從當前函數返回;
- 若 rc = NGX_AGAIN,并是當前請求數據未完全發送,則需將剩余的請求數據保存在 ngx_http_upstream_t 結構體的 output 成員中,并且調用 ngx_add_timer 方法將當前連接上的寫事件添加到定時器中,調用 ngx_handle_write_event 方法將寫事件注冊到 epoll 事件機制中,等待可寫事件發生,并return 從當前函數返回;
- 若 rc = NGX_OK,表示已經發送全部請求數據,則準備接收來自上游服務器的響應報文;
- 先調用 ngx_add_timer 方法將當前連接的讀事件添加到定時器機制中,檢測接收響應是否超時,檢查當前連接上的讀事件是否準備就緒,即標志位 ready 是否為1,若該標志位為 1,則調用 ngx_http_upstream_process_header 方法開始處理響應頭部,并 return 從當前函數返回;
- 若當前連接上讀事件的標志位 ready 為0,表示暫時無可讀數據,則需等待讀事件再次被觸發,由于原始讀事件的回調方法為 ngx_http_upstream_process_header,所有無需重新設置。由于請求已經全部發送,防止寫事件的回調方法 ngx_http_upstream_send_request_handler 再次被觸發,因此需要重新設置寫事件的回調方法為 ngx_http_upstream_dummy_handler,該方法實際上不執行任何操作,同時調用 ngx_handle_write_event 方法將寫事件注冊到 epoll 事件機制中;
~~~
/* 向上游服務器發送請求 */
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_connection_t *c;
/* 獲取當前連接 */
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream send request");
/*
* 若標志位request_sent為0,表示還未發送請求;
* 且ngx_http_upstream_test_connect方法返回非NGX_OK,標志當前還未與上游服務器成功建立連接;
* 則需要調用ngx_http_upstream_next方法嘗試與下一個上游服務器建立連接;
* 并return從當前函數返回;
*/
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
c->log->action = "sending request to upstream";
/*
* 調用ngx_output_chain方法向上游發送保存在request_bufs鏈表中的請求數據;
* 值得注意的是該方法的第二個參數可以是NULL也可以是request_bufs,那怎么來區分呢?
* 若是第一次調用該方法發送request_bufs鏈表中的請求數據時,request_sent標志位為0,
* 此時,第二個參數自然就是request_bufs了,那么為什么會有NULL作為參數的情況呢?
* 當在第一次調用該方法時,并不能一次性把所有request_bufs中的數據發送完畢時,
* 此時,會把剩余的數據保存在output結構里面,并把標志位request_sent設置為1,
* 因此,再次發送請求數據時,不用指定request_bufs參數,因為此時剩余數據已經保存在output中;
*/
rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
/* 向上游服務器發送請求之后,把request_sent標志位設置為1 */
u->request_sent = 1;
/* 下面根據不同rc的返回值進行判斷 */
/*
* 若返回值rc=NGX_ERROR,表示當前連接上出錯,
* 將錯誤信息傳遞給ngx_http_upstream_next方法,
* 該方法根據錯誤信息決定是否重新向上游服務器發起連接;
* 并return從當前函數返回;
*/
if (rc == NGX_ERROR) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/*
* 檢查當前連接上寫事件的標志位timer_set是否為1,
* 若該標志位為1,則需把寫事件從定時器機制中移除;
*/
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
/*
* 若返回值rc = NGX_AGAIN,表示請求數據并未完全發送,
* 即有剩余的請求數據保存在output中,但此時,寫事件已經不可寫,
* 則調用ngx_add_timer方法把當前連接上的寫事件添加到定時器機制,
* 并調用ngx_handle_write_event方法將寫事件注冊到epoll事件機制中;
* 并return從當前函數返回;
*/
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->send_timeout);
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
/* rc == NGX_OK */
/*
* 若返回值 rc = NGX_OK,表示已經發送完全部請求數據,
* 準備接收來自上游服務器的響應報文,則執行以下程序;
*/
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == NGX_ERROR) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
ngx_tcp_push_n " failed");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
}
/* 將當前連接上讀事件添加到定時器機制中 */
ngx_add_timer(c->read, u->conf->read_timeout);
/*
* 若此時,讀事件已經準備就緒,
* 則調用ngx_http_upstream_process_header方法開始接收并處理響應頭部;
* 并return從當前函數返回;
*/
if (c->read->ready) {
ngx_http_upstream_process_header(r, u);
return;
}
/*
* 若當前讀事件未準備就緒;
* 則把寫事件的回調方法設置為ngx_http_upstream_dumy_handler方法(不進行任何實際操作);
* 并把寫事件注冊到epoll事件機制中;
*/
u->write_event_handler = ngx_http_upstream_dummy_handler;
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
~~~
當無法一次性將請求內容全部發送完畢,則需等待 epoll 事件機制的寫事件發生,一旦發生就會調用回調方法 ngx_http_upstream_send_request_handler。
ngx_http_upstream_send_request_handler 方法的執行流程如下所示:
- 檢查連接上寫事件是否超時,即timedout 標志位是否為 1,若為 1 表示已經超時,則調用 ngx_http_upstream_next 方法重新向上游發起連接請求,并 return 從當前函數返回;
- 若標志位 timedout 為0,即不超時,檢查 header_sent 標志位是否為 1,表示已經接收到來自上游服務器的響應頭部,則不需要再向上游發送請求,將寫事件的回調方法設置為 ngx_http_upstream_dummy_handler,同時將寫事件注冊到 epoll 事件機制中,并return 從當前函數返回;
- 若標志位 header_sent 為 0,則調用 ngx_http_upstream_send_request 方法向上游發送請求數據;
~~~
static void
ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream send request handler");
/* 檢查當前連接上寫事件的超時標志位 */
if (c->write->timedout) {
/* 執行超時重連機制 */
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
#if (NGX_HTTP_SSL)
if (u->ssl && c->ssl == NULL) {
ngx_http_upstream_ssl_init_connection(r, u, c);
return;
}
#endif
/* 已經接收到上游服務器的響應頭部,則不需要再向上游服務器發送請求數據 */
if (u->header_sent) {
/* 將寫事件的回調方法設置為不進行任何實際操作的方法ngx_http_upstream_dumy_handler */
u->write_event_handler = ngx_http_upstream_dummy_handler;
/* 將寫事件注冊到epoll事件機制中,并return從當前函數返回 */
(void) ngx_handle_write_event(c->write, 0);
return;
}
/* 若沒有接收來自上游服務器的響應頭部,則需向上游服務器發送請求數據 */
ngx_http_upstream_send_request(r, u);
}
~~~
### 接收響應
### 接收響應頭部
當 Nginx 已經向上游發送請求,準備開始接收來自上游的響應頭部,由方法 ngx_http_upstream_process_header 實現,該方法接收并解析響應頭部。
ngx_http_upstream_process_header 方法的執行流程如下:
- 檢查上游連接上的讀事件是否超時,若標志位 timedout 為 1,則表示超時,此時調用 ngx_http_upstream_next 方法重新與上游建立連接,并 return 從當前函數返回;
- 若標志位 timedout 為 0,接著檢查 ngx_http_upstream_t 結構體中的標志位 request_sent,若該標志位為 0,表示未向上游發送請求,同時調用 ngx_http_upstream_test_connect 方法測試連接狀態,若該方法返回值為非 NGX_OK,表示與上游已經斷開連接,則調用 ngx_http_upstream_next 方法重新與上游建立連接,并 return 從當前函數返回;
- 檢查 ngx_http_upstream_t 結構體中接收響應頭部的 buffer 緩沖區是否有內存空間以便接收響應頭部,若 buffer.start 為 NULL,表示該緩沖區為空,則需調用 ngx_palloc 方法分配內存,該內存大小 buffer_size 由 ngx_http_upstream_conf_t 配置結構體的 buffer_size 成員指定;
- 調用 recv 方法開始接收來自上游服務器的響應頭部,并根據該方法的返回值 n 進行判斷:
- 若 n = NGX_AGAIN,表示讀事件未準備就緒,需要等待下次讀事件被觸發時繼續接收響應頭部,此時,調用 ngx_add_timer 方法將讀事件添加到定時器中,同時調用 ngx_handle_read_event 方法將讀事件注冊到epoll 事件機制中,并 return 從當前函數返回;
- 若 n = NGX_ERROR 或 n = 0,表示上游連接發生錯誤 或 上游服務器主動關閉連接,則調用 ngx_http_upstream_next 方法重新發起連接請求,并 return 從當前函數返回;
- 若 n 大于 0,表示已經接收到響應頭部,此時,調用 ngx_http_upstream_t 結構體中由 HTTP 模塊實現的 process_header 方法解析響應頭部,且返回 rc 值;
- 若 rc = NGX_AGAIN,表示接收到的響應頭部不完整,檢查接收緩沖區 buffer 是否還有剩余的內存空間,若緩沖區沒有剩余的內存空間,表示接收到的響應頭部過大,此時調用 ngx_http_upstream_next 方法重新建立連接,并 return 從當前函數返回;若緩沖區還有剩余的內存空間,則continue 繼續接收響應頭部;
- 若 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,表示接收到的響應頭部是非法的,則調用 ngx_http_upstream_next 方法重新建立連接,并 return 從當前函數返回;
- 若 rc = NGX_ERROR,表示連接出錯,此時調用 ngx_http_upstream_finalize_request 方法結束請求,并 return 從當前函數返回;
- 若 rc = NGX_OK,表示已接收到完整的響應頭部,則調用 ngx_http_upstream_process_headers 方法處理已解析的響應頭部,該方法會將已解析出來的響應頭部保存在 ngx_http_request_t 結構體中的 headers_out 成員;
- 檢查 ngx_http_request_t 結構體的 subrequest_in_memory 成員決定是否需要轉發響應給下游服務器;
- 若 subrequest_in_memory 為 0,表示需要轉發響應給下游服務器,則調用 ngx_http_upstream_send_response 方法開始轉發響應給下游服務器,并 return 從當前函數返回;
- 若 subrequest_in_memory 為 1,表示不需要將響應轉發給下游,此時檢查 HTTP 模塊是否定義了 ngx_http_upstream_t 結構體中的 input_filter 方法處理響應包體;
- 若沒有定義 input_filter 方法,則使用 upstream 機制默認方法 ngx_http_upstream_non_buffered_filter 代替 input_filter 方法;
- 若定義了自己的 input_filter 方法,則首先調用 input_filter_init 方法為處理響應包體做初始化工作;
- 檢查接收緩沖區 buffer 在解析完響應頭部之后剩余的字符流,若有剩余的字符流,則表示已經預接收了響應包體,此時調用 input_filter 方法處理響應包體;
- 設置 upstream 機制讀事件 read_event_handler 的回調方法為 ngx_http_upstream_process_body_in_memory,并調用該方法開始接收并解析響應包體;
~~~
/* 接收并解析響應頭部 */
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ssize_t n;
ngx_int_t rc;
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process header");
c->log->action = "reading response header from upstream";
/* 檢查當前連接上的讀事件是否超時 */
if (c->read->timedout) {
/*
* 若標志位timedout為1,表示讀事件超時;
* 則把超時錯誤傳遞給ngx_http_upstream_next方法,
* 該方法根據允許的錯誤進行重連接策略;
* 并return從當前函數返回;
*/
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
/*
* 若標志位request_sent為0,表示還未發送請求;
* 且ngx_http_upstream_test_connect方法返回非NGX_OK,標志當前還未與上游服務器成功建立連接;
* 則需要調用ngx_http_upstream_next方法嘗試與下一個上游服務器建立連接;
* 并return從當前函數返回;
*/
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/*
* 檢查ngx_http_upstream_t結構體中接收響應頭部的buffer緩沖區;
* 若接收緩沖區buffer未分配內存,則調用ngx_palloce方法分配內存,
* 該內存的大小buffer_size由ngx_http_upstream_conf_t配置結構的buffer_size指定;
*/
if (u->buffer.start == NULL) {
u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
if (u->buffer.start == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
/* 調整接收緩沖區buffer,準備接收響應頭部 */
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
u->buffer.end = u->buffer.start + u->conf->buffer_size;
/* 表示該緩沖區內存可被復用、數據可被改變 */
u->buffer.temporary = 1;
u->buffer.tag = u->output.tag;
/* 初始化headers_in的成員headers鏈表 */
if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
sizeof(ngx_table_elt_t))
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
#if (NGX_HTTP_CACHE)
if (r->cache) {
u->buffer.pos += r->cache->header_start;
u->buffer.last = u->buffer.pos;
}
#endif
}
for ( ;; ) {
/* 調用recv方法從當前連接上讀取響應頭部數據 */
n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
/* 下面根據 recv 方法不同返回值 n 進行判斷 */
/*
* 若返回值 n = NGX_AGAIN,表示讀事件未準備就緒,
* 需等待下次讀事件被觸發時繼續接收響應頭部,
* 即將讀事件注冊到epoll事件機制中,等待可讀事件發生;
* 并return從當前函數返回;
*/
if (n == NGX_AGAIN) {
#if 0
ngx_add_timer(rev, u->read_timeout);
#endif
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
if (n == 0) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream prematurely closed connection");
}
/*
* 若返回值 n = NGX_ERROR 或 n = 0,則表示上游服務器已經主動關閉連接;
* 此時,調用ngx_http_upstream_next方法決定是否重新發起連接;
* 并return從當前函數返回;
*/
if (n == NGX_ERROR || n == 0) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/* 若返回值 n 大于 0,表示已經接收到響應頭部 */
u->buffer.last += n;
#if 0
u->valid_header_in = 0;
u->peer.cached = 0;
#endif
/*
* 調用ngx_http_upstream_t結構體中process_header方法開始解析響應頭部;
* 并根據該方法返回值進行不同的判斷;
*/
rc = u->process_header(r);
/*
* 若返回值 rc = NGX_AGAIN,表示接收到的響應頭部不完整,
* 需等待下次讀事件被觸發時繼續接收響應頭部;
* continue繼續接收響應;
*/
if (rc == NGX_AGAIN) {
if (u->buffer.last == u->buffer.end) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream sent too big header");
ngx_http_upstream_next(r, u,
NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
continue;
}
break;
}
/*
* 若返回值 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,
* 則表示接收到的響應頭部是非法的,
* 調用ngx_http_upstream_next方法決定是否重新發起連接;
* 并return從當前函數返回;
*/
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
/*
* 若返回值 rc = NGX_ERROR,表示出錯,
* 則調用ngx_http_upstream_finalize_request方法結束該請求;
* 并return從當前函數返回;
*/
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
/* rc == NGX_OK */
/*
* 若返回值 rc = NGX_OK,表示成功解析到完整的響應頭部;*/
if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) {
if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
return;
}
if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
return;
}
}
/* 調用ngx_http_upstream_process_headers方法處理已解析處理的響應頭部 */
if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
return;
}
/*
* 檢查ngx_http_request_t 結構體的subrequest_in_memory成員決定是否轉發響應給下游服務器;
* 若該標志位為0,則需調用ngx_http_upstream_send_response方法轉發響應給下游服務器;
* 并return從當前函數返回;
*/
if (!r->subrequest_in_memory) {
ngx_http_upstream_send_response(r, u);
return;
}
/* 若不需要轉發響應,則調用ngx_http_upstream_t中的input_filter方法處理響應包體 */
/* subrequest content in memory */
/*
* 若HTTP模塊沒有定義ngx_http_upstream_t中的input_filter處理方法;
* 則使用upstream機制默認方法ngx_http_upstream_non_buffered_filter;
*
* 若HTTP模塊實現了input_filter方法,則不使用upstream默認的方法;
*/
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
/*
* 調用input_filter_init方法為處理包體做初始化工作;
*/
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/*
* 檢查接收緩沖區是否有剩余的響應數據;
* 因為響應頭部已經解析完畢,若接收緩沖區還有未被解析的剩余數據,
* 則該數據就是響應包體;
*/
n = u->buffer.last - u->buffer.pos;
/*
* 若接收緩沖區有剩余的響應包體,調用input_filter方法開始處理已接收到響應包體;
*/
if (n) {
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
/* 調用input_filter方法處理響應包體 */
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
/* 設置upstream機制的讀事件回調方法read_event_handler為ngx_http_upstream_process_body_in_memory */
u->read_event_handler = ngx_http_upstream_process_body_in_memory;
/* 調用ngx_http_upstream_process_body_in_memory方法開始處理響應包體 */
ngx_http_upstream_process_body_in_memory(r, u);
}
~~~
### 接收響應包體
接收并解析響應包體由 ngx_http_upstream_process_body_in_memory 方法實現;
ngx_http_upstream_process_body_in_memory 方法的執行流程如下所示:
- 檢查上游連接上讀事件是否超時,若標志位 timedout 為 1,則表示已經超時,此時調用 ngx_http_upstream_finalize_request 方法結束請求,并 return 從當前函數返回;
- 檢查接收緩沖區 buffer 是否還有剩余的內存空間,若沒有剩余的內存空間,則調用 ngx_http_upstream_finalize_request 方法結束請求,并 return 從當前函數返回;若有剩余的內存空間則調用 recv 方法開始接收響應包體;
- 若返回值 n = NGX_AGAIN,表示等待下一次觸發讀事件再接收響應包體,調用 ngx_handle_read_event 方法將讀事件注冊到 epoll 事件機制中,同時將讀事件添加到定時器機制中;
- 若返回值 n = 0 或 n = NGX_ERROR,則調用 ngx_http_upstream_finalize_request 方法結束請求,并 return 從當前函數返回;
- 若返回值 n 大于 0,則表示成功接收到響應包體,調用 input_filter 方法開始處理響應包體,檢查讀事件的 ready 標志位;
- 若標志位 ready 為 1,表示仍有可讀的響應包體數據,因此回到步驟 2 繼續調用 recv 方法讀取響應包體,直到讀取完畢;
- 若標志位 ready 為 0,則調用 ngx_handle_read_event 方法將讀事件注冊到epoll事件機制中,同時調用 ngx_add_timer 方法將讀事件添加到定時器機制中;
~~~
/* 接收并解析響應包體 */
static void
ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
size_t size;
ssize_t n;
ngx_buf_t *b;
ngx_event_t *rev;
ngx_connection_t *c;
c = u->peer.connection;
rev = c->read;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process body on memory");
/*
* 檢查讀事件標志位timedout是否超時,若該標志位為1,表示響應已經超時;
* 則調用ngx_http_upstream_finalize_request方法結束請求;
* 并return從當前函數返回;
*/
if (rev->timedout) {
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
return;
}
b = &u->buffer;
for ( ;; ) {
/* 檢查當前接收緩沖區是否剩余的內存空間 */
size = b->end - b->last;
/*
* 若接收緩沖區不存在空閑的內存空間,
* 則調用ngx_http_upstream_finalize_request方法結束請求;
* 并return從當前函數返回;
*/
if (size == 0) {
ngx_log_error(NGX_LOG_ALERT, c->log, 0,
"upstream buffer is too small to read response");
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/*
* 若接收緩沖區有可用的內存空間,
* 則調用recv方法開始接收響應包體;
*/
n = c->recv(c, b->last, size);
/*
* 若返回值 n = NGX_AGAIN,表示等待下一次觸發讀事件再接收響應包體;
*/
if (n == NGX_AGAIN) {
break;
}
/*
* 若返回值n = 0(表示上游服務器主動關閉連接),或n = NGX_ERROR(表示出錯);
* 則調用ngx_http_upstream_finalize_request方法結束請求;
* 并return從當前函數返回;
*/
if (n == 0 || n == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, n);
return;
}
/* 若返回值 n 大于0,表示成功讀取到響應包體 */
u->state->response_length += n;
/* 調用input_filter方法處理本次接收到的響應包體 */
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/* 檢查讀事件的ready標志位,若為1,繼續讀取響應包體 */
if (!rev->ready) {
break;
}
}
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
/*
* 若讀事件的ready標志位為0,表示讀事件未準備就緒,
* 則將讀事件注冊到epoll事件機制中,添加到定時器機制中;
* 讀事件的回調方法不改變,即依舊為ngx_http_upstream_process_body_in_memory;
*/
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (rev->active) {
ngx_add_timer(rev, u->conf->read_timeout);
} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
~~~
### 轉發響應
下面看下?upstream?處理上游響應包體的三種方式:
1. 當請求結構體?ngx_http_request_t?中的成員subrequest_in_memory?標志位為 1 時,upstream?不轉發響應包體到下游,并由HTTP?模塊實現的input_filter()?方法處理包體;
1. 當請求結構體?ngx_http_request_t?中的成員subrequest_in_memory?標志位為 0 時,且ngx_http_upstream_conf_t?配置結構體中的成員buffering?標志位為 1 時,upstream?將開啟更多的內存和磁盤文件用于緩存上游的響應包體(此時,上游網速更快),并轉發響應包體;
1. 當請求結構體?ngx_http_request_t?中的成員subrequest_in_memory?標志位為 0 時,且ngx_http_upstream_conf_t?配置結構體中的成員buffering?標志位為 0 時,upstream?將使用固定大小的緩沖區來轉發響應包體;
轉發響應由函數 ngx_http_upstream_send_response 實現,該函數的執行流程如下:
- 調用 ngx_http_send_header 方法轉發響應頭部,并將 ngx_http_upstream_t 結構體中的 header_sent 標志位設置為 1,表示已經轉發響應頭部;
- 若臨時文件還保存著請求包體,則需調用 ngx_pool_run_cleanup_filter 方法清理臨時文件;
- 檢查標志位 buffering,若該標志位為 1,表示需要開啟文件緩存,若該標志位為 0,則不需要開啟文件緩存,只需要以固定的內存塊大小轉發響應包體即可;
- 若標志位 buffering 為0;
- 則檢查 HTTP 模塊是否實現了自己的 input_filter 方法,若沒有則使用 upstream 機制默認的方法 ngx_http_upstream_non_buffered_filter;
- 設置 ngx_http_upstream_t 結構體中讀事件 read_event_handler 的回調方法為 ngx_http_upstream_process_non_buffered_upstream,當接收上游響應時,會通過 ngx_http_upstream_handler 方法最終調用 ngx_http_upstream_process_non_buffered_uptream 來接收響應;
- 設置 ngx_http_upstream_t 結構體中寫事件 write_event_handler 的回調方法為 ngx_http_upstream_process_non_buffered_downstream,當向下游發送數據時,會通過 ngx_http_handler 方法最終調用 ngx_http_upstream_process_non_buffered_downstream 方法來發送響應包體;
- 調用 input_filter_init 方法為 input_filter 方法處理響應包體做初始化工作;
- 檢查接收緩沖區 buffer 在解析完響應頭部之后,是否還有剩余的響應數據,若有表示預接收了響應包體:
- 若在解析響應頭部區間,預接收了響應包體,則調用 input_filter 方法處理該部分預接收的響應包體,并調用 ngx_http_upstream_process_non_buffered_downstream 方法轉發本次接收到的響應包體給下游服務器;
- 若在解析響應頭部區間,沒有接收響應包體,則首先清空接收緩沖區 buffer 以便復用來接收響應包體,檢查上游連接上讀事件是否準備就緒,若標志位 ready 為1,表示準備就緒,則調用 ngx_http_upstream_process_non_buffered_upstream 方法接收上游響應包體;若標志位 ready 為 0,則 return 從當前函數返回;
- 若標志位 buffering 為1;
- 初始化 ngx_http_upstream_t 結構體中的 ngx_event_pipe_t pipe 成員;
- 調用 input_filter_init 方法為 input_filter 方法處理響應包體做初始化工作;
- 設置上游連接上的讀事件 read_event_handler 的回調方法為 ngx_http_upstream_process_upstream;
- 設置上游連接上的寫事件 write_event_handler 的回調方法為 ngx_http_upstream_process_downstream;
- 調用 ngx_http_upstream_proess_upstream 方法處理由上游服務器發來的響應包體;
~~~
/* 轉發響應包體 */
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
int tcp_nodelay;
ssize_t n;
ngx_int_t rc;
ngx_event_pipe_t *p;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
/* 調用ngx_http_send_hander方法向下游發送響應頭部 */
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
/* 將標志位header_sent設置為1 */
u->header_sent = 1;
if (u->upgrade) {
ngx_http_upstream_upgrade(r, u);
return;
}
/* 獲取Nginx與下游之間的TCP連接 */
c = r->connection;
if (r->header_only) {
if (u->cacheable || u->store) {
if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) {
ngx_connection_error(c, ngx_socket_errno,
ngx_shutdown_socket_n " failed");
}
r->read_event_handler = ngx_http_request_empty_handler;
r->write_event_handler = ngx_http_request_empty_handler;
c->error = 1;
} else {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
}
/* 若臨時文件保存著請求包體,則調用ngx_pool_run_cleanup_file方法清理臨時文件的請求包體 */
if (r->request_body && r->request_body->temp_file) {
ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
/*
* 若標志位buffering為0,轉發響應時以下游服務器網速優先;
* 即只需分配固定的內存塊大小來接收來自上游服務器的響應并轉發,
* 當該內存塊已滿,則暫停接收來自上游服務器的響應數據,
* 等待把內存塊的響應數據轉發給下游服務器后有剩余內存空間再繼續接收響應;
*/
if (!u->buffering) {
/*
* 若HTTP模塊沒有實現input_filter方法,
* 則采用upstream機制默認的方法ngx_http_upstream_non_buffered_filter;
*/
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
/*
* 設置ngx_http_upstream_t結構體中讀事件的回調方法為ngx_http_upstream_non_buffered_upstream,(即讀取上游響應的方法);
* 設置當前請求ngx_http_request_t結構體中寫事件的回調方法為ngx_http_upstream_process_non_buffered_downstream,(即轉發響應到下游的方法);
*/
u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
r->write_event_handler =
ngx_http_upstream_process_non_buffered_downstream;
r->limit_rate = 0;
/* 調用input_filter_init為input_filter方法處理響應包體做初始化工作 */
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
tcp_nodelay = 1;
if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
(const void *) &tcp_nodelay, sizeof(int)) == -1)
{
ngx_connection_error(c, ngx_socket_errno,
"setsockopt(TCP_NODELAY) failed");
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
c->tcp_nodelay = NGX_TCP_NODELAY_SET;
}
/* 檢查解析完響應頭部后接收緩沖區buffer是否已接收了響應包體 */
n = u->buffer.last - u->buffer.pos;
/* 若接收緩沖區已經接收了響應包體 */
if (n) {
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
/* 調用input_filter方法開始處理響應包體 */
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/* 調用該方法把本次接收到的響應包體轉發給下游服務器 */
ngx_http_upstream_process_non_buffered_downstream(r);
} else {
/* 若接收緩沖區中沒有響應包體,則將其清空,即復用這個緩沖區 */
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/*
* 若當前連接上讀事件已準備就緒,
* 則調用ngx_http_upstream_process_non_buffered_upstream方法接收響應包體并處理;
*/
if (u->peer.connection->read->ready || u->length == 0) {
ngx_http_upstream_process_non_buffered_upstream(r, u);
}
}
return;
}
/*
* 若ngx_http_upstream_t結構體的buffering標志位為1,則轉發響應包體時以上游網速優先;
* 即分配更多的內存和緩存,即一直接收來自上游服務器的響應,把來自上游服務器的響應保存的內存或緩存中;
*/
/* TODO: preallocate event_pipe bufs, look "Content-Length" */
#if (NGX_HTTP_CACHE)
...
...
#endif
/* 初始化ngx_event_pipe_t結構體 p */
p = u->pipe;
p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
p->output_ctx = r;
p->tag = u->output.tag;
p->bufs = u->conf->bufs;
p->busy_size = u->conf->busy_buffers_size;
p->upstream = u->peer.connection;
p->downstream = c;
p->pool = r->pool;
p->log = c->log;
p->cacheable = u->cacheable || u->store;
p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
if (p->temp_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->temp_file->file.fd = NGX_INVALID_FILE;
p->temp_file->file.log = c->log;
p->temp_file->path = u->conf->temp_path;
p->temp_file->pool = r->pool;
if (p->cacheable) {
p->temp_file->persistent = 1;
} else {
p->temp_file->log_level = NGX_LOG_WARN;
p->temp_file->warn = "an upstream response is buffered "
"to a temporary file";
}
p->max_temp_file_size = u->conf->max_temp_file_size;
p->temp_file_write_size = u->conf->temp_file_write_size;
/* 初始化預讀鏈表緩沖區preread_bufs */
p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->preread_bufs->buf = &u->buffer;
p->preread_bufs->next = NULL;
u->buffer.recycled = 1;
p->preread_size = u->buffer.last - u->buffer.pos;
if (u->cacheable) {
p->buf_to_file = ngx_calloc_buf(r->pool);
if (p->buf_to_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->buf_to_file->start = u->buffer.start;
p->buf_to_file->pos = u->buffer.start;
p->buf_to_file->last = u->buffer.pos;
p->buf_to_file->temporary = 1;
}
if (ngx_event_flags & NGX_USE_AIO_EVENT) {
/* the posted aio operation may corrupt a shadow buffer */
p->single_buf = 1;
}
/* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */
p->free_bufs = 1;
/*
* event_pipe would do u->buffer.last += p->preread_size
* as though these bytes were read
*/
u->buffer.last = u->buffer.pos;
if (u->conf->cyclic_temp_file) {
/*
* we need to disable the use of sendfile() if we use cyclic temp file
* because the writing a new data may interfere with sendfile()
* that uses the same kernel file pages (at least on FreeBSD)
*/
p->cyclic_temp_file = 1;
c->sendfile = 0;
} else {
p->cyclic_temp_file = 0;
}
p->read_timeout = u->conf->read_timeout;
p->send_timeout = clcf->send_timeout;
p->send_lowat = clcf->send_lowat;
p->length = -1;
/* 調用input_filter_init方法進行初始化工作 */
if (u->input_filter_init
&& u->input_filter_init(p->input_ctx) != NGX_OK)
{
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/* 設置上游讀事件的方法 */
u->read_event_handler = ngx_http_upstream_process_upstream;
/* 設置下游寫事件的方法 */
r->write_event_handler = ngx_http_upstream_process_downstream;
/* 處理上游響應包體 */
ngx_http_upstream_process_upstream(r, u);
}
~~~
當以下游網速優先轉發響應包體給下游時,由函數 ngx_http_upstream_process_non_buffered_downstrean 實現,該函數的執行流程如下所示:
- 檢查下游連接上寫事件是否超時,若標志位 timedout 為1,則表示超時,此時調用 ngx_http_upstream_finalize_request 方法接收請求,并 return 從當前函數返回;
- 調用 ngx_http_upstream_process_non_bufferd_request 方法向下游服務器發送響應包體,此時第二個參數為 1;
~~~
/* buffering 標志位為0時,轉發響應包體給下游服務器 */
static void
ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r)
{
ngx_event_t *wev;
ngx_connection_t *c;
ngx_http_upstream_t *u;
/* 獲取Nginx與下游服務器之間的TCP連接 */
c = r->connection;
/* 獲取ngx_http_upstream_t結構體 */
u = r->upstream;
/* 獲取當前連接的寫事件 */
wev = c->write;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process non buffered downstream");
c->log->action = "sending to client";
/* 檢查寫事件是否超時,若超時則結束請求 */
if (wev->timedout) {
c->timedout = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
return;
}
/* 若不超時,以固定內存塊方式轉發響應包體給下游服務器 */
ngx_http_upstream_process_non_buffered_request(r, 1);
}
~~~
? ? ? 由于 buffering 標志位為0時,沒有開啟文件緩存,只有固定大小的內存塊作為接收響應緩沖區,當上游的響應包體比較大時,此時,接收緩沖區內存并不能夠滿足一次性接收完所有響應包體, 因此,在接收緩沖區已滿時,會阻塞接收響應包體,并先把已經收到的響應包體轉發給下游服務器。所有在轉發響應包體時,有可能會接收上游響應包體。此過程由 ngx_http_upstream_process_non_buffered_upstream 方法實現;
ngx_http_upstream_process_non_buffered_upstream 方法執行流程如下:
- 檢查上游連接上的讀事件是否超時,若標志位 timedout 為 1,表示已經超時,此時調用 ngx_http_upstream_finalize_request 方法結束請求,并 return 從當前函數返回;
- 調用 ngx_http_upstream_process_non_buffered_request 方法接收上游響應包體,此時第二個參數為 0;
~~~
/* 接收上游響應包體(buffering為0的情況) */
static void
ngx_http_upstream_process_non_buffered_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_connection_t *c;
/* 獲取Nginx與上游服務器之間的TCP連接 */
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process non buffered upstream");
c->log->action = "reading upstream";
/* 判斷讀事件是否超時,若超時則結束當前請求 */
if (c->read->timedout) {
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
return;
}
/*
* 若不超時,則以固定內存塊方式轉發響應包體給下游服務器,
* 注意:轉發的過程中,會接收來自上游服務器的響應包體;
*/
ngx_http_upstream_process_non_buffered_request(r, 0);
}
~~~
? ? ? 在上面函數中向下游服務器轉發響應包體過程中,最終會調用 ngx_http_upstream_process_non_buffered_request 方法來實現,而且轉發響應包體給下游服務器時,同時會接收來自上游的響應包體,接收上游響應包體最終也會調用該函數,只是調用的時候第二個參數指定不同的值;
ngx_http_upstream_process_non_buffered_request 方法執行流程如下所示:
- *步驟1*:若 do_write 參數的值為 0,表示需要接收來自上游服務器的響應包體,則直接跳到*步驟3*開始執行;
- *步驟2*:若 do_write 參數的值為 1,則開始向下游轉發響應包體;
- 檢查 ngx_http_upstream_t 結構體中的 out_bufs 鏈表 或 busy_bufs 鏈表是否有數據:
- 若 out_bufs 或 busy_bufs 鏈表緩沖區中有響應包體,則調用 ngx_http_output_filter 方法向下游發送響應包體,并調用 ngx_chain_update_chains 方法更新 ngx_http_upstream_t 結構體中的 free_bufs、busy_bufs、out_bufs 鏈表緩沖區;
- 若 out_bufs 和 busy_bufs 鏈表緩沖區中都沒有數據,則清空接收緩沖區 buffer 以便再次接收來自上游服務器的響應包體;
- *步驟3*:計算接收緩沖區 buffer 剩余的內存空間 size,若有剩余的內存空間,且此時上游連接上有可讀的響應包體(即讀事件的 ready 標志位為 1),則調用 recv 方法讀取上游響應包體,并返回 n;
- 若返回值 n 大于 0,表示已經接收到上游響應包體,則調用 input_filter 方法處理響應包體,并設置 do_write 標志位為 1,表示已經接收到響應包體,此時可轉發給下游服務器,又回到*步驟2*繼續執行;
- 若返回值 n = NGX_AGAIN,表示需要等待下一次讀事件的發生以便繼續接收上游響應包體,則直接跳至*步驟5*開始執行;
- *步驟4*:若接收緩沖區 buffer 沒有剩余內存空間 或 上游連接上讀事件未準備就緒,則從*步驟5*開始執行;
- *步驟5*:調用 ngx_add_timer 方法將下游連接上寫事件添加到定時器機制中,調用 ngx_handle_write_event 方法將下游連接上寫事件注冊到 epoll 事件機制中;
- *步驟6*:調用 ngx_handle_read_event 方法將上游連接上讀事件注冊到 epoll 事件機制中,調用 ngx_add_timer 方法將上游連接上讀事件添加到定時器機制中;
~~~
/* 以固定內存塊方式轉發響應包體給下游服務器 */
/*
* 第二個參數表示本次是否需要向下游發送響應;若為0時,需要接收來自上游服務器的響應,也需要轉發響應給下游;
* 若為1,只負責轉發響應給下游服務器;
*/
static void
ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r,
ngx_uint_t do_write)
{
size_t size;
ssize_t n;
ngx_buf_t *b;
ngx_int_t rc;
ngx_connection_t *downstream, *upstream;
ngx_http_upstream_t *u;
ngx_http_core_loc_conf_t *clcf;
/* 獲取ngx_http_upstream_t結構體 */
u = r->upstream;
/* 獲取Nginx與下游服務器之間的TCP連接 */
downstream = r->connection;
/* 獲取Nginx與上游服務器之間的TCP連接 */
upstream = u->peer.connection;
/* 獲取ngx_hhtp_upstream_t結構體的接收緩沖區buffer */
b = &u->buffer;
/*
* 獲取do_write的值,該值決定是否還要接收來自上游服務器的響應;
* 其中length表示還需要接收的上游響應包體長度;
*/
do_write = do_write || u->length == 0;
for ( ;; ) {
if (do_write) {/* 若do_write為1,則開始向下游服務器轉發響應包體 */
/*
* 檢查是否有響應包體需要轉發給下游服務器;
* 其中out_bufs表示本次需要轉發給下游服務器的響應包體;
* busy_bufs表示上一次向下游服務器轉發響應包體時沒有轉發完的響應包體內存;
* 即若一次性轉發不完所有的響應包體,則會保存在busy_bufs鏈表緩沖區中,
* 這里的保存只是將busy_bufs指向未發送完畢的響應數據;
*/
if (u->out_bufs || u->busy_bufs) {
/* 調用ngx_http_output_filter方法將響應包體發送給下游服務器 */
rc = ngx_http_output_filter(r, u->out_bufs);
/* 若返回值 rc = NGX_ERROR,則結束請求 */
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/*
* 調用ngx_chain_update_chains方法更新free_bufs、busy_bufs、out_bufs鏈表;
* 即清空out_bufs鏈表,把out_bufs鏈表中已發送完的ngx_buf_t緩沖區清空,并將其添加到free_bufs鏈表中;
* 把out_bufs鏈表中未發送完的ngx_buf_t緩沖區添加到busy_bufs鏈表中;
*/
ngx_chain_update_chains(r->pool, &u->free_bufs, &u->busy_bufs,
&u->out_bufs, u->output.tag);
}
/*
* busy_bufs為空,表示所有響應包體已經轉發到下游服務器,
* 此時清空接收緩沖區buffer以便再次接收來自上游服務器的響應包體;
*/
if (u->busy_bufs == NULL) {
if (u->length == 0
|| (upstream->read->eof && u->length == -1))
{
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (upstream->read->eof) {
ngx_log_error(NGX_LOG_ERR, upstream->log, 0,
"upstream prematurely closed connection");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_BAD_GATEWAY);
return;
}
if (upstream->read->error) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_BAD_GATEWAY);
return;
}
b->pos = b->start;
b->last = b->start;
}
}
/* 計算接收緩沖區buffer剩余可用的內存空間 */
size = b->end - b->last;
/*
* 若接收緩沖區buffer有剩余的可用空間,
* 且此時讀事件可讀,即可讀取來自上游服務器的響應包體;
* 則調用recv方法開始接收來自上游服務器的響應包體,并保存在接收緩沖區buffer中;
*/
if (size && upstream->read->ready) {
n = upstream->recv(upstream, b->last, size);
/* 若返回值 n = NGX_AGAIN,則等待下一次可讀事件發生繼續接收響應 */
if (n == NGX_AGAIN) {
break;
}
/*
* 若返回值 n 大于0,表示接收到響應包體,
* 則調用input_filter方法處理響應包體;
* 并把do_write設置為1;
*/
if (n > 0) {
u->state->response_length += n;
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
do_write = 1;
continue;
}
break;
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
/* 調用ngx_handle_write_event方法將Nginx與下游之間的連接上的寫事件注冊的epoll事件機制中 */
if (downstream->data == r) {
if (ngx_handle_write_event(downstream->write, clcf->send_lowat)
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
/* 調用ngx_add_timer方法將Nginx與下游之間的連接上的寫事件添加到定時器事件機制中 */
if (downstream->write->active && !downstream->write->ready) {
ngx_add_timer(downstream->write, clcf->send_timeout);
} else if (downstream->write->timer_set) {
ngx_del_timer(downstream->write);
}
/* 調用ngx_handle_read_event方法將Nginx與上游之間的連接上的讀事件注冊的epoll事件機制中 */
if (ngx_handle_read_event(upstream->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/* 調用ngx_add_timer方法將Nginx與上游之間的連接上的讀事件添加到定時器事件機制中 */
if (upstream->read->active && !upstream->read->ready) {
ngx_add_timer(upstream->read, u->conf->read_timeout);
} else if (upstream->read->timer_set) {
ngx_del_timer(upstream->read);
}
}
~~~
~~~
/* upstream機制默認的input_filter方法 */
static ngx_int_t
ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
{
/*
* data參數是ngx_http_upstream_t結構體中的input_filter_ctx,
* 當HTTP模塊沒有實現input_filter方法時,
* input_filter_ctx指向ngx_http_request_t結構體;
*/
ngx_http_request_t *r = data;
ngx_buf_t *b;
ngx_chain_t *cl, **ll;
ngx_http_upstream_t *u;
u = r->upstream;
/* 找到out_bufs鏈表的最后一個緩沖區,并由ll指向該緩沖區 */
for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
ll = &cl->next;
}
/* 從free_bufs空閑鏈表緩沖區中獲取一個ngx_buf_t結構體給cl */
cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
if (cl == NULL) {
return NGX_ERROR;
}
/* 將新分配的ngx_buf_t緩沖區添加到out_bufs鏈表的尾端 */
*ll = cl;
cl->buf->flush = 1;
cl->buf->memory = 1;
/* buffer是保存來自上游服務器的響應包體 */
b = &u->buffer;
/* 將響應包體數據保存在cl緩沖區中 */
cl->buf->pos = b->last;
b->last += bytes;
cl->buf->last = b->last;
cl->buf->tag = u->output.tag;
if (u->length == -1) {
return NGX_OK;
}
/* 更新length長度,表示需要接收的包體長度減少bytes字節 */
u->length -= bytes;
return NGX_OK;
}
~~~
當 buffering 標志位為 1 轉發響應包體給下游時,由函數 ngx_http_upstream_process_downstream 實現。
ngx_http_upstream_process_downstream 方法的執行流程如下所示:
- 若下游連接上寫事件的 timedout 標志位為 1,表示寫事件已經超時;
- 若下游連接上寫事件的 delayed 標志位為 1;
- 若下游連接上寫事件 ready 標志位為 1,表示寫事件已經準備就緒,則調用 ngx_event_pipe 方法轉發響應包體;
- 若下游連接上寫事件 ready 標志位為 0,表示寫事件未準備就緒,則調用 ngx_add_timer 方法將寫事件添加到定時器機制中,調用 ngx_handle_write_event 方法將寫事件注冊到 epoll 事件機制中,并 return 從當前函數返回;
- 若下游連接上寫事件的 delayed 標志位為 0,則該連接已經出錯,設置 ngx_event_pipe_t 結構體中 downstream_error標志位為 1,設置 ngx_connection_t 結構體中 timedout 標志位為 1,并調用 ngx_connection_error 方法;
- 若下游連接上寫事件的 timedout 標志位為 0,表示寫事件不超時;
- 若下游連接上寫事件的 delayed 標志位為 1,則調用 ngx_handle_write_event 方法將寫事件注冊到 epoll 事件機制中,并 return 從當前函數返回;
- 若下游連接上寫事件的 delayed 標志位為 0,則調用 ngx_event_pipe 方法轉發響應包體;
- 最終調用 ngx_http_upstream_process_request 方法;
~~~
static void
ngx_http_upstream_process_downstream(ngx_http_request_t *r)
{
ngx_event_t *wev;
ngx_connection_t *c;
ngx_event_pipe_t *p;
ngx_http_upstream_t *u;
/* 獲取 Nginx 與下游服務器之間的連接 */
c = r->connection;
/* 獲取 Nginx 與上游服務器之間的連接 */
u = r->upstream;
/* 獲取 ngx_event_pipe_t 結構體 */
p = u->pipe;
/* 獲取下游連接上的寫事件 */
wev = c->write;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process downstream");
c->log->action = "sending to client";
/* 檢查下游連接上寫事件是否超時,若標志位 timedout 為 1,表示超時 */
if (wev->timedout) {
/* 若下游連接上寫事件的delayed 標志位為 1 */
if (wev->delayed) {
wev->timedout = 0;
wev->delayed = 0;
/*
* 檢查寫事件是否準備就緒,若 ready 標志位為 0,
* 表示未準備就緒,則調用 ngx_add_timer 方法將寫事件添加到定時器機制中;
* 調用 ngx_handle_write_event 方法將寫事件注冊到 epoll 事件機制中;
* 并 return 從當前函數返回;
*/
if (!wev->ready) {
ngx_add_timer(wev, p->send_timeout);
if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
}
return;
}
/*
* 若寫事件已經準備就緒,即ready 標志位為 1;
* 則調用 ngx_event_pipe 方法將響應包體轉發給下游服務器;
* 并 return 從當前函數返回;
*/
if (ngx_event_pipe(p, wev->write) == NGX_ABORT) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
} else {
/* 若寫事件的delayed標志位為 0,則設置downstream_error標志位為 1,,表示連接出錯 */
p->downstream_error = 1;
c->timedout = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
}
} else {/* 若下游連接上寫事件不超時,即timedout 標志位為 0 */
/*
* 檢查寫事件 delayed 標志位,若該標志位為 1;
* 則調用 ngx_handle_write_event 方法將寫事件注冊到 epoll 事件機制中;
* 并 return 從當前函數返回;
*/
if (wev->delayed) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http downstream delayed");
if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
}
return;
}
/* 若寫事件的delayed 標志位為 0,則調用 ngx_event_pipe 方法轉發響應 */
if (ngx_event_pipe(p, 1) == NGX_ABORT) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
/* 最終調用該函數 */
ngx_http_upstream_process_request(r);
}
~~~
ngx_http_upstream_process_upstream 方法執行流程如下所示:
- 若上游連接上讀事件的 timedout 標志位為 1,表示讀事件已經超時,則設置 upstream_error 為 1,調用 ngx_connection_error 方法接收當前函數;
- 若上游連接上讀事件的 timedout 標志位為 0,則調用 ngx_event_pipe 方法接收上游響應包體;
- 最終調用 ngx_http_upstream_process_request 方法;
~~~
static void
ngx_http_upstream_process_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process upstream");
c->log->action = "reading upstream";
if (c->read->timedout) {
u->pipe->upstream_error = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
} else {
if (ngx_event_pipe(u->pipe, 0) == NGX_ABORT) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
ngx_http_upstream_process_request(r);
}
~~~
ngx_event_pipe 方法的執行流程如下所示:
- *步驟1*:若參數 do_write 為 1,表示向下游轉發響應;
- 調用 ngx_event_pipe_write_to_downstream 方法向下游轉發響應,并返回值為 rc;
- 若返回值 rc = NGX_ABORT,則 return NGX_ABORT 從當前函數返回;
- 若返回值 rc = NGX_BUSY,表示不需要往下執行,則 return NGX_OK 從當前函數返回;
- 若返回值 rc = NGX_OK,則直接跳至*步驟3*執行;
- *步驟2*:若參數 do_write 為 0,表示需要接收上游響應,直接跳至*步驟3*執行;
- *步驟3*:設置 ngx_event_pipe_t 結構體中的 read 標志位為 0,upstream_blocked 標志位為 0;
- *步驟4*:調用 ngx_event_pipe_read_upstream 方法讀取上游響應;
- *步驟5*:檢查 read 和 upstream_blocked 標志位,若 read 和 upstream_blocked 標志位都為 0,則跳至*步驟7*執行;
- *步驟6*:若 read 或 upstream_blocked 標志位為 1,表示需要向下游發送剛剛讀取到的響應,則設置 do_write 標志為 1,跳至*步驟1*繼續執行;
- *步驟7*:調用 ngx_add_timer 方法將上游連接上的讀事件添加到定時器機制中,調用 ngx_handle_read_event 方法將讀事件注冊到 epoll 事件機制中;
- *步驟8*:調用 ngx_add_timer 方法將下游連接上的寫事件添加到定時器機制中,調用 ngx_handle_write_event 方法將寫事件注冊到 epoll 事件機制中;
- *步驟9*:return NGX_OK 從當前函數返回;
~~~
/* 轉發響應的ngx_event_pipe_t結構體 */
ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
u_int flags;
ngx_int_t rc;
ngx_event_t *rev, *wev;
for ( ;; ) {
if (do_write) {/* 若 do_write標志位為1,表示向下游轉發響應 */
p->log->action = "sending to client";
/* 調用ngx_event_pipe_write_to_downstream方法向下游轉發響應 */
rc = ngx_event_pipe_write_to_downstream(p);
if (rc == NGX_ABORT) {
return NGX_ABORT;
}
if (rc == NGX_BUSY) {
return NGX_OK;
}
}
/* 若do_write標志位為0,則接收上游響應 */
p->read = 0;
p->upstream_blocked = 0;
p->log->action = "reading upstream";
/* 調用ngx_event_pipe_read_upstream方法讀取上游響應 */
if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
return NGX_ABORT;
}
/*
* 若標志位read和upstream_blocked為0,
* 則沒有可讀的響應數據,break退出for循環;
*/
if (!p->read && !p->upstream_blocked) {
break;
}
/* 否則,設置do_write標志位為1繼續進行for循環操作 */
do_write = 1;
}
/*
* 將上游讀事件添加到定時器機制中,注冊到epoll事件機制中;
*/
if (p->upstream->fd != (ngx_socket_t) -1) {
rev = p->upstream->read;
flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
if (ngx_handle_read_event(rev, flags) != NGX_OK) {
return NGX_ABORT;
}
if (rev->active && !rev->ready) {
ngx_add_timer(rev, p->read_timeout);
} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
/*
* 將下游寫事件添加到定時器機制中,注冊到epoll事件機制中;
*/
if (p->downstream->fd != (ngx_socket_t) -1
&& p->downstream->data == p->output_ctx)
{
wev = p->downstream->write;
if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
return NGX_ABORT;
}
if (!wev->delayed) {
if (wev->active && !wev->ready) {
ngx_add_timer(wev, p->send_timeout);
} else if (wev->timer_set) {
ngx_del_timer(wev);
}
}
}
return NGX_OK;
}
~~~
ngx_event_pipe_read_upstream 方法的執行流程如下所示:
- *步驟1*:檢查 ngx_event_pipe_t 結構體中的 upstream_eof(若為 1 表示上游連接通信已經結束)、upstream_error(若為 1,表示上游連接出錯)、upstream_done(若為 1,表示上游連接已經關閉) 標志位,若其中一個標志位為 1,表示上游連接關閉,則 return NGX_OK 從當前函數返回;
- *步驟2*:進入 for 循環,再次檢查以上三個標志位,若其中有一個為 1,則 break 退出for 循環,跳至*步驟8*執行;
- *步驟3*:若 preread_bufs 鏈表緩沖區為空,表示接收響應頭部區間,沒有預接收響應包體,且此時上游連接上讀事件未準備就緒,即ready標志位為0,則 break 退出for 循環,跳至*步驟8*執行;
- *步驟4*:若 preread_bufs 鏈表緩沖區不為空,表示預接收了響應包體,則將 preread_bufs 鏈表緩沖區掛載到 chain 鏈表中,并計算預接收到響應包體的長度 n,若 n 大于 0,則設置 read 標志位為 1;
- *步驟5*:若 preread_bufs 鏈表緩沖區為空:
- 若 free_raw_bufs 不為空,則將該鏈表緩沖區掛載到chain鏈表中;
- 若 free_raw_bufs 為空:
- 若 allocated 小于 bufs.num,則調用 ngx_create_temp_buf 方法分配一個新的緩沖區 b,并將新分配的緩沖區掛載到 chain 鏈表中;
- 若 allocated 大于 bufs.num:
- 若 cacheable 標志位為 0,且下游連接上寫事件已準備就緒,即寫事件的 ready 標志位為 1,表示可以向下游發送響應包體,此時,設置upstream_blocked 標志位為 1,表示阻塞讀取上游響應包體,因為沒有緩沖區來接收上游響應包體,并break 退出for循環,跳至*步驟8*執行;
- 若 cacheable 標志位為 1,即開啟了文件緩存,且此時臨時文件長度未達到最大長度,則調用 ngx_event_pipe_write_chain_to_temp_file 方法將上游響應寫入到臨時文件中,以便使 free_raw_bufs 有空余緩沖區來繼續接收上游響應,并將此 free_raw_bufs 鏈表緩沖區掛載到 chain 鏈表中;
- 若以上條件都不滿足,則break 退出 for 循環,跳至 *步驟8*執行;
- *步驟6*:調用 recv_chain 方法接收上游響應包體,返回值為 n,并把接收到的上游響應包體緩沖區添加到 free_raw_bufs 鏈表的尾端;
- 若返回值 n = NGX_ERROR,表示上游連接出錯,return NGX_ERROR 從當前函數返回;
- 若返回值 n = NGX_AGAIN,表示沒有讀取到上游響應包體,則 break 退出 for 循環,跳至*步驟8*執行;
- 若返回值 n = 0,表示上游服務器主動關閉連接,則 break 退出 for 循環,跳至*步驟8*執行;
- 若返回值 n 大于 0,則設置 read 標志位為 1,表示已經接收到上游響應包體;
- *步驟7*:開始處理已接收到的上游響應包體,遍歷待處理緩沖區鏈表 chain 中的每一個 ngx_buf_t 緩沖區:
- 調用 ngx_event_pipe_remove_shadow_links 方法釋放當前緩沖區 ngx_buf_t 中的 shadow 域;
- 計算當前緩沖區剩余的內存空間大小為 size:
- 若本次接收到的上游響應包體 n 不小于 size,表示當前緩沖區已滿,調用 input_filter 方法處理當前緩沖區的響應包體,把其掛載到 in 鏈表中;
- 若本次接收到的上游響應包體 n 小于 size 值,則表示當前緩沖區還有剩余空間繼續接收上游響應包體,先把本次接收到的響應包體緩沖區添加到 free_raw_bufs 鏈表尾端;
- *步驟8*:由于上面步驟接收到的上游響應包體最終會方法 free_raw_bufs 鏈表緩沖區中,再次檢查 free_raw_bufs 鏈表緩沖區,若該緩沖區不為空,則調用 input_filter 方法處理該緩沖區的響應包體,同時調用 ngx_free_chain 方法是否 chain 緩沖區數據;
- *步驟9*:若 upstream_eof 或 upstream_error 標志位為 1,且 free_raw_bufs 不為空,再次調用 input_filter 方法處理 free_raw_bufs 緩沖區數據,若 free_bufs 標志位為 1,則調用 ngx_free 釋放 shadow 域為空的緩沖區;
~~~
/* 讀取上游響應 */
static ngx_int_t
ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
{
ssize_t n, size;
ngx_int_t rc;
ngx_buf_t *b;
ngx_chain_t *chain, *cl, *ln;
/*
* 若Nginx與上游之間的通信已經結束、
* 或Nginx與上游之間的連接出錯、
* 或Nginx與上游之間的連接已經關閉;
* 則直接return NGX_OK 從當前函數返回;
*/
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: %d", p->upstream->read->ready);
/* 開始接收上游響應包體,并調用input_filter方法進行處理 */
for ( ;; ) {
/*
* 若Nginx與上游之間的通信已經結束、
* 或Nginx與上游之間的連接出錯、
* 或Nginx與上游之間的連接已經關閉;
* 則直接break 從當前for循環退出;
*/
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
break;
}
/*
* 若preread_bufs鏈表緩沖區為空(表示在接收響應頭部區間,未預接收響應包體),
* 且上游讀事件未準備就緒,即沒有可讀的響應包體;
* break從for循環退出;
*/
if (p->preread_bufs == NULL && !p->upstream->read->ready) {
break;
}
/*
* 若preread_bufs鏈表緩沖區有未處理的數據,需要把它掛載到chain鏈表中
* 即該數據是接收響應頭部區間,預接收的響應包體;
*/
if (p->preread_bufs) {/* the preread_bufs is not empty */
/* use the pre-read bufs if they exist */
chain = p->preread_bufs;/* 將預接收響應包體緩沖區添加到chain鏈表尾端 */
p->preread_bufs = NULL;/* 使該緩沖區指向NULL,表示沒有響應包體 */
n = p->preread_size;/* 計算預接收響應包體的長度 n */
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe preread: %z", n);
/*
* 若preread_bufs鏈表緩沖區不為空,
* 則設置read標志位為1,表示當前已經接收了響應包體;
*/
if (n) {
p->read = 1;
}
} else {/* the preread_bufs is NULL */
/*
* 若preread_bufs鏈表緩沖區沒有未處理的響應包體,
* 則需要有緩沖區來接收上游響應包體;
*/
#if (NGX_HAVE_KQUEUE)
...
...
#endif
/*
* 若 free_raw_bufs不為空,則使用該鏈表緩沖區接收上游響應包體;
* free_raw_bufs鏈表緩沖區用來保存調用一次ngx_event_pipe_read_upstream方法所接收到的上游響應包體;
*/
if (p->free_raw_bufs) {
/* use the free bufs if they exist */
chain = p->free_raw_bufs;/* 將接收響應包體緩沖區添加到chain鏈表中 */
if (p->single_buf) {/* 表示每一次只能接收一個ngx_buf_t緩沖區的響應包體 */
p->free_raw_bufs = p->free_raw_bufs->next;
chain->next = NULL;
} else {
p->free_raw_bufs = NULL;
}
} else if (p->allocated < p->bufs.num) {
/*
* 若 free_raw_bufs為空,且已分配的緩沖區數目allocated小于緩沖區數目bufs.num;
* 則需要分配一個新的緩沖區來接收上游響應包體;
*/
/* allocate a new buf if it's still allowed */
/* 分配新的緩沖區來接收上游響應 */
b = ngx_create_temp_buf(p->pool, p->bufs.size);
if (b == NULL) {
return NGX_ABORT;
}
p->allocated++;
/* 分配一個ngx_chain_t 鏈表緩沖區 */
chain = ngx_alloc_chain_link(p->pool);
if (chain == NULL) {
return NGX_ABORT;
}
/* 把新分配接收響應包體的緩沖區添加到chain鏈表中 */
chain->buf = b;
chain->next = NULL;
} else if (!p->cacheable
&& p->downstream->data == p->output_ctx
&& p->downstream->write->ready
&& !p->downstream->write->delayed)
{
/* 若free_raw_bufs為空,且allocated大于bufs.num,若cacheable標志位為0,即不啟用文件緩存,
* 檢查Nginx與下游之間連接,并檢查該連接上寫事件是否準備就緒,
* 若已準備就緒,即表示可以向下游發送響應包體;
*/
/*
* if the bufs are not needed to be saved in a cache and
* a downstream is ready then write the bufs to a downstream
*/
/*
* 設置upstream_blocked標志位為1,表示阻塞讀取上游響應,
* 因為沒有緩沖區或文件緩存來接收響應包體,則應該阻塞讀取上游響應包體;
* 并break退出for循環,此時會向下游轉發響應,釋放緩沖區,以便再次接收上游響應包體;
*/
p->upstream_blocked = 1;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe downstream ready");
break;/* 退出for循環 */
} else if (p->cacheable
|| p->temp_file->offset < p->max_temp_file_size)
{/* 若cacheable標志位為1,即開啟了文件緩存,則檢查臨時文件是否達到最大長度,若未達到最大長度 */
/*
* if it is allowed, then save some bufs from p->in
* to a temporary file, and add them to a p->out chain
*/
/* 將上游響應寫入到臨時文件中,此時free_raw_bufs有緩沖區空間來接收上游 響應包體 */
rc = ngx_event_pipe_write_chain_to_temp_file(p);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe temp offset: %O", p->temp_file->offset);
if (rc == NGX_BUSY) {
break;
}
if (rc == NGX_AGAIN) {
if (ngx_event_flags & NGX_USE_LEVEL_EVENT
&& p->upstream->read->active
&& p->upstream->read->ready)
{
if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
== NGX_ERROR)
{
return NGX_ABORT;
}
}
}
if (rc != NGX_OK) {
return rc;
}
chain = p->free_raw_bufs;
if (p->single_buf) {
p->free_raw_bufs = p->free_raw_bufs->next;
chain->next = NULL;
} else {
p->free_raw_bufs = NULL;
}
} else {
/* 若沒有緩沖區或文件緩存接收上游響應包體,則暫時不收受上游響應包體,break退出循環 */
/* there are no bufs to read in */
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"no pipe bufs to read in");
break;
}
/* end of check the free_raw_bufs */
/*
* 若有緩沖區接收上游響應包體,則調用recv_chain方法接收上游響應包體;
* 把接收到的上游響應包體緩沖區添加到free_raw_bufs鏈表的尾端;
*/
n = p->upstream->recv_chain(p->upstream, chain);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe recv chain: %z", n);
/* 將保存接收到上游響應包體的緩沖區添加到free_raw_bufs鏈表尾端 */
if (p->free_raw_bufs) {
chain->next = p->free_raw_bufs;
}
p->free_raw_bufs = chain;
/* 下面根據所接收上游響應包體的返回值n來進行判斷 */
/*
* n = NGX_ERROR,表示發生錯誤,
* 則設置upstream_error標志位為1,
* 并return NGX_ERROR從當前函數返回;
*/
if (n == NGX_ERROR) {
p->upstream_error = 1;
return NGX_ERROR;
}
/*
* n = NGX_AGAIN,表示沒有讀取到上游響應包體,
* 則break跳出for循環;
*/
if (n == NGX_AGAIN) {
if (p->single_buf) {
ngx_event_pipe_remove_shadow_links(chain->buf);
}
break;
}
/*
* n 大于0,表示已經接收到上游響應包體,
* 則設置read標志位為1;
*/
p->read = 1;
/*
* n = 0,表示上游服務器主動關閉連接,
* 則設置upstream_eof標志位為1,表示已關閉連接;
* 并break退出for循環;
*/
if (n == 0) {
p->upstream_eof = 1;
break;
}
}
/* checking the preread_bufs is end */
/* 下面開始處理已接收到的上游響應包體數據 */
p->read_length += n;
cl = chain;
p->free_raw_bufs = NULL;
/* 遍歷待處理緩沖區鏈表chain中的ngx_buf_t緩沖區 */
while (cl && n > 0) {
/* 調用該方法將當前ngx_buf_t緩沖區中的shadow域釋放 */
ngx_event_pipe_remove_shadow_links(cl->buf);
/* 計算當前緩沖區剩余的空間大小 */
size = cl->buf->end - cl->buf->last;
/* 若本次接收到上游響應包體的長度大于緩沖區剩余的空間,表示當前緩沖區已滿 */
if (n >= size) {
cl->buf->last = cl->buf->end;
/* STUB */ cl->buf->num = p->num++;
/* 調用input_filter方法處理當前緩沖區響應包體,把其掛載到in鏈表中 */
if (p->input_filter(p, cl->buf) == NGX_ERROR) {
return NGX_ABORT;
}
n -= size;
ln = cl;
cl = cl->next;
ngx_free_chain(p->pool, ln);
} else {
/*
* 若本次接收到上游響應包體的長度小于緩沖區剩余的空間,
* 表示當前緩沖區還有剩余空間接收上游響應包體;
* 則先把本次接收到的響應包體緩沖區添加到free_raw_bufs鏈表尾端;
*/
cl->buf->last += n;
n = 0;
}
}
if (cl) {
for (ln = cl; ln->next; ln = ln->next) { /* void */ }
ln->next = p->free_raw_bufs;
p->free_raw_bufs = cl;
}
}
/* end of the For cycle */
#if (NGX_DEBUG)
...
...
#endif
/* 若free_raw_bufs不為空 */
if (p->free_raw_bufs && p->length != -1) {
cl = p->free_raw_bufs;
if (cl->buf->last - cl->buf->pos >= p->length) {
p->free_raw_bufs = cl->next;
/* STUB */ cl->buf->num = p->num++;
/* 調用input_filter方法處理free_raw_bufs緩沖區 */
if (p->input_filter(p, cl->buf) == NGX_ERROR) {
return NGX_ABORT;
}
/* 釋放已被處理的chain緩沖區數據 */
ngx_free_chain(p->pool, cl);
}
}
if (p->length == 0) {
p->upstream_done = 1;
p->read = 1;
}
/*
* 檢查upstream_eof或upstream_error標志位是否為1,若其中一個為1,表示連接已經關閉,
* 若連接已經關閉,且free_raw_bufs緩沖區不為空;
*/
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
/* STUB */ p->free_raw_bufs->buf->num = p->num++;
/* 再次調用input_filter方法處理free_raw_bufs緩沖區的響應包體數據 */
if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
return NGX_ABORT;
}
p->free_raw_bufs = p->free_raw_bufs->next;
/* 檢查free_bufs標志位,若為1,則釋放shadow域為空的緩沖區 */
if (p->free_bufs && p->buf_to_file == NULL) {
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
if (cl->buf->shadow == NULL) {
ngx_pfree(p->pool, cl->buf->start);
}
}
}
}
if (p->cacheable && p->in) {
if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
return NGX_ABORT;
}
}
/* 返回NGX_OK,結束當前函數 */
return NGX_OK;
}
~~~
? ? ? ngx_event_pipe_write_to_downstream 方法將 in 鏈表和 out 鏈表中管理的緩沖區發送到下游服務器,由于 out 鏈表中緩沖區的內容在響應中的位置比 in 鏈表靠前,因此優先發送 out 鏈表內容給下游服務器。
ngx_event_pipe_write_to_downstream 方法的執行流程如下:
- *步驟1*:檢查上游連接是否結束,即標志位 upstream_eof、upstream_error、upstream_done 有一個為 1,則表示不需要再接收上游響應包體,跳至*步驟2*執行,否則跳至*步驟5*執行;
- *步驟2*:調用 output_filter 方法將 out 鏈表緩沖區中的響應包體發送給下游服務器;
- *步驟3*:調用 output_filter 方法將 in 鏈表緩沖區中的響應包體發送給下游服務器;
- *步驟4*:設置 downstream_done 標志位為 1,結束當前函數;
- *步驟5*:計算 busy 鏈表緩沖區中待發送的響應包體長度 bsize,若 bsize 大于配置項規定值 busy_size,則跳至*步驟7*執行,否則繼續向下游準備發送 out 或 in 鏈表緩沖區中的響應包體;
- *步驟6*:檢查 out 鏈表是否為空:
- 若 out 鏈表不為空,取出 out 鏈表首個緩沖區 ngx_buf_t 作為發送響應包體,跳至*步驟7*執行;
- 若 out 鏈表為空,檢查 in 鏈表是否為空:
- 若 in 鏈表為空,則說明本次沒有需要發送的響應包體,則返回 NGX_OK,結束當前函數;
- 若 in 鏈表不為空,取出 in 鏈表首部的第一個緩沖區作為待發送響應包體緩沖區,跳至*步驟7*執行;
- *步驟7*:檢查以前待發送響應包體長度加上本次本次需要發送的響應包體長度是否大于 busy_size,若大于 busy_size,跳至*步驟8*執行;否則跳至*步驟5*執行;
- *步驟8*:調用 output_filter 方法向下游服務器發送存儲響應包體的 out 緩沖區鏈表;
- *步驟9*:調用 ngx_chain_update_chain 方法更新 free、busy、out 緩沖區;
- *步驟10*:遍歷 free 鏈表,釋放緩沖區中的 shadow 域;
~~~
/* 向下游服務器轉發響應包體 */
static ngx_int_t
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
{
u_char *prev;
size_t bsize;
ngx_int_t rc;
ngx_uint_t flush, flushed, prev_last_shadow;
ngx_chain_t *out, **ll, *cl;
ngx_connection_t *downstream;
/* 獲取Nginx與下游服務器之間的連接 */
downstream = p->downstream;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream: %d", downstream->write->ready);
flushed = 0;
for ( ;; ) {
/* downstream_error標志位為1,表示與下游之間的連接出現錯誤 */
if (p->downstream_error) {
return ngx_event_pipe_drain_chains(p);
}
/* 檢查與上游之間的連接是否關閉,若已關閉 */
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
/* pass the p->out and p->in chains to the output filter */
for (cl = p->busy; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
/* 調用output_filter方法將out鏈表中的緩沖區響應包體轉發給下游 */
if (p->out) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush out");
for (cl = p->out; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->out);
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
p->out = NULL;
}
/* 調用output_filter方法將in鏈表中的緩沖區響應包體轉發給下游 */
if (p->in) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush in");
for (cl = p->in; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->in);
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
p->in = NULL;
}
if (p->cacheable && p->buf_to_file) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write chain");
if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
return NGX_ABORT;
}
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream done");
/* TODO: free unused bufs */
p->downstream_done = 1;
break;
}
/*
* 若上游連接沒有關閉,則檢查下游連接上的寫事件是否準備就緒;
* 若準備就緒,則表示可以向下游轉發響應包體;
* 若未準備就緒,則break退出for循環,return NGX_OK從當前函數返回;
*/
if (downstream->data != p->output_ctx
|| !downstream->write->ready
|| downstream->write->delayed)
{
break;
}
/* bsize is the size of the busy recycled bufs */
prev = NULL;
bsize = 0;
/* 計算busy鏈表緩沖區中待發送響應包體的長度bsize */
for (cl = p->busy; cl; cl = cl->next) {
if (cl->buf->recycled) {
if (prev == cl->buf->start) {
continue;
}
bsize += cl->buf->end - cl->buf->start;
prev = cl->buf->start;
}
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write busy: %uz", bsize);
out = NULL;
/* 檢查bsize是否大于busy_size配置項 */
if (bsize >= (size_t) p->busy_size) {
flush = 1;
goto flush;
}
/* 若bsize小于busy_size配置項 */
flush = 0;
ll = NULL;
prev_last_shadow = 1;
/*
* 檢查in、out鏈表緩沖區是否為空,若不為空;
* 將out、in鏈表首個緩沖區作為發送內容;
*/
for ( ;; ) {
if (p->out) {/* out鏈表不為空,則取出首個緩沖區作為發送響應內容 */
cl = p->out;
if (cl->buf->recycled) {
ngx_log_error(NGX_LOG_ALERT, p->log, 0,
"recycled buffer in pipe out chain");
}
p->out = p->out->next;
} else if (!p->cacheable && p->in) {
/* 若out為空,檢查in是否為空,若in不為空,則首個緩沖區作為發送內容 */
cl = p->in;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write buf ls:%d %p %z",
cl->buf->last_shadow,
cl->buf->pos,
cl->buf->last - cl->buf->pos);
if (cl->buf->recycled && prev_last_shadow) {
/* 判斷待發送響應包體長度加上本次緩沖區的長度是否大于busy_size */
if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
flush = 1;
break;
}
bsize += cl->buf->end - cl->buf->start;
}
prev_last_shadow = cl->buf->last_shadow;
p->in = p->in->next;
} else {
break;
}
cl->next = NULL;
if (out) {
*ll = cl;
} else {
out = cl;
}
ll = &cl->next;
}
flush:
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write: out:%p, f:%d", out, flush);
if (out == NULL) {
if (!flush) {
break;
}
/* a workaround for AIO */
if (flushed++ > 10) {
return NGX_BUSY;
}
}
/* 調用output_filter方法發送out鏈表緩沖區 */
rc = p->output_filter(p->output_ctx, out);
/* 更新free、busy、out鏈表緩沖區 */
ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
/* 遍歷free鏈表,釋放緩沖區中的shadow域 */
for (cl = p->free; cl; cl = cl->next) {
if (cl->buf->temp_file) {
if (p->cacheable || !p->cyclic_temp_file) {
continue;
}
/* reset p->temp_offset if all bufs had been sent */
if (cl->buf->file_last == p->temp_file->offset) {
p->temp_file->offset = 0;
}
}
/* TODO: free buf if p->free_bufs && upstream done */
/* add the free shadow raw buf to p->free_raw_bufs */
if (cl->buf->last_shadow) {
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
return NGX_ABORT;
}
cl->buf->last_shadow = 0;
}
cl->buf->shadow = NULL;
}
}
return NGX_OK;
}
~~~
~~~
static void
ngx_http_upstream_process_request(ngx_http_request_t *r)
{
ngx_temp_file_t *tf;
ngx_event_pipe_t *p;
ngx_http_upstream_t *u;
u = r->upstream;
p = u->pipe;
if (u->peer.connection) {
if (u->store) {
if (p->upstream_eof || p->upstream_done) {
tf = p->temp_file;
if (u->headers_in.status_n == NGX_HTTP_OK
&& (p->upstream_done || p->length == -1)
&& (u->headers_in.content_length_n == -1
|| u->headers_in.content_length_n == tf->offset))
{
ngx_http_upstream_store(r, u);
u->store = 0;
}
}
}
#if (NGX_HTTP_CACHE)
...
...
#endif
if (p->upstream_done || p->upstream_eof || p->upstream_error) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream exit: %p", p->out);
if (p->upstream_done
|| (p->upstream_eof && p->length == -1))
{
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
if (p->upstream_eof) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"upstream prematurely closed connection");
}
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
return;
}
}
if (p->downstream_error) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream downstream error");
if (!u->cacheable && !u->store && u->peer.connection) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
}
}
}
~~~
### 結束 upstream 請求
結束 upstream 請求由函數 ngx_http_upstream_finalize_request 實現,該函數最終會調用 HTTP 框架的 ngx_http_finalize_request 方法來結束請求。
~~~
static void
ngx_http_upstream_finalize_request(ngx_http_request_t *r,
ngx_http_upstream_t *u, ngx_int_t rc)
{
ngx_uint_t flush;
ngx_time_t *tp;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"finalize http upstream request: %i", rc);
/* 將 cleanup 指向的清理資源回調方法設置為 NULL */
if (u->cleanup) {
*u->cleanup = NULL;
u->cleanup = NULL;
}
/* 釋放解析主機域名時分配的資源 */
if (u->resolved && u->resolved->ctx) {
ngx_resolve_name_done(u->resolved->ctx);
u->resolved->ctx = NULL;
}
/* 設置當前時間為 HTTP 響應結束的時間 */
if (u->state && u->state->response_sec) {
tp = ngx_timeofday();
u->state->response_sec = tp->sec - u->state->response_sec;
u->state->response_msec = tp->msec - u->state->response_msec;
if (u->pipe && u->pipe->read_length) {
u->state->response_length = u->pipe->read_length;
}
}
/* 調用該方法執行一些操作 */
u->finalize_request(r, rc);
/* 調用 free 方法釋放連接資源 */
if (u->peer.free && u->peer.sockaddr) {
u->peer.free(&u->peer, u->peer.data, 0);
u->peer.sockaddr = NULL;
}
/* 若上游連接還未關閉,則調用 ngx_close_connection 方法關閉該連接 */
if (u->peer.connection) {
#if (NGX_HTTP_SSL)
...
...
#endif
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"close http upstream connection: %d",
u->peer.connection->fd);
if (u->peer.connection->pool) {
ngx_destroy_pool(u->peer.connection->pool);
}
ngx_close_connection(u->peer.connection);
}
u->peer.connection = NULL;
if (u->pipe && u->pipe->temp_file) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream temp fd: %d",
u->pipe->temp_file->file.fd);
}
/* 若使用了文件緩存,則調用 ngx_delete_file 方法刪除用于緩存響應的臨時文件 */
if (u->store && u->pipe && u->pipe->temp_file
&& u->pipe->temp_file->file.fd != NGX_INVALID_FILE)
{
if (ngx_delete_file(u->pipe->temp_file->file.name.data)
== NGX_FILE_ERROR)
{
ngx_log_error(NGX_LOG_CRIT, r->connection->log, ngx_errno,
ngx_delete_file_n " \"%s\" failed",
u->pipe->temp_file->file.name.data);
}
}
#if (NGX_HTTP_CACHE)
...
...
#endif
if (r->subrequest_in_memory
&& u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE)
{
u->buffer.last = u->buffer.pos;
}
if (rc == NGX_DECLINED) {
return;
}
r->connection->log->action = "sending to client";
if (!u->header_sent
|| rc == NGX_HTTP_REQUEST_TIME_OUT
|| rc == NGX_HTTP_CLIENT_CLOSED_REQUEST)
{
ngx_http_finalize_request(r, rc);
return;
}
flush = 0;
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
rc = NGX_ERROR;
flush = 1;
}
if (r->header_only) {
ngx_http_finalize_request(r, rc);
return;
}
if (rc == 0) {
rc = ngx_http_send_special(r, NGX_HTTP_LAST);
} else if (flush) {
r->keepalive = 0;
rc = ngx_http_send_special(r, NGX_HTTP_FLUSH);
}
/* 調用 HTTP 框架實現的 ngx_http_finalize_request 方法關閉請求 */
ngx_http_finalize_request(r, rc);
}
~~~
- 前言
- Nginx 配置文件
- Nginx 內存池管理
- Nginx 基本數據結構
- Nginx 數組結構 ngx_array_t
- Nginx 鏈表結構 ngx_list_t
- Nginx 隊列雙向鏈表結構 ngx_queue_t
- Nginx 哈希表結構 ngx_hash_t
- Nginx 紅黑樹結構 ngx_rbtree_t
- Nginx 模塊開發
- Nginx 啟動初始化過程
- Nginx 配置解析
- Nginx 中的 upstream 與 subrequest 機制
- Nginx 源碼結構分析
- Nginx 事件模塊
- Nginx 的 epoll 事件驅動模塊
- Nginx 定時器事件
- Nginx 事件驅動模塊連接處理
- Nginx 中 HTTP 模塊初始化
- Nginx 中處理 HTTP 請求
- Nginx 中 upstream 機制的實現
- Nginx 中 upstream 機制的負載均衡