## 第三章 高級請求-應答模式
在第二章中我們通過開發一系列的小應用來熟悉?MQ的基本使用方法,每個應用會引入一些新的特性。本章會沿用這種方式,來探索更多建立在?MQ請求-應答模式之上的高級工作模式。
本章涉及的內容有:
* 在請求-應答模式中創建和使用消息信封
* 使用REQ、REP、DEALER和ROUTER套接字
* 使用標識來手工指定應答目標
* 使用自定義離散路由模式
* 使用自定義最近最少使用路由模式
* 構建高層消息封裝類
* 構建基本的請求應答代理
* 合理命名套接字
* 模擬client-worker集群
* 構建可擴展的請求-應答集群云
* 使用管道套接字監控線程
### Request-Reply Envelopes
在請求-應答模式中,信封里保存了應答目標的位置。這就是為什么?MQ網絡雖然是無狀態的,但仍能完成請求-應答的過程。
在一般使用過程中,你并不需要知道請求-應答信封的工作原理。使用REQ、REP時,?MQ會自動處理消息信封。下一章講到的裝置(device),使用時也只需保證讀取和寫入所有的信息即可。?MQ使用多段消息的方式來存儲信封,所以在復制消息時也會復制信封。
然而,在使用高級請求-應答模式之前是需要了解信封這一機制的,以下是信封機制在ROUTER中的工作原理:
* 從ROUTER中讀取一條消息時,?MQ會包上一層信封,上面注明了消息的來源。
* 向ROUTER寫入一條消息時(包含信封),?MQ會將信封拆開,并將消息遞送給相應的對象。
如果將從ROUTER A中獲取的消息(包含信封)寫入ROUTER B(即將消息發送給一個DEALER,該DEALER連接到了ROUTER),那么在從ROUTER B中獲取該消息時就會包含兩層信封。
信封機制的根本作用是讓ROUTER知道如何將消息遞送給正確的應答目標,你需要做的就是在程序中保留好該信封。回顧一下REP套接字,它會將收到消息的信封逐個拆開,將消息本身傳送給應用程序。而在發送時,又會在消息外層包裹該信封,發送給ROUTER,從而傳遞給正確的應答目標。
我們可以使用上述原理建立起一個ROUTER-DEALER裝置:
```
[REQ] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [ROUTER--DEALER] <--> [REP]
...etc.
```
當你用REQ套接字去連接ROUTER套接字,并發送一條請求消息,你會從ROUTER中獲得一條如下所示的消息:

* 第三幀是應用程序發送給REQ套接字的消息;
* 第二幀的空信息是REQ套接字在發送消息給ROUTER之前添加的;
* 第一幀即信封,是由ROUTER套接字添加的,記錄了消息的來源。
如果我們在一條裝置鏈路上傳遞該消息,最終會得到包含多層信封的消息。最新的信封會在消息的頂部。

以下將詳述我們在請求-應答模式中使用到的四種套接字類型:
* DEALER是一種負載均衡,它會將消息分發給已連接的節點,并使用公平隊列的機制處理接受到的消息。DEALER的作用就像是PUSH和PULL的結合。
* REQ發送消息時會在消息頂部插入一個空幀,接受時會將空幀移去。其實REQ是建立在DEALER之上的,但REQ只有當消息發送并接受到回應后才能繼續運行。
* ROUTER在收到消息時會在頂部添加一個信封,標記消息來源。發送時會通過該信封決定哪個節點可以獲取到該條消息。
* REP在收到消息時會將第一個空幀之前的所有信息保存起來,將原始信息傳送給應用程序。在發送消息時,REP會用剛才保存的信息包裹應答消息。REP其實是建立在ROUTER之上的,但和REQ一樣,必須完成接受和發送這兩個動作后才能繼續。
REP要求消息中的信封由一個空幀結束,所以如果你沒有用REQ發送消息,則需要自己在消息中添加這個空幀。
你肯定會問,ROUTER是怎么標識消息的來源的?答案當然是套接字的標識。我們之前講過,一個套接字可能是瞬時的,它所連接的套接字(如ROUTER)則會給它生成一個標識,與之相關聯。一個套接字也可以顯式地給自己定義一個標識,這樣其他套接字就可以直接使用了。
這是一個瞬時的套接字,ROUTER會自動生成一個UUID來標識消息的來源。

這是一個持久的套接字,標識由消息來源自己指定。

下面讓我們在實例中觀察上述兩種操作。下列程序會打印出ROUTER從兩個REP套接字中獲得的消息,其中一個沒有指定標識,另一個指定了“Hello”作為標識。
**identity.c**
```c
//
// 以下程序演示了如何在請求-應答模式中使用套接字標識。
// 需要注意的是s_開頭的函數是由zhelpers.h提供的。
// 我們沒有必要重復編寫那些代碼。
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
void *sink = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (sink, "inproc://example");
// 第一個套接字由0MQ自動設置標識
void *anonymous = zmq_socket (context, ZMQ_REQ);
zmq_connect (anonymous, "inproc://example");
s_send (anonymous, "ROUTER uses a generated UUID");
s_dump (sink);
// 第二個由自己設置
void *identified = zmq_socket (context, ZMQ_REQ);
zmq_setsockopt (identified, ZMQ_IDENTITY, "Hello", 5);
zmq_connect (identified, "inproc://example");
s_send (identified, "ROUTER socket uses REQ's socket identity");
s_dump (sink);
zmq_close (sink);
zmq_close (anonymous);
zmq_close (identified);
zmq_term (context);
return 0;
}
```
運行結果:
```
----------------------------------------
[017] 00314F043F46C441E28DD0AC54BE8DA727
[000]
[026] ROUTER uses a generated UUID
----------------------------------------
[005] Hello
[000]
[038] ROUTER socket uses REQ's socket identity
```
### 自定義請求-應答路由
我們已經看到ROUTER套接字是如何使用信封將消息發送給正確的應答目標的,下面我們從一個角度來定義ROUTER:在發送消息時使用一定格式的信封提供正確的路由目標,ROUTER就能夠將該條消息異步地發送給對應的節點。
所以說ROUTER的行為是完全可控的。在深入理解這一特性之前,讓我們先近距離觀察一下REQ和REP套接字,賦予他們一些鮮活的角色:
* REQ是一個“媽媽”套接字,不會耐心聽別人說話,但會不斷地拋出問題尋求解答。REQ是嚴格同步的,它永遠位于消息鏈路的請求端;
* REP則是一個“爸爸”套接字,只會回答問題,不會主動和別人對話。REP也是嚴格同步的,并一直位于應答端。
關于“媽媽”套接字,正如我們小時候所經歷的,只能等她向你開口時你們才能對話。媽媽不像爸爸那么開明,也不會像DEALER套接字一樣接受模棱兩可的回答。所以,想和REQ套接字對話只有等它主動發出請求后才行,之后它就會一直等待你的回答,不管有多久。
“爸爸”套接字則給人一種強硬、冷漠的感覺,他只做一件事:無論你提出什么問題,都會給出一個精確的回答。不要期望一個REP套接字會主動和你對話或是將你倆的交談傳達給別人,它不會這么做的。
我們通常認為請求-應答模式一定是有來有往、有去有回的過程,但實際上這個過程是可以異步進行的。我們只需獲得相應節點的地址,即可通過ROUTER套接字來異步地發送消息。ROUTER是ZMQ中唯一一個可以定位消息來源的套接字。
我們對請求-應答模式下的路由做一個小結:
* 對于瞬時的套接字,ROUTER會動態生成一個UUID來標識它,因此從ROUTER中獲取到的消息里會包含這個標識;
* 對于持久的套接字,可以自定義標識,ROUTER會如直接將該標識放入消息之中;
* 具有顯式聲明標識的節點可以連接到其他類型的套接字;
* 節點可以通過配置文件等機制提前獲知對方節點的標識,作出相應的處理。
我們至少有三種模式來實現和ROUTER的連接:
* ROUTER-DEALER
* ROUTER-REQ
* ROUTER-REP
每種模式下我們都可以完全掌控消息的路由方式,但不同的模式會有不一樣的應用場景和消息流,下一節開始我們會逐一解釋。
自定義路由也有一些注意事項:
* 自定義路由讓節點能夠控制消息的去向,這一點有悖?MQ的規則。使用自定義路由的唯一理由是?MQ缺乏更多的路由算法供我們選擇;
* 未來的?MQ版本可能包含一些我們自定義的路由方式,這意味著我們現在設計的代碼可能無法在新版本的?MQ中運行,或者成為一種多余;
* 內置的路由機制是可擴展的,且對裝置友好,但自定義路由就需要自己解決這些問題。
所以說自定義路由的成本是比較高的,更多情況下應當交由?MQ來完成。不過既然我們已經講到這兒了,就繼續深入下去吧!
### ROUTER-DEALER路由
ROUTER-DEALDER是一種最簡單的路由方式。將ROUTER和多個DEALER相連接,用一種合適的算法來決定如何分發消息給DEALER。DEALER可以是一個黑洞(只負責處理消息,不給任何返回)、代理(將消息轉發給其他節點)或是服務(會發送返回信息)。
如果你要求DEALER能夠進行回復,那就要保證只有一個ROUTER連接到DEALER,因為DEALER并不知道哪個特定的節點在聯系它,如果有多個節點,它會做負載均衡,將消息分發出去。但如果DEALER是一個黑洞,那就可以連接任何數量的節點。
ROUTER-DEALER路由可以用來做什么呢?如果DEALER會將它完成任務的時間回復給ROUTER,那ROUTER就可以知道這個DEALER的處理速度有多快了。因為ROUTER和DEALER都是異步的套接字,所以我們要用zmq_poll()來處理這種情況。
下面例子中的兩個DEALER不會返回消息給ROUTER,我們的路由采用加權隨機算法:發送兩倍多的信息給其中的一個DEALER。

