## 可靠的請求-應答模式
第三章中我們使用實例介紹了高級請求-應答模式,本章我們會講述請求-應答模式的可靠性問題,并使用ZMQ提供的套接字類型組建起可靠的請求-應答消息系統。
本章將介紹的內容有:
* 客戶端請求-應答
* 最近最少使用隊列
* 心跳機制
* 面向服務的隊列
* 基于磁盤(脫機)隊列
* 主從備份服務
* 無中間件的請求-應答
### 什么是可靠性?
要給可靠性下定義,我們可以先界定它的相反面——故障。如果我們可以處理某些類型的故障,那么我們的模型對于這些故障就是可靠的。下面我們就來列舉分布式ZMQ應用程序中可能發生的問題,從可能性高的故障開始:
* 應用程序代碼是最大的故障來源。程序會崩潰或中止,停止對數據來源的響應,或是響應得太慢,耗盡內存等。
* 系統代碼,如使用ZMQ編寫的中間件,也會意外中止。系統代碼應該要比應用程序代碼更為可靠,但畢竟也有可能崩潰。特別是當系統代碼與速度過慢的客戶端交互時,很容易耗盡內存。
* 消息隊列溢出,典型的情況是系統代碼中沒有對蠻客戶端做積極的處理,任由消息隊列溢出。
* 網絡臨時中斷,造成消息丟失。這類錯誤ZMQ應用程序是無法及時發現的,因為ZMQ會自動進行重連。
* 硬件系統崩潰,導致所有進程中止。
* 網絡會出現特殊情形的中斷,如交換機的某個端口發生故障,導致部分網絡無法訪問。
* 數據中心可能遭受雷擊、地震、火災、電壓過載、冷卻系統失效等。
想要讓軟件系統規避上述所有的風險,需要大量的人力物力,故不在本指南的討論范圍之內。
由于前五個故障類型涵蓋了99.9%的情形(這一數據源自我近期進行的一項研究),所以我們會深入探討。如果你的公司大到足以考慮最后兩種情形,那請及時聯系我,因為我正愁沒錢將我家后院的大坑建成游泳池。
### 可靠性設計
簡單地來說,可靠性就是當程序發生故障時也能順利地運行下去,這要比搭建一個消息系統來得困難得多。我們會根據ZMQ提供的每一種核心消息模式,來看看如何保障代碼的持續運行。
* 請求-應答模式:當服務端在處理請求是中斷,客戶端能夠得知這一信息,并停止接收消息,轉而選擇等待重試、請求另一服務端等操作。這里我們暫不討論客戶端發生問題的情形。
* 發布-訂閱模式:如果客戶端收到一些消息后意外中止,服務端是不知道這一情況的。發布-訂閱模式中的訂閱者不會返回任何消息給發布者。但是,訂閱者可以通過其他方式聯系服務端,如請求-應答模式,要求服務端重發消息。這里我們暫不討論服務端發生問題的情形。此外,訂閱者可以通過某些方式檢查自身是否運行得過慢,并采取相應措施(向操作者發出警告、中止等)。
* 管道模式:如果worker意外終止,任務分發器將無從得知。管道模式和發布-訂閱模式類似,只朝一個方向發送消息。但是,下游的結果收集器可以檢測哪項任務沒有完成,并告訴任務分發器重新分配該任務。如果任務分發器或結果收集器意外中止了,那客戶端發出的請求只能另作處理。所以說,系統代碼真的要減少出錯的幾率,因為這很難處理。
本章主要講解請求-應答模式中的可靠性設計,其他模式將在后續章節中講解。
最基本的請求應答模式是REQ客戶端發送一個同步的請求至REP服務端,這種模式的可靠性很低。如果服務端在處理請求時中止,那客戶端會永遠處于等待狀態。
相比TCP協議,ZMQ提供了自動重連機制、消息分發的負載均衡等。但是,在真實環境中這也是不夠的。唯一可以完全信任基本請求-應答模式的應用場景是同一進程的兩個線程之間進行通信,沒有網絡問題或服務器失效的情況。
但是,只要稍加修飾,這種基本的請求-應答模式就能很好地在現實環境中工作了。我喜歡將其稱為“海盜”模式。
粗略地講,客戶端連接服務端有三種方式,每種方式都需要不同的可靠性設計:
* 多個客戶端直接和單個服務端進行通信。使用場景:只有一個單點服務器,所有客戶端都需要和它通信。需處理的故障:服務器崩潰和重啟;網絡連接中斷。
* 多個客戶端和單個隊列裝置通信,該裝置將請求分發給多個服務端。使用場景:任務分發。需處理的故障:worker崩潰和重啟,死循環,過載;隊列裝置崩潰和重啟;網絡中斷。
* 多個客戶端直接和多個服務端通信,無中間件。使用場景:類似域名解析的分布式服務。需處理的故障:服務端崩潰和重啟,死循環,過載;網絡連接中斷。
以上每種設計都必須有所取舍,很多時候會混合使用。下面我們詳細說明。
### 客戶端的可靠性設計(懶惰海盜模式)
我們可以通過在客戶端進行簡單的設置,來實現可靠的請求-應答模式。我暫且稱之為“懶惰的海盜”(Lazy Pirate)模式。
在接收應答時,我們不進行同步等待,而是做以下操作:
* 對REQ套接字進行輪詢,當消息抵達時才進行接收;
* 請求超時后重發消息,循環多次;
* 若仍無消息,則結束當前事務。

使用REQ套接字時必須嚴格遵守發送-接收過程,因為它內部采用了一個有限狀態機來限定狀態,這一特性會讓我們應用“海盜”模式時遇上一些麻煩。最簡單的做法是將REQ套接字關閉重啟,從而打破這一限定。
**lpclient: Lazy Pirate client in C**
```c
//
// Lazy Pirate client
// 使用zmq_poll輪詢來實現安全的請求-應答
// 運行時可隨機關閉或重啟lpserver程序
//
#include "czmq.h"
#define REQUEST_TIMEOUT 2500 // 毫秒, (> 1000!)
#define REQUEST_RETRIES 3 // 嘗試次數
#define SERVER_ENDPOINT "tcp://localhost:5555"
int main (void)
{
zctx_t *ctx = zctx_new ();
printf ("I: 正在連接服務器...\n");
void *client = zsocket_new (ctx, ZMQ_REQ);
assert (client);
zsocket_connect (client, SERVER_ENDPOINT);
int sequence = 0;
int retries_left = REQUEST_RETRIES;
while (retries_left && !zctx_interrupted) {
// 發送一個請求,并開始接收消息
char request [10];
sprintf (request, "%d", ++sequence);
zstr_send (client, request);
int expect_reply = 1;
while (expect_reply) {
// 對套接字進行輪詢,并設置超時時間
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
// 如果接收到回復則進行處理
if (items [0].revents & ZMQ_POLLIN) {
// 收到服務器應答,必須和請求時的序號一致
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
if (atoi (reply) == sequence) {
printf ("I: 服務器返回正常 (%s)\n", reply);
retries_left = REQUEST_RETRIES;
expect_reply = 0;
}
else
printf ("E: 服務器返回異常: %s\n",
reply);
free (reply);
}
else
if (--retries_left == 0) {
printf ("E: 服務器不可用,取消操作\n");
break;
}
else {
printf ("W: 服務器沒有響應,正在重試...\n");
// 關閉舊套接字,并建立新套接字
zsocket_destroy (ctx, client);
printf ("I: 服務器重連中...\n");
client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, SERVER_ENDPOINT);
// 使用新套接字再次發送請求
zstr_send (client, request);
}
}
}
zctx_destroy (&ctx);
return 0;
}
```
**lpserver: Lazy Pirate server in C**
```c
//
// Lazy Pirate server
// 將REQ套接字連接至 tcp://*:5555
// 和hwserver程序類似,除了以下兩點:
// - 直接輸出請求內容
// - 隨機地降慢運行速度,或中止程序,模擬崩潰
//
#include "zhelpers.h"
int main (void)
{
srandom ((unsigned) time (NULL));
void *context = zmq_init (1);
void *server = zmq_socket (context, ZMQ_REP);
zmq_bind (server, "tcp://*:5555");
int cycles = 0;
while (1) {
char *request = s_recv (server);
cycles++;
// 循環幾次后開始模擬各種故障
if (cycles > 3 && randof (3) == 0) {
printf ("I: 模擬程序崩潰\n");
break;
}
else
if (cycles > 3 && randof (3) == 0) {
printf ("I: 模擬CPU過載\n");
sleep (2);
}
printf ("I: 正常請求 (%s)\n", request);
sleep (1); // 耗時的處理過程
s_send (server, request);
free (request);
}
zmq_close (server);
zmq_term (context);
return 0;
}
```
運行這個測試用例時,可以打開兩個控制臺,服務端會隨機發生故障,你可以看看客戶端的反應。服務端的典型輸出如下:
```c
I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash
```
客戶端的輸出是:
```
I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning
```
客戶端為每次請求都加上了序列號,并檢查收到的應答是否和序列號一致,以保證沒有請求或應答丟失,同一個應答收到多次或亂序。多運行幾次實例,看看是否真的能夠解決問題。現實環境中你不需要使用到序列號,那只是為了證明這一方式是可行的。
客戶端使用REQ套接字進行請求,并在發生問題時打開一個新的套接字來,繞過REQ強制的發送/接收過程。可能你會想用DEALER套接字,但這并不是一個好主意。首先,DEALER并不會像REQ那樣處理信封(如果你不知道信封是什么,那更不能用DEALER了)。其次,你可能會獲得你并不想得到的結果。
這一方案的優劣是:
* 優點:簡單明了,容易實施;
* 優點:可以方便地應用到現有的客戶端和服務端程序中;
* 優點:ZMQ有自動重連機制;
* 缺點:單點服務發生故障時不能定位到新的可用服務。
### 基本的可靠隊列(簡單海盜模式)
在第二種模式中,我們使用一個隊列裝置來擴展上述的“懶惰的海盜”模式,使客戶端能夠透明地和多個服務端通信。這里的服務端可以定義為worker。我們可以從最基礎的模型開始,分階段實施這個方案。
在所有的海盜模式中,worker是無狀態的,或者說存在著一個我們所不知道的公共狀態,如共享數據庫。隊列裝置的存在意味著worker可以在client毫不知情的情況下隨意進出。一個worker死亡后,會有另一個worker接替它的工作。這種拓撲結果非常簡潔,但唯一的缺點是隊列裝置本身會難以維護,可能造成單點故障。
在第三章中,隊列裝置的基本算法是最近最少使用算法。那么,如果worker死亡或阻塞,我們需要做些什么?答案是很少很少。我們已經在client中加入了重試的機制,所以,使用基本的LRU隊列就可以運作得很好了。這種做法也符合ZMQ的邏輯,所以我們可以通過在點對點交互中插入一個簡單的隊列裝置來擴展它:

我們可以直接使用“懶惰的海盜”模式中的client,以下是隊列裝置的代碼:
**spqueue: Simple Pirate queue in C**
```c
//
// 簡單海盜隊列
//
// 這個裝置和LRU隊列完全一致,不存在任何可靠性機制,依靠client的重試來保證裝置的運行
//
#include "czmq.h"
#define LRU_READY "\001" // 消息:worker準備就緒
int main (void)
{
// 準備上下文和套接字
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555"); // client端點
zsocket_bind (backend, "tcp://*:5556"); // worker端點
// 存放可用worker的隊列
zlist_t *workers = zlist_new ();
while (1) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// 當有可用的woker時,輪詢前端端點
int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
if (rc == -1)
break; // 中斷
// 處理后端端點的worker消息
if (items [0].revents & ZMQ_POLLIN) {
// 使用worker的地址進行LRU排隊
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // 中斷
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
// 如果消息不是READY,則轉發給client
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// 獲取client請求,轉發給第一個可用的worker
zmsg_t *msg = zmsg_recv (frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
zmsg_send (&msg, backend);
}
}
}
// 程序運行結束,進行清理
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}
```
以下是worker的代碼,用到了“懶惰的海盜”服務,并將其調整為LRU模式(使用REQ套接字傳遞“已就緒”信號):
**spworker: Simple Pirate worker in C**
```c
//
// 簡單海盜模式worker
//
// 使用REQ套接字連接tcp://*:5556,使用LRU算法實現worker
//
#include "czmq.h"
#define LRU_READY "\001" // 消息:worker已就緒
int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
// 使用隨機符號來指定套接字標識,方便追蹤
srandom ((unsigned) time (NULL));
char identity [10];
sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
zmq_setsockopt (worker, ZMQ_IDENTITY, identity, strlen (identity));
zsocket_connect (worker, "tcp://localhost:5556");
// 告訴代理worker已就緒
printf ("I: (%s) worker準備就緒\n", identity);
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
int cycles = 0;
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // 中斷
// 經過幾輪循環后,模擬各種問題
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) 模擬崩潰\n", identity);
zmsg_destroy (&msg);
break;
}
else
if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) 模擬CPU過載\n", identity);
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: (%s) 正常應答\n", identity);
sleep (1); // 進行某些處理
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return 0;
}
```
運行上述事例,啟動多個worker,一個client,以及一個隊列裝置,順序隨意。你可以看到worker最終都會崩潰或死亡,client則多次重試并最終放棄。裝置從來不會停止,你可以任意重啟worker和client,這個模型可以和任意個worker、client交互。
### 健壯的可靠隊列(偏執海盜模式)
“簡單海盜隊列”模式工作得非常好,主要是因為它只是兩個現有模式的結合體。不過,它也有一些缺點:
* 該模式無法處理隊列的崩潰或重啟。client會進行重試,但worker不會重啟。雖然ZMQ會自動重連worker的套接字,但對于新啟動的隊列裝置來說,由于worker并沒有發送“已就緒”的消息,所以它相當于是不存在的。為了解決這一問題,我們需要從隊列發送心跳給worker,這樣worker就能知道隊列是否已經死亡。
* 隊列沒有檢測worker是否已經死亡,所以當worker在處于空閑狀態時死亡,隊列裝置只有在發送了某個請求之后才會將該worker從隊列中移除。這時,client什么都不能做,只能等待。這不是一個致命的問題,但是依然是不夠好的。所以,我們需要從worker發送心跳給隊列裝置,從而讓隊列得知worker什么時候消亡。
我們使用一個名為“偏執的海盜模式”來解決上述兩個問題。
之前我們使用REQ套接字作為worker的套接字類型,但在偏執海盜模式中我們會改用DEALER套接字,從而使我們能夠任意地發送和接受消息,而不是像REQ套接字那樣必須完成發送-接受循環。而DEALER的缺點是我們必須自己管理消息信封。如果你不知道信封是什么,那請閱讀第三章。