**rtdealer.c**
```c
//
// 自定義ROUTER-DEALER路由
//
// 這個實例是單個進程,這樣方便啟動。
// 每個線程都有自己的ZMQ上下文,所以可以認為是多個進程在運行。
//
#include "zhelpers.h"
#include <pthread.h>
// 這里定義了兩個worker,其代碼是一樣的。
//
static void *
worker_task_a (void *args)
{
void *context = zmq_init (1);
void *worker = zmq_socket (context, ZMQ_DEALER);
zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1);
zmq_connect (worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// 我們只接受到消息的第二部分
char *request = s_recv (worker);
int finished = (strcmp (request, "END") == 0);
free (request);
if (finished) {
printf ("A received: %d\n", total);
break;
}
total++;
}
zmq_close (worker);
zmq_term (context);
return NULL;
}
static void *
worker_task_b (void *args)
{
void *context = zmq_init (1);
void *worker = zmq_socket (context, ZMQ_DEALER);
zmq_setsockopt (worker, ZMQ_IDENTITY, "B", 1);
zmq_connect (worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// 我們只接受到消息的第二部分
char *request = s_recv (worker);
int finished = (strcmp (request, "END") == 0);
free (request);
if (finished) {
printf ("B received: %d\n", total);
break;
}
total++;
}
zmq_close (worker);
zmq_term (context);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
void *client = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (client, "ipc://routing.ipc");
pthread_t worker;
pthread_create (&worker, NULL, worker_task_a, NULL);
pthread_create (&worker, NULL, worker_task_b, NULL);
// 等待線程連接至套接字,否則我們發送的消息將不能被正確路由
sleep (1);
// 發送10個任務,給A兩倍多的量
int task_nbr;
srandom ((unsigned) time (NULL));
for (task_nbr = 0; task_nbr < 10; task_nbr++) {
// 發送消息的兩個部分:第一部分是目標地址
if (randof (3) > 0)
s_sendmore (client, "A");
else
s_sendmore (client, "B");
// 然后是任務
s_send (client, "This is the workload");
}
s_sendmore (client, "A");
s_send (client, "END");
s_sendmore (client, "B");
s_send (client, "END");
zmq_close (client);
zmq_term (context);
return 0;
}
```
對上述代碼的兩點說明:
* ROUTER并不知道DEALER何時會準備好,我們可以用信號機制來解決,但為了不讓這個例子太過復雜,我們就用sleep(1)的方式來處理。如果沒有這句話,那ROUTER一開始發出的消息將無法被路由,?MQ會丟棄這些消息。
* 需要注意的是,除了ROUTER會丟棄無法路由的消息外,PUB套接字當沒有SUB連接它時也會丟棄發送出去的消息。其他套接字則會將無法發送的消息存儲起來,直到有節點來處理它們。
在將消息路由給DEALER時,我們手工建立了這樣一個信封:

ROUTER套接字會移除第一幀,只將第二幀的內容傳遞給相應的DEALER。當DEALER發送消息給ROUTER時,只會發送一幀,ROUTER會在外層包裹一個信封(添加第一幀),返回給我們。
如果你定義了一個非法的信封地址,ROUTER會直接丟棄該消息,不作任何提示。對于這一點我們也無能為力,因為出現這種情況只有兩種可能,一是要送達的目標節點不復存在了,或是程序中錯誤地指定了目標地址。如何才能知道消息會被正確地路由?唯一的方法是讓路由目標發送一些反饋消息給我們。后面幾章會講述這一點。
DEALER的工作方式就像是PUSH和PULL的結合。但是,我們不能用PULL或PUSH去構建請求-應答模式。
### 最近最少使用算法路由(LRU模式)
我們之前講過REQ套接字永遠是對話的發起方,然后等待對方回答。這一特性可以讓我們能夠保持多個REQ套接字等待調配。換句話說,REQ套接字會告訴我們它已經準備好了。
你可以將ROUTER和多個REQ相連,請求-應答的過程如下:
* REQ發送消息給ROUTER
* ROUTER返回消息給REQ
* REQ發送消息給ROUTER
* ROUTER返回消息給REQ
* ...
和DEALER相同,REQ只能和一個ROUTER連接,除非你想做類似多路冗余路由這樣的事(我甚至不想在這里解釋),其復雜度會超過你的想象并迫使你放棄的。

ROUTER-REQ模式可以用來做什么?最常用的做法就是最近最少使用算法(LRU)路由了,ROUTER發出的請求會讓等待最久的REQ來處理。請看示例:
```c
//
// 自定義ROUTER-REQ路由
//
#include "zhelpers.h"
#include <pthread.h>
#define NBR_WORKERS 10
static void *
worker_task(void *args) {
void *context = zmq_init(1);
void *worker = zmq_socket(context, ZMQ_REQ);
// s_set_id()函數會根據套接字生成一個可打印的字符串,
// 并以此作為該套接字的標識。
s_set_id(worker);
zmq_connect(worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// 告訴ROUTER我已經準備好了
s_send(worker, "ready");
// 從ROUTER中獲取工作,直到收到結束的信息
char *workload = s_recv(worker);
int finished = (strcmp(workload, "END") == 0);
free(workload);
if (finished) {
printf("Processed: %d tasks\n", total);
break;
}
total++;
// 隨機等待一段時間
s_sleep(randof(1000) + 1);
}
zmq_close(worker);
zmq_term(context);
return NULL;
}
int main(void) {
void *context = zmq_init(1);
void *client = zmq_socket(context, ZMQ_ROUTER);
zmq_bind(client, "ipc://routing.ipc");
srandom((unsigned) time(NULL));
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create(&worker, NULL, worker_task, NULL);
}
int task_nbr;
for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) {
// 最近最少使用的worker就在消息隊列中
char *address = s_recv(client);
char *empty = s_recv(client);
free(empty);
char *ready = s_recv(client);
free(ready);
s_sendmore(client, address);
s_sendmore(client, "");
s_send(client, "This is the workload");
free(address);
}
// 通知所有REQ套接字結束工作
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
char *address = s_recv(client);
char *empty = s_recv(client);
free(empty);
char *ready = s_recv(client);
free(ready);
s_sendmore(client, address);
s_sendmore(client, "");
s_send(client, "END");
free(address);
}
zmq_close(client);
zmq_term(context);
return 0;
}
```
在這個示例中,實現LRU算法并沒有用到特別的數據結構,因為?MQ的消息隊列機制已經提供了等價的實現。一個更為實際的LRU算法應該將已準備好的worker收集起來,保存在一個隊列中進行分配。以后我們會講到這個例子。
程序的運行結果會將每個worker的執行次數打印出來。由于REQ套接字會隨機等待一段時間,而我們也沒有做負載均衡,所以我們希望看到的是每個worker執行相近的工作量。這也是程序執行的結果。
```
Processed: 8 tasks
Processed: 8 tasks
Processed: 11 tasks
Processed: 7 tasks
Processed: 9 tasks
Processed: 11 tasks
Processed: 14 tasks
Processed: 11 tasks
Processed: 11 tasks
Processed: 10 tasks
```
關于以上代碼的幾點說明:
* 我們不需要像前一個例子一樣等待一段時間,因為REQ套接字會明確告訴ROUTER它已經準備好了。
* 我們使用了zhelpers.h提供的s_set_id()函數來為套接字生成一個可打印的字符串標識,這是為了讓例子簡單一些。在現實環境中,REQ套接字都是匿名的,你需要直接調用zmq_recv()和zmq_send()來處理消息,因為s_recv()和s_send()只能處理字符串標識的套接字。
* 更糟的是,我們使用了隨機的標識,不要在現實環境中使用隨機標識的持久套接字,這樣做會將節點消耗殆盡。
* 如果你只是將上面的代碼拷貝過來,沒有充分理解,那你就像是看到蜘蛛人從屋頂上飛下來,你也照著做了,后果自負吧。
在將消息路由給REQ套接字時,需要注意一定的格式,即地址-空幀-消息:

### 使用地址進行路由
在經典的請求-應答模式中,ROUTER一般不會和REP套接字通信,而是由DEALER去和REP通信。DEALER會將消息隨機分發給多個REP,并獲得結果。ROUTER更適合和REQ套接字通信。
我們應該記住,?MQ的經典模型往往是運行得最好的,畢竟人走得多的路往往是條好路,如果不按常理出牌,那很有可能會跌入無敵深潭。下面我們就將ROUTER和REP進行連接,看看會發生什么。
REP套接字有兩個特點:
* 它需要完成完整的請求-應答周期;
* 它可以接受任意大小的信封,并能完整地返回該信封。
在一般的請求-應答模式中,REP是匿名的,可以隨時替換。因為我們這里在將自定義路由,就要做到將一條消息發送給REP A,而不是REP B。這樣才能保證網絡的一端是你,另一端是特定的REP。
?MQ的核心理念之一是周邊的節點應該盡可能的智能,且數量眾多,而中間件則是固定和簡單的。這就意味著周邊節點可以向其他特定的節點發送消息,比如可以連接到一個特定的REP。這里我們先不討論如何在多個節點之間進行路由,只看最后一步中ROUTER如何和特定的REP通信的。

這張圖描述了以下事件:
* client有一條消息,將來會通過另一個ROUTER將該消息發送回去。這條信息包含了兩個地址、一個空幀、以及消息內容;
* client將該條消息發送給了ROUTER,并指定了REP的地址;
* ROUTER將該地址移去,并以此決定其下哪個REP可以獲得該消息;
* REP收到該條包含地址、空幀、以及內容的消息;
* REP將空幀之前的所有內容移去,交給worker去處理消息;
* worker處理完成后將回復交給REP;
* REP將之前保存好的信封包裹住該條回復,并發送給ROUTER;
* ROUTER在該條回復上又添加了一個注明REP的地址的幀。
這個過程看起來很復雜,但還是有必要取了解清楚的。只要記住,REP套接字會原封不動地將信封返回回去。
**rtpapa.c**
```c
//
// 自定義ROUTER-REP路由
//
#include "zhelpers.h"
// 這里使用一個進程來強調事件發生的順序性
int main (void)
{
void *context = zmq_init (1);
void *client = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (client, "ipc://routing.ipc");
void *worker = zmq_socket (context, ZMQ_REP);
zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1);
zmq_connect (worker, "ipc://routing.ipc");
// 等待worker連接
sleep (1);
// 發送REP的標識、地址、空幀、以及消息內容
s_sendmore (client, "A");
s_sendmore (client, "address 3");
s_sendmore (client, "address 2");
s_sendmore (client, "address 1");
s_sendmore (client, "");
s_send (client, "This is the workload");
// worker只會得到消息內容
s_dump (worker);
// worker不需要處理信封
s_send (worker, "This is the reply");
// 看看ROUTER里收到了什么
s_dump (client);
zmq_close (client);
zmq_close (worker);
zmq_term (context);
return 0;
}
```
運行結果
```
----------------------------------------
[020] This is the workload
----------------------------------------
[001] A
[009] address 3
[009] address 2
[009] address 1
[000]
[017] This is the reply
```
關于以上代碼的幾點說明:
* 在現實環境中,ROUTER和REP套接字處于不同的節點。本例沒有啟用多進程,為的是讓事件的發生順序更為清楚。
* zmq_connect()并不是瞬間完成的,REP和ROUTER連接的時候是會花費一些時間的。在現實環境中,ROUTER無從得知REP是否已經連接成功了,除非得到REP的某些回應。本例中使用sleep(1)來處理這一問題,如果不這樣做,那REP將無法獲得消息(自己嘗試一下吧)。
* 我們使用REP的套接字標識來進行路由,如果你不信,可以將消息發送給B,看看A能不能收到。
* 本例中的s_dump()等函數來自于zhelpers.h文件,可以看到在進行套接字連接時代碼都是一樣的,所以我們才能在?MQ API的基礎上搭建上層的API。等今后我們討論到復雜應用程序的時候再詳細說明。
要將消息路由給REP,我們需要創建它能辨別的信封:

### 請求-應答模式下的消息代理
這一節我們將對如何使用?MQ消息信封做一個回顧,并嘗試編寫一個通用的消息代理裝置。我們會建立一個隊列裝置來連接多個client和worker,裝置的路由算法可以由我們自己決定。這里我們選擇最近最少使用算法,因為這和負載均衡一樣比較實用。
首先讓我們回顧一下經典的請求-應答模型,嘗試用它建立一個不斷增長的巨型服務網絡。最基本的請求-應答模型是:

這個模型支持多個REP套接字,但如果我們想支持多個REQ套接字,就需要增加一個中間件,它通常是ROUTER和DEALER的結合體,簡單將兩個套接字之間的信息進行搬運,因此可以用現成的ZMQ_QUEUE裝置來實現:
```
+--------+ +--------+ +--------+
| Client | | Client | | Client |
+--------+ +--------+ +--------+
| REQ | | REQ | | REQ |
+---+----+ +---+----+ +---+----+
| | |
+-----------+-----------+
|
+---+----+
| ROUTER |
+--------+
| Device |
+--------+
| DEALER |
+---+----+
|
+-----------+-----------+
| | |
+---+----+ +---+----+ +---+----+
| REP | | REP | | REP |
+--------+ +--------+ +--------+
| Worker | | Worker | | Worker |
+--------+ +--------+ +--------+
Figure # - Stretched request-reply
```
這種結構的關鍵在于,ROUTER會將消息來自哪個REQ記錄下來,生成一個信封。DEALER和REP套接字在傳輸消息的過程中不會丟棄或更改信封的內容,這樣當消息返回給ROUTER時,它就知道應該發送給哪個REQ了。這個模型中的REP套接字是匿名的,并沒有特定的地址,所以只能提供同一種服務。
上述結構中,對REP的路由我們使用了DEADER自帶的負載均衡算法。但是,我們想用LRU算法來進行路由,這就要用到ROUTER-REP模式:

這個ROUTER-ROUTER的LRU隊列不能簡單地在兩個套接字間搬運消息,以下代碼會比較復雜,不過在請求-應答模式中復用性很高。
**lruqueue.c**
```c
//
// 使用LRU算法的裝置
// client和worker處于不同的線程中
//
#include "zhelpers.h"
#include <pthread.h>
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
// 出隊操作,使用一個可存儲任何類型的數組實現
#define DEQUEUE(q) memmove (&(q)[0], &(q)[1], sizeof (q) - sizeof (q [0]))
// 使用REQ套接字實現基本的請求-應答模式
// 由于s_send()和s_recv()不能處理0MQ的二進制套接字標識,
// 所以這里會生成一個可打印的字符串標識。
//
static void *
client_task (void *args)
{
void *context = zmq_init (1);
void *client = zmq_socket (context, ZMQ_REQ);
s_set_id (client); // 設置可打印的標識
zmq_connect (client, "ipc://frontend.ipc");
// 發送請求并獲取應答信息
s_send (client, "HELLO");
char *reply = s_recv (client);
printf ("Client: %s\n", reply);
free (reply);
zmq_close (client);
zmq_term (context);
return NULL;
}
// worker使用REQ套接字實現LRU算法
//
static void *
worker_task (void *args)
{
void *context = zmq_init (1);
void *worker = zmq_socket (context, ZMQ_REQ);
s_set_id (worker); // 設置可打印的標識
zmq_connect (worker, "ipc://backend.ipc");
// 告訴代理worker已經準備好
s_send (worker, "READY");
while (1) {
// 將消息中空幀之前的所有內容(信封)保存起來,
// 本例中空幀之前只有一幀,但可以有更多。
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);
// 獲取請求,并發送回應
char *request = s_recv (worker);
printf ("Worker: %s\n", request);
free (request);
s_sendmore (worker, address);
s_sendmore (worker, "");
s_send (worker, "OK");
free (address);
}
zmq_close (worker);
zmq_term (context);
return NULL;
}
int main (void)
{
// 準備0MQ上下文和套接字
void *context = zmq_init (1);
void *frontend = zmq_socket (context, ZMQ_ROUTER);
void *backend = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (frontend, "ipc://frontend.ipc");
zmq_bind (backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) {
pthread_t client;
pthread_create (&client, NULL, client_task, NULL);
}
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_task, NULL);
}
// LRU邏輯
// - 一直從backend中獲取消息;當有超過一個worker空閑時才從frontend獲取消息。
// - 當woker回應時,會將該worker標記為已準備好,并轉發woker的回應給client
// - 如果client發送了請求,就將該請求轉發給下一個worker
// 存放可用worker的隊列
int available_workers = 0;
char *worker_queue [10];
while (1) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
zmq_poll (items, available_workers? 2: 1, -1);
// 處理backend中worker的隊列
if (items [0].revents & ZMQ_POLLIN) {
// 將worker的地址入隊
char *worker_addr = s_recv (backend);
assert (available_workers < NBR_WORKERS);
worker_queue [available_workers++] = worker_addr;
// 跳過空幀
char *empty = s_recv (backend);
assert (empty [0] == 0);
free (empty);
// 第三幀是“READY”或是一個client的地址
char *client_addr = s_recv (backend);
// 如果是一個應答消息,則轉發給client
if (strcmp (client_addr, "READY") != 0) {
empty = s_recv (backend);
assert (empty [0] == 0);
free (empty);
char *reply = s_recv (backend);
s_sendmore (frontend, client_addr);
s_sendmore (frontend, "");
s_send (frontend, reply);
free (reply);
if (--client_nbr == 0)
break; // 處理N條消息后退出
}
free (client_addr);
}
if (items [1].revents & ZMQ_POLLIN) {
// 獲取下一個client的請求,交給空閑的worker處理
// client請求的消息格式是:[client地址][空幀][請求內容]
char *client_addr = s_recv (frontend);
char *empty = s_recv (frontend);
assert (empty [0] == 0);
free (empty);
char *request = s_recv (frontend);
s_sendmore (backend, worker_queue [0]);
s_sendmore (backend, "");
s_sendmore (backend, client_addr);
s_sendmore (backend, "");
s_send (backend, request);
free (client_addr);
free (request);
// 將該worker的地址出隊
free (worker_queue [0]);
DEQUEUE (worker_queue);
available_workers--;
}
}
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
```
這段程序有兩個關鍵點:1、各個套接字是如何處理信封的;2、LRU算法。我們先來看信封的格式。
我們知道REQ套接字在發送消息時會向頭部添加一個空幀,接收時又會自動移除。我們要做的就是在傳輸消息時滿足REQ的要求,處理好空幀。另外還要注意,ROUTER會在所有收到的消息前添加消息來源的地址。
現在我們就將完整的請求-應答流程走一遍,我們將client套接字的標識設為“CLIENT”,worker的設為“WORKER”。以下是client發送的消息:

代理從ROUTER中獲取到的消息格式如下:

代理會從LRU隊列中獲取一個空閑woker的地址,作為信封附加在消息之上,傳送給ROUTER。注意要添加一個空幀。

REQ(worker)收到消息時,會將信封和空幀移去:

可以看到,worker收到的消息和client端ROUTER收到的消息是一致的。worker需要將該消息中的信封保存起來,只對消息內容做操作。
在返回的過程中:
* worker通過REQ傳輸給device消息\[client地址\]\[空幀\]\[應答內容\];
* device從worker端的ROUTER中獲取到\[worker地址\]\[空幀\]\[client地址\]\[空幀\]\[應答內容\];
* device將worker地址保存起來,并發送\[client地址\]\[空幀\]\[應答內容\]給client端的ROUTER;
* client從REQ中獲得到\[應答內容\]。
然后再看看LRU算法,它要求client和worker都使用REQ套接字,并正確的存儲和返回消息信封,具體如下:
* 創建一組poll,不斷地從backend(worker端的ROUTER)獲取消息;只有當有空閑的worker時才從frontend(client端的ROUTER)獲取消息;
* 循環執行poll
* 如果backend有消息,只有兩種情況:1)READY消息(該worker已準備好,等待分配);2)應答消息(需要轉發給client)。兩種情況下我們都會保存worker的地址,放入LRU隊列中,如果有應答內容,則轉發給相應的client。
* 如果frontend有消息,我們從LRU隊列中取出下一個worker,將該請求發送給它。這就需要發送[worker地址][空幀][client地址][空幀][請求內容]到worker端的ROUTER。
我們可以對該算法進行擴展,如在worker啟動時做一個自我測試,計算出自身的處理速度,并隨READY消息發送給代理,這樣代理在分配工作時就可以做相應的安排。
### ?MQ上層API的封裝
使用?MQ提供的API操作多段消息時是很麻煩的,如以下代碼:
```c
while (1) {
// 將消息中空幀之前的所有內容(信封)保存起來,
// 本例中空幀之前只有一幀,但可以有更多。
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);
// 獲取請求,并發送回應
char *request = s_recv (worker);
printf ("Worker: %s\n", request);
free (request);
s_sendmore (worker, address);
s_sendmore (worker, "");
s_send (worker, "OK");
free (address);
}
```
這段代碼不滿足重用的需求,因為它只能處理一個幀的信封。事實上,以上代碼已經做了一些封裝了,如果調用?MQ底層的API的話,代碼就會更加冗長:
```c
while (1) {
// 將消息中空幀之前的所有內容(信封)保存起來,
// 本例中空幀之前只有一幀,但可以有更多。
zmq_msg_t address;
zmq_msg_init (&address);
zmq_recv (worker, &address, 0);
zmq_msg_t empty;
zmq_msg_init (&empty);
zmq_recv (worker, &empty, 0);
// 獲取請求,并發送回應
zmq_msg_t payload;
zmq_msg_init (&payload);
zmq_recv (worker, &payload, 0);
int char_nbr;
printf ("Worker: ");
for (char_nbr = 0; char_nbr < zmq_msg_size (&payload); char_nbr++)
printf ("%c", *(char *) (zmq_msg_data (&payload) + char_nbr));
printf ("\n");
zmq_msg_init_size (&payload, 2);
memcpy (zmq_msg_data (&payload), "OK", 2);
zmq_send (worker, &address, ZMQ_SNDMORE);
zmq_close (&address);
zmq_send (worker, &empty, ZMQ_SNDMORE);
zmq_close (&empty);
zmq_send (worker, &payload, 0);
zmq_close (&payload);
}
```
我們理想中的API是可以一步接收和處理完整的消息,包括信封。?MQ底層的API并不是為此而涉及的,但我們可以在它上層做進一步的封裝,這也是學習?MQ的過程中很重要的內容。
想要編寫這樣一個API還是很有難度的,因為我們要避免過于頻繁地復制數據。此外,?MQ用“消息”來定義多段消息和多段消息中的一部分,同時,消息又可以是字符串消息或者二進制消息,這也給編寫API增加的難度。
解決方法之一是使用新的命名方式:字符串(s_send()和s_recv()中已經在用了)、幀(消息的一部分)、消息(一個或多個幀)。以下是用新的API重寫的worker:
```c
while (1) {
zmsg_t *zmsg = zmsg_recv (worker);
zframe_print (zmsg_last (zmsg), "Worker: ");
zframe_reset (zmsg_last (zmsg), "OK", 2);
zmsg_send (&zmsg, worker);
}
```
用4行代碼代替22行代碼是個不錯的選擇,而且更容易讀懂。我們可以用這種理念繼續編寫其他的API,希望可以實現以下功能:
* 自動處理套接字。每次都要手動關閉套接字是很麻煩的事,手動定義過期時間也不是太有必要,所以,如果能在關閉上下文時自動關閉套接字就太好了。
* 便捷的線程管理。基本上所有的?MQ應用都會用到多線程,但POSIX的多線程接口用起來并不是太方便,所以也可以封裝一下。
* 便捷的時鐘管理。想要獲取毫秒數、或是暫停運行幾毫秒都不太方便,我們的API應該提供這個接口。
* 一個能夠替代zmq_poll()的反應器。poll循環很簡單,但比較笨拙,會造成重復代碼:計算時間、處理套接字中的信息等。若有一個簡單的反應器來處理套接字的讀寫以及時間的控制,將會很方便。
* 恰當地處理Ctrl-C按鍵。我么已經看到如何處理中斷了,最好這一機制可以用到所有的程序里。
我們可以用czmq來實現以上的需求。這個擴展很早就有了,提供了很多?MQ的上層封裝,甚至是數據結構(哈希、鏈表等)。
以下是用czmq重寫的LRU代理:
**lruqueue2.c**
```c
//
// LRU消息隊列裝置,使用czmq庫實現
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // worker準備就緒的信息
// 使用REQ套接字實現基本的請求-應答模式
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");
// 發送請求并接收應答
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break;
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
// worker使用REQ套接字,實現LRU路由
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");
// 告知代理worker已準備就緒
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
// 接收消息并處理
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // 終止
//zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "ipc://frontend.ipc");
zsocket_bind (backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (ctx, client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (ctx, worker_task, NULL);
// LRU邏輯
// - 一直從backend中獲取消息;當有超過一個worker空閑時才從frontend獲取消息。
// - 當woker回應時,會將該worker標記為已準備好,并轉發woker的回應給client
// - 如果client發送了請求,就將該請求轉發給下一個worker
// 存放可用worker的隊列
zlist_t *workers = zlist_new ();
while (1) {
// 初始化poll
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// 當有可用的worker時,從frontend獲取消息
int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
break; // 中斷
// 對backend發來的消息進行處理
if (items [0].revents & ZMQ_POLLIN) {
// 使用worker的地址進行LRU路由
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // 中斷
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
// 如果不是READY消息,則轉發給client
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// 獲取client發來的請求,轉發給worker
zmsg_t *msg = zmsg_recv (frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
zmsg_send (&msg, backend);
}
}
}
// 如果完成了,則進行一些清理工作
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}
```
czmq提供了一個簡單的中斷機制,當按下Ctrl-C時程序會終止?MQ的運行,并返回-1,errno設置為EINTR。程序中斷時,czmq的recv方法會返回NULL,所以你可以用下面的代碼來作判斷:
```c
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // 中斷
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
```
如果使用zmq_poll()函數,則可以這樣判斷:
```
int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
break; // 中斷
```
上例中還是使用了原生的zmq_poll()方法,也可以使用czmq提供的zloop反應器來實現,它可以做到:
* 從任意套接字上獲取消息,也就是說只要套接字有消息就可以觸發函數;
* 停止讀取套接字上的消息;
* 設置一個時鐘,定時地讀取消息。
zloop內部當然是使用zmq_poll()實現的,但它可以做到動態地增減套接字上的監聽器,重構poll池,并根據poll的超時時間來計算下一個時鐘觸發事件。
使用這種反應器模式后,我們的代碼就更簡潔了:
```c
zloop_t *reactor = zloop_new ();
zloop_reader (reactor, self->backend, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);
```
對消息的實際處理放在了程序的其他部分,并不是所有人都會喜歡這種風格,但zloop的確是將定時器和套接字的行為融合在了一起。在以后的例子中,我們會用zmq_poll()來處理簡單的示例,使用zloop來處理復雜的。
下面我們用zloop來重寫LRU隊列裝置
**lruqueue3.c**
```c
//
// LRU隊列裝置,使用czmq及其反應器模式實現
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // woker已準備就緒的消息
// 使用REQ實現基本的請求-應答模式
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");
// 發送請求并接收應答
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break;
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
// worker使用REQ套接字來實現路由
//
static void *
worker_task (void *arg_ptr)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");
// 告訴代理worker已經準備就緒
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
// 獲取消息并處理
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // 中斷
//zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
// LRU隊列處理器結構,將要傳給反應器
typedef struct {
void *frontend; // 監聽client
void *backend; // 監聽worker
zlist_t *workers; // 可用的worker列表
} lruqueue_t;
// 處理frontend端的消息
int s_handle_frontend (zloop_t *loop, void *socket, void *arg)
{
lruqueue_t *self = (lruqueue_t *) arg;
zmsg_t *msg = zmsg_recv (self->frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (self->workers));
zmsg_send (&msg, self->backend);
// 如果沒有可用的worker,則停止監聽frontend
if (zlist_size (self->workers) == 0)
zloop_cancel (loop, self->frontend);
}
return 0;
}
// 處理backend端的消息
int s_handle_backend (zloop_t *loop, void *socket, void *arg)
{
// 使用worker的地址進行LRU路由
lruqueue_t *self = (lruqueue_t *) arg;
zmsg_t *msg = zmsg_recv (self->backend);
if (msg) {
zframe_t *address = zmsg_unwrap (msg);
zlist_append (self->workers, address);
// 當有可用worker時增加frontend端的監聽
if (zlist_size (self->workers) == 1)
zloop_reader (loop, self->frontend, s_handle_frontend, self);
// 如果是worker發送來的應答,則轉發給client
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, self->frontend);
}
return 0;
}
int main (void)
{
zctx_t *ctx = zctx_new ();
lruqueue_t *self = (lruqueue_t *) zmalloc (sizeof (lruqueue_t));
self->frontend = zsocket_new (ctx, ZMQ_ROUTER);
self->backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (self->frontend, "ipc://frontend.ipc");
zsocket_bind (self->backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (ctx, client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (ctx, worker_task, NULL);
// 可用worker的列表
self->workers = zlist_new ();
// 準備并啟動反應器
zloop_t *reactor = zloop_new ();
zloop_reader (reactor, self->backend, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);
// 結束之后的清理工作
while (zlist_size (self->workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (self->workers);
zframe_destroy (&frame);
}
zlist_destroy (&self->workers);
zctx_destroy (&ctx);
free (self);
return 0;
}
```
要正確處理Ctrl-C還是有點困難的,如果你使用zctx類,那它會自動進行處理,不過也需要代碼的配合。若zmq_poll()返回了-1,或者recv方法(zstr_recv, zframe_recv, zmsg_recv)返回了NULL,就必須退出所有的循環。另外,在最外層循環中增加!zctx_interrupted的判斷也很有用。
### 異步C/S結構
在之前的ROUTER-DEALER模型中,我們看到了client是如何異步地和多個worker進行通信的。我們可以將這個結構倒置過來,實現多個client異步地和單個server進行通信:

* client連接至server并發送請求;
* 每一次收到請求,server會發送0至N個應答;
* client可以同時發送多個請求而不需要等待應答;
* server可以同時發送多個應答二不需要新的請求。
**asyncsrd.c**
```c
//
// 異步C/S模型(DEALER-ROUTER)
//
#include "czmq.h"
// ---------------------------------------------------------------------
// 這是client端任務,它會連接至server,每秒發送一次請求,同時收集和打印應答消息。
// 我們會運行多個client端任務,使用隨機的標識。
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_DEALER);
// 設置隨機標識,方便跟蹤
char identity [10];
sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
zsockopt_set_identity (client, identity);
zsocket_connect (client, "tcp://localhost:5570");
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int request_nbr = 0;
while (1) {
// 從poll中獲取消息,每秒一次
int centitick;
for (centitick = 0; centitick < 100; centitick++) {
zmq_poll (items, 1, 10 * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (client);
zframe_print (zmsg_last (msg), identity);
zmsg_destroy (&msg);
}
}
zstr_sendf (client, "request #%d", ++request_nbr);
}
zctx_destroy (&ctx);
return NULL;
}
// ---------------------------------------------------------------------
// 這是server端任務,它使用多線程機制將請求分發給多個worker,并正確返回應答信息。
// 一個worker只能處理一次請求,但client可以同時發送多個請求。
static void server_worker (void *args, zctx_t *ctx, void *pipe);
void *server_task (void *args)
{
zctx_t *ctx = zctx_new ();
// frontend套接字使用TCP和client通信
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5570");
// backend套接字使用inproc和worker通信
void *backend = zsocket_new (ctx, ZMQ_DEALER);
zsocket_bind (backend, "inproc://backend");
// 啟動一個worker線程池,數量任意
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++)
zthread_fork (ctx, server_worker, NULL);
// 使用隊列裝置連接backend和frontend,我們本來可以這樣做:
// zmq_device (ZMQ_QUEUE, frontend, backend);
// 但這里我們會自己完成這個任務,這樣可以方便調試。
// 在frontend和backend間搬運消息
while (1) {
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (frontend);
//puts ("Request from client:");
//zmsg_dump (msg);
zmsg_send (&msg, backend);
}
if (items [1].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (backend);
//puts ("Reply from worker:");
//zmsg_dump (msg);
zmsg_send (&msg, frontend);
}
}
zctx_destroy (&ctx);
return NULL;
}
// 接收一個請求,隨機返回多條相同的文字,并在應答之間做隨機的延遲。
//
static void
server_worker (void *args, zctx_t *ctx, void *pipe)
{
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "inproc://backend");
while (1) {
// DEALER套接字將信封和消息內容一起返回給我們
zmsg_t *msg = zmsg_recv (worker);
zframe_t *address = zmsg_pop (msg);
zframe_t *content = zmsg_pop (msg);
assert (content);
zmsg_destroy (&msg);
// 隨機返回0至4條應答
int reply, replies = randof (5);
for (reply = 0; reply < replies; reply++) {
// 暫停一段時間
zclock_sleep (randof (1000) + 1);
zframe_send (&address, worker, ZFRAME_REUSE + ZFRAME_MORE);
zframe_send (&content, worker, ZFRAME_REUSE);
}
zframe_destroy (&address);
zframe_destroy (&content);
}
}
// 主程序用來啟動多個client和一個server
//
int main (void)
{
zctx_t *ctx = zctx_new ();
zthread_new (ctx, client_task, NULL);
zthread_new (ctx, client_task, NULL);
zthread_new (ctx, client_task, NULL);
zthread_new (ctx, server_task, NULL);
// 運行5秒后退出
zclock_sleep (5 * 1000);
zctx_destroy (&ctx);
return 0;
}
```
運行上面的代碼,可以看到三個客戶端有各自的隨機標識,每次請求會獲得零到多條回復。
* client每秒會發送一次請求,并獲得零到多條應答。這要通過zmq_poll()來實現,但我們不能只每秒poll一次,這樣將不能及時處理應答。程序中我們每秒取100次,這樣一來server端也可以以此作為一種心跳(heartbeat),用來檢測client是否還在線。
* server使用了一個worker池,每一個worker同步處理一條請求。我們可以使用內置的隊列來搬運消息,但為了方便調試,在程序中我們自己實現了這一過程。你可以將注釋的幾行去掉,看看輸出結果。
這段代碼的整體架構如下圖所示:

可以看到,client和server之間的連接我們使用的是DEALER-ROUTER,而server和worker的連接則用了DEALER-DEALER。如果worker是一個同步的線程,我們可以用REP。但是本例中worker需要能夠發送多個應答,所以就需要使用DEALER這樣的異步套接字。這里我們不需要對應答進行路由,因為所有的worker都是連接到一個server上的。
讓我們看看路由用的信封,client發送了一條信息,server獲取的信息中包含了client的地址,這樣一來我們有兩種可行的server-worker通信方案:
* worker收到未經標識的信息。我們使用顯式聲明的標識,配合ROUTER套接字來連接worker和server。這種設計需要worker提前告知ROUTER它的存在,這種LRU算法正是我們之前所講述的。
* worker收到含有標識的信息,并返回含有標識的應答。這就要求worker能夠處理好信封。
第二種涉及較為簡單:
```
client server frontend worker
[ DEALER ]<---->[ ROUTER <----> DEALER <----> DEALER ]
1 part 2 parts 2 parts
```
當我們需要在client和server之間維持一個對話時,就會碰到一個經典的問題:client是不固定的,如果給每個client都保存一些消息,那系統資源很快就會耗盡。即使是和同一個client保持連接,因為使用的是瞬時的套接字(沒有顯式聲明標識),那每次連接也相當于是一個新的連接。
想要在異步的請求中保存好client的信息,有以下幾點需要注意:
* client需要發送心跳給server。本例中client每秒都會發送一個請求給server,這就是一種很可靠的心跳機制。
* 使用client的套接字標識來存儲信息,這對瞬時和持久的套接字都有效;
* 檢測停止心跳的client,如兩秒內沒有收到某個client的心跳,就將保存的狀態丟棄。
### 實戰:跨代理路由
讓我們把目前所學到的知識綜合起來,應用到實戰中去。我們的大客戶今天打來一個緊急電話,說是要構建一個大型的云計算設施。它要求這個云架構可以跨越多個數據中心,每個數據中心包含一組client和worker,且能共同協作。
我們堅信實踐高于理論,所以就提議使用ZMQ搭建這樣一個系統。我們的客戶同意了,可能是因為他的確也想降低開發的成本,或是在推特上看到了太多ZMQ的好處。
#### 細節詳述
喝完幾杯特濃咖啡,我們準備著手干了,但腦中有個理智的聲音提醒我們應該在事前將問題分析清楚,然后再開始思考解決方案。云到底要做什么?我們如是問,客戶這樣回答:
* worker在不同的硬件上運作,但可以處理所有類型的任務。每個集群都有成百個worker,再乘以集群的個數,因此數量眾多。
* client向worker指派任務,每個任務都是獨立的,每個client都希望能找到對應的worker來處理任務,越快越好。client是不固定的,來去頻繁。
* 真正的難點在于,這個架構需要能夠自如地添加和刪除集群,附帶著集群中的client和worker。
* 如果集群中沒有可用的worker,它便會將任務分派給其他集群中可以用的worker。
* client每次發送一個請求,并等待應答。如果X秒后他們沒有獲得應答,他們會重新發送請求。這一點我們不需要多做考慮,client端的API已經寫好了。
* worker每次處理一個請求,他們的行為非常簡單。如果worker崩潰了,會有另外的腳本啟動他們。
聽了以上的回答,我們又進一步追問:
* 集群之間會有一個更上層的網絡來連接他們對嗎?客戶說是的。
* 我們需要處理多大的吞吐量?客戶說,每個集群約有一千個client,單個client每秒會發送10次請求。請求包含的內容很少,應答也很少,每個不超過1KB。
我們進行了簡單的計算,2500個client x 10次/秒 x 1000字節 x 雙向 = 50MB/秒,或400Mb/秒,這對1Gb網絡來說不成問題,可以使用TCP協議。
這樣需求就很清晰了,不需要額外的硬件或協議來完成這件事,只要提供一個高效的路由算法,設計得縝密一些。我們首先從一個集群(數據中心)開始,然后思考如何來連接他們。
#### 單個集群的架構
worker和client是同步的,我們使用LRU算法來給worker分配任務。每個worker都是等價的,所以我們不需要考慮服務的問題。worker是匿名的,client不會和某個特定的worker進行通信,因而我們不需要保證消息的送達以及失敗后的重試等。
鑒于上文提過的原因,client和worker是不會直接通信的,這樣一來就無法動態地添加和刪除節點了。所以,我們的基礎模型會使用一個請求-應答模式中使用過的代理結構。