我們仍會使用懶惰海盜模式的client,以下是偏執海盜的隊列裝置代碼:
**ppqueue: Paranoid Pirate queue in C**
```c
//
// 偏執海盜隊列
//
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // 心跳健康度,3-5是合理的
#define HEARTBEAT_INTERVAL 1000 // 單位:毫秒
// 偏執海盜協議的消息代碼
#define PPP_READY "\001" // worker已就緒
#define PPP_HEARTBEAT "\002" // worker心跳
// 使用以下結構表示worker隊列中的一個有效的worker
typedef struct {
zframe_t *address; // worker的地址
char *identity; // 可打印的套接字標識
int64_t expiry; // 過期時間
} worker_t;
// 創建新的worker
static worker_t *
s_worker_new (zframe_t *address)
{
worker_t *self = (worker_t *) zmalloc (sizeof (worker_t));
self->address = address;
self->identity = zframe_strdup (address);
self->expiry = zclock_time () + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
return self;
}
// 銷毀worker結構,包括標識
static void
s_worker_destroy (worker_t **self_p)
{
assert (self_p);
if (*self_p) {
worker_t *self = *self_p;
zframe_destroy (&self->address);
free (self->identity);
free (self);
*self_p = NULL;
}
}
// worker已就緒,將其移至列表末尾
static void
s_worker_ready (worker_t *self, zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (streq (self->identity, worker->identity)) {
zlist_remove (workers, worker);
s_worker_destroy (&worker);
break;
}
worker = (worker_t *) zlist_next (workers);
}
zlist_append (workers, self);
}
// 返回下一個可用的worker地址
static zframe_t *
s_workers_next (zlist_t *workers)
{
worker_t *worker = zlist_pop (workers);
assert (worker);
zframe_t *frame = worker->address;
worker->address = NULL;
s_worker_destroy (&worker);
return frame;
}
// 尋找并銷毀已過期的worker。
// 由于列表中最舊的worker排在最前,所以當找到第一個未過期的worker時就停止。
static void
s_workers_purge (zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (zclock_time () < worker->expiry)
break; // worker未過期,停止掃描
zlist_remove (workers, worker);
s_worker_destroy (&worker);
worker = (worker_t *) zlist_first (workers);
}
}
int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555"); // client端點
zsocket_bind (backend, "tcp://*:5556"); // worker端點
// List of available workers
zlist_t *workers = zlist_new ();
// 規律地發送心跳
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
while (1) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// 當存在可用worker時輪詢前端端點
int rc = zmq_poll (items, zlist_size (workers)? 2: 1,
HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
// 處理后端worker請求
if (items [0].revents & ZMQ_POLLIN) {
// 使用worker地址進行LRU路由
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // 中斷
// worker的任何信號均表示其仍然存活
zframe_t *address = zmsg_unwrap (msg);
worker_t *worker = s_worker_new (address);
s_worker_ready (worker, workers);
// 處理控制消息,或者將應答轉發給client
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_READY, 1)
&& memcmp (zframe_data (frame), PPP_HEARTBEAT, 1)) {
printf ("E: invalid message from worker");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// 獲取下一個client請求,交給下一個可用的worker
zmsg_t *msg = zmsg_recv (frontend);
if (!msg)
break; // 中斷
zmsg_push (msg, s_workers_next (workers));
zmsg_send (&msg, backend);
}
// 發送心跳給空閑的worker
if (zclock_time () >= heartbeat_at) {
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
zframe_send (&worker->address, backend,
ZFRAME_REUSE + ZFRAME_MORE);
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, backend, 0);
worker = (worker_t *) zlist_next (workers);
}
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
s_workers_purge (workers);
}
// 程序結束后進行清理
while (zlist_size (workers)) {
worker_t *worker = (worker_t *) zlist_pop (workers);
s_worker_destroy (&worker);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}
```
該隊列裝置使用心跳機制擴展了LRU模式,看起來很簡單,但要想出這個主意還挺難的。下文會更多地介紹心跳機制。
以下是偏執海盜的worker代碼:
**ppworker: Paranoid Pirate worker in C**
```c
//
// 偏執海盜worker
//
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // 合理值:3-5
#define HEARTBEAT_INTERVAL 1000 // 單位:毫秒
#define INTERVAL_INIT 1000 // 重試間隔
#define INTERVAL_MAX 32000 // 回退算法最大值
// 偏執海盜規范的常量定義
#define PPP_READY "\001" // 消息:worker已就緒
#define PPP_HEARTBEAT "\002" // 消息:worker心跳
// 返回一個連接至偏執海盜隊列裝置的套接字
static void *
s_worker_socket (zctx_t *ctx) {
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "tcp://localhost:5556");
// 告知隊列worker已準備就緒
printf ("I: worker已就緒\n");
zframe_t *frame = zframe_new (PPP_READY, 1);
zframe_send (&frame, worker, 0);
return worker;
}
int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = s_worker_socket (ctx);
// 如果心跳健康度為零,則表示隊列裝置已死亡
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// 規律地發送心跳
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
srandom ((unsigned) time (NULL));
int cycles = 0;
while (1) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
if (items [0].revents & ZMQ_POLLIN) {
// 獲取消息
// - 3段消息,信封+內容,表示一個請求
// - 1段消息,表示心跳
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // 中斷
if (zmsg_size (msg) == 3) {
// 若干詞循環后模擬各種問題
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: 模擬崩潰\n");
zmsg_destroy (&msg);
break;
}
else
if (cycles > 3 && randof (5) == 0) {
printf ("I: 模擬CPU過載\n");
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: 正常應答\n");
zmsg_send (&msg, worker);
liveness = HEARTBEAT_LIVENESS;
sleep (1); // 做一些處理工作
if (zctx_interrupted)
break;
}
else
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_HEARTBEAT, 1) == 0)
liveness = HEARTBEAT_LIVENESS;
else {
printf ("E: 非法消息\n");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else {
printf ("E: 非法消息\n");
zmsg_dump (msg);
}
interval = INTERVAL_INIT;
}
else
if (--liveness == 0) {
printf ("W: 心跳失敗,無法連接隊列裝置\n");
printf ("W: %zd 毫秒后進行重連...\n", interval);
zclock_sleep (interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy (ctx, worker);
worker = s_worker_socket (ctx);
liveness = HEARTBEAT_LIVENESS;
}
// 適時發送心跳給隊列
if (zclock_time () > heartbeat_at) {
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
printf ("I: worker心跳\n");
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, worker, 0);
}
}
zctx_destroy (&ctx);
return 0;
}
```
幾點說明:
* 代碼中包含了幾處失敗模擬,和先前一樣。這會讓代碼極難維護,所以當投入使用時,應當移除這些模擬代碼。
* 偏執海盜模式中隊列的心跳有時會不正常,下文會講述這一點。
* worker使用了一種類似于懶惰海盜client的重試機制,但有兩點不同:1、回退算法設置;2、永不言棄。
嘗試運行以下代碼,跑通流程:
```
ppqueue &
for i in 1 2 3 4; do
ppworker &
sleep 1
done
lpclient &
```
你會看到worker逐個崩潰,client在多次嘗試后放棄。你可以停止并重啟隊列裝置,client和worker會相繼重連,并正確地發送、處理和接收請求,順序不會混亂。所以說,整個通信過程只有兩種情形:交互成功,或client最終放棄。
### 心跳
當我在寫偏執海盜模式的示例時,大約花了五個小時的時間來協調隊列至worker的心跳,剩下的請求-應答鏈路只花了約10分鐘的時間。心跳機制在可靠性上帶來的益處有時還不及它所引發的問題。使用過程中很有可能會產生“虛假故障”的情況,即節點誤認為他們已失去連接,因為心跳沒有正確地發送。
在理解和實施心跳時,需要考慮以下幾點:
* 心跳不是一種請求-應答,它們異步地在節點之間傳遞,任一節點都可以通過它來判斷對方已經死亡,并中止通信。
* 如果某個節點使用持久套接字(即設定了套接字標識),意味著發送給它的心跳可能會堆砌,并在重連后一起收到。所以說,worker不應該使用持久套接字。示例代碼使用持久套接字是為了便于調試,而且代碼中使用了隨機的套接字標識,避免重用之前的標識。
* 使用過程中,應先讓心跳工作起來,再進行后面的消息處理。你需要保證啟動任一節點后,心跳都能正確地執行。停止并重啟他們,模擬凍結、崩潰等情況來進行測試。
* 當你的主循環使用了zmq_poll(),則應該使用另一個計時器來觸發心跳。不要使用主循環來控制心跳的發送,這回導致過量地發送心跳(阻塞網絡),或是發送得太少(導致節點斷開)。zhelpers包提供了s_clock()函數返回當前系統時間戳,單位是毫秒,可以用它來控制心跳的發送間隔。C代碼如下:
```c
// 規律地發送心跳
uint64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
while (1) {
…
zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000);
…
// 無論zmq_poll的行為是什么,都使用以下邏輯判斷是否發送心跳
if (s_clock () > heartbeat_at) {
… 發送心跳給所有節點
// 設置下一次心跳的時間
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
}
}
```
* 主循環應該使用心跳間隔作為超時時間。顯然不能使用無超時時間的設置,而短于心跳間隔也只是浪費循環次數而已。
* 使用簡單的追蹤方式來進行追蹤,如直接輸出至控制臺。這里有一些追蹤的竅門:使用zmsg()函數打印套接字內容;對消息進行編號,判斷是否會有間隔。
* 在真實的應用程序中,心跳必須是可以配置的,并能和節點共同商定。有些節點需要高頻心跳,如10毫秒,另一些節點則可能只需要30秒發送一次心跳即可。
* 如果你要對不同的節點發送不同頻率的心跳,那么poll的超時時間應設置為最短的心跳間隔。
* 也許你會想要用一個單獨的套接字來處理心跳,這看起來很棒,可以將同步的請求-應答和異步的心跳隔離開來。但是,這個主意并不好,原因有幾點:首先、發送數據時其實是不需要發送心跳的;其次、套接字可能會因為網絡問題而阻塞,你需要設法知道用于發送數據的套接字停止響應的原因是死亡了還是過于繁忙而已,這樣你就需要對這個套接字進行心跳。最后,處理兩個套接字要比處理一個復雜得多。
* 我們沒有設置client至隊列的心跳,因為這太過復雜了,而且沒有太大價值。
### 約定和協議
也許你已經注意到,由于心跳機制,偏執海盜模式和簡單海盜模式是不兼容的。
其實,這里我們需要寫一個協議。也許在試驗階段是不需要協議的,但這在真實的應用程序中是有必要。如果我們想用其他語言來寫worker怎么辦?我們是否需要通過源代碼來查看通信過程?如果我們想改變協議怎么辦?規范可能很簡單,但并不顯然。越是成功的協議,就會越為復雜。
一個缺乏約定的應用程序一定是不可復用的,所以讓我們來為這個協議寫一個規范,怎么做呢?
* 位于[rfc.zeromq.org](http://rfc.zeromq.org/)的wiki頁上,我們特地設置了一個用于存放ZMQ協議的頁面。
* 要創建一個新的協議,你需要注冊并按照指導進行。過程很直接,但并不一定所有人都能撰寫技術性文檔。
我大約花了15分鐘的時間草擬[海盜模式規范(PPP)](http://rfc.zeromq.org/spec:6),麻雀雖小,但五臟俱全。
要用PPP協議進行真實環境下的編程,你還需要:
* 在READY命令中加入版本號,這樣就能再日后安全地新增PPP版本號。
* 目前,READY和HEARTBEAT信號并沒有指定其來源于請求還是應答。要區分他們,需要新建一個消息結構,其中包含“消息類型”這一信息。
### 面向服務的可靠隊列(管家模式)
世上的事物往往瞬息萬變,正當我們期待有更好的協議來解決上一節的問題時,已經有人制定好了:
* http://rfc.zeromq.org/spec:7
這份協議只有一頁,它將PPP協議變得更為堅固。我們在設計復雜架構時應該這樣做:首先寫下約定,再用軟件去實現它。
管家模式協議(MDP)在擴展PPP協議時引入了一個有趣的特性:client發送的每一個請求都有一個“服務名稱”,而worker在像隊列裝置注冊時需要告知自己的服務類型。MDP的優勢在于它來源于現實編程,協議簡單,且容易提升。
引入“服務名稱”的機制,是對偏執海盜隊列的一個簡單補充,而結果是讓其成為一個面向服務的代理。

在實施管家模式之前,我們需要為client和worker編寫一個框架。如果程序員可以通過簡單的API來實現這種模式,那就沒有必要讓他們去了解管家模式的協議內容和實現方法了。
所以,我們第一個協議(即管家模式協議)定義了分布式架構中節點是如何互相交互的,第二個協議則要定義應用程序應該如何通過框架來使用這一協議。
管家模式有兩個端點,客戶端和服務端。因為我們要為client和worker都撰寫框架,所以就需要提供兩套API。以下是用簡單的面向對象方法設計的client端API雛形,使用的是C語言的[ZFL library](http://zfl.zeromq.org/page:read-the-manual )。
```c
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
```
就這么簡單。我們創建了一個會話來和代理通信,發送并接收一個請求,最后關閉連接。以下是worker端API的雛形。
```c
mdwrk_t *mdwrk_new (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
```
上面兩段代碼看起來差不多,但是worker端API略有不同。worker第一次執行recv()后會傳遞一個空的應答,之后才傳遞當前的應答,并獲得新的請求。
兩段的API都很容易開發,只需在偏執海盜模式代碼的基礎上修改即可。以下是client API:
**mdcliapi: Majordomo client API in C**
```c
/* =====================================================================
mdcliapi.c
Majordomo Protocol Client API
Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "mdcliapi.h"
// 類結構
// 我們會通過成員方法來訪問這些屬性
struct _mdcli_t {
zctx_t *ctx; // 上下文
char *broker;
void *client; // 連接至代理的套接字
int verbose; // 使用標準輸出打印當前活動
int timeout; // 請求超時時間
int retries; // 請求重試次數
};
// ---------------------------------------------------------------------
// 連接或重連代理
void s_mdcli_connect_to_broker (mdcli_t *self)
{
if (self->client)
zsocket_destroy (self->ctx, self->client);
self->client = zsocket_new (self->ctx, ZMQ_REQ);
zmq_connect (self->client, self->broker);
if (self->verbose)
zclock_log ("I: 正在連接至代理 %s...", self->broker);
}
// ---------------------------------------------------------------------
// 構造函數
mdcli_t *
mdcli_new (char *broker, int verbose)
{
assert (broker);
mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->verbose = verbose;
self->timeout = 2500; // 毫秒
self->retries = 3; // 嘗試次數
s_mdcli_connect_to_broker (self);
return self;
}
// ---------------------------------------------------------------------
// 析構函數
void
mdcli_destroy (mdcli_t **self_p)
{
assert (self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 設定請求超時時間
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
assert (self);
self->timeout = timeout;
}
// ---------------------------------------------------------------------
// 設定請求重試次數
void
mdcli_set_retries (mdcli_t *self, int retries)
{
assert (self);
self->retries = retries;
}
// ---------------------------------------------------------------------
// 向代理發送請求,并嘗試獲取應答;
// 對消息保持所有權,發送后銷毀;
// 返回應答消息,或NULL。
zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
assert (self);
assert (request_p);
zmsg_t *request = *request_p;
// 用協議前綴包裝消息
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: 服務名稱 (可打印字符串)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
if (self->verbose) {
zclock_log ("I: 發送請求給 '%s' 服務:", service);
zmsg_dump (request);
}
int retries_left = self->retries;
while (retries_left && !zctx_interrupted) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->client);
while (TRUE) {
// 輪詢套接字以接收應答,有超時時間
zmq_pollitem_t items [] = {
{ self->client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
// 收到應答后進行處理
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: received reply:");
zmsg_dump (msg);
}
// 不要嘗試處理錯誤,直接報錯即可
assert (zmsg_size (msg) >= 3);
zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);
zframe_t *reply_service = zmsg_pop (msg);
assert (zframe_streq (reply_service, service));
zframe_destroy (&reply_service);
zmsg_destroy (&request);
return msg; // 成功
}
else
if (--retries_left) {
if (self->verbose)
zclock_log ("W: no reply, reconnecting...");
// 重連并重發消息
s_mdcli_connect_to_broker (self);
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->client);
}
else {
if (self->verbose)
zclock_log ("W: 發生嚴重錯誤,放棄重試。");
break; // 放棄
}
}
}
if (zctx_interrupted)
printf ("W: 收到中斷消息,結束client進程...\n");
zmsg_destroy (&request);
return NULL;
}
```
以下測試程序會執行10萬次請求應答:
**mdclient: Majordomo client application in C**
```c
//
// 管家模式協議 - 客戶端示例
// 使用mdcli API隱藏管家模式協議的內部實現
//
// 讓我們直接編譯這段代碼,不生成類庫
#include "mdcliapi.c"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
int count;
for (count = 0; count < 100000; count++) {
zmsg_t *request = zmsg_new ();
zmsg_pushstr (request, "Hello world");
zmsg_t *reply = mdcli_send (session, "echo", &request);
if (reply)
zmsg_destroy (&reply);
else
break; // 中斷或停止
}
printf ("已處理 %d 次請求-應答\n", count);
mdcli_destroy (&session);
return 0;
}
```
下面是worker的API:
**mdwrkapi: Majordomo worker API in C**
```c
/* =====================================================================
mdwrkapi.c
Majordomo Protocol Worker API
Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "mdwrkapi.h"
// 可靠性參數
#define HEARTBEAT_LIVENESS 3 // 合理值:3-5
// 類結構
// 使用成員函數訪問屬性
struct _mdwrk_t {
zctx_t *ctx; // 上下文
char *broker;
char *service;
void *worker; // 連接至代理的套接字
int verbose; // 使用標準輸出打印活動
// 心跳設置
uint64_t heartbeat_at; // 發送心跳的時間
size_t liveness; // 嘗試次數
int heartbeat; // 心跳延時,單位:毫秒
int reconnect; // 重連延時,單位:毫秒
// 內部狀態
int expect_reply; // 初始值為0
// 應答地址,如果存在的話
zframe_t *reply_to;
};
// ---------------------------------------------------------------------
// 發送消息給代理
// 如果沒有提供消息,則內部創建一個
static void
s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
zmsg_t *msg)
{
msg = msg? zmsg_dup (msg): zmsg_new ();
// 將協議信封壓入消息頂部
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);
zmsg_pushstr (msg, "");
if (self->verbose) {
zclock_log ("I: sending %s to broker",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->worker);
}
// ---------------------------------------------------------------------
// 連接或重連代理
void s_mdwrk_connect_to_broker (mdwrk_t *self)
{
if (self->worker)
zsocket_destroy (self->ctx, self->worker);
self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
zmq_connect (self->worker, self->broker);
if (self->verbose)
zclock_log ("I: 正在連接代理 %s...", self->broker);
// 向代理注冊服務類型
s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);
// 當心跳健康度為零,表示代理已斷開連接
self->liveness = HEARTBEAT_LIVENESS;
self->heartbeat_at = zclock_time () + self->heartbeat;
}
// ---------------------------------------------------------------------
// 構造函數
mdwrk_t *
mdwrk_new (char *broker,char *service, int verbose)
{
assert (broker);
assert (service);
mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->service = strdup (service);
self->verbose = verbose;
self->heartbeat = 2500; // 毫秒
self->reconnect = 2500; // 毫秒
s_mdwrk_connect_to_broker (self);
return self;
}
// ---------------------------------------------------------------------
// 析構函數
void
mdwrk_destroy (mdwrk_t **self_p)
{
assert (self_p);
if (*self_p) {
mdwrk_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self->service);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 設置心跳延遲
void
mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)
{
self->heartbeat = heartbeat;
}
// ---------------------------------------------------------------------
// 設置重連延遲
void
mdwrk_set_reconnect (mdwrk_t *self, int reconnect)
{
self->reconnect = reconnect;
}
// ---------------------------------------------------------------------
// 若有應答則發送給代理,并等待新的請求
zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
{
// 格式化并發送請求傳入的應答
assert (reply_p);
zmsg_t *reply = *reply_p;
assert (reply || !self->expect_reply);
if (reply) {
assert (self->reply_to);
zmsg_wrap (reply, self->reply_to);
s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
zmsg_destroy (reply_p);
}
self->expect_reply = 1;
while (TRUE) {
zmq_pollitem_t items [] = {
{ self->worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->worker);
if (!msg)
break; // 中斷
if (self->verbose) {
zclock_log ("I: 從代理處獲得消息:");
zmsg_dump (msg);
}
self->liveness = HEARTBEAT_LIVENESS;
// 不要處理錯誤,直接報錯即可
assert (zmsg_size (msg) >= 3);
zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);
zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPW_WORKER));
zframe_destroy (&header);
zframe_t *command = zmsg_pop (msg);
if (zframe_streq (command, MDPW_REQUEST)) {
// 這里需要將消息中空幀之前的所有地址都保存起來,
// 但在這里我們暫時只保存一個
self->reply_to = zmsg_unwrap (msg);
zframe_destroy (&command);
return msg; // 處理請求
}
else
if (zframe_streq (command, MDPW_HEARTBEAT))
; // 不對心跳做任何處理
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_mdwrk_connect_to_broker (self);
else {
zclock_log ("E: 消息不合法");
zmsg_dump (msg);
}
zframe_destroy (&command);
zmsg_destroy (&msg);
}
else
if (--self->liveness == 0) {
if (self->verbose)
zclock_log ("W: 失去與代理的連接 - 正在重試...");
zclock_sleep (self->reconnect);
s_mdwrk_connect_to_broker (self);
}
// 適時地發送消息
if (zclock_time () > self->heartbeat_at) {
s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
self->heartbeat_at = zclock_time () + self->heartbeat;
}
}
if (zctx_interrupted)
printf ("W: 收到中斷消息,中止worker...\n");
return NULL;
}
```
以下測試程序實現了名為echo的服務:
**mdworker: Majordomo worker application in C**
```c
//
// 管家模式協議 - worker示例
// 使用mdwrk API隱藏MDP協議的內部實現
//
// 讓我們直接編譯代碼,而不創建類庫
#include "mdwrkapi.c"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdwrk_t *session = mdwrk_new (
"tcp://localhost:5555", "echo", verbose);
zmsg_t *reply = NULL;
while (1) {
zmsg_t *request = mdwrk_recv (session, &reply);
if (request == NULL)
break; // worker被中止
reply = request; // echo服務……其實很復雜:)
}
mdwrk_destroy (&session);
return 0;
}
```
幾點說明:
* API是單線程的,所以說worker不會再后臺發送心跳,而這也是我們所期望的:如果worker應用程序停止了,心跳就會跟著中止,代理便會停止向該worker發送新的請求。
* wroker API沒有做回退算法的設置,因為這里不值得使用這一復雜的機制。
* API沒有提供任何報錯機制,如果出現問題,它會直接報斷言(或異常,依語言而定)。這一做法對實驗性的編程是有用的,這樣可以立刻看到執行結果。但在真實編程環境中,API應該足夠健壯,合適地處理非法消息。
也許你會問,worker API為什么要關閉它的套接字并新開一個呢?特別是ZMQ是有重連機制的,能夠在節點歸來后進行重連。我們可以回顧一下簡單海盜模式中的worker,以及偏執海盜模式中的worker來加以理解。ZMQ確實會進行自動重連,但如果代理死亡并重連,worker并不會重新進行注冊。這個問題有兩種解決方案:一是我們這里用到的較為簡便的方案,即當worker判斷代理已經死亡時,關閉它的套接字并重頭來過;另一個方案是當代理收到未知worker的心跳時要求該worker對其提供的服務類型進行注冊,這樣一來就需要在協議中說明這一規則。
下面讓我們設計管家模式的代理,它的核心代碼是一組隊列,每種服務對應一個隊列。我們會在worker出現時創建相應的隊列(worker消失時應該銷毀對應的隊列,不過我們這里暫時不考慮)。額外的,我們會為每種服務維護一個worker的隊列。
為了讓C語言代碼更為易讀易寫,我使用了[ZFL項目](http://zfl.zeromq.org)提供的哈希和鏈表容器,并命名為[zhash](https://github.com/imatix/zguide/blob/master/examples/C/zhash.h zhash)和[zlist](https://github.com/imatix/zguide/blob/master/examples/C/zlist.h)。如果使用現代語言編寫,那自然可以使用其內置的容器。
**mdbroker: Majordomo broker in C**
```c
//
// 管家模式協議 - 代理
// 協議 http://rfc.zeromq.org/spec:7 和 spec:8 的最簡實現
//
#include "czmq.h"
#include "mdp.h"
// 一般我們會從配置文件中獲取以下值
#define HEARTBEAT_LIVENESS 3 // 合理值:3-5
#define HEARTBEAT_INTERVAL 2500 // 單位:毫秒
#define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
// 定義一個代理
typedef struct {
zctx_t *ctx; // 上下文
void *socket; // 用于連接client和worker的套接字
int verbose; // 使用標準輸出打印活動信息
char *endpoint; // 代理綁定到的端點
zhash_t *services; // 已知服務的哈希表
zhash_t *workers; // 已知worker的哈希表
zlist_t *waiting; // 正在等待的worker隊列
uint64_t heartbeat_at; // 發送心跳的時間
} broker_t;
// 定義一個服務
typedef struct {
char *name; // 服務名稱
zlist_t *requests; // 客戶端請求隊列
zlist_t *waiting; // 正在等待的worker隊列
size_t workers; // 可用worker數
} service_t;
// 定義一個worker,狀態為空閑或占用
typedef struct {
char *identity; // worker的標識
zframe_t *address; // 地址幀
service_t *service; // 所屬服務
int64_t expiry; // 過期時間,從未收到心跳起計時
} worker_t;
// ---------------------------------------------------------------------
// 代理使用的函數
static broker_t *
s_broker_new (int verbose);
static void
s_broker_destroy (broker_t **self_p);
static void
s_broker_bind (broker_t *self, char *endpoint);
static void
s_broker_purge_workers (broker_t *self);
// 服務使用的函數
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame);
static void
s_service_destroy (void *argument);
static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);
// worker使用的函數
static worker_t *
s_worker_require (broker_t *self, zframe_t *address);
static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
static void
s_worker_destroy (void *argument);
static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_worker_send (broker_t *self, worker_t *worker, char *command,
char *option, zmsg_t *msg);
static void
s_worker_waiting (broker_t *self, worker_t *worker);
// 客戶端使用的函數
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
// ---------------------------------------------------------------------
// 主程序
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
broker_t *self = s_broker_new (verbose);
s_broker_bind (self, "tcp://*:5555");
// 接受并處理消息,直至程序被中止
while (TRUE) {
zmq_pollitem_t items [] = {
{ self->socket, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
// Process next input message, if any
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->socket);
if (!msg)
break; // 中斷
if (self->verbose) {
zclock_log ("I: 收到消息:");
zmsg_dump (msg);
}
zframe_t *sender = zmsg_pop (msg);
zframe_t *empty = zmsg_pop (msg);
zframe_t *header = zmsg_pop (msg);
if (zframe_streq (header, MDPC_CLIENT))
s_client_process (self, sender, msg);
else
if (zframe_streq (header, MDPW_WORKER))
s_worker_process (self, sender, msg);
else {
zclock_log ("E: 非法消息:");
zmsg_dump (msg);
zmsg_destroy (&msg);
}
zframe_destroy (&sender);
zframe_destroy (&empty);
zframe_destroy (&header);
}
// 斷開并刪除過期的worker
// 適時地發送心跳給worker
if (zclock_time () > self->heartbeat_at) {
s_broker_purge_workers (self);
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
worker = (worker_t *) zlist_next (self->waiting);
}
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
}
if (zctx_interrupted)
printf ("W: 收到中斷消息,關閉中...\n");
s_broker_destroy (&self);
return 0;
}
// ---------------------------------------------------------------------
// 代理對象的構造函數
static broker_t *
s_broker_new (int verbose)
{
broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
// 初始化代理狀態
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
self->verbose = verbose;
self->services = zhash_new ();
self->workers = zhash_new ();
self->waiting = zlist_new ();
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
return self;
}
// ---------------------------------------------------------------------
// 代理對象的析構函數
static void
s_broker_destroy (broker_t **self_p)
{
assert (self_p);
if (*self_p) {
broker_t *self = *self_p;
zctx_destroy (&self->ctx);
zhash_destroy (&self->services);
zhash_destroy (&self->workers);
zlist_destroy (&self->waiting);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 將代理套接字綁定至端點,可以重復調用該函數
// 我們使用一個套接字來同時處理client和worker
void
s_broker_bind (broker_t *self, char *endpoint)
{
zsocket_bind (self->socket, endpoint);
zclock_log ("I: MDP broker/0.1.1 is active at %s", endpoint);
}
// ---------------------------------------------------------------------
// 刪除空閑狀態中過期的worker
static void
s_broker_purge_workers (broker_t *self)
{
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
if (zclock_time () < worker->expiry)
continue; // 該worker未過期,停止搜索
if (self->verbose)
zclock_log ("I: 正在刪除過期的worker: %s",
worker->identity);
s_worker_delete (self, worker, 0);
worker = (worker_t *) zlist_first (self->waiting);
}
}
// ---------------------------------------------------------------------
// 定位或創建新的服務項
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)
{
assert (service_frame);
char *name = zframe_strdup (service_frame);
service_t *service =
(service_t *) zhash_lookup (self->services, name);
if (service == NULL) {
service = (service_t *) zmalloc (sizeof (service_t));
service->name = name;
service->requests = zlist_new ();
service->waiting = zlist_new ();
zhash_insert (self->services, name, service);
zhash_freefn (self->services, name, s_service_destroy);
if (self->verbose)
zclock_log ("I: 收到消息:");
}
else
free (name);
return service;
}
// ---------------------------------------------------------------------
// 當服務從broker->services中移除時銷毀該服務對象
static void
s_service_destroy (void *argument)
{
service_t *service = (service_t *) argument;
// 銷毀請求隊列中的所有項目
while (zlist_size (service->requests)) {
zmsg_t *msg = zlist_pop (service->requests);
zmsg_destroy (&msg);
}
zlist_destroy (&service->requests);
zlist_destroy (&service->waiting);
free (service->name);
free (service);
}
// ---------------------------------------------------------------------
// 可能時,分發請求給等待中的worker
static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg)
{
assert (service);
if (msg) // 將消息加入隊列
zlist_append (service->requests, msg);
s_broker_purge_workers (self);
while (zlist_size (service->waiting)
&& zlist_size (service->requests))
{
worker_t *worker = zlist_pop (service->waiting);
zlist_remove (self->waiting, worker);
zmsg_t *msg = zlist_pop (service->requests);
s_worker_send (self, worker, MDPW_REQUEST, NULL, msg);
zmsg_destroy (&msg);
}
}
// ---------------------------------------------------------------------
// 使用8/MMI協定處理內部服務
static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg)
{
char *return_code;
if (zframe_streq (service_frame, "mmi.service")) {
char *name = zframe_strdup (zmsg_last (msg));
service_t *service =
(service_t *) zhash_lookup (self->services, name);
return_code = service && service->workers? "200": "404";
free (name);
}
else
return_code = "501";
zframe_reset (zmsg_last (msg), return_code, strlen (return_code));
// 移除并保存返回給client的信封,插入協議頭信息和服務名稱,并重新包裝信封
zframe_t *client = zmsg_unwrap (msg);
zmsg_push (msg, zframe_dup (service_frame));
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
}
// ---------------------------------------------------------------------
// 按需創建worker
static worker_t *
s_worker_require (broker_t *self, zframe_t *address)
{
assert (address);
// self->workers使用wroker的標識為鍵
char *identity = zframe_strhex (address);
worker_t *worker =
(worker_t *) zhash_lookup (self->workers, identity);
if (worker == NULL) {
worker = (worker_t *) zmalloc (sizeof (worker_t));
worker->identity = identity;
worker->address = zframe_dup (address);
zhash_insert (self->workers, identity, worker);
zhash_freefn (self->workers, identity, s_worker_destroy);
if (self->verbose)
zclock_log ("I: 正在注冊新的worker: %s", identity);
}
else
free (identity);
return worker;
}
// ---------------------------------------------------------------------
// 從所有數據結構中刪除wroker,并銷毀worker對象
static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect)
{
assert (worker);
if (disconnect)
s_worker_send (self, worker, MDPW_DISCONNECT, NULL, NULL);
if (worker->service) {
zlist_remove (worker->service->waiting, worker);
worker->service->workers--;
}
zlist_remove (self->waiting, worker);
// 以下方法間接調用了s_worker_destroy()方法
zhash_delete (self->workers, worker->identity);
}
// ---------------------------------------------------------------------
// 當worker從broker->workers中移除時,銷毀worker對象
static void
s_worker_destroy (void *argument)
{
worker_t *worker = (worker_t *) argument;
zframe_destroy (&worker->address);
free (worker->identity);
free (worker);
}
// ---------------------------------------------------------------------
// 處理worker發送來的消息
static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 1); // 消息中至少包含命令幀
zframe_t *command = zmsg_pop (msg);
char *identity = zframe_strhex (sender);
int worker_ready = (zhash_lookup (self->workers, identity) != NULL);
free (identity);
worker_t *worker = s_worker_require (self, sender);
if (zframe_streq (command, MDPW_READY)) {
// 若worker隊列中已有該worker,但仍收到了它的“已就緒”消息,則刪除這個worker。
if (worker_ready)
s_worker_delete (self, worker, 1);
else
if (zframe_size (sender) >= 4 // 服務名稱為保留的服務
&& memcmp (zframe_data (sender), "mmi.", 4) == 0)
s_worker_delete (self, worker, 1);
else {
// 將worker對應到服務,并置為空閑狀態
zframe_t *service_frame = zmsg_pop (msg);
worker->service = s_service_require (self, service_frame);
worker->service->workers++;
s_worker_waiting (self, worker);
zframe_destroy (&service_frame);
}
}
else
if (zframe_streq (command, MDPW_REPLY)) {
if (worker_ready) {
// 移除并保存返回給client的信封,插入協議頭信息和服務名稱,并重新包裝信封
zframe_t *client = zmsg_unwrap (msg);
zmsg_pushstr (msg, worker->service->name);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
s_worker_waiting (self, worker);
}
else
s_worker_delete (self, worker, 1);
}
else
if (zframe_streq (command, MDPW_HEARTBEAT)) {
if (worker_ready)
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
else
s_worker_delete (self, worker, 1);
}
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_worker_delete (self, worker, 0);
else {
zclock_log ("E: 非法消息");
zmsg_dump (msg);
}
free (command);
zmsg_destroy (&msg);
}
// ---------------------------------------------------------------------
// 發送消息給worker
// 如果指針指向了一條消息,發送它,但不銷毀它,因為這是調用者的工作
static void
s_worker_send (broker_t *self, worker_t *worker, char *command,
char *option, zmsg_t *msg)
{
msg = msg? zmsg_dup (msg): zmsg_new ();
// 將協議信封壓入消息頂部
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);
// 在消息頂部插入路由幀
zmsg_wrap (msg, zframe_dup (worker->address));
if (self->verbose) {
zclock_log ("I: 正在發送消息給worker %s",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->socket);
}
// ---------------------------------------------------------------------
// 正在等待的worker
static void
s_worker_waiting (broker_t *self, worker_t *worker)
{
// 將worker加入代理和服務的等待隊列
zlist_append (self->waiting, worker);
zlist_append (worker->service->waiting, worker);
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
s_service_dispatch (self, worker->service, NULL);
}
// ---------------------------------------------------------------------
// 處理client發來的請求
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 2); // 服務名稱 + 請求內容
zframe_t *service_frame = zmsg_pop (msg);
service_t *service = s_service_require (self, service_frame);
// 為應答內容設置請求方的地址
zmsg_wrap (msg, zframe_dup (sender));
if (zframe_size (service_frame) >= 4
&& memcmp (zframe_data (service_frame), "mmi.", 4) == 0)
s_service_internal (self, service_frame, msg);
else
s_service_dispatch (self, service, msg);
zframe_destroy (&service_frame);
}
```
這個例子應該是我們見過最復雜的一個示例了,大約有500行代碼。編寫這段代碼并讓其變的健壯,大約花費了兩天的時間。但是,這也僅僅是一個完整的面向服務代理的一部分。
幾點說明:
* 管家模式協議要求我們在一個套接字中同時處理client和worker,這一點對部署和管理代理很有益處:它只會在一個ZMQ端點上收發請求,而不是兩個。
* 代理很好地實現了MDP/0.1協議中規范的內容,包括當代理發送非法命令和心跳時斷開的機制。
* 可以將這段代碼擴充為多線程,每個線程管理一個套接字、一組client和worker。這種做法在大型架構的拆分中顯得很有趣。C語言代碼已經是這樣的格式了,因此很容易實現。
* 還可以將這段代碼擴充為主備模式、雙在線模式,進一步提高可靠性。因為從本質上來說,代理是無狀態的,只是保存了服務的存在與否,因此client和worker可以自行選擇除此之外的代理來進行通信。
* 示例代碼中心跳的間隔為5秒,主要是為了減少調試時的輸出。現實中的值應該設得低一些,但是,重試的過程應該設置得稍長一些,讓服務有足夠的時間啟動,如10秒鐘。
### 異步管家模式
上文那種實現管家模式的方法比較簡單,client還是簡單海盜模式中的,僅僅是用API重寫了一下。我在測試機上運行了程序,處理10萬條請求大約需要14秒的時間,這和代碼也有一些關系,因為復制消息幀的時間浪費了CPU處理時間。但真正的問題在于,我們總是逐個循環進行處理(round-trip),即發送-接收-發送-接收……ZMQ內部禁用了TCP發包優化算法([Nagle's algorithm](http://en.wikipedia.org/wiki/Nagles_algorithm)),但逐個處理循環還是比較浪費。
理論歸理論,還是需要由實踐來檢驗。我們用一個簡單的測試程序來看看逐個處理循環是否真的耗時。這個測試程序會發送一組消息,第一次它發一條收一條,第二次則一起發送再一起接收。兩次結果應該是一樣的,但速度截然不同。
**tripping: Round-trip demonstrator in C**
```c
//
// Round-trip 模擬
//
// 本示例程序使用多線程的方式啟動client、worker、以及代理,
// 當client處理完畢時會發送信號給主程序。
//
#include "czmq.h"
static void
client_task (void *args, zctx_t *ctx, void *pipe)
{
void *client = zsocket_new (ctx, ZMQ_DEALER);
zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1);
zsocket_connect (client, "tcp://localhost:5555");
printf ("開始測試...\n");
zclock_sleep (100);
int requests;
int64_t start;
printf ("同步 round-trip 測試...\n");
start = zclock_time ();
for (requests = 0; requests < 10000; requests++) {
zstr_send (client, "hello");
char *reply = zstr_recv (client);
free (reply);
}
printf (" %d 次/秒\n",
(1000 * 10000) / (int) (zclock_time () - start));
printf ("異步 round-trip 測試...\n");
start = zclock_time ();
for (requests = 0; requests < 100000; requests++)
zstr_send (client, "hello");
for (requests = 0; requests < 100000; requests++) {
char *reply = zstr_recv (client);
free (reply);
}
printf (" %d 次/秒\n",
(1000 * 100000) / (int) (zclock_time () - start));
zstr_send (pipe, "完成");
}
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1);
zsocket_connect (worker, "tcp://localhost:5556");
while (1) {
zmsg_t *msg = zmsg_recv (worker);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
static void *
broker_task (void *args)
{
// 準備上下文和套接字
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555");
zsocket_bind (backend, "tcp://*:5556");
// 初始化輪詢對象
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
while (1) {
int rc = zmq_poll (items, 2, -1);
if (rc == -1)
break; // 中斷
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (frontend);
zframe_t *address = zmsg_pop (msg);
zframe_destroy (&address);
zmsg_pushstr (msg, "W");
zmsg_send (&msg, backend);
}
if (items [1].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (backend);
zframe_t *address = zmsg_pop (msg);
zframe_destroy (&address);
zmsg_pushstr (msg, "C");
zmsg_send (&msg, frontend);
}
}
zctx_destroy (&ctx);
return NULL;
}
int main (void)
{
// 創建線程
zctx_t *ctx = zctx_new ();
void *client = zthread_fork (ctx, client_task, NULL);
zthread_new (ctx, worker_task, NULL);
zthread_new (ctx, broker_task, NULL);
// 等待client端管道的信號
char *signal = zstr_recv (client);
free (signal);
zctx_destroy (&ctx);
return 0;
}
```
在我的開發環境中運行結果如下:
```
Setting up test...
Synchronous round-trip test...
9057 calls/second
Asynchronous round-trip test...
173010 calls/second
```
需要注意的是client在運行開始會暫停一段時間,這是因為在向ROUTER套接字發送消息時,若指定標識的套接字沒有連接,那么ROUTER會直接丟棄該消息。這個示例中我們沒有使用LRU算法,所以當worker連接速度稍慢時就有可能丟失數據,影響測試結果。
我們可以看到,逐個處理循環比異步處理要慢將近20倍,讓我們把它應用到管家模式中去。
首先,讓我們修改client的API,添加獨立的發送和接收方法:
```
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t *mdcli_recv (mdcli_t *self);
```
然后花很短的時間就能將同步的client API改造成異步的API:
**mdcliapi2: Majordomo asynchronous client API in C**
```c
/* =====================================================================
mdcliapi2.c
Majordomo Protocol Client API (async version)
Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "mdcliapi2.h"
// 類結構
// 使用成員函數訪問屬性
struct _mdcli_t {
zctx_t *ctx; // 上下文
char *broker;
void *client; // 連接至代理的套接字
int verbose; // 在標準輸出打印運行狀態
int timeout; // 請求超時時間
};
// ---------------------------------------------------------------------
// 連接或重連代理
void s_mdcli_connect_to_broker (mdcli_t *self)
{
if (self->client)
zsocket_destroy (self->ctx, self->client);
self->client = zsocket_new (self->ctx, ZMQ_DEALER);
zmq_connect (self->client, self->broker);
if (self->verbose)
zclock_log ("I: 正在連接代理 %s...", self->broker);
}
// ---------------------------------------------------------------------
// 構造函數
mdcli_t *
mdcli_new (char *broker, int verbose)
{
assert (broker);
mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->verbose = verbose;
self->timeout = 2500; // 毫秒
s_mdcli_connect_to_broker (self);
return self;
}
// ---------------------------------------------------------------------
// 析構函數
void
mdcli_destroy (mdcli_t **self_p)
{
assert (self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 設置請求超時時間
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
assert (self);
self->timeout = timeout;
}
// ---------------------------------------------------------------------
// 發送請求給代理
// 取得請求消息的所有權,發送后銷毀
int
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
assert (self);
assert (request_p);
zmsg_t *request = *request_p;
// 在消息頂部加入協議規定的幀
// Frame 0: empty (模擬REQ套接字的行為)
// Frame 1: "MDPCxy" (6個字節, MDP/Client x.y)
// Frame 2: Service name (看打印字符串)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
zmsg_pushstr (request, "");
if (self->verbose) {
zclock_log ("I: 發送請求給 '%s' 服務:", service);
zmsg_dump (request);
}
zmsg_send (&request, self->client);
return 0;
}
// ---------------------------------------------------------------------
// 獲取應答消息,若無則返回NULL;
// 該函數不會嘗試從代理的崩潰中恢復,
// 因為我們沒有記錄那些未收到應答的請求,所以也無法重發。
zmsg_t *
mdcli_recv (mdcli_t *self)
{
assert (self);
// 輪詢套接字以獲取應答
zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
return NULL; // 中斷
// 收到應答后進行處理
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: received reply:");
zmsg_dump (msg);
}
// 不要處理錯誤,直接報出
assert (zmsg_size (msg) >= 4);
zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);
zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);
zframe_t *service = zmsg_pop (msg);
zframe_destroy (&service);
return msg; // Success
}
if (zctx_interrupted)
printf ("W: 收到中斷消息,正在中止client...\n");
else
if (self->verbose)
zclock_log ("W: 嚴重錯誤,放棄請求");
return NULL;
}
```
下面是對應的測試代碼:
**mdclient2: Majordomo client application in C**
```
//
// 異步管家模式 - client示例程序
// 使用mdcli API隱藏MDP協議的具體實現
//
// 直接編譯源碼,而不創建類庫
#include "mdcliapi2.c"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
int count;
for (count = 0; count < 100000; count++) {
zmsg_t *request = zmsg_new ();
zmsg_pushstr (request, "Hello world");
mdcli_send (session, "echo", &request);
}
for (count = 0; count < 100000; count++) {
zmsg_t *reply = mdcli_recv (session);
if (reply)
zmsg_destroy (&reply);
else
break; // 使用Ctrl-C中斷
}
printf ("收到 %d 個應答\n", count);
mdcli_destroy (&session);
return 0;
}
```
代理和worker的代碼沒有變,因為我們并沒有改變MDP協議。經過對client的改造,我們可以明顯看到速度的提升。如以下是同步狀況下處理10萬條請求的時間:
```
$ time mdclient
100000 requests/replies processed
real 0m14.088s
user 0m1.310s
sys 0m2.670s
```
以下是異步請求的情況:
```
$ time mdclient2
100000 replies received
real 0m8.730s
user 0m0.920s
sys 0m1.550s
```
讓我們建立10個worker,看看效果如何:
```
$ time mdclient2
100000 replies received
real 0m3.863s
user 0m0.730s
sys 0m0.470s
```
由于worker獲得消息需要通過LRU隊列機制,所以并不能做到完全的異步。但是,worker越多其效果也會越好。在我的測試機上,當worker的數量達到8個時,速度就不再提升了——四核處理器只能做這么多。但是,我們仍然獲得了近四倍的速度提升,而改造過程只有幾分鐘而已。此外,代理其實還沒有進行優化,它仍會復制消息,而沒有實現零拷貝。不過,我們已經做到每秒處理2.5萬次請求-應答,已經很不錯了。
當然,異步的管家模式也并不完美,有一個顯著的缺點:它無法從代理的崩潰中恢復。可以看到mdcliapi2的代碼中并沒有恢復連接的代碼,重新連接需要有以下幾點作為前提:
* 每個請求都做了編號,每次應答也含有相應的編號,這就需要修改協議,明確定義;
* client的API需要保留并跟蹤所有已發送、但仍未收到應答的請求;
* 如果代理發生崩潰,client會重發所有消息。
可以看到,高可靠性往往和復雜度成正比,值得在管家模式中應用這一機制嗎?這就要看應用場景了。如果是一個名稱查詢服務,每次會話會調用一次,那不需要應用這一機制;如果是一個位于前端的網頁服務,有數千個客戶端相連,那可能就需要了。
### 服務查詢
現在,我們已經有了一個面向服務的代理了,但是我們無法得知代理是否提供了某項特定服務。如果請求失敗,那當然就表示該項服務目前不可用,但具體原因是什么呢?所以,如果能夠詢問代理“echo服務正在運行嗎?”,那將會很有用處。最明顯的方法是在MDP/Client協議中添加一種命令,客戶端可以詢問代理某項服務是否可用。但是,MDP/Client最大的優點在于簡單,如果添加了服務查詢的功能就太過復雜了。
另一種方案是學電子郵件的處理方式,將失敗的請求重新返回。但是這同樣會增加復雜度,因為我們需要鑒別收到的消息是一個應答還是被退回的請求。
讓我們用之前的方式,在MDP的基礎上建立新的機制,而不是改變它。服務定位本身也是一項服務,我們還可以提供類似于“禁用某服務”、“提供服務數據”等其他服務。我們需要的是一個能夠擴展協議但又不會影響協議本身的機制。
這樣就誕生了一個小巧的RFC - MMI(管家接口)的應用層,建立在MDP協議之上:http://rfc.zeromq.org/spec:8 。我們在代理中其實已經加以實現了,不知你是否已經注意到。下面的代碼演示了如何使用這項服務查詢功能:
**mmiecho: Service discovery over Majordomo in C**
```c
//
// MMI echo 服務查詢示例程序
//
// 讓我們直接編譯,不生成類庫
#include "mdcliapi.c"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
// 我們需要查詢的服務名稱
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "echo");
// 發送給“服務查詢”服務的消息
zmsg_t *reply = mdcli_send (session, "mmi.service", &request);
if (reply) {
char *reply_code = zframe_strdup (zmsg_first (reply));
printf ("查詢 echo 服務的結果: %s\n", reply_code);
free (reply_code);
zmsg_destroy (&reply);
}
else
printf ("E: 代理無響應,請確認它正在工作\n");
mdcli_destroy (&session);
return 0;
}
```
代理在運行時會檢查請求的服務名稱,自行處理那些mmi.開頭的服務,而不轉發給worker。你可以在不開啟worker的情況下運行以上代碼,可以看到程序是報告200還是404。MMI在示例程序代理中的實現是很簡單的,比如,當某個worker消亡時,該服務仍然標記為可用。實踐中,代理應該在一定間隔后清除那些沒有worker的服務。
### 冪等服務
冪等是指能夠安全地重復執行某項操作。如,看鐘是冪等的,但借錢給別人老婆就不是了。有些客戶端至服務端的通信是冪等的,但有些則不是。冪等的通信示例有:
* 無狀態的任務分配,即管道模式中服務端是無狀態的worker,它的處理結果是根據客戶端的請求狀態生成的,因此可以重復處理相同的請求;
* 命名服務中將邏輯地址轉化成實際綁定或連接的端點,可以重復查詢多次,因此也是冪等的。
非冪等的通信示例有:
* 日志服務,我們不會希望相同的日志內容被記錄多次;
* 任何會對下游節點有影響的服務,如該服務會向下游節點發送信息,若收到相同的請求,那下游節點收到的信息就是重復的;
* 當服務修改了某些共享的數據,且沒有進行冪等方面的設置。如某項服務對銀行賬戶進行了借操作(debit),這一定是非冪等的。
如果應用程序提供的服務是非冪等的,那就需要考慮它究竟是在哪個階段崩潰的。如果程序在空閑或處理請求的過程中崩潰,那不會有什么問題。我們可以使用數據庫中的事務機制來保證借貸操作是同時發生的。如果應用程序在發送請求的時候崩潰了,那就會有問題,因為對于該程序來說,它已經完成了工作。
如果在返回應答的過程中網絡阻塞了,客戶端會認為請求發送失敗,并進行重發,這樣服務端會再一次執行相同的請求。這不是我們想要的結果。
常用的解決方法是在服務端檢測并拒絕重復的請求,這就需要:
* 客戶端為每個請求加注唯一的標識,包括客戶端標識和消息標識;
* 服務端在發送應答時使用客戶端標識和消息標識作為鍵,保存應答內容;
* 當服務端發現收到的請求已在應答哈希表中存在,它會跳過該次請求,直接返回應答內容。
### 脫機可靠性(巨人模式)
當你意識到管家模式是一種非常可靠的消息代理時,你可能會想要使用磁盤做一下消息中轉,從而進一步提升可靠性。這種方式雖然在很多企業級消息系統中應用,但我還是有些反對的,原因有:
* 我們可以看到,懶惰海盜模式的client可以工作得非常好,能夠在多種架構中運行。唯一的問題是它會假設worker是無狀態的,且提供的服務是冪等的。但這個問題我們可以通過其他方式解決,而不是添加磁盤。
* 添加磁盤會帶來新的問題,需要額外的管理和維護費用。海盜模式的最大優點就是簡單明了,不會崩潰。如果你還是擔心硬件會出問題,可以改用點對點的通信模式,這會在本章最后一節講到。
雖然有以上原因,但還是有一個合理的場景可以用到磁盤中轉的——異步脫機網絡。海盜模式有一個問題,那就是client發送請求后會一直等待應答。如果client和worker并不是長連接(可以拿電子郵箱做個類比),我們就無法在client和worker之間建立一個無狀態的網絡,因此需要將這種狀態保存起來。
于是我們就有了巨人模式,該模式下會將消息寫到磁盤中,確保不會丟失。當我們進行服務查詢時,會轉向巨人這一層進行。巨人是建立在管家之上的,而不是改寫了MDP協議。這樣做的好處是我們可以在一個特定的worker中實現這種可靠性,而不用去增加代理的邏輯。
* 實現更為簡單;
* 代理用一種語言編寫,worker使用另一種語言編寫;
* 可以自由升級這種模式。
唯一的缺點是,代理和磁盤之間會有一層額外的聯系,不過這也是值得的。
我們有很多方法來實現一種持久化的請求-應答架構,而目標當然是越簡單越好。我能想到的最簡單的方式是提供一種成為“巨人”的代理服務,它不會影響現有worker的工作,若client想要立即得到應答,它可以和代理進行通信;如果它不是那么著急,那就可以和巨人通信:“嗨,巨人,麻煩幫我處理下這個請求,我去買些菜。”

這樣一來,巨人就既是worker又是client。client和巨人之間的對話一般是:
* Client: 請幫我處理這個請求。巨人:好的。
* Client: 有要給我的應答嗎?巨人:有的。(或者沒有)
* Client: OK,你可以釋放那個請求了,工作已經完成。巨人:好的。
巨人和代理之間的對話一般是:
* 巨人:嗨,代理程序,你這里有個叫echo的服務嗎?代理:恩,好像有。
* 巨人:嗨,echo服務,請幫我處理一下這個請求。Echo: 好了,這是應答。
* 巨人:謝謝!
你可以想象一些發生故障的情形,看看上述模式是否能解決?worker在處理請求的時候崩潰,巨人會不斷地重新發送請求;應答在傳輸過程中丟失了,巨人也會重試;如果請求已經處理,但client沒有得到應答,那它會再次詢問巨人;如果巨人在處理請求或進行應答的時候崩潰了,客戶端會進行重試;只要請求是被保存在磁盤上的,那它就不會丟失。
這個機制中,握手的過程是比較漫長的,但client可以使用異步的管家模式,一次發送多個請求,并一起等待應答。
我們需要一種方法,讓client會去請求應答內容。不同的client會訪問到相同的服務,且client是來去自由的,有著不同的標識。一個簡單、合理、安全的解決方案是:
* 當巨人收到請求時,它會為每個請求生成唯一的編號(UUID),并將這個編號返回給client;
* client在請求應答內容時需要提供這個編號。
這樣一來client就需要負責將UUID安全地保存起來,不過這就省去了驗證的過程。有其他方案嗎?我們可以使用持久化的套接字,即顯式聲明客戶端的套接字標識。然而,這會造成管理上的麻煩,而且萬一兩個client的套接字標識相同,那會引來無窮的麻煩。
在我們開始制定一個新的協議之前,我們先思考一下client如何和巨人通信。一種方案是提供一種服務,配合三個不同的命令;另一種方案則更為簡單,提供三種獨立的服務:
* **titanic.request** - 保存一個請求,并返回UUID
* **titanic.reply** - 根據UUID獲取應答內容
* **titanic.close** - 確認某個請求已被正確地處理
我們需要創建一個多線程的worker,正如我們之前用ZMQ進行多線程編程一樣,很簡單。但是,在我們開始編寫代碼之前,先講巨人模式的一些定義寫下來:http://rfc.zeromq.org/spec:9 。我們稱之為“巨人服務協議”,或TSP。
使用TSP協議自然會讓client多出額外的工作,下面是一個簡單但足夠健壯的client:
**ticlient: Titanic client example in C**
```c
//
// 巨人模式client示例
// 實現 http://rfc.zeromq.org/spec:9 協議中的client端
// 讓我們直接編譯,不創建類庫
#include "mdcliapi.c"
// 請求TSP協議下的服務
// 如果成功則返回應答(狀態碼:200),否則返回NULL
//
static zmsg_t *
s_service_call (mdcli_t *session, char *service, zmsg_t **request_p)
{
zmsg_t *reply = mdcli_send (session, service, request_p);
if (reply) {
zframe_t *status = zmsg_pop (reply);
if (zframe_streq (status, "200")) {
zframe_destroy (&status);
return reply;
}
else
if (zframe_streq (status, "400")) {
printf ("E: 客戶端發生嚴重錯誤,取消請求\n");
exit (EXIT_FAILURE);
}
else
if (zframe_streq (status, "500")) {
printf ("E: 服務端發生嚴重錯誤,取消請求\n");
exit (EXIT_FAILURE);
}
}
else
exit (EXIT_SUCCESS); // 中斷或發生錯誤
zmsg_destroy (&reply);
return NULL; // 請求不成功,但不返回失敗原因
}
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
// 1. 發送echo服務的請求給巨人
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "echo");
zmsg_addstr (request, "Hello world");
zmsg_t *reply = s_service_call (
session, "titanic.request", &request);
zframe_t *uuid = NULL;
if (reply) {
uuid = zmsg_pop (reply);
zmsg_destroy (&reply);
zframe_print (uuid, "I: request UUID ");
}
// 2. 等待應答
while (!zctx_interrupted) {
zclock_sleep (100);
request = zmsg_new ();
zmsg_add (request, zframe_dup (uuid));
zmsg_t *reply = s_service_call (
session, "titanic.reply", &request);
if (reply) {
char *reply_string = zframe_strdup (zmsg_last (reply));
printf ("Reply: %s\n", reply_string);
free (reply_string);
zmsg_destroy (&reply);
// 3. 關閉請求
request = zmsg_new ();
zmsg_add (request, zframe_dup (uuid));
reply = s_service_call (session, "titanic.close", &request);
zmsg_destroy (&reply);
break;
}
else {
printf ("I: 尚未收到應答,準備稍后重試...\n");
zclock_sleep (5000); // 5秒后重試
}
}
zframe_destroy (&uuid);
mdcli_destroy (&session);
return 0;
}
```
當然,上面的代碼可以整合到一個框架中,程序員不需要了解其中的細節。如果我有時間的話,我會嘗試寫一個這樣的API的,讓應用程序又變回短短的幾行。這種理念和MDP中的一致:不要做重復的事。
下面是巨人的實現。這個服務端會使用三個線程來處理三種服務。它使用最原始的持久化方法來保存請求:為每個請求創建一個磁盤文件。雖然簡單,但也挺恐怖的。比較復雜的部分是,巨人會維護一個隊列來保存這些請求,從而避免重復地掃描目錄。
**titanic: Titanic broker example in C**
```c
//
// 巨人模式 - 服務
//
// 實現 http://rfc.zeromq.org/spec:9 協議的服務端
// 讓我們直接編譯,不創建類庫
#include "mdwrkapi.c"
#include "mdcliapi.c"
#include "zfile.h"
#include <uuid/uuid.h>
// 返回一個可打印的唯一編號(UUID)
// 調用者負責釋放UUID字符串的內存
static char *
s_generate_uuid (void)
{
char hex_char [] = "0123456789ABCDEF";
char *uuidstr = zmalloc (sizeof (uuid_t) * 2 + 1);
uuid_t uuid;
uuid_generate (uuid);
int byte_nbr;
for (byte_nbr = 0; byte_nbr < sizeof (uuid_t); byte_nbr++) {
uuidstr [byte_nbr * 2 + 0] = hex_char [uuid [byte_nbr] >> 4];
uuidstr [byte_nbr * 2 + 1] = hex_char [uuid [byte_nbr] & 15];
}
return uuidstr;
}
// 根據UUID生成用于保存請求內容的文件名,并返回
#define TITANIC_DIR ".titanic"
static char *
s_request_filename (char *uuid) {
char *filename = malloc (256);
snprintf (filename, 256, TITANIC_DIR "/%s.req", uuid);
return filename;
}
// 根據UUID生成用于保存應答內容的文件名,并返回
static char *
s_reply_filename (char *uuid) {
char *filename = malloc (256);
snprintf (filename, 256, TITANIC_DIR "/%s.rep", uuid);
return filename;
}
// ---------------------------------------------------------------------
// 巨人模式 - 請求服務
static void
titanic_request (void *args, zctx_t *ctx, void *pipe)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.request", 0);
zmsg_t *reply = NULL;
while (TRUE) {
// 若應答非空則發送,再從代理處獲得新的請求
zmsg_t *request = mdwrk_recv (worker, &reply);
if (!request)
break; // 中斷并退出
// 確保消息目錄是存在的
file_mkdir (TITANIC_DIR);
// 生成UUID,并將消息保存至磁盤
char *uuid = s_generate_uuid ();
char *filename = s_request_filename (uuid);
FILE *file = fopen (filename, "w");
assert (file);
zmsg_save (request, file);
fclose (file);
free (filename);
zmsg_destroy (&request);
// 將UUID加入隊列
reply = zmsg_new ();
zmsg_addstr (reply, uuid);
zmsg_send (&reply, pipe);
// 將UUID返回給客戶端
// 將由循環頂部的mdwrk_recv()函數完成
reply = zmsg_new ();
zmsg_addstr (reply, "200");
zmsg_addstr (reply, uuid);
free (uuid);
}
mdwrk_destroy (&worker);
}
// ---------------------------------------------------------------------
// 巨人模式 - 應答服務
static void *
titanic_reply (void *context)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.reply", 0);
zmsg_t *reply = NULL;
while (TRUE) {
zmsg_t *request = mdwrk_recv (worker, &reply);
if (!request)
break; // 中斷并退出
char *uuid = zmsg_popstr (request);
char *req_filename = s_request_filename (uuid);
char *rep_filename = s_reply_filename (uuid);
if (file_exists (rep_filename)) {
FILE *file = fopen (rep_filename, "r");
assert (file);
reply = zmsg_load (file);
zmsg_pushstr (reply, "200");
fclose (file);
}
else {
reply = zmsg_new ();
if (file_exists (req_filename))
zmsg_pushstr (reply, "300"); //掛起
else
zmsg_pushstr (reply, "400"); //未知
}
zmsg_destroy (&request);
free (uuid);
free (req_filename);
free (rep_filename);
}
mdwrk_destroy (&worker);
return 0;
}
// ---------------------------------------------------------------------
// 巨人模式 - 關閉請求
static void *
titanic_close (void *context)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.close", 0);
zmsg_t *reply = NULL;
while (TRUE) {
zmsg_t *request = mdwrk_recv (worker, &reply);
if (!request)
break; // 中斷并退出
char *uuid = zmsg_popstr (request);
char *req_filename = s_request_filename (uuid);
char *rep_filename = s_reply_filename (uuid);
file_delete (req_filename);
file_delete (rep_filename);
free (uuid);
free (req_filename);
free (rep_filename);
zmsg_destroy (&request);
reply = zmsg_new ();
zmsg_addstr (reply, "200");
}
mdwrk_destroy (&worker);
return 0;
}
// 處理某個請求,成功則返回1
static int
s_service_success (mdcli_t *client, char *uuid)
{
// 讀取請求內容,第一幀為服務名稱
char *filename = s_request_filename (uuid);
FILE *file = fopen (filename, "r");
free (filename);
// 如果client已經關閉了該請求,則返回1
if (!file)
return 1;
zmsg_t *request = zmsg_load (file);
fclose (file);
zframe_t *service = zmsg_pop (request);
char *service_name = zframe_strdup (service);
// 使用MMI協議檢查服務是否可用
zmsg_t *mmi_request = zmsg_new ();
zmsg_add (mmi_request, service);
zmsg_t *mmi_reply = mdcli_send (client, "mmi.service", &mmi_request);
int service_ok = (mmi_reply
&& zframe_streq (zmsg_first (mmi_reply), "200"));
zmsg_destroy (&mmi_reply);
if (service_ok) {
zmsg_t *reply = mdcli_send (client, service_name, &request);
if (reply) {
filename = s_reply_filename (uuid);
FILE *file = fopen (filename, "w");
assert (file);
zmsg_save (reply, file);
fclose (file);
free (filename);
return 1;
}
zmsg_destroy (&reply);
}
else
zmsg_destroy (&request);
free (service_name);
return 0;
}
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
zctx_t *ctx = zctx_new ();
// 創建MDP客戶端會話
mdcli_t *client = mdcli_new ("tcp://localhost:5555", verbose);
mdcli_set_timeout (client, 1000); // 1 秒
mdcli_set_retries (client, 1); // 只嘗試一次
void *request_pipe = zthread_fork (ctx, titanic_request, NULL);
zthread_new (ctx, titanic_reply, NULL);
zthread_new (ctx, titanic_close, NULL);
// 主循環
while (TRUE) {
// 如果沒有活動,我們將每秒循環一次
zmq_pollitem_t items [] = { { request_pipe, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
if (items [0].revents & ZMQ_POLLIN) {
// 確保消息目錄是存在的
file_mkdir (TITANIC_DIR);
// 將UUID添加到隊列中,使用“-”號標識等待中的請求
zmsg_t *msg = zmsg_recv (request_pipe);
if (!msg)
break; // 中斷
FILE *file = fopen (TITANIC_DIR "/queue", "a");
char *uuid = zmsg_popstr (msg);
fprintf (file, "-%s\n", uuid);
fclose (file);
free (uuid);
zmsg_destroy (&msg);
}
// 分派
//
char entry [] = "?.......:.......:.......:.......:";
FILE *file = fopen (TITANIC_DIR "/queue", "r+");
while (file && fread (entry, 33, 1, file) == 1) {
// 處理UUID前綴為“-”的請求
if (entry [0] == '-') {
if (verbose)
printf ("I: 開始處理請求 %s\n", entry + 1);
if (s_service_success (client, entry + 1)) {
// 標記為已處理
fseek (file, -33, SEEK_CUR);
fwrite ("+", 1, 1, file);
fseek (file, 32, SEEK_CUR);
}
}
// 跳過最后一行
if (fgetc (file) == '\r')
fgetc (file);
if (zctx_interrupted)
break;
}
if (file)
fclose (file);
}
mdcli_destroy (&client);
return 0;
}
```
測試時,打開mdbroker和titanic,再運行ticlient,然后開啟任意個mdworker,就可以看到client獲得了應答。
幾點說明:
* 我們使用MMI協議去向代理詢問某項服務是否可用,這一點和MDP中的邏輯一致;
* 我們使用inproc(進程內)協議建立主循環和titanic.request服務間的聯系,保存新的請求信息。這樣可以避免主循環不斷掃描磁盤目錄,讀取所有請求文件,并按照時間日期排序。
這個示例程序不應關注它的性能(一定會非常糟糕,雖然我沒有測試過),而是應該看到它是如何提供一種可靠的通信模式的。你可以測試一下,打開代理、巨人、worker和client,使用-v參數顯示跟蹤信息,然后隨意地開關代理、巨人、或worker(client不能關閉),可以看到所有的請求都能獲得應答。
如果你想在真實環境中使用巨人模式,你肯定會問怎樣才能讓速度快起來。以下是我的做法:
* 使用一個磁盤文件保存所有數據。操作系統處理大文件的效率要比處理許多小文件來的高。
* 使用一種循環的機制來組織該磁盤文件的結構,這樣新的請求可以被連續地寫入這個文件。單個線程在全速寫入磁盤時的效率是比較高的。
* 將索引保存在內存中,可以在啟動程序時重建這個索引。這樣做可以節省磁盤緩存,讓索引安全地保存在磁盤上。你需要用到fsync的機制來保存每一條數據;或者可以等待幾毫秒,如果不怕丟失上千條數據的話。
* 如果條件允許,應選擇使用固態硬盤;
* 提前分配該磁盤文件的空間,或者將每次分配的空間調大一些,這樣可以避免磁盤碎片的產生,并保證讀寫是連續的。
另外,我不建議將消息保存在數據庫中,甚至不建議交給那些所謂的高速鍵值緩存,它們比起一個磁盤文件要來得昂貴。
如果你想讓巨人模式變得更為可靠,你可以將請求復制到另一臺服務器上,這樣就不需要擔心主程序遭到核武器襲擊了。
如果你想讓巨人模式變得更為快速,但可以犧牲一些可靠性,那你可以將請求和應答都保存在內存中。這樣做可以讓該服務作為脫機網絡運行,不過若巨人服務本身崩潰了,我也無能為力。
### 高可靠對稱節點(雙子星模式)
#### 概覽
雙子星模式是一對具有主從機制的高可靠節點。任一時間,某個節點會充當主機,接收所有客戶端的請求;另一個則作為一種備機存在。兩個節點會互相監控對方,當主機從網絡中消失時,備機會替代主機的位置。
雙子星模式由Pieter Hintjens和Martin Sustrik設計,應用在iMatix的[OpenAMQ服務器](http://www.openamq.org/)中。它的設計理念是:
* 提供一種簡明的高可靠性解決方案;
* 易于理解和使用;
* 能夠進行可靠的故障切換。

假設我們有一組雙子星模式的服務器,以下是可能發生的故障:
1. 主機發生硬件故障(斷電、失火等),應用程序發送后立刻使用備機進行連接;
1. 主機的網絡環境發生故障,可能某個路由器被雷擊了,立刻使用備機;
1. 主機上的服務被維護人員誤殺,無法自動恢復。
恢復步驟如下:
1. 維護人員排查主機故障;
1. 將備機關閉,造成短時間的服務不可用;
1. 待應用程序都連接到主機后,維護人員重啟備機。
恢復過程是人工進行的,慘痛的經驗告訴我們自動恢復是很可怕的:
* 故障的發生會造成10-30秒之間的服務暫停,如果這是一個真正的突發狀況,那最好還是讓主機暫停服務的好,因為立刻重啟服務可能造成另一個10-30秒的暫停,不如讓用戶停止使用。
* 當有緊急狀況發生時,可以在修復的過程中記錄故障發生原因,而不是讓系統自動恢復,管理員因此無法用其經驗抵御下一次突發狀況。
* 最后,如果自動恢復確實成功了,管理員將無從得知故障的發生原因,因而無法進行分析。
雙子星模式的故障恢復過程是:在修復了主機的問題后,將備機做關閉處理,稍后再重新開啟:

雙子星模式的關閉過程有兩種:
1. 先關閉備機,等待一段時間后再關閉主機;
1. 同時關閉主機和備機,間隔時間不超過幾秒。
關閉時,間隔時間要比故障切換時間短,否則會導致應用程序失去連接、重新連接、并再次失去連接,導致用戶投訴。
#### 詳細要求
雙子星模式可以非常簡單,但能工作得很出色。事實上,這里的實現方法已經歷經三個版本了,之前的版本都過于復雜,想要做太多的事情,因而被我們拋棄。我們需要的只是最基本的功能,能夠提供易理解、易開發、高可靠的解決方法就可以了。
以下是該架構的詳細需求:
* 需要用到雙子星模式的故障是:系統遭受災難性的打擊,如硬件崩潰、火災、意外等。對于其他常規的服務器故障,可以用更簡單的方法。
* 故障恢復時間應該在60秒以內,理想情況下應該在10秒以內;
* 故障恢復(failover)應該是自動完成的,而系統還原(recover)則是由人工完成的。我們希望應用程序能夠在發生故障時自動從主機切換到備機,但不希望在問題解決之前自動切換回主機,因為這很有可能讓主機再次崩潰。
* 程序的邏輯應該盡量簡單,易于使用,最好能封裝在API中;
* 需要提供一個明確的指示,哪臺主機正在提供服務,以避免“精神分裂”的癥狀,即兩臺服務器都認為自己是主機;
* 兩臺服務器的啟動順序不應該有限制;
* 啟動或關閉主從機時不需要更改客戶端的配置,但有可能會中斷連接;
* 管理員需要能夠同時監控兩臺機器;
* 兩臺機器之間必須有專用的高速網絡連接,必須能使用特定IP進行路由。
我們做如下架假設:
* 單臺備機能夠提供足夠的保障,不需要再進行其他備份機制;
* 主從機應該都能夠提供完整的服務,承載相同的壓力,不需要進行負載均衡;
* 預算中允許有這樣一臺長時間閑置的備機。
雙子星模式不會用到:
* 多臺備機,或在主從機之間進行負載均衡。該模式中的備機將一直處于空閑狀態,只有主機發生問題時才會工作;
* 處理持久化的消息或事務。我們假設所連接的網絡是不可靠的(或不可信的)。
* 自動搜索網絡。雙子星模式是手工配置的,他們知道對方的存在,應用程序則知道雙子星的存在。
* 主從機之間狀態的同步。所有服務端的狀態必須能由應用程序進行重建。
以下是雙子星模式中的幾個術語:
* **主機** - 通常情況下作為master的機器;
* **備機** - 通常情況下作為slave的機器,只有當主機從網絡中消失時,備機才會切換成master狀態,接收所有的應用程序請求;
* **master** - 雙子星模式中接收應用程序請求的機器;同一時刻只有一臺master;
* **slave** - 當master消失時用以頂替的機器。
配置雙子星模式的步驟:
1. 讓主機知道備機的位置;
1. 讓備機知道主機的位置;
1. 調整故障恢復時間,兩臺機器的配置必須相同。
比較重要的配置是應讓兩臺機器間隔多久檢查一次對方的狀態,以及多長時間后采取行動。在我們的示例中,故障恢復時間設置為2000毫秒,超過這個時間備機就會代替主機的位置。但若你將主機的服務包裹在一個shell腳本中進行重啟,就需要延長這個時間,否則備機可能在主機恢復連接的過程中轉換成master。
要讓客戶端應用程序和雙子星模式配合,你需要做的是:
1. 知道兩臺服務器的地址;
1. 嘗試連接主機,若失敗則連接備機;
1. 檢測失效的連接,一般使用心跳機制;
1. 嘗試重連主機,然后再連接備機,其間的間隔應比服務器故障恢復時間長;
1. 重建服務器端需要的所有狀態數據;
1. 如果要保證可靠性,應重發故障期間的消息。
這不是件容易的事,所以我們一般會將其封裝成一個API,供程序員使用。
雙子星模式的主要限制有:
* 服務端進程不能涉及到一個以上的雙子星對稱節點;
* 主機只能有一個備機;
* 當備機處于slave狀態時,它不會處理任何請求;
* 備機必須能夠承受所有的應用程序請求;
* 故障恢復時間不能在運行時調整;
* 客戶端應用程序需要做一些重連的工作。
#### 防止精神分裂
“精神分裂”癥狀指的是一個集群中的不同部分同時認為自己是master,從而停止對對方的檢測。雙子星模式中的算法會降低這種癥狀的發生幾率:主備機在決定自己是否為master時會檢測自身是否收到了應用程序的請求,以及對方是否已經從網絡中消失。
但在某些情況下,雙子星模式也會發生精神分裂。比如說,主備機被配置在兩幢大樓里,每幢大樓的局域網中又分布了一些應用程序。這樣,當兩幢大樓的網絡通信被阻斷,雙子星模式的主備機就會分別在兩幢大樓里接受和處理請求。
為了防止精神分裂,我們必須讓主備機使用專用的網絡進行連接,最簡單的方法當然是用一根雙絞線將他們相連。
我們不能將雙子星部署在兩個不同的島嶼上,為各自島嶼的應用程序服務。這種情況下,我們會使用諸如聯邦模式的機制進行可靠性設計。
最好但最夸張的做法是,將兩臺機器之間的連接和應用程序的連接完全隔離開來,甚至是使用不同的網卡,而不僅僅是不同的端口。這樣做也是為了日后排查錯誤時更為明確。
#### 實現雙子星模式
閑話少說,下面是雙子星模式的服務端代碼:
**bstarsrv: Binary Star server in C**
```c
//
// 雙子星模式 - 服務端
//
#include "czmq.h"
// 發送狀態信息的間隔時間
// 如果對方在兩次心跳過后都沒有應答,則視為斷開
#define HEARTBEAT 1000 // In msecs
// 服務器狀態枚舉
typedef enum {
STATE_PRIMARY = 1, // 主機,等待同伴連接
STATE_BACKUP = 2, // 備機,等待同伴連接
STATE_ACTIVE = 3, // 激活態,處理應用程序請求
STATE_PASSIVE = 4 // 被動態,不接收請求
} state_t;
// 對話節點事件
typedef enum {
PEER_PRIMARY = 1, // 主機
PEER_BACKUP = 2, // 備機
PEER_ACTIVE = 3, // 激活態
PEER_PASSIVE = 4, // 被動態
CLIENT_REQUEST = 5 // 客戶端請求
} event_t;
// 有限狀態機
typedef struct {
state_t state; // 當前狀態
event_t event; // 當前事件
int64_t peer_expiry; // 判定節點死亡的時限
} bstar_t;
// 執行有限狀態機(將事件綁定至狀態);
// 發生異常時返回TRUE。
static Bool
s_state_machine (bstar_t *fsm)
{
Bool exception = FALSE;
// 主機等待同伴連接
// 該狀態下接收CLIENT_REQUEST事件
if (fsm->state == STATE_PRIMARY) {
if (fsm->event == PEER_BACKUP) {
printf ("I: 已連接至備機(slave),可以作為master運行。\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_ACTIVE) {
printf ("I: 已連接至備機(master),可以作為slave運行。\n");
fsm->state = STATE_PASSIVE;
}
}
else
// 備機等待同伴連接
// 該狀態下拒絕CLIENT_REQUEST事件
if (fsm->state == STATE_BACKUP) {
if (fsm->event == PEER_ACTIVE) {
printf ("I: 已連接至主機(master),可以作為slave運行。\n");
fsm->state = STATE_PASSIVE;
}
else
if (fsm->event == CLIENT_REQUEST)
exception = TRUE;
}
else
// 服務器處于激活態
// 該狀態下接受CLIENT_REQUEST事件
if (fsm->state == STATE_ACTIVE) {
if (fsm->event == PEER_ACTIVE) {
// 若出現兩臺master,則拋出異常
printf ("E: 嚴重錯誤:雙master。正在退出。\n");
exception = TRUE;
}
}
else
// 服務器處于被動態
// 若同伴已死,CLIENT_REQUEST事件將觸發故障恢復
if (fsm->state == STATE_PASSIVE) {
if (fsm->event == PEER_PRIMARY) {
// 同伴正在重啟 - 轉為激活態,同伴將轉為被動態。
printf ("I: 主機(slave)正在重啟,可作為master運行。\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_BACKUP) {
// 同伴正在重啟 - 轉為激活態,同伴將轉為被動態。
printf ("I: 備機(slave)正在重啟,可作為master運行。\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_PASSIVE) {
// 若出現兩臺slave,集群將無響應
printf ("E: 嚴重錯誤:雙slave。正在退出\n");
exception = TRUE;
}
else
if (fsm->event == CLIENT_REQUEST) {
// 若心跳超時,同伴將成為master;
// 此行為由客戶端請求觸發。
assert (fsm->peer_expiry > 0);
if (zclock_time () >= fsm->peer_expiry) {
// 同伴已死,轉為激活態。
printf ("I: 故障恢復,可作為master運行。\n");
fsm->state = STATE_ACTIVE;
}
else
// 同伴還在,拒絕請求。
exception = TRUE;
}
}
return exception;
}
int main (int argc, char *argv [])
{
// 命令行參數可以為:
// -p 作為主機啟動, at tcp://localhost:5001
// -b 作為備機啟動, at tcp://localhost:5002
zctx_t *ctx = zctx_new ();
void *statepub = zsocket_new (ctx, ZMQ_PUB);
void *statesub = zsocket_new (ctx, ZMQ_SUB);
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
bstar_t fsm = { 0 };
if (argc == 2 && streq (argv [1], "-p")) {
printf ("I: 主機master,等待備機(slave)連接。\n");
zsocket_bind (frontend, "tcp://*:5001");
zsocket_bind (statepub, "tcp://*:5003");
zsocket_connect (statesub, "tcp://localhost:5004");
fsm.state = STATE_PRIMARY;
}
else
if (argc == 2 && streq (argv [1], "-b")) {
printf ("I: 備機slave,等待主機(master)連接。\n");
zsocket_bind (frontend, "tcp://*:5002");
zsocket_bind (statepub, "tcp://*:5004");
zsocket_connect (statesub, "tcp://localhost:5003");
fsm.state = STATE_BACKUP;
}
else {
printf ("Usage: bstarsrv { -p | -b }\n");
zctx_destroy (&ctx);
exit (0);
}
// 設定下一次發送狀態的時間
int64_t send_state_at = zclock_time () + HEARTBEAT;
while (!zctx_interrupted) {
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ statesub, 0, ZMQ_POLLIN, 0 }
};
int time_left = (int) ((send_state_at - zclock_time ()));
if (time_left < 0)
time_left = 0;
int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 上下文對象被關閉
if (items [0].revents & ZMQ_POLLIN) {
// 收到客戶端請求
zmsg_t *msg = zmsg_recv (frontend);
fsm.event = CLIENT_REQUEST;
if (s_state_machine (&fsm) == FALSE)
// 返回應答
zmsg_send (&msg, frontend);
else
zmsg_destroy (&msg);
}
if (items [1].revents & ZMQ_POLLIN) {
// 收到狀態消息,作為事件處理
char *message = zstr_recv (statesub);
fsm.event = atoi (message);
free (message);
if (s_state_machine (&fsm))
break; // 錯誤,退出。
fsm.peer_expiry = zclock_time () + 2 * HEARTBEAT;
}
// 定時發送狀態信息
if (zclock_time () >= send_state_at) {
char message [2];
sprintf (message, "%d", fsm.state);
zstr_send (statepub, message);
send_state_at = zclock_time () + HEARTBEAT;
}
}
if (zctx_interrupted)
printf ("W: 中斷\n");
// 關閉套接字和上下文
zctx_destroy (&ctx);
return 0;
}
```
下面是客戶端代碼:
**bstarcli: Binary Star client in C**
```c
//
// 雙子星模式 - 客戶端
//
#include "czmq.h"
#define REQUEST_TIMEOUT 1000 // 毫秒
#define SETTLE_DELAY 2000 // 超時時間
int main (void)
{
zctx_t *ctx = zctx_new ();
char *server [] = { "tcp://localhost:5001", "tcp://localhost:5002" };
uint server_nbr = 0;
printf ("I: 正在連接服務器 %s...\n", server [server_nbr]);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);
int sequence = 0;
while (!zctx_interrupted) {
// 發送請求并等待應答
char request [10];
sprintf (request, "%d", ++sequence);
zstr_send (client, request);
int expect_reply = 1;
while (expect_reply) {
// 輪詢套接字
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中斷
// 處理應答
if (items [0].revents & ZMQ_POLLIN) {
// 審核應答編號
char *reply = zstr_recv (client);
if (atoi (reply) == sequence) {
printf ("I: 服務端應答正常 (%s)\n", reply);
expect_reply = 0;
sleep (1); // 每秒發送一個請求
}
else {
printf ("E: 錯誤的應答內容: %s\n",
reply);
}
free (reply);
}
else {
printf ("W: 服務器無響應,正在重試\n");
// 重開套接字
zsocket_destroy (ctx, client);
server_nbr = (server_nbr + 1) % 2;
zclock_sleep (SETTLE_DELAY);
printf ("I: 正在連接服務端 %s...\n",
server [server_nbr]);
client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);
// 使用新套接字重發請求
zstr_send (client, request);
}
}
}
zctx_destroy (&ctx);
return 0;
}
```
運行以下命令進行測試,順序隨意:
```
bstarsrv -p # Start primary
bstarsrv -b # Start backup
bstarcli
```
可以將主機進程殺掉,測試故障恢復機制;再開啟主機,殺掉備機,查看還原機制。要注意是由客戶端觸發這兩個事件的。
下圖展現了服務進程的狀態圖。綠色狀態下會接收客戶端請求,粉色狀態會拒絕請求。事件指的是同伴的狀態,所以“同伴激活態”指的是同伴機器告知我們它處于激活態。“客戶請求”表示我們從客戶端獲得了請求,“客戶投票”則指我們從客戶端獲得了請求并且同伴已經超時死亡。

需要注意的是,服務進程使用PUB-SUB套接字進行狀態交換,其它類型的套接字在這里不適用。比如,PUSH和DEALER套接字在沒有節點相連的時候會發生阻塞;PAIR套接字不會在節點斷開后進行重連;ROUTER套接字需要地址才能發送消息。
These are the main limitations of the Binary Star pattern:
* A server process cannot be part of more than one Binary Star pair.
* A primary server can have a single backup server, no more.
* The backup server cannot do useful work while in slave mode.
* The backup server must be capable of handling full application loads.
* Failover configuration cannot be modified at runtime.
* Client applications must do some work to benefit from failover.
#### 雙子星反應堆
我們可以將雙子星模式打包成一個類似反應堆的類,供以后復用。在C語言中,我們使用czmq的zloop類,其他語言應該會有相應的實現。以下是C語言版的bstar接口:
```c
// 創建雙子星模式實例,使用本地(綁定)和遠程(連接)端點來設置節點對。
bstar_t *bstar_new (int primary, char *local, char *remote);
// 銷毀實例
void bstar_destroy (bstar_t **self_p);
// 返回底層的zloop反應堆,用以添加定時器、讀取器、注冊和取消等功能。
zloop_t *bstar_zloop (bstar_t *self);
// 注冊投票讀取器
int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);
// 注冊狀態機處理器
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
// 開啟反應堆,當回調函數返回-1,或進程收到SIGINT、SIGTERM信號時中止。
int bstar_start (bstar_t *self);
```
以下是類的實現:
**bstar: Binary Star core class in C**
```c
/* =====================================================================
bstar - Binary Star reactor
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "bstar.h"
// 服務器狀態枚舉
typedef enum {
STATE_PRIMARY = 1, // 主機,等待同伴連接
STATE_BACKUP = 2, // 備機,等待同伴連接
STATE_ACTIVE = 3, // 激活態,處理應用程序請求
STATE_PASSIVE = 4 // 被動態,不接收請求
} state_t;
// 對話節點事件
typedef enum {
PEER_PRIMARY = 1, // 主機
PEER_BACKUP = 2, // 備機
PEER_ACTIVE = 3, // 激活態
PEER_PASSIVE = 4, // 被動態
CLIENT_REQUEST = 5 // 客戶端請求
} event_t;
// 發送狀態信息的間隔時間
// 如果對方在兩次心跳過后都沒有應答,則視為斷開
#define BSTAR_HEARTBEAT 1000 // In msecs
// 類結構
struct _bstar_t {
zctx_t *ctx; // 私有上下文
zloop_t *loop; // 反應堆循環
void *statepub; // 狀態發布者
void *statesub; // 狀態訂閱者
state_t state; // 當前狀態
event_t event; // 當前事件
int64_t peer_expiry; // 判定節點死亡的時限
zloop_fn *voter_fn; // 投票套接字處理器
void *voter_arg; // 投票處理程序的參數
zloop_fn *master_fn; // 成為master時回調
void *master_arg; // 參數
zloop_fn *slave_fn; // 成為slave時回調
void *slave_arg; // 參數
};
// ---------------------------------------------------------------------
// 執行有限狀態機(將事件綁定至狀態);
// 發生異常時返回-1,正確時返回0。
static int
s_execute_fsm (bstar_t *self)
{
int rc = 0;
// 主機等待同伴連接
// 該狀態下接收CLIENT_REQUEST事件
if (self->state == STATE_PRIMARY) {
if (self->event == PEER_BACKUP) {
zclock_log ("I: 已連接至備機(slave),可以作為master運行。");
self->state = STATE_ACTIVE;
if (self->master_fn)
(self->master_fn) (self->loop, NULL, self->master_arg);
}
else
if (self->event == PEER_ACTIVE) {
zclock_log ("I: 已連接至備機(master),可以作為slave運行。");
self->state = STATE_PASSIVE;
if (self->slave_fn)
(self->slave_fn) (self->loop, NULL, self->slave_arg);
}
else
if (self->event == CLIENT_REQUEST) {
zclock_log ("I: 收到客戶端請求,可作為master運行。");
self->state = STATE_ACTIVE;
if (self->master_fn)
(self->master_fn) (self->loop, NULL, self->master_arg);
}
}
else
// 備機等待同伴連接
// 該狀態下拒絕CLIENT_REQUEST事件
if (self->state == STATE_BACKUP) {
if (self->event == PEER_ACTIVE) {
zclock_log ("I: 已連接至主機(master),可以作為slave運行。");
self->state = STATE_PASSIVE;
if (self->slave_fn)
(self->slave_fn) (self->loop, NULL, self->slave_arg);
}
else
if (self->event == CLIENT_REQUEST)
rc = -1;
}
else
// 服務器處于激活態
// 該狀態下接受CLIENT_REQUEST事件
// 只有服務器死亡才會離開激活態
if (self->state == STATE_ACTIVE) {
if (self->event == PEER_ACTIVE) {
// 若出現兩臺master,則拋出異常
zclock_log ("E: 嚴重錯誤:雙master。正在退出。");
rc = -1;
}
}
else
// 服務器處于被動態
// 若同伴已死,CLIENT_REQUEST事件將觸發故障恢復
if (self->state == STATE_PASSIVE) {
if (self->event == PEER_PRIMARY) {
// 同伴正在重啟 - 轉為激活態,同伴將轉為被動態。
zclock_log ("I: 主機(slave)正在重啟,可作為master運行。");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_BACKUP) {
// 同伴正在重啟 - 轉為激活態,同伴將轉為被動態。
zclock_log ("I: 備機(slave)正在重啟,可作為master運行。");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_PASSIVE) {
// 若出現兩臺slave,集群將無響應
zclock_log ("E: 嚴重錯誤:雙slave。正在退出");
rc = -1;
}
else
if (self->event == CLIENT_REQUEST) {
// 若心跳超時,同伴將成為master;
// 此行為由客戶端請求觸發。
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
// 同伴已死,轉為激活態。
zclock_log ("I: 故障恢復,可作為master運行。");
self->state = STATE_ACTIVE;
}
else
// 同伴還在,拒絕請求。
rc = -1;
}
// 觸發狀態更改事件處理函數
if (self->state == STATE_ACTIVE && self->master_fn)
(self->master_fn) (self->loop, NULL, self->master_arg);
}
return rc;
}
// ---------------------------------------------------------------------
// 反應堆事件處理程序
// 發送狀態信息
int s_send_state (zloop_t *loop, void *socket, void *arg)
{
bstar_t *self = (bstar_t *) arg;
zstr_sendf (self->statepub, "%d", self->state);
return 0;
}
// 接收狀態信息,啟動有限狀態機
int s_recv_state (zloop_t *loop, void *socket, void *arg)
{
bstar_t *self = (bstar_t *) arg;
char *state = zstr_recv (socket);
if (state) {
self->event = atoi (state);
self->peer_expiry = zclock_time () + 2 * BSTAR_HEARTBEAT;
free (state);
}
return s_execute_fsm (self);
}
// 收到應用程序請求,判斷是否接收
int s_voter_ready (zloop_t *loop, void *socket, void *arg)
{
bstar_t *self = (bstar_t *) arg;
// 如果能夠處理請求,則調用函數
self->event = CLIENT_REQUEST;
if (s_execute_fsm (self) == 0) {
puts ("CLIENT REQUEST");
(self->voter_fn) (self->loop, socket, self->voter_arg);
}
else {
// 銷毀等待中的消息
zmsg_t *msg = zmsg_recv (socket);
zmsg_destroy (&msg);
}
return 0;
}
// ---------------------------------------------------------------------
// 構造函數
bstar_t *
bstar_new (int primary, char *local, char *remote)
{
bstar_t
*self;
self = (bstar_t *) zmalloc (sizeof (bstar_t));
// 初始化雙子星
self->ctx = zctx_new ();
self->loop = zloop_new ();
self->state = primary? STATE_PRIMARY: STATE_BACKUP;
// 創建狀態PUB套接字
self->statepub = zsocket_new (self->ctx, ZMQ_PUB);
zsocket_bind (self->statepub, local);
// 創建狀態SUB套接字
self->statesub = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_connect (self->statesub, remote);
// 設置基本的反應堆事件處理器
zloop_timer (self->loop, BSTAR_HEARTBEAT, 0, s_send_state, self);
zloop_reader (self->loop, self->statesub, s_recv_state, self);
return self;
}
// ---------------------------------------------------------------------
// 析構函數
void
bstar_destroy (bstar_t **self_p)
{
assert (self_p);
if (*self_p) {
bstar_t *self = *self_p;
zloop_destroy (&self->loop);
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 返回底層zloop對象,用以添加額外的定時器、閱讀器等。
zloop_t *
bstar_zloop (bstar_t *self)
{
return self->loop;
}
// ---------------------------------------------------------------------
// 創建套接字,連接至本地端點,注冊成為閱讀器;
// 只有當有限狀態機允許時才會讀取該套接字;
// 從該套接字獲得的消息將作為一次“投票”;
// 我們要求雙子星模式中只有一個“投票”套接字。
int
bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
void *arg)
{
// 保存原始的回調函數和參數,稍后使用
void *socket = zsocket_new (self->ctx, type);
zsocket_bind (socket, endpoint);
assert (!self->voter_fn);
self->voter_fn = handler;
self->voter_arg = arg;
return zloop_reader (self->loop, socket, s_voter_ready, self);
}
// ---------------------------------------------------------------------
// 注冊狀態變化事件處理器
void
bstar_new_master (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->master_fn);
self->master_fn = handler;
self->master_arg = arg;
}
void
bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->slave_fn);
self->slave_fn = handler;
self->slave_arg = arg;
}
// ---------------------------------------------------------------------
// 啟用或禁止跟蹤信息
void bstar_set_verbose (bstar_t *self, Bool verbose)
{
zloop_set_verbose (self->loop, verbose);
}
// ---------------------------------------------------------------------
// 開啟反應堆,當回調函數返回-1,或進程收到SIGINT、SIGTERM信號時中止。
int
bstar_start (bstar_t *self)
{
assert (self->voter_fn);
return zloop_start (self->loop);
}
```
這樣一來,我們的服務端代碼會變得非常簡短:
**bstarsrv2: Binary Star server, using core class in C**
```c
//
// 雙子星模式服務端,使用bstar反應堆
//
// 直接編譯,不建類庫
#include "bstar.c"
// Echo service
int s_echo (zloop_t *loop, void *socket, void *arg)
{
zmsg_t *msg = zmsg_recv (socket);
zmsg_send (&msg, socket);
return 0;
}
int main (int argc, char *argv [])
{
// 命令行參數可以為:
// -p 作為主機啟動, at tcp://localhost:5001
// -b 作為備機啟動, at tcp://localhost:5002
bstar_t *bstar;
if (argc == 2 && streq (argv [1], "-p")) {
printf ("I: 主機master,等待備機(slave)連接。\n");
bstar = bstar_new (BSTAR_PRIMARY,
"tcp://*:5003", "tcp://localhost:5004");
bstar_voter (bstar, "tcp://*:5001", ZMQ_ROUTER, s_echo, NULL);
}
else
if (argc == 2 && streq (argv [1], "-b")) {
printf ("I: 備機slave,等待主機(master)連接。\n");
bstar = bstar_new (BSTAR_BACKUP,
"tcp://*:5004", "tcp://localhost:5003");
bstar_voter (bstar, "tcp://*:5002", ZMQ_ROUTER, s_echo, NULL);
}
else {
printf ("Usage: bstarsrvs { -p | -b }\n");
exit (0);
}
bstar_start (bstar);
bstar_destroy (&bstar);
return 0;
}
```
### 無中間件的可靠性(自由者模式)
我們講了那么多關于中間件的示例,好像有些違背“ZMQ是無中間件”的說法。但要知道在現實生活中,中間件一直是讓人又愛又恨的東西。實踐中的很多消息架構能都在使用中間件進行分布式架構的搭建,所以說最終的決定還是需要你自己去權衡的。這也是為什么雖然我能駕車10分鐘到一個大型商場里購買五箱音量,但我還是會選擇走10分鐘到樓下的便利店里去買。這種出于經濟方面的考慮(時間、精力、成本等)不僅在日常生活中很常見,在軟件架構中也很重要。
這就是為什么ZMQ不會強制使用帶有中間件的架構,但仍提供了像內置裝置這樣的中間件供編程人員自由選用。
這一節我們會打破以往使用中間件進行可靠性設計的架構,轉而使用點對點架構,即自由者模式,來進行可靠的消息傳輸。我們的示例程序會是一個名稱解析服務。ZMQ中的一個常見問題是:我們如何得知需要連接的端點?在代碼中直接寫入TCP/IP地址肯定是不合適的;使用配置文件會造成管理上的不便。試想一下,你要在上百臺計算機中進行配置,只是為了讓它們知道google.com的IP地址是74.125.230.82。
一個ZMQ的名稱解析服務需要實現的功能有:
* 將邏輯名稱解析為一個或多個端點地址,包括綁定端和連接端。實際使用時,名稱服務會提供一組端點。
* 允許我們在不同的環境下,即開發環境和生產環境,進行解析;
* 該服務必須是可靠的,否則應用程序將無法連接到網絡。
為管家模式提供名稱解析服務會很有用,雖然將代理程序的端點對外暴露也很簡單,但是如果用好名稱解析服務,那它將成為唯一一個對外暴露的接口,將更便于管理。
我們需要處理的故障類型有:服務崩潰或重啟、服務過載、網絡因素等。為獲取可靠性,我們必須建立一個服務群,當某個服務端崩潰后,客戶端可以連接其他的服務端。實踐中,兩個服務端就已經足夠了,但事實上服務端的數量可以是任意個。

在這個架構中,大量客戶端和少量服務端進行通信,服務端將套接字綁定至單獨的端口,這和管家模式中的代理有很大不同。對于客戶端來說,它有這樣幾種選擇:
* 客戶端可以使用REQ套接字和懶惰海盜模式,但需要有一個機制防止客戶端不斷地請求已停止的服務端。
* 客戶端可以使用DEALER套接字,向所有的服務端發送請求。很簡單,但并不太妙;
* 客戶端使用ROUTER套接字,連接特定的服務端。但客戶端如何得知服務端的套接字標識呢?一種方式是讓服務端主動連接客戶端(很復雜),或者將服務端標識寫入代碼進行固化(很混亂)。
#### 模型一:簡單重試
讓我們先嘗試簡單的方案,重寫懶惰海盜模式,讓其能夠和多個服務端進行通信。啟動服務端時用命令行參數指定端口。然后啟動多個服務端。
**flserver1: Freelance server, Model One in C**
```c
//
// 自由者模式 - 服務端 - 模型1
// 提供echo服務
//
#include "czmq.h"
int main (int argc, char *argv [])
{
if (argc < 2) {
printf ("I: syntax: %s <endpoint>\n", argv [0]);
exit (EXIT_SUCCESS);
}
zctx_t *ctx = zctx_new ();
void *server = zsocket_new (ctx, ZMQ_REP);
zsocket_bind (server, argv [1]);
printf ("I: echo服務端點: %s\n", argv [1]);
while (TRUE) {
zmsg_t *msg = zmsg_recv (server);
if (!msg)
break; // 中斷
zmsg_send (&msg, server);
}
if (zctx_interrupted)
printf ("W: 中斷\n");
zctx_destroy (&ctx);
return 0;
}
```
啟動客戶端,指定一個或多個端點:
**flclient1: Freelance client, Model One in C**
```c
//
// 自由者模式 - 客戶端 - 模型1
// 使用REQ套接字請求一個或多個服務端
//
#include "czmq.h"
#define REQUEST_TIMEOUT 1000
#define MAX_RETRIES 3 // 嘗試次數
static zmsg_t *
s_try_request (zctx_t *ctx, char *endpoint, zmsg_t *request)
{
printf ("I: 在端點 %s 上嘗試請求echo服務...\n", endpoint);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, endpoint);
// 發送請求,并等待應答
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, client);
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
zmsg_t *reply = NULL;
if (items [0].revents & ZMQ_POLLIN)
reply = zmsg_recv (client);
// 關閉套接字
zsocket_destroy (ctx, client);
return reply;
}
int main (int argc, char *argv [])
{
zctx_t *ctx = zctx_new ();
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "Hello world");
zmsg_t *reply = NULL;
int endpoints = argc - 1;
if (endpoints == 0)
printf ("I: syntax: %s <endpoint> ...\n", argv [0]);
else
if (endpoints == 1) {
// 若只有一個端點,則嘗試N次
int retries;
for (retries = 0; retries < MAX_RETRIES; retries++) {
char *endpoint = argv [1];
reply = s_try_request (ctx, endpoint, request);
if (reply)
break; // 成功
printf ("W: 沒有收到 %s 的應答, 準備重試...\n", endpoint);
}
}
else {
// 若有多個端點,則每個嘗試一次
int endpoint_nbr;
for (endpoint_nbr = 0; endpoint_nbr < endpoints; endpoint_nbr++) {
char *endpoint = argv [endpoint_nbr + 1];
reply = s_try_request (ctx, endpoint, request);
if (reply)
break; // Successful
printf ("W: 沒有收到 %s 的應答\n", endpoint);
}
}
if (reply)
printf ("服務運作正常\n");
zmsg_destroy (&request);
zmsg_destroy (&reply);
zctx_destroy (&ctx);
return 0;
}
```
可用如下命令運行:
```
flserver1 tcp://*:5555 &
flserver1 tcp://*:5556 &
flclient1 tcp://localhost:5555 tcp://localhost:5556
```
客戶端的核心機制是懶惰海盜模式,即獲得一次成功的應答后就結束。會有兩種情況:
* 如果只有一個服務端,客戶端會再嘗試N次后停止,這和懶惰海盜模式的邏輯一致;
* 如果有多個服務端,客戶端會每個嘗試一次,收到應答后停止。
這種機制補充了海盜模式,使其能夠克服只有一個服務端的情況。
但是,這種設計無法在現實程序中使用:當有很多客戶端連接了服務端,而主服務端崩潰了,那所有客戶端都需要在超時后才能繼續執行。
#### 模型二:批量發送
下面讓我們使用DEALER套接字。我們的目標是能再最短的時間里收到一個應答,不能受主服務端崩潰的影響。可以采取以下措施:
* 連接所有的服務端;
* 當有請求時,一次性發送給所有的服務端;
* 等待第一個應答;
* 忽略其他應答。
這樣設計客戶端時,當發送請求后,所有的服務端都會收到這個請求,并返回應答。如果某個服務端斷開連接了,ZMQ可能會將請求發給其他服務端,導致某些服務端會收到兩次請求。
更麻煩的是客戶端無法得知應答的數量,容易發生混亂。
我們可以為請求進行編號,忽略不匹配的應答。我們要對服務端進行改造,返回的消息中需要包含請求編號:
**flserver2: Freelance server, Model Two in C**
```c
//
// 自由者模式 - 服務端 - 模型2
// 返回帶有請求編號的OK信息
//
#include "czmq.h"
int main (int argc, char *argv [])
{
if (argc < 2) {
printf ("I: syntax: %s <endpoint>\n", argv [0]);
exit (EXIT_SUCCESS);
}
zctx_t *ctx = zctx_new ();
void *server = zsocket_new (ctx, ZMQ_REP);
zsocket_bind (server, argv [1]);
printf ("I: 服務已就緒 %s\n", argv [1]);
while (TRUE) {
zmsg_t *request = zmsg_recv (server);
if (!request)
break; // 中斷
// 判斷請求內容是否正確
assert (zmsg_size (request) == 2);
zframe_t *address = zmsg_pop (request);
zmsg_destroy (&request);
zmsg_t *reply = zmsg_new ();
zmsg_add (reply, address);
zmsg_addstr (reply, "OK");
zmsg_send (&reply, server);
}
if (zctx_interrupted)
printf ("W: interrupted\n");
zctx_destroy (&ctx);
return 0;
}
```
客戶端代碼:
**flclient2: Freelance client, Model Two in C**
```c
//
// 自由者模式 - 客戶端 - 模型2
// 使用DEALER套接字發送批量消息
//
#include "czmq.h"
// 超時時間
#define GLOBAL_TIMEOUT 2500
// 將客戶端API封裝成一個類
#ifdef __cplusplus
extern "C" {
#endif
// 聲明類結構
typedef struct _flclient_t flclient_t;
flclient_t *
flclient_new (void);
void
flclient_destroy (flclient_t **self_p);
void
flclient_connect (flclient_t *self, char *endpoint);
zmsg_t *
flclient_request (flclient_t *self, zmsg_t **request_p);
#ifdef __cplusplus
}
#endif
int main (int argc, char *argv [])
{
if (argc == 1) {
printf ("I: syntax: %s <endpoint> ...\n", argv [0]);
exit (EXIT_SUCCESS);
}
// 創建自由者模式客戶端
flclient_t *client = flclient_new ();
// 連接至各個端點
int argn;
for (argn = 1; argn < argc; argn++)
flclient_connect (client, argv [argn]);
// 發送一組請求,并記錄時間
int requests = 10000;
uint64_t start = zclock_time ();
while (requests--) {
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "random name");
zmsg_t *reply = flclient_request (client, &request);
if (!reply) {
printf ("E: 名稱解析服務不可用,正在退出\n");
break;
}
zmsg_destroy (&reply);
}
printf ("平均請求時間: %d 微秒\n",
(int) (zclock_time () - start) / 10);
flclient_destroy (&client);
return 0;
}
// --------------------------------------------------------------------
// 類結構
struct _flclient_t {
zctx_t *ctx; // 上下文
void *socket; // 用于和服務端通信的DEALER套接字
size_t servers; // 以連接的服務端數量
uint sequence; // 已發送的請求數
};
// --------------------------------------------------------------------
// Constructor
flclient_t *
flclient_new (void)
{
flclient_t
*self;
self = (flclient_t *) zmalloc (sizeof (flclient_t));
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_DEALER);
return self;
}
// --------------------------------------------------------------------
// 析構函數
void
flclient_destroy (flclient_t **self_p)
{
assert (self_p);
if (*self_p) {
flclient_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}
// --------------------------------------------------------------------
// 連接至新的服務端端點
void
flclient_connect (flclient_t *self, char *endpoint)
{
assert (self);
zsocket_connect (self->socket, endpoint);
self->servers++;
}
// --------------------------------------------------------------------
// 發送請求,接收應答
// 發送后銷毀請求
zmsg_t *
flclient_request (flclient_t *self, zmsg_t **request_p)
{
assert (self);
assert (*request_p);
zmsg_t *request = *request_p;
// 向消息添加編號和空幀
char sequence_text [10];
sprintf (sequence_text, "%u", ++self->sequence);
zmsg_pushstr (request, sequence_text);
zmsg_pushstr (request, "");
// 向所有已連接的服務端發送請求
int server;
for (server = 0; server < self->servers; server++) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->socket);
}
// 接收來自任何服務端的應答
// 因為我們可能poll多次,所以每次都進行計算
zmsg_t *reply = NULL;
uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT;
while (zclock_time () < endtime) {
zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
// 應答內容是 [empty][sequence][OK]
reply = zmsg_recv (self->socket);
assert (zmsg_size (reply) == 3);
free (zmsg_popstr (reply));
char *sequence = zmsg_popstr (reply);
int sequence_nbr = atoi (sequence);
free (sequence);
if (sequence_nbr == self->sequence)
break;
}
}
zmsg_destroy (request_p);
return reply;
}
```
幾點說明:
* 客戶端被封裝成了一個API類,將復雜的代碼都包裝了起來。
* 客戶端會在幾秒之后放棄尋找可用的服務端;
* 客戶端需要創建一個合法的REP信封,所以需要添加一個空幀。
程序中,客戶端發出了1萬次名稱解析請求(雖然是假的),并計算平均耗費時間。在我的測試機上,有一個服務端時,耗時60微妙;三個時80微妙。
該模型的優缺點是:
* 優點:簡單,容易理解和編寫;
* 優點:它工作迅速,有重試機制;
* 缺點:占用了額外的網絡帶寬;
* 缺點:我們不能為服務端設置優先級,如主服務、次服務等;
* 缺點:服務端不能同時處理多個請求。
#### Model Three - Complex and Nasty
批量發送模型看起來不太真實,那就讓我們來探索最后這個極度復雜的模型。很有可能在編寫完之后我們又會轉而使用批量發送,哈哈,這就是我的作風。
我們可以將客戶端使用的套接字更換為ROUTER,讓我們能夠向特定的服務端發送請求,停止向已死亡的服務端發送請求,從而做得盡可能地智能。我們還可以將服務端的套接字更換為ROUTER,從而突破單線程的瓶頸。
但是,使用ROUTER-ROUTER套接字連接兩個瞬時套接字是不可行的,節點只有在收到第一條消息時才會為對方生成套接字標識。唯一的方法是讓其中一個節點使用持久化的套接字,比較好的方式是讓客戶端知道服務端的標識,即服務端作為持久化的套接字。
為了避免產生新的配置項,我們直接使用服務端的端點作為套接字標識。
回想一下ZMQ套接字標識是如何工作的。服務端的ROUTER套接字為自己設置一個標識(在綁定之前),當客戶端連接時,通過一個握手的過程來交換雙方的標識。客戶端的ROUTER套接字會先發送一條空消息,服務端為客戶端生成一個隨機的UUID。然后,服務端會向客戶端發送自己的標識。
這樣一來,客戶端就可以將消息發送給特定的服務端了。不過還有一個問題:我們不知道服務端會在什么時候完成這個握手的過程。如果服務端是在線的,那可能幾毫秒就能完成。如果不在線,那可能需要很久很久。
這里有一個矛盾:我們需要知道服務端何時連接成功且能夠開始工作。自由者模式不像中間件模式,它的服務端必須要先發送請求后才能的應答。所以在服務端發送消息給客戶端之前,客戶端必須要先請求服務端,這看似是不可能的。
我有一個解決方法,那就是批量發送。這里發送的不是真正的請求,而是一個試探性的心跳(PING-PONG)。當收到應答時,就說明對方是在線的。
下面讓我們制定一個協議,來定義自由者模式是如何傳遞這種心跳的:
* http://rfc.zeromq.org/spec:10
實現這個協議的服務端很方便,下面就是經過改造的echo服務:
**flserver3: Freelance server, Model Three in C**
```c
//
// 自由者模式 - 服務端 - 模型3
// 使用ROUTER-ROUTER套接字進行通信;單線程。
//
#include "czmq.h"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
zctx_t *ctx = zctx_new ();
// 準備服務端套接字,其標識和端點名相同
char *bind_endpoint = "tcp://*:5555";
char *connect_endpoint = "tcp://localhost:5555";
void *server = zsocket_new (ctx, ZMQ_ROUTER);
zmq_setsockopt (server,
ZMQ_IDENTITY, connect_endpoint, strlen (connect_endpoint));
zsocket_bind (server, bind_endpoint);
printf ("I: 服務端已準備就緒 %s\n", bind_endpoint);
while (!zctx_interrupted) {
zmsg_t *request = zmsg_recv (server);
if (verbose && request)
zmsg_dump (request);
if (!request)
break; // 中斷
// Frame 0: 客戶端標識
// Frame 1: 心跳,或客戶端控制信息幀
// Frame 2: 請求內容
zframe_t *address = zmsg_pop (request);
zframe_t *control = zmsg_pop (request);
zmsg_t *reply = zmsg_new ();
if (zframe_streq (control, "PONG"))
zmsg_addstr (reply, "PONG");
else {
zmsg_add (reply, control);
zmsg_addstr (reply, "OK");
}
zmsg_destroy (&request);
zmsg_push (reply, address);
if (verbose && reply)
zmsg_dump (reply);
zmsg_send (&reply, server);
}
if (zctx_interrupted)
printf ("W: 中斷\n");
zctx_destroy (&ctx);
return 0;
}
```
但是,自由者模式的客戶端會變得大一寫。為了清晰期間,我們將其拆分為兩個類來實現。首先是在上層使用的程序:
**flclient3: Freelance client, Model Three in C**
```c
//
// 自由者模式 - 客戶端 - 模型3
// 使用flcliapi類來封裝自由者模式
//
// 直接編譯,不建類庫
#include "flcliapi.c"
int main (void)
{
// 創建自由者模式實例
flcliapi_t *client = flcliapi_new ();
// 鏈接至服務器端點
flcliapi_connect (client, "tcp://localhost:5555");
flcliapi_connect (client, "tcp://localhost:5556");
flcliapi_connect (client, "tcp://localhost:5557");
// 發送隨機請求,計算時間
int requests = 1000;
uint64_t start = zclock_time ();
while (requests--) {
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "random name");
zmsg_t *reply = flcliapi_request (client, &request);
if (!reply) {
printf ("E: 名稱解析服務不可用,正在退出\n");
break;
}
zmsg_destroy (&reply);
}
printf ("平均執行時間: %d usec\n",
(int) (zclock_time () - start) / 10);
flcliapi_destroy (&client);
return 0;
}
```
下面是該模式復雜的實現過程:
**flcliapi: Freelance client API in C**
```c
/* =====================================================================
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "flcliapi.h"
// 請求超時時間
#define GLOBAL_TIMEOUT 3000 // msecs
// 心跳間隔
#define PING_INTERVAL 2000 // msecs
// 判定服務死亡的時間
#define SERVER_TTL 6000 // msecs
// =====================================================================
// 同步部分,在應用程序層面運行
// ---------------------------------------------------------------------
// 類結構
struct _flcliapi_t {
zctx_t *ctx; // 上下文
void *pipe; // 用于和主線程通信的套接字
};
// 這是運行后臺代理程序的線程
static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe);
// ---------------------------------------------------------------------
// 構造函數
flcliapi_t *
flcliapi_new (void)
{
flcliapi_t
*self;
self = (flcliapi_t *) zmalloc (sizeof (flcliapi_t));
self->ctx = zctx_new ();
self->pipe = zthread_fork (self->ctx, flcliapi_agent, NULL);
return self;
}
// ---------------------------------------------------------------------
// 析構函數
void
flcliapi_destroy (flcliapi_t **self_p)
{
assert (self_p);
if (*self_p) {
flcliapi_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 連接至新服務器端點
// 消息內容:[CONNECT][endpoint]
void
flcliapi_connect (flcliapi_t *self, char *endpoint)
{
assert (self);
assert (endpoint);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "CONNECT");
zmsg_addstr (msg, endpoint);
zmsg_send (&msg, self->pipe);
zclock_sleep (100); // 等待連接
}
// ---------------------------------------------------------------------
// 發送并銷毀請求,接收應答
zmsg_t *
flcliapi_request (flcliapi_t *self, zmsg_t **request_p)
{
assert (self);
assert (*request_p);
zmsg_pushstr (*request_p, "REQUEST");
zmsg_send (request_p, self->pipe);
zmsg_t *reply = zmsg_recv (self->pipe);
if (reply) {
char *status = zmsg_popstr (reply);
if (streq (status, "FAILED"))
zmsg_destroy (&reply);
free (status);
}
return reply;
}
// =====================================================================
// 異步部分,在后臺運行
// ---------------------------------------------------------------------
// 單個服務端信息
typedef struct {
char *endpoint; // 服務端端點/套接字標識
uint alive; // 是否在線
int64_t ping_at; // 下一次心跳時間
int64_t expires; // 過期時間
} server_t;
server_t *
server_new (char *endpoint)
{
server_t *self = (server_t *) zmalloc (sizeof (server_t));
self->endpoint = strdup (endpoint);
self->alive = 0;
self->ping_at = zclock_time () + PING_INTERVAL;
self->expires = zclock_time () + SERVER_TTL;
return self;
}
void
server_destroy (server_t **self_p)
{
assert (self_p);
if (*self_p) {
server_t *self = *self_p;
free (self->endpoint);
free (self);
*self_p = NULL;
}
}
int
server_ping (char *key, void *server, void *socket)
{
server_t *self = (server_t *) server;
if (zclock_time () >= self->ping_at) {
zmsg_t *ping = zmsg_new ();
zmsg_addstr (ping, self->endpoint);
zmsg_addstr (ping, "PING");
zmsg_send (&ping, socket);
self->ping_at = zclock_time () + PING_INTERVAL;
}
return 0;
}
int
server_tickless (char *key, void *server, void *arg)
{
server_t *self = (server_t *) server;
uint64_t *tickless = (uint64_t *) arg;
if (*tickless > self->ping_at)
*tickless = self->ping_at;
return 0;
}
// ---------------------------------------------------------------------
// 后臺處理程序信息
typedef struct {
zctx_t *ctx; // 上下文
void *pipe; // 用于應用程序通信的套接字
void *router; // 用于服務端通信的套接字
zhash_t *servers; // 已連接的服務端
zlist_t *actives; // 在線的服務端
uint sequence; // 請求編號
zmsg_t *request; // 當前請求
zmsg_t *reply; // 當前應答
int64_t expires; // 請求過期時間
} agent_t;
agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
self->servers = zhash_new ();
self->actives = zlist_new ();
return self;
}
void
agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
zhash_destroy (&self->servers);
zlist_destroy (&self->actives);
zmsg_destroy (&self->request);
zmsg_destroy (&self->reply);
free (self);
*self_p = NULL;
}
}
// 當服務端從列表中移除時,回調該函數。
static void
s_server_free (void *argument)
{
server_t *server = (server_t *) argument;
server_destroy (&server);
}
void
agent_control_message (agent_t *self)
{
zmsg_t *msg = zmsg_recv (self->pipe);
char *command = zmsg_popstr (msg);
if (streq (command, "CONNECT")) {
char *endpoint = zmsg_popstr (msg);
printf ("I: connecting to %s...\n", endpoint);
int rc = zmq_connect (self->router, endpoint);
assert (rc == 0);
server_t *server = server_new (endpoint);
zhash_insert (self->servers, endpoint, server);
zhash_freefn (self->servers, endpoint, s_server_free);
zlist_append (self->actives, server);
server->ping_at = zclock_time () + PING_INTERVAL;
server->expires = zclock_time () + SERVER_TTL;
free (endpoint);
}
else
if (streq (command, "REQUEST")) {
assert (!self->request); // 遵循請求-應答循環
// 將請求編號和空幀加入消息頂部
char sequence_text [10];
sprintf (sequence_text, "%u", ++self->sequence);
zmsg_pushstr (msg, sequence_text);
// 獲取請求消息的所有權
self->request = msg;
msg = NULL;
// 設置請求過期時間
self->expires = zclock_time () + GLOBAL_TIMEOUT;
}
free (command);
zmsg_destroy (&msg);
}
void
agent_router_message (agent_t *self)
{
zmsg_t *reply = zmsg_recv (self->router);
// 第一幀是應答的服務端標識
char *endpoint = zmsg_popstr (reply);
server_t *server =
(server_t *) zhash_lookup (self->servers, endpoint);
assert (server);
free (endpoint);
if (!server->alive) {
zlist_append (self->actives, server);
server->alive = 1;
}
server->ping_at = zclock_time () + PING_INTERVAL;
server->expires = zclock_time () + SERVER_TTL;
// 第二幀是應答的編號
char *sequence = zmsg_popstr (reply);
if (atoi (sequence) == self->sequence) {
zmsg_pushstr (reply, "OK");
zmsg_send (&reply, self->pipe);
zmsg_destroy (&self->request);
}
else
zmsg_destroy (&reply);
}
// ---------------------------------------------------------------------
// 異步的后臺代理會維護一個服務端池,處理請求和應答。
static void
flcliapi_agent (void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new (ctx, pipe);
zmq_pollitem_t items [] = {
{ self->pipe, 0, ZMQ_POLLIN, 0 },
{ self->router, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
// 計算超時時間
uint64_t tickless = zclock_time () + 1000 * 3600;
if (self->request
&& tickless > self->expires)
tickless = self->expires;
zhash_foreach (self->servers, server_tickless, &tickless);
int rc = zmq_poll (items, 2,
(tickless - zclock_time ()) * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 上下文對象被關閉
if (items [0].revents & ZMQ_POLLIN)
agent_control_message (self);
if (items [1].revents & ZMQ_POLLIN)
agent_router_message (self);
// 如果我們需要處理一項請求,將其發送給下一個可用的服務端
if (self->request) {
if (zclock_time () >= self->expires) {
// 請求超時
zstr_send (self->pipe, "FAILED");
zmsg_destroy (&self->request);
}
else {
// 尋找可用的服務端
while (zlist_size (self->actives)) {
server_t *server =
(server_t *) zlist_first (self->actives);
if (zclock_time () >= server->expires) {
zlist_pop (self->actives);
server->alive = 0;
}
else {
zmsg_t *request = zmsg_dup (self->request);
zmsg_pushstr (request, server->endpoint);
zmsg_send (&request, self->router);
break;
}
}
}
}
// 斷開并刪除已過期的服務端
// 發送心跳給空閑服務器
zhash_foreach (self->servers, server_ping, self->router);
}
agent_destroy (&self);
}
```
這組API使用了較為復雜的機制,我們之前也有用到過:
**異步后臺代理**
客戶端API由兩部分組成:同步的flcliapi類,運行于應用程序線程;異步的agent類,運行于后臺線程。flcliapi和agent類通過一個inproc套接字互相通信。所有和ZMQ相關的內容都封裝在API中。agent類實質上是作為一個迷你的代理程序在運行,負責在后臺與服務端進行通信,只要我們發送請求,它就會設法連接一個服務器來處理請求。
**連接等待機制**
ROUTER套接字的特點之一是會直接丟棄無法路由的消息,這就意味著當與服務器建立了ROUTER-ROUTER連接后,如果立刻發送一條消息,該消息是會丟失的。flcliapi類則延遲了一會兒后再發送消息。之后的通信中,由于服務端套接字是持久的,客戶端就不再丟棄消息了。
**Ping silence**
0MQ will queue messages for a dead server indefinitely. So if a client repeatedly PINGs a dead server, when that server comes back to life it'll get a whole bunch of PING messages all at once. Rather than continuing to ping a server we know is offline, we count on 0MQ's handling of durable sockets to deliver the old PING messages when the server comes back online. As soon as a server reconnects, it'll get PINGs from all clients that were connected to it, it'll PONG back, and those clients will recognize it as alive again.
**調整輪詢時間**
在之前的示例程序中,我們一般會為輪詢設置固定的超時時間(如1秒),這種做法雖然簡單,但是對于用電較為敏感的設備來說(如筆記本電腦或手機)喚醒CPU是需要額外的電力的。所以,為了完美也好,好玩也好,我們這里調整了輪詢時間,將其設置為到達過期時間時才超時,這樣就能節省一部分輪詢次數了。我們可以將過期時間放入一個列表中存儲,方便查詢。
### 總結
這一章中我們看到了很多可靠的請求-應答機制,每種機制都有其優劣性。大部分示例代碼是可以直接用于生產環境的,不過還可以進一步優化。有兩個模式會比較典型:使用了中間件的管家模式,以及未使用中間件的自由者模式。