#### 多個集群的架構
下面我們將集群擴充到多個,每個集群有自己的一組client和worker,并使用代理相連接:

問題在于:我們如何讓一個集群的client和另一個集群的worker進行通信呢?有這樣幾種解決方案,我們來看看他們的優劣:
* client直接和多個代理相連接。優點在于我們可以不對代理和worker做改動,但client會變得復雜,并需要知悉整個架構的情況。如果我們想要添加第三或第四個集群,所有的client都會需要修改。我們相當于是將路由和容錯功能寫進client了,這并不是個好主意。
* worker直接和多個代理相連接。可是REQ類型的worker不能做到這一點,它只能應答給某一個代理。如果改用REP套接字,這樣就不能使用LRU算法的隊列代理了。這點肯定不行,在我們的結構中必須用LRU算法來管理worker。還有個方法是使用ROUTER套接字,讓我們暫且稱之為方案1。
* 代理之間可以互相連接,這看上去不錯,因為不需要增加過多的額外連接。雖然我們不能隨意地添加代理,但這個問題可以暫不考慮。這種情況下,集群中的worker和client不必理會整體架構,當代理有剩余的工作能力時便會和其他代理通信。這是方案2。
我們首先看看方案1,worker同時和多個代理進行通信:

這看上去很靈活,但卻沒有提供我們所需要的特性:client只有當集群中的worker不可用時才會去請求異地的worker。此外,worker的“已就緒”信號會同時發送給兩個代理,這樣就有可能同時獲得兩份任務。這個方案的失敗還有一個原因:我們又將路由邏輯放在了邊緣地帶。
那來看看方案2,我們為各個代理建立連接,不修改worker和client:

這種設計的優勢在于,我們只需要在一個地方解決問題就可以了,其他地方不需要修改。這就好像代理之間會秘密通信:伙計,我這兒有一些剩余的勞動力,如果你那兒忙不過來就跟我說,價錢好商量。
事實上,我們只不過是需要設計一種更為復雜的路由算法罷了:代理成為了其他代理的分包商。這種設計還有其他一些好處:
* 在普通情況下(如只存在一個集群),這種設計的處理方式和原來沒有區別,當有多個集群時再進行其他動作。
* 對于不同的工作我們可以使用不同的消息流模式,如使用不同的網絡鏈接。
* 架構的擴充看起來也比較容易,如有必要我們還可以添加一個超級代理來完成調度工作。
現在我們就開始編寫代碼。我們會將完整的集群寫入一個進程,這樣便于演示,而且稍作修改就能投入實際使用。這也是ZMQ的優美之處,你可以使用最小的開發模塊來進行實驗,最后方便地遷移到實際工程中。線程變成進程,消息模式和邏輯不需要改變。我們每個“集群”進程都包含client線程、worker線程、以及代理線程。
我們對基礎模型應該已經很熟悉了:
* client線程使用REQ套接字,將請求發送給代理線程(ROUTER套接字);
* worker線程使用REQ套接字,處理并應答從代理線程(ROUTER套接字)收到的請求;
* 代理會使用LRU隊列和路由機制來管理請求。
#### 聯邦模式和同伴模式
連接代理的方式有很多,我們需要斟酌一番。我們需要的功能是告訴其他代理“我這里還有空閑的worker”,然后開始接收并處理一些任務;我們還需要能夠告訴其他代理“夠了夠了,我這邊的工作量也滿了”。這個過程不一定要十分完美,有時我們確實會接收超過承受能力的工作量,但仍能逐步地完成。
最簡單的方式稱為聯邦,即代理充當其他代理的client和worker。我們可以將代理的前端套接字連接至其他代理的后端套接字,反之亦然。提示一下,ZMQ中是可以將一個套接字綁定到一個端點,同時又連接至另一個端點的。

這種架構的邏輯會比較簡單:當代理沒有client時,它會告訴其他代理自己準備好了,并接收一個任務進行處理。但問題在于這種機制太簡單了,聯邦模式下的代理一次只能處理一個請求。如果client和worker是嚴格同步的,那么代理中的其他空閑worker將分配不到任務。我們需要的代理應該具備完全異步的特性。
但是,聯邦模式對某些應用來說是非常好的,比如面向服務架構(SOA)。所以,先不要急著否定聯邦模式,它只是不適用于LRU算法和集群負載均衡而已。
我們還有一種方式來連接代理:同伴模式。代理之間知道彼此的存在,并使用一個特殊的信道進行通信。我們逐步進行分析,假設有N個代理需要連接,每個代理則有N-1個同伴,所有代理都使用相同格式的消息進行通信。關于消息在代理之間的流通有兩點需要注意:
* 每個代理需要告知所有同伴自己有多少空閑的worker,這是一則簡單的消息,只是一個不斷更新的數字,很顯然我們會使用PUB-SUB套接字。這樣一來,每個代理都會打開一個PUB套接字,不斷告知外界自身的信息;同時又會打開一個SUB套接字,獲取其他代理的信息。
* 每個代理需要以某種方式將工作任務交給其他代理,并能獲取應答,這個過程需要是異步的。我們會使用ROUTER-ROUTER套接字來實現,沒有其他選擇。每個代理會使用兩個這樣的ROUTER套接字,一個用于接收任務,另一個用于分發任務。如果不使用兩個套接字,那就需要額外的邏輯來判別收到的是請求還是應答,這就需要在消息中加入更多的信息。
另外還需要考慮的是代理和本地client和worker之間的通信。
#### The Naming Ceremony
代理中有三個消息流,每個消息流使用兩個套接字,因此一共需要使用六個套接字。為這些套接字取一組好名字很重要,這樣我們就不會在來回切換的時候找不著北。套接字是有一定任務的,他們的所完成的工作可以是命名的一部分。這樣,當我們日后再重新閱讀這些代碼時,就不會顯得太過陌生了。
以下是我們使用的三個消息流:
* 本地(local)的請求-應答消息流,實現代理和client、代理和worker之間的通信;
* 云端(cloud)的請求-應答消息流,實現代理和其同伴的通信;
* 狀態(state)流,由代理和其同伴互相傳遞。
能夠找到一些有意義的、且長度相同的名字,會讓我們的代碼對得比較整齊。可能他們并沒有太多關聯,但久了自然會習慣。
每個消息流會有兩個套接字,我們之前一直稱為“前端(frontend)”和“后端(backend)”。這兩個名字我們已經使用很多次了:前端會負責接受信息或任務;后端會發送信息或任務給同伴。從概念上說,消息流都是從前往后的,應答則是從后往前。
因此,我們決定使用以下的命名方式:
* localfe / localbe
* cloudfe / cloudbe
* statefe / statebe
通信協議方面,我們全部使用ipc。使用這種協議的好處是,它能像tcp協議那樣作為一種脫機通信協議來工作,而又不需要使用IP地址或DNS服務。對于ipc協議的端點,我們會命名為xxx-localfe/be、xxx-cloud、xxx-state,其中xxx代表集群的名稱。
也許你會覺得這種命名方式太長了,還不如簡單的叫s1、s2、s3……事實上,你的大腦并不是機器,閱讀代碼的時候不能立刻反應出變量的含義。而用上面這種“三個消息流,兩個方向”的方式記憶,要比純粹記憶“六個不同的套接字”來的方便。
以下是代理程序的套接字分布圖:

請注意,我們會將cloudbe連接至其他代理的cloudfe,也會將statebe連接至其他代理的statefe。
#### 狀態流原型
由于每個消息流都有其巧妙之處,所以我們不會直接把所有的代碼都寫出來,而是分段編寫和測試。當每個消息流都能正常工作了,我們再將其拼裝成一個完整的應用程序。我們首先從狀態流開始:

代碼如下:
**peering1: Prototype state flow in C**
```c
//
// 代理同伴模擬(第一部分)
// 狀態流原型
//
#include "czmq.h"
int main (int argc, char *argv [])
{
// 第一個參數是代理的名稱
// 其他參數是各個同伴的名稱
//
if (argc < 2) {
printf ("syntax: peering1 me {you}...\n");
exit (EXIT_FAILURE);
}
char *self = argv [1];
printf ("I: 正在準備代理程序 %s...\n", self);
srandom ((unsigned) time (NULL));
// 準備上下文和套接字
zctx_t *ctx = zctx_new ();
void *statebe = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (statebe, "ipc://%s-state.ipc", self);
// 連接statefe套接字至所有同伴
void *statefe = zsocket_new (ctx, ZMQ_SUB);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: 正在連接至同伴代理 '%s' 的狀態流后端\n", peer);
zsocket_connect (statefe, "ipc://%s-state.ipc", peer);
}
// 發送并接受狀態消息
// zmq_poll()函數使用的超時時間即心跳時間
//
while (1) {
// 初始化poll對象列表
zmq_pollitem_t items [] = {
{ statefe, 0, ZMQ_POLLIN, 0 }
};
// 輪詢套接字活動,超時時間為1秒
int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
// 處理接收到的狀態消息
if (items [0].revents & ZMQ_POLLIN) {
char *peer_name = zstr_recv (statefe);
char *available = zstr_recv (statefe);
printf ("同伴代理 %s 有 %s 個worker空閑\n", peer_name, available);
free (peer_name);
free (available);
}
else {
// 發送隨機數表示空閑的worker數
zstr_sendm (statebe, self);
zstr_sendf (statebe, "%d", randof (10));
}
}
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}
```
幾點說明:
* 每個代理都需要有各自的標識,用以生成相應的ipc端點名稱。真實環境中,代理需要使用TCP協議連接,這就需要一個更為完備的配置機制,我們會在以后的章節中談到。
* 程序的核心是一個zmq_poll()循環,它會處理接收到消息,并發送自身的狀態。只有當zmq_poll()因無法獲得同伴消息而超時時我們才會發送自身狀態,如果我們每次收到消息都去發送自身狀態,那消息就會過量了。
* 發送的狀態消息包含兩幀,第一幀是代理自身的地址,第二幀是空閑的worker數。我們必須要告知同伴代理自身的地址,這樣才能接收到請求,唯一的方法就是在消息中顯示注明。
* 我們沒有在SUB套接字上設置標識,否則就會在連接到同伴代理時獲得過期的狀態信息。
* 我們沒有在PUB套接字上設置閾值(HWM),因為訂閱者是瞬時的。我們也可以將閾值設置為1,但其實是沒有必要的。
讓我們編譯這段程序,用它模擬三個集群,DC1、DC2、DC3。我們在不同的窗口中運行以下命令:
```
peering1 DC1 DC2 DC3 # Start DC1 and connect to DC2 and DC3
peering1 DC2 DC1 DC3 # Start DC2 and connect to DC1 and DC3
peering1 DC3 DC1 DC2 # Start DC3 and connect to DC1 and DC2
```
每個集群都會報告同伴代理的狀態,之后每隔一秒都會打印出自己的狀態。
在現實編程中,我們不會通過定時的方式來發送自身狀態,而是在狀態發生改變時就發送。這看起來會很占用帶寬,但其實狀態消息的內容很少,而且集群間的連接是非常快速的。
如果我們想要以較為精確的周期來發送狀態信息,可以新建一個線程,將statebe套接字打開,然后由主線程將不規則的狀態信息發送給子線程,再由子線程定時發布這些消息。不過這種機制就需要額外的編程了。
#### 本地流和云端流原型
下面讓我們建立本地流和云端流的原型。這段代碼會從client獲取請求,并隨機地分派給集群內的worker或其他集群。

在編寫代碼之前,讓我們先描繪一下核心的路由邏輯,整理出一份簡單而健壯的設計。
我們需要兩個隊列,一個隊列用于存放從本地集群client收到的請求,另一個存放其他集群發送來的請求。一種方法是從本地和云端的前端套接字中獲取消息,分別存入兩個隊列。但是這么做似乎是沒有必要的,因為ZMQ套接字本身就是隊列。所以,我們直接使用ZMQ套接字提供的緩存來作為隊列使用。
這項技術我們在LRU隊列裝置中使用過,且工作得很好。做法是,當代理下有空閑的worker或能接收請求的其他集群時,才從套接字中獲取請求。我們可以不斷地從后端獲取應答,然后路由回去。如果后端沒有任何響應,那也就沒有必要去接收前端的請求了。
所以,我們的主循環會做以下幾件事:
* 輪詢后端套接字,會從worker處獲得“已就緒”的消息或是一個應答。如果是應答消息,則將其路由回集群client,或是其他集群。
* worker應答后即可標記為可用,放入隊列并計數;
* 如果有可用的worker,就獲取一個請求,該請求可能來自集群內的client,也可能是其他集群。隨后將請求轉發給集群內的worker,或是隨機轉發給其他集群。
這里我們只是隨機地將請求發送給其他集群,而不是在代理中模擬出一個worker,進行集群間的任務分發。這看起來挺愚蠢的,不過目前尚可使用。
我們使用代理的標識來進行代理之前的消息路由。每個代理都有自己的名字,是在命令行中指定的。只要這些指定的名字和ZMQ為client自動生成的UUID不重復,那么我們就可以知道應答是要返回給client,還是返回給另一個集群。
下面是代碼,有趣的部分已在程序中標注:
**peering2: Prototype local and cloud flow in C**
```c
//
// 代理同伴模擬(第二部分)
// 請求-應答消息流原型
//
// 示例程序使用了一個進程,這樣可以讓程序變得簡單,
// 每個線程都有自己的上下文對象,所以可以認為他們是多個進程。
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // 消息:worker已就緒
// 代理名稱;現實中,這個名稱應該由某種配置完成
static char *self;
// 請求-應答客戶端使用REQ套接字
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);
while (1) {
// 發送請求,接收應答
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // 中斷
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
// worker使用REQ套接字,并進行LRU路由
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);
// 告知代理worker已就緒
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
// 處理消息
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // 中斷
zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
int main (int argc, char *argv [])
{
// 第一個參數是代理的名稱
// 其他參數是同伴代理的名稱
//
if (argc < 2) {
printf ("syntax: peering2 me {you}...\n");
exit (EXIT_FAILURE);
}
self = argv [1];
printf ("I: 正在準備代理程序 %s...\n", self);
srandom ((unsigned) time (NULL));
// 準備上下文和套接字
zctx_t *ctx = zctx_new ();
char endpoint [256];
// 將cloudfe綁定至端點
void *cloudfe = zsocket_new (ctx, ZMQ_ROUTER);
zsockopt_set_identity (cloudfe, self);
zsocket_bind (cloudfe, "ipc://%s-cloud.ipc", self);
// 將cloudbe連接至同伴代理的端點
void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER);
zsockopt_set_identity (cloudfe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: 正在連接至同伴代理 '%s' 的cloudfe端點\n", peer);
zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer);
}
// 準備本地前端和后端
void *localfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localfe, "ipc://%s-localfe.ipc", self);
void *localbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);
// 讓用戶告訴我們何時開始
printf ("請確認所有代理已經啟動,按任意鍵繼續: ");
getchar ();
// 啟動本地worker
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (ctx, worker_task, NULL);
// 啟動本地client
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (ctx, client_task, NULL);
// 有趣的部分
// -------------------------------------------------------------
// 請求-應答消息流
// - 若本地有可用worker,則輪詢獲取本地或云端的請求;
// - 將請求路由給本地worker或其他集群。
// 可用worker隊列
int capacity = 0;
zlist_t *workers = zlist_new ();
while (1) {
zmq_pollitem_t backends [] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 }
};
// 如果沒有可用worker,則繼續等待
int rc = zmq_poll (backends, 2,
capacity? 1000 * ZMQ_POLL_MSEC: -1);
if (rc == -1)
break; // 中斷
// 處理本地worker的應答
zmsg_t *msg = NULL;
if (backends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localbe);
if (!msg)
break; // 中斷
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
capacity++;
// 如果是“已就緒”的信號,則不再進行路由
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
}
// 處理來自同伴代理的應答
else
if (backends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudbe);
if (!msg)
break; // 中斷
// 我們不需要使用同伴代理的地址
zframe_t *address = zmsg_unwrap (msg);
zframe_destroy (&address);
}
// 如果應答消息中的地址是同伴代理的,則發送給它
for (argn = 2; msg && argn < argc; argn++) {
char *data = (char *) zframe_data (zmsg_first (msg));
size_t size = zframe_size (zmsg_first (msg));
if (size == strlen (argv [argn])
&& memcmp (data, argv [argn], size) == 0)
zmsg_send (&msg, cloudfe);
}
// 將應答路由給本地client
if (msg)
zmsg_send (&msg, localfe);
// 開始處理客戶端請求
//
while (capacity) {
zmq_pollitem_t frontends [] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
rc = zmq_poll (frontends, 2, 0);
assert (rc >= 0);
int reroutable = 0;
// 優先處理同伴代理的請求,避免資源耗盡
if (frontends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudfe);
reroutable = 0;
}
else
if (frontends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localfe);
reroutable = 1;
}
else
break; // 沒有請求
// 將20%的請求發送給其他集群
//
if (reroutable && argc > 2 && randof (5) == 0) {
// 隨地地路由給同伴代理
int random_peer = randof (argc - 2) + 2;
zmsg_pushmem (msg, argv [random_peer], strlen (argv [random_peer]));
zmsg_send (&msg, cloudbe);
}
else {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zmsg_wrap (msg, frame);
zmsg_send (&msg, localbe);
capacity--;
}
}
}
// 程序結束后的清理工作
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}
```
在兩個窗口中運行以上代碼:
```
peering2 me you
peering2 you me
```
幾點說明:
* zmsg類庫讓程序變得簡單多了,這類程序顯然應該成為我們ZMQ程序員必備的工具;
由于我們沒有在程序中實現獲取同伴代理狀態的功能,所以先暫且認為他們都是有空閑worker的。現實中,我們不會將請求發送個一個不存在的同伴代理。
* 你可以讓這段程序長時間地運行下去,看看會不會出現路由錯誤的消息,因為一旦錯誤,client就會阻塞。你可以試著將一個代理關閉,就能看到代理無法將請求路由給云端中的其他代理,client逐個阻塞,程序也停止打印跟蹤信息。
#### 組裝
讓我們將所有這些放到一段代碼里。和之前一樣,我們會在一個進程中完成所有工作。我們會將上文中的兩個示例程序結合起來,編寫出一個可以模擬任意多個集群的程序。
代碼共有270行,非常適合用來模擬一組完整的集群程序,包括client、worker、代理、以及云端任務分發機制。
**peering3: Full cluster simulation in C**
```c
//
// 同伴代理模擬(第三部分)
// 狀態和任務消息流原型
//
// 示例程序使用了一個進程,這樣可以讓程序變得簡單,
// 每個線程都有自己的上下文對象,所以可以認為他們是多個進程。
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 5
#define LRU_READY "\001" // 消息:worker已就緒
// 代理名稱;現實中,這個名稱應該由某種配置完成
static char *self;
// 請求-應答客戶端使用REQ套接字
// 為模擬壓力測試,客戶端會一次性發送大量請求
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);
void *monitor = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (monitor, "ipc://%s-monitor.ipc", self);
while (1) {
sleep (randof (5));
int burst = randof (15);
while (burst--) {
char task_id [5];
sprintf (task_id, "%04X", randof (0x10000));
// 使用隨機的十六進制ID來標注任務
zstr_send (client, task_id);
// 最多等待10秒
zmq_pollitem_t pollset [1] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (pollset, 1, 10 * 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
if (pollset [0].revents & ZMQ_POLLIN) {
char *reply = zstr_recv (client);
if (!reply)
break; // 中斷
// worker的應答中應包含任務ID
puts (reply);
assert (streq (reply, task_id));
free (reply);
}
else {
zstr_sendf (monitor,
"E: 客戶端退出,丟失的任務為: %s", task_id);
return NULL;
}
}
}
zctx_destroy (&ctx);
return NULL;
}
// worker使用REQ套接字,并進行LRU路由
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);
// 告知代理worker已就緒
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
while (1) {
// worker會隨機延遲幾秒
zmsg_t *msg = zmsg_recv (worker);
sleep (randof (2));
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
int main (int argc, char *argv [])
{
// 第一個參數是代理的名稱
// 其他參數是同伴代理的名稱
//
if (argc < 2) {
printf ("syntax: peering3 me {you}...\n");
exit (EXIT_FAILURE);
}
self = argv [1];
printf ("I: 正在準備代理程序 %s...\n", self);
srandom ((unsigned) time (NULL));
// 準備上下文和套接字
zctx_t *ctx = zctx_new ();
char endpoint [256];
// 將cloudfe綁定至端點
void *cloudfe = zsocket_new (ctx, ZMQ_ROUTER);
zsockopt_set_identity (cloudfe, self);
zsocket_bind (cloudfe, "ipc://%s-cloud.ipc", self);
// 將statebe綁定至端點
void *statebe = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (statebe, "ipc://%s-state.ipc", self);
// 將cloudbe連接至同伴代理的端點
void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER);
zsockopt_set_identity (cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: 正在連接至同伴代理 '%s' 的cloudfe端點\n", peer);
zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer);
}
// 將statefe連接至同伴代理的端點
void *statefe = zsocket_new (ctx, ZMQ_SUB);
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: 正在連接至同伴代理 '%s' 的statebe端點\n", peer);
zsocket_connect (statefe, "ipc://%s-state.ipc", peer);
}
// 準備本地前端和后端
void *localfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localfe, "ipc://%s-localfe.ipc", self);
void *localbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);
// 準備監控套接字
void *monitor = zsocket_new (ctx, ZMQ_PULL);
zsocket_bind (monitor, "ipc://%s-monitor.ipc", self);
// 啟動本地worker
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (ctx, worker_task, NULL);
// 啟動本地client
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (ctx, client_task, NULL);
// 有趣的部分
// -------------------------------------------------------------
// 發布-訂閱消息流
// - 輪詢同伴代理的狀態信息;
// - 當自身狀態改變時,對外廣播消息。
// 請求-應答消息流
// - 若本地有可用worker,則輪詢獲取本地或云端的請求;
// - 將請求路由給本地worker或其他集群。
// 可用worker隊列
int local_capacity = 0;
int cloud_capacity = 0;
zlist_t *workers = zlist_new ();
while (1) {
zmq_pollitem_t primary [] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 },
{ statefe, 0, ZMQ_POLLIN, 0 },
{ monitor, 0, ZMQ_POLLIN, 0 }
};
// 如果沒有可用的worker,則一直等待
int rc = zmq_poll (primary, 4,
local_capacity? 1000 * ZMQ_POLL_MSEC: -1);
if (rc == -1)
break; // 中斷
// 跟蹤自身狀態信息是否改變
int previous = local_capacity;
// 處理本地worker的應答
zmsg_t *msg = NULL;
if (primary [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localbe);
if (!msg)
break; // 中斷
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
local_capacity++;
// 如果是“已就緒”的信號,則不再進行路由
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
}
// 處理來自同伴代理的應答
else
if (primary [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudbe);
if (!msg)
break; // Interrupted
// 我們不需要使用同伴代理的地址
zframe_t *address = zmsg_unwrap (msg);
zframe_destroy (&address);
}
// 如果應答消息中的地址是同伴代理的,則發送給它
for (argn = 2; msg && argn < argc; argn++) {
char *data = (char *) zframe_data (zmsg_first (msg));
size_t size = zframe_size (zmsg_first (msg));
if (size == strlen (argv [argn])
&& memcmp (data, argv [argn], size) == 0)
zmsg_send (&msg, cloudfe);
}
// 將應答路由給本地client
if (msg)
zmsg_send (&msg, localfe);
// 處理同伴代理的狀態更新
if (primary [2].revents & ZMQ_POLLIN) {
char *status = zstr_recv (statefe);
cloud_capacity = atoi (status);
free (status);
}
// 處理監控消息
if (primary [3].revents & ZMQ_POLLIN) {
char *status = zstr_recv (monitor);
printf ("%s\n", status);
free (status);
}
// 開始處理客戶端請求
// - 如果本地有空閑worker,則總本地client和云端接收請求;
// - 如果我們只有空閑的同伴代理,則只輪詢本地client的請求;
// - 將請求路由給本地worker,或者同伴代理。
//
while (local_capacity + cloud_capacity) {
zmq_pollitem_t secondary [] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
if (local_capacity)
rc = zmq_poll (secondary, 2, 0);
else
rc = zmq_poll (secondary, 1, 0);
assert (rc >= 0);
if (secondary [0].revents & ZMQ_POLLIN)
msg = zmsg_recv (localfe);
else
if (secondary [1].revents & ZMQ_POLLIN)
msg = zmsg_recv (cloudfe);
else
break; // 沒有任務
if (local_capacity) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zmsg_wrap (msg, frame);
zmsg_send (&msg, localbe);
local_capacity--;
}
else {
// 隨機路由給同伴代理
int random_peer = randof (argc - 2) + 2;
zmsg_pushmem (msg, argv [random_peer], strlen (argv [random_peer]));
zmsg_send (&msg, cloudbe);
}
}
if (local_capacity != previous) {
// 將自身代理的地址附加到消息中
zstr_sendm (statebe, self);
// 廣播新的狀態信息
zstr_sendf (statebe, "%d", local_capacity);
}
}
// 程序結束后的清理工作
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}
```
這段代碼并不長,但花費了大約一天的時間去調通。以下是一些說明:
* client線程會檢測并報告失敗的請求,它們會輪詢代理套接字,查看是否有應答,超時時間為10秒。
* client線程不會自己打印信息,而是將消息PUSH給一個監控線程,由它打印消息。這是我們第一次使用ZMQ進行監控和記錄日志,我們以后會見得更多。
* clinet會模擬多種負載情況,讓集群在不同的壓力下工作,因此請求可能會在本地處理,也有可能會發送至云端。集群中的client和worker數量、其他集群的數量,以及延遲時間,會左右這個結果。你可以設置不同的參數來測試它們。
* 主循環中有兩組輪詢集合,事實上我們可以使用三個:信息流、后端、前端。因為在前面的例子中,如果后端沒有空閑的worker,就沒有必要輪詢前端請求了。
以下是幾個在編寫過程中遇到的問題:
* 如果請求或應答在某處丟失,client會因此阻塞。回憶以下,ROUTER-ROUTER套接字會在消息如法路由的情況下直接丟棄。這里的一個策略就是改變client線程,檢測并報告這種錯誤。此外,我還在每次recv()之后以及send()之前使用zmsg_dump()來打印套接字內容,用來更快地定位消息。
* 主循環會錯誤地從多個已就緒的套接字中獲取消息,造成第一條消息的丟失。解決方法是只從第一個已就緒的套接字中獲取消息。
* zmsg類庫沒有很好地將UUID編碼為C語言字符串,導致包含字節0的UUID會崩潰。解決方法是將UUID轉換成可打印的十六進制字符串。
這段模擬程序沒有檢測同伴代理是否存在。如果你開啟了某個代理,它已向其他代理發送過狀態信息,然后關閉了,那其他代理仍會向它發送請求。這樣一來,其他代理的client就會報告很多錯誤。解決時有兩點:一、為狀態信息設置有效期,當同伴代理消失一段時間后就不再發送請求;二、提高請求-應答的可靠性,這在下一章中會講到。