## 第五章 高級發布-訂閱模式
第三章和第四章講述了ZMQ中請求-應答模式的一些高級用法。如果你已經能夠徹底理解了,那我要說聲恭喜。這一章我們會關注發布-訂閱模式,使用上層模式封裝,提升ZMQ發布-訂閱模式的性能、可靠性、狀態同步及安全機制。
本章涉及的內容有:
* 處理慢訂閱者(自殺的蝸牛模式)
* 高速訂閱者(黑箱模式)
* 構建一個共享鍵值緩存(克隆模式)
### 檢測慢訂閱者(自殺的蝸牛模式)
在使用發布-訂閱模式的時候,最常見的問題之一是如何處理響應較慢的訂閱者。理想狀況下,發布者能以全速發送消息給訂閱者,但現實中,訂閱者會需要對消息做較長時間的處理,或者寫得不夠好,無法跟上發布者的腳步。
如何處理慢訂閱者?最好的方法當然是讓訂閱者高效起來,不過這需要額外的工作。以下是一些處理慢訂閱者的方法:
* **在發布者中貯存消息**。這是Gmail的做法,如果過去的幾小時里沒有閱讀郵件的話,它會把郵件保存起來。但在高吞吐量的應用中,發布者堆積消息往往會導致內存溢出,最終崩潰。特別是當同是有多個訂閱者時,或者無法用磁盤來做一個緩沖,情況就會變得更為復雜。
* **在訂閱者中貯存消息**。這種做法要好的多,其實ZMQ默認的行為就是這樣的。如果非得有一個人會因為內存溢出而崩潰,那也只會是訂閱者,而非發布者,這挺公平的。然而,這種做法只對瞬間消息量很大的應用才合理,訂閱者只是一時處理不過來,但最終會趕上進度。但是,這還是沒有解決訂閱者速度過慢的問題。
* **暫停發送消息**。這也是Gmail的做法,當我的郵箱容量超過7.554GB時,新的郵件就會被Gmail拒收或丟棄。這種做法對發布者來說很有益,ZMQ中若設置了閾值(HWM),其默認行為也就是這樣的。但是,我們仍不能解決慢訂閱者的問題,我們只是讓消息變得斷斷續續而已。
* **斷開與滿訂閱者的連接**。這是hotmail的做法,如果連續兩周沒有登錄,它就會斷開,這也是為什么我正在使用第十五個hotmail郵箱。不過這種方案在ZMQ里是行不通的,因為對于發布者而言,訂閱者是不可見的,無法做相應處理。
看來沒有一種經典的方式可以滿足我們的需求,所以我們就要進行創新了。我們可以讓訂閱者自殺,而不僅僅是斷開連接。這就是“自殺的蝸牛”模式。當訂閱者發現自身運行得過慢時(對于慢速的定義應該是一個配置項,當達到這個標準時就大聲地喊出來吧,讓程序員知道),它會哀嚎一聲,然后自殺。
訂閱者如何檢測自身速度過慢呢?一種方式是為消息進行編號,并在發布者端設置閾值。當訂閱者發現消息編號不連續時,它就知道事情不對勁了。這里的閾值就是訂閱者自殺的值。
這種方案有兩個問題:一、如果我們連接的多個發布者,我們要如何為消息進行編號呢?解決方法是為每一個發布者設定一個唯一的編號,作為消息編號的一部分。二、如果訂閱者使用ZMQ_SUBSRIBE選項對消息進行了過濾,那么我們精心設計的消息編號機制就毫無用處了。
有些情形不會進行消息的過濾,所以消息編號還是行得通的。不過更為普遍的解決方案是,發布者為消息標注時間戳,當訂閱者收到消息時會檢測這個時間戳,如果其差別達到某一個值,就發出警報并自殺。
當訂閱者有自身的客戶端或服務協議,需要保證最大延遲時間時,自殺的蝸牛模式會很合適。撤銷一個訂閱者也許并不是最周全的方案,但至少不會引發后續的問題。如果訂閱者收到了過時的消息,那可能會對數據造成進一步的破壞,而且很難被發現。
以下是自殺的蝸牛模式的最簡實現:
**suisnail: Suicidal Snail in C**
```c
//
// 自殺的蝸牛模式
//
#include "czmq.h"
// ---------------------------------------------------------------------
// 該訂閱者會連接至發布者,接收所有的消息,
// 運行過程中它會暫停一會兒,模擬復雜的運算過程,
// 當發現收到的消息超過1秒的延遲時,就自殺。
#define MAX_ALLOWED_DELAY 1000 // 毫秒
static void
subscriber (void *args, zctx_t *ctx, void *pipe)
{
// 訂閱所有消息
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:5556");
// 獲取并處理消息
while (1) {
char *string = zstr_recv (subscriber);
int64_t clock;
int terms = sscanf (string, "%" PRId64, &clock);
assert (terms == 1);
free (string);
// 自殺邏輯
if (zclock_time () - clock > MAX_ALLOWED_DELAY) {
fprintf (stderr, "E: 訂閱者無法跟進, 取消中\n");
break;
}
// 工作一定時間
zclock_sleep (1 + randof (2));
}
zstr_send (pipe, "訂閱者中止");
}
// ---------------------------------------------------------------------
// 發布者每毫秒發送一條用時間戳標記的消息
static void
publisher (void *args, zctx_t *ctx, void *pipe)
{
// 準備發布者
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");
while (1) {
// 發送當前時間(毫秒)給訂閱者
char string [20];
sprintf (string, "%" PRId64, zclock_time ());
zstr_send (publisher, string);
char *signal = zstr_recv_nowait (pipe);
if (signal) {
free (signal);
break;
}
zclock_sleep (1); // 等待1毫秒
}
}
// 下面的代碼會啟動一個訂閱者和一個發布者,當訂閱者死亡時停止運行
//
int main (void)
{
zctx_t *ctx = zctx_new ();
void *pubpipe = zthread_fork (ctx, publisher, NULL);
void *subpipe = zthread_fork (ctx, subscriber, NULL);
free (zstr_recv (subpipe));
zstr_send (pubpipe, "break");
zclock_sleep (100);
zctx_destroy (&ctx);
return 0;
}
```
幾點說明:
* 示例程序中的消息包含了系統當前的時間戳(毫秒)。在現實應用中,你應該使用時間戳作為消息頭,并提供消息內容。
* 示例程序中的發布者和訂閱者是同一個進程的兩個線程。在現實應用中,他們應該是兩個不同的進程。示例中這么做只是為了演示的方便
### 高速訂閱者(黑箱模式)
發布-訂閱模式的一個典型應用場景是大規模分布式數據處理。如要處理從證券市場上收集到的數據,可以在證券交易系統上設置一個發布者,獲取價格信息,并發送給一組訂閱者。如果我們有很多訂閱者,我們可以使用TCP。如果訂閱者到達一定的量,那我們就應該使用可靠的廣播協議,如pgm。
假設我們的發布者每秒產生10萬條100個字節的消息。在剔除了不需要的市場信息后,這個比率還是比較合理的。現在我們需要記錄一天的數據(8小時約有250GB),再將其傳入一個模擬網絡,即一組訂閱者。雖然10萬條數據對ZMQ來說很容易處理,但我們需要更高的速度。
假設我們有多臺機器,一臺做發布者,其他的做訂閱者。這些機器都是8核的,發布者那臺有12核。
在我們開始發布消息時,有兩點需要注意:
1. 即便只是處理很少的數據,訂閱者仍有可能跟不上發布者的速度;
1. 當處理到6M/s的數據量時,發布者和訂閱者都有可能達到極限。
首先,我們需要將訂閱者設計為一種多線程的處理程序,這樣我們就能在一個線程中讀取消息,使用其他線程來處理消息。一般來說,我們對每種消息的處理方式都是不同的。這樣一來,訂閱者可以對收到的消息進行一次過濾,如根據頭信息來判別。當消息滿足某些條件,訂閱者會將消息交給worker處理。用ZMQ的語言來說,訂閱者會將消息轉發給worker來處理。
這樣一來,訂閱者看上去就像是一個隊列裝置,我們可以用各種方式去連接隊列裝置和worker。如我們建立單向的通信,每個worker都是相同的,可以使用PUSH和PULL套接字,分發的工作就交給ZMQ吧。這是最簡單也是最快速的方式:

訂閱者和發布者之間的通信使用TCP或PGM協議,訂閱者和worker的通信由于是在同一個進程中完成的,所以使用inproc協議。
下面我們看看如何突破瓶頸。由于訂閱者是單線程的,當它的CPU占用率達到100%時,它無法使用其他的核心。單線程程序總是會遇到瓶頸的,不管是2M、6M還是更多。我們需要將工作量分配到不同的線程中去,并發地執行。
很多高性能產品使用的方案是分片,就是將工作量拆分成獨立并行的流。如,一半的專題數據由一個流媒體傳輸,另一半由另一個流媒體傳輸。我們可以建立更多的流媒體,但如果CPU核心數不變,那就沒有必要了。
讓我們看看如何將工作量分片為兩個流:

要讓兩個流全速工作,需要這樣配置ZMQ:
* 使用兩個I/O線程,而不是一個;
* 使用兩個獨立的網絡接口;
* 每個I/O線程綁定至一個網絡接口;
* 兩個訂閱者線程,分別綁定至一個核心;
* 使用兩個SUB套接字;
* 剩余的核心供worker使用;
* worker線程同時綁定至兩個訂閱者線程的PUSH套接字。
創建的線程數量應和CPU核心數一致,如果我們建立的線程數量超過核心數,那其處理速度只會減少。另外,開放多個I/O線程也是沒有必要的。
### 共享鍵值緩存(克隆模式)
發布-訂閱模式和無線電廣播有些類似,在你收聽之前發送的消息你將無從得知,收到消息的多少又會取決于你的接收能力。讓人吃驚的是,對于那些追求完美的工程師來說,這種機器恰恰符合他們的需求,且廣為傳播,成為現實生活中分發消息的最佳機制。想想非死不可、推特、BBS新聞、體育新聞等應用就知道了。
但是,在很多情形下,可靠的發布-訂閱模式同樣是有價值的。正如我們討論請求-應答模式一樣,我們會根據“故障”來定義“可靠性”,下面幾項便是發布-訂閱模式中可能發生的故障:
* 訂閱者連接太慢,因此沒有收到發布者最初發送的消息;
* 訂閱者速度太慢,同樣會丟失消息;
* 訂閱者可能會斷開,其間的消息也會丟失。
還有一些情況我們碰到的比較少,但不是沒有:
* 訂閱者崩潰、重啟,從而丟失了所有已收到的消息;
* 訂閱者處理消息的速度過慢,導致消息在隊列中堆砌并溢出;
* 因網絡過載而丟失消息(特別是PGM協議下的連接);
* 網速過慢,消息在發布者處溢出,從而崩潰。
其實還會有其他出錯的情況,只是以上這些在現實應用中是比較典型的。
我們已經有方法解決上面的某些問題了,比如對于慢速訂閱者可以使用自殺的蝸牛模式。但是,對于其他的問題,我們最后能有一個可復用的框架來編寫可靠的發布-訂閱模式。
難點在于,我們并不知道目標應用程序會怎樣處理這些數據。它們會進行過濾、只處理一部分消息嗎?它們是否會將消息記錄起來供日后使用?它們是否會將消息轉發給其下的worker進行處理?需要考慮的情況實在太多了,每種情況都有其所謂的可靠性。
所以,我們將問題抽象出來,供多種應用程序使用。這種抽象應用我們稱之為共享的鍵值緩存,它的功能是通過唯一的鍵名存儲二進制數據塊。
不要將這個抽象應用和分布式哈希表混淆起來,它是用來解決節點在分布式網絡中相連接的問題的;也不要和分布式鍵值表混淆,它更像是一個NoSQL數據庫。我們要建立的應用是將內存中的狀態可靠地傳遞給一組客戶端,它要做到的是:
* 客戶端可以隨時加入網絡,并獲得服務端當前的狀態;
* 任何客戶端都可以改變鍵值緩存(插入、更新、刪除);
* 將這種變化以最短的延遲可靠地傳達給所有的客戶端;
* 能夠處理大量的客戶端,成百上千。
克隆模式的要點在于客戶端會反過來和服務端進行通信,這在簡單的發布-訂閱模式中并不常見。所以我這里使用“服務端”、“客戶端”而不是“發布者”、“訂閱者”這兩個詞。我們會使用發布-訂閱模式作為核心消息模式,不過還需要夾雜其他模式。
#### 分發鍵值更新事件
我們會分階段實施克隆模式。首先,我們看看如何從服務器發送鍵值更新事件給所有的客戶端。我們將第一章中使用的天氣服務模型進行改造,以鍵值對的方式發送信息,并讓客戶端使用哈希表來保存:

以下是服務端代碼:
**clonesrv1: Clone server, Model One in C**
```c
//
// 克隆模式服務端模型1
//
// 讓我們直接編譯,不生成類庫
#include "kvsimple.c"
int main (void)
{
// 準備上下文和PUB套接字
zctx_t *ctx = zctx_new ();
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");
zclock_sleep (200);
zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;
srandom ((unsigned) time (NULL));
while (!zctx_interrupted) {
// 使用鍵值對分發消息
kvmsg_t *kvmsg = kvmsg_new (++sequence);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
}
printf (" 已中止\n已發送 %d 條消息\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
```
以下是客戶端代碼:
**clonecli1: Clone client, Model One in C**
```c
//
// 克隆模式客戶端模型1
//
// 讓我們直接編譯,不生成類庫
#include "kvsimple.c"
int main (void)
{
// 準備上下文和SUB套接字
zctx_t *ctx = zctx_new ();
void *updates = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (updates, "tcp://localhost:5556");
zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv (updates);
if (!kvmsg)
break; // 中斷
kvmsg_store (&kvmsg, kvmap);
sequence++;
}
printf (" 已中斷\n收到 %d 條消息\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
```
幾點說明:
* 所有復雜的工作都在kvmsg類中完成了,這個類能夠處理鍵值對類型的消息對象,其實質上是一個ZMQ多幀消息,共有三幀:鍵(ZMQ字符串)、編號(64位,按字節順序排列)、二進制體(保存所有附加信息)。
* 服務端隨機生成消息,使用四位數作為鍵,這樣可以模擬大量而不是過量的哈希表(1萬個條目)。
* 服務端綁定套接字后會等待200毫秒,以避免訂閱者連接延遲而丟失數據的問題。我們會在后面的模型中解決這一點。
* 我們使用“發布者”和“訂閱者”來命名程序中使用的套接字,這樣可以避免和后續模型中的其他套接字發生混淆。
以下是kvmsg的代碼,已經經過了精簡:
**kvsimple: Key-value message class in C**
```c
/* =====================================================================
kvsimple - simple key-value message class for example applications
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "kvsimple.h"
#include "zlist.h"
// 鍵是一個短字符串
#define KVMSG_KEY_MAX 255
// 消息被格式化成三幀
// frame 0: 鍵(ZMQ字符串)
// frame 1: 編號(8個字節,按順序排列)
// frame 2: 內容(二進制數據塊)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_BODY 2
#define KVMSG_FRAMES 3
// 類結構
struct _kvmsg {
// 消息中某幀是否存在
int present [KVMSG_FRAMES];
// 對應的ZMQ消息幀
zmq_msg_t frame [KVMSG_FRAMES];
// 將鍵轉換為C語言字符串
char key [KVMSG_KEY_MAX + 1];
};
// ---------------------------------------------------------------------
// 構造函數,設置編號
kvmsg_t *
kvmsg_new (int64_t sequence)
{
kvmsg_t
*self;
self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
kvmsg_set_sequence (self, sequence);
return self;
}
// ---------------------------------------------------------------------
// 析構函數
// 釋放消息中的幀,可供zhash_freefn()函數調用
void
kvmsg_free (void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *) ptr;
// 銷毀消息中的幀
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);
// 釋放對象本身
free (self);
}
}
void
kvmsg_destroy (kvmsg_t **self_p)
{
assert (self_p);
if (*self_p) {
kvmsg_free (*self_p);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 從套接字中讀取鍵值消息,返回kvmsg實例
kvmsg_t *
kvmsg_recv (void *socket)
{
assert (socket);
kvmsg_t *self = kvmsg_new (0);
// 讀取所有幀,出錯則銷毀對象
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);
zmq_msg_init (&self->frame [frame_nbr]);
self->present [frame_nbr] = 1;
if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) {
kvmsg_destroy (&self);
break;
}
// 驗證多幀消息
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
if (zsockopt_rcvmore (socket) != rcvmore) {
kvmsg_destroy (&self);
break;
}
}
return self;
}
// ---------------------------------------------------------------------
// 向套接字發送鍵值對消息,不檢驗消息幀的內容
void
kvmsg_send (kvmsg_t *self, void *socket)
{
assert (self);
assert (socket);
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
zmq_msg_t copy;
zmq_msg_init (©);
if (self->present [frame_nbr])
zmq_msg_copy (©, &self->frame [frame_nbr]);
zmq_sendmsg (socket, ©,
(frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
zmq_msg_close (©);
}
}
// ---------------------------------------------------------------------
// 從消息中獲取鍵值,不存在則返回NULL
char *
kvmsg_key (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_KEY]) {
if (!*self->key) {
size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
if (size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy (self->key,
zmq_msg_data (&self->frame [FRAME_KEY]), size);
self->key [size] = 0;
}
return self->key;
}
else
return NULL;
}
// ---------------------------------------------------------------------
// 返回消息的編號
int64_t
kvmsg_sequence (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_SEQ]) {
assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
int64_t sequence = ((int64_t) (source [0]) << 56)
+ ((int64_t) (source [1]) << 48)
+ ((int64_t) (source [2]) << 40)
+ ((int64_t) (source [3]) << 32)
+ ((int64_t) (source [4]) << 24)
+ ((int64_t) (source [5]) << 16)
+ ((int64_t) (source [6]) << 8)
+ (int64_t) (source [7]);
return sequence;
}
else
return 0;
}
// ---------------------------------------------------------------------
// 返回消息內容,不存在則返回NULL
byte *
kvmsg_body (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
else
return NULL;
}
// ---------------------------------------------------------------------
// 返回消息內容的大小
size_t
kvmsg_size (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return zmq_msg_size (&self->frame [FRAME_BODY]);
else
return 0;
}
// ---------------------------------------------------------------------
// 設置消息的鍵
void
kvmsg_set_key (kvmsg_t *self, char *key)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_KEY];
if (self->present [FRAME_KEY])
zmq_msg_close (msg);
zmq_msg_init_size (msg, strlen (key));
memcpy (zmq_msg_data (msg), key, strlen (key));
self->present [FRAME_KEY] = 1;
}
// ---------------------------------------------------------------------
// 設置消息的編號
void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_SEQ];
if (self->present [FRAME_SEQ])
zmq_msg_close (msg);
zmq_msg_init_size (msg, 8);
byte *source = zmq_msg_data (msg);
source [0] = (byte) ((sequence >> 56) & 255);
source [1] = (byte) ((sequence >> 48) & 255);
source [2] = (byte) ((sequence >> 40) & 255);
source [3] = (byte) ((sequence >> 32) & 255);
source [4] = (byte) ((sequence >> 24) & 255);
source [5] = (byte) ((sequence >> 16) & 255);
source [6] = (byte) ((sequence >> 8) & 255);
source [7] = (byte) ((sequence) & 255);
self->present [FRAME_SEQ] = 1;
}
// ---------------------------------------------------------------------
// 設置消息內容
void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (msg);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (msg, size);
memcpy (zmq_msg_data (msg), body, size);
}
// ---------------------------------------------------------------------
// 使用printf()格式設置消息鍵
void
kvmsg_fmt_key (kvmsg_t *self, char *format, ...)
{
char value [KVMSG_KEY_MAX + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, KVMSG_KEY_MAX, format, args);
va_end (args);
kvmsg_set_key (self, value);
}
// ---------------------------------------------------------------------
// 使用springf()格式設置消息內容
void
kvmsg_fmt_body (kvmsg_t *self, char *format, ...)
{
char value [255 + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
kvmsg_set_body (self, (byte *) value, strlen (value));
}
// ---------------------------------------------------------------------
// 若kvmsg結構的鍵值均存在,則存入哈希表;
// 如果kvmsg結構已沒有引用,則自動銷毀和釋放。
void
kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
{
assert (self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert (self);
if (self->present [FRAME_KEY]
&& self->present [FRAME_BODY]) {
zhash_update (hash, kvmsg_key (self), self);
zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
}
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 將消息內容打印至標準錯誤輸出,用以調試和跟蹤
void
kvmsg_dump (kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf (stderr, "NULL");
return;
}
size_t size = kvmsg_size (self);
byte *body = kvmsg_body (self);
fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
fprintf (stderr, "[key:%s]", kvmsg_key (self));
fprintf (stderr, "[size:%zd] ", size);
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
fprintf (stderr, "%02X", body [char_nbr]);
fprintf (stderr, "\n");
}
else
fprintf (stderr, "NULL message\n");
}
// ---------------------------------------------------------------------
// 測試用例
int
kvmsg_test (int verbose)
{
kvmsg_t
*kvmsg;
printf (" * kvmsg: ");
// 準備上下文和套接字
zctx_t *ctx = zctx_new ();
void *output = zsocket_new (ctx, ZMQ_DEALER);
int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);
void *input = zsocket_new (ctx, ZMQ_DEALER);
rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);
zhash_t *kvmap = zhash_new ();
// 測試簡單消息的發送和接受
kvmsg = kvmsg_new (1);
kvmsg_set_key (kvmsg, "key");
kvmsg_set_body (kvmsg, (byte *) "body", 4);
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_store (&kvmsg, kvmap);
kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
kvmsg_store (&kvmsg, kvmap);
// 關閉并銷毀所有對象
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
printf ("OK\n");
return 0;
}
```
我們會在下文編寫一個更為完整的kvmsg類,可以用到現實環境中。
客戶端和服務端都會維護一個哈希表,但這個模型需要所有的客戶端都比服務端啟動得早,而且不能崩潰,這顯然不能滿足可靠性的要求。
#### 創建快照
為了讓后續連接的(或從故障中恢復的)客戶端能夠獲取服務器上的狀態信息,需要讓它在連接時獲取一份快照。正如我們將“消息”的概念簡化為“已編號的鍵值對”,我們也可以將“狀態”簡化為“一個哈希表”。為獲取服務端狀態,客戶端會打開一個REQ套接字進行請求:

我們需要考慮時間的問題,因為生成快照是需要一定時間的,我們需要知道應從哪個更新事件開始更新快照,服務端是不知道何時有更新事件的。一種方法是先開始訂閱消息,收到第一個消息之后向服務端請求“將該條更新之前的所有內容發送給”。這樣一來,服務器需要為每一次更新保存一份快照,這顯然是不現實的。
所以,我們會在客戶端用以下方式進行同步:
* 客戶端開始訂閱服務器的更新事件,然后請求一份快照。這樣就能保證這份快照是在上一次更新事件之后產生的。
* 客戶端開始等待服務器的快照,并將更新事件保存在隊列中,做法很簡單,不要從套接字中讀取消息就可以了,ZMQ會自動將這些消息保存起來,這時不應設置閾值(HWM)。
* 當客戶端獲取到快照后,它將再次開始讀取更新事件,但是需要丟棄那些早于快照生成時間的事件。如快照生成時包含了200次更新,那客戶端會從第201次更新開始讀取。
* 隨后,客戶端就會用更新事件去更新自身的狀態了。
這是一個比較簡單的模型,因為它用到了ZMQ消息隊列的機制。服務端代碼如下:
**clonesrv2: Clone server, Model Two in C**
```c
//
// 克隆模式 - 服務端 - 模型2
//
// 讓我們直接編譯,不創建類庫
#include "kvsimple.c"
static int s_send_single (char *key, void *data, void *args);
static void state_manager (void *args, zctx_t *ctx, void *pipe);
int main (void)
{
// 準備套接字和上下文
zctx_t *ctx = zctx_new ();
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5557");
int64_t sequence = 0;
srandom ((unsigned) time (NULL));
// 開啟狀態管理器,并等待同步信號
void *updates = zthread_fork (ctx, state_manager, NULL);
free (zstr_recv (updates));
while (!zctx_interrupted) {
// 分發鍵值消息
kvmsg_t *kvmsg = kvmsg_new (++sequence);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_send (kvmsg, updates);
kvmsg_destroy (&kvmsg);
}
printf (" 已中斷\n已發送 %d 條消息\n", (int) sequence);
zctx_destroy (&ctx);
return 0;
}
// 快照請求方信息
typedef struct {
void *socket; // 用于發送快照的ROUTER套接字
zframe_t *identity; // 請求方的標識
} kvroute_t;
// 發送快照中單個鍵值對
// 使用kvmsg對象作為載體
static int
s_send_single (char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
// 先發送接收方標識
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_t *kvmsg = (kvmsg_t *) data;
kvmsg_send (kvmsg, kvroute->socket);
return 0;
}
// 該線程維護服務端狀態,并處理快照請求。
//
static void
state_manager (void *args, zctx_t *ctx, void *pipe)
{
zhash_t *kvmap = zhash_new ();
zstr_send (pipe, "READY");
void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (snapshot, "tcp://*:5556");
zmq_pollitem_t items [] = {
{ pipe, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
int64_t sequence = 0; // 當前快照版本
while (!zctx_interrupted) {
int rc = zmq_poll (items, 2, -1);
if (rc == -1 && errno == ETERM)
break; // 上下文異常
// 等待主線程的更新事件
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (pipe);
if (!kvmsg)
break; // 中斷
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
}
// 執行快照請求
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv (snapshot);
if (!identity)
break; // 中斷
// 請求內容在第二幀中
char *request = zstr_recv (snapshot);
if (streq (request, "ICANHAZ?"))
free (request);
else {
printf ("E: 錯誤的請求,程序中止\n");
break;
}
// 發送快照給客戶端
kvroute_t routing = { snapshot, identity };
// 逐項發送
zhash_foreach (kvmap, s_send_single, &routing);
// 發送結束標識,內含快照版本號
printf ("正在發送快照,版本號 %d\n", (int) sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
}
}
zhash_destroy (&kvmap);
}
```
以下是客戶端代碼:
**clonecli2: Clone client, Model Two in C**
```c
//
// 克隆模式 - 客戶端 - 模型2
//
// 讓我們直接編譯,不生成類庫
#include "kvsimple.c"
int main (void)
{
// 準備上下文和SUB套接字
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:5557");
zhash_t *kvmap = zhash_new ();
// 獲取快照
int64_t sequence = 0;
zstr_send (snapshot, "ICANHAZ?");
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // 中斷
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("已獲取快照,版本號=%d\n", (int) sequence);
kvmsg_destroy (&kvmsg);
break; // 完成
}
kvmsg_store (&kvmsg, kvmap);
}
// 應用隊列中的更新事件,丟棄過時事件
while (!zctx_interrupted) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // 中斷
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
}
else
kvmsg_destroy (&kvmsg);
}
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
```
幾點說明:
* 客戶端使用兩個線程,一個用來生成隨機的更新事件,另一個用來管理狀態。兩者之間使用PAIR套接字通信。可能你會考慮使用SUB套接字,但是“慢連接”的問題會影響到程序運行。PAIR套接字會讓兩個線程嚴格同步的。
* 我們在updates套接字上設置了閾值(HWM),避免更新服務內存溢出。在inproc協議的連接中,閾值是兩端套接字閾值的加和,所以要分別設置。
* 客戶端比較簡單,用C語言編寫,大約60行代碼。大多數工作都在kvmsg類中完成了,不過總的來說,克隆模式實現起來還是比較簡單的。
* 我們沒有用特別的方式來序列化狀態內容。鍵值對用kvmsg對象表示,保存在一個哈希表中。在不同的時間請求狀態時會得到不同的快照。
* 我們假設客戶端只和一個服務進行通信,而且服務必須是正常運行的。我們暫不考慮如何從服務崩潰的情形中恢復過來。
現在,這兩段程序都還沒有真正地工作起來,但已經能夠正確地同步狀態了。這是一個多種消息模式的混合體:進程內的PAIR、發布-訂閱、ROUTER-DEALER等。
#### 重發鍵值更新事件
第二個模型中,鍵值更新事件都來自于服務器,構成了一個中心化的模型。但是我們需要的是一個能夠在客戶端進行更新的緩存,并能同步到其他客戶端中。這時,服務端只是一個無狀態的中間件,帶來的好處有:
* 我們不用太過關心服務端的可靠性,因為即使它崩潰了,我們仍能從客戶端中獲取完整的數據。
* 我們可以使用鍵值緩存在動態節點之間分享數據。
客戶端的鍵值更新事件會通過PUSH-PULL套接字傳達給服務端:

我們為什么不讓客戶端直接將更新信息發送給其他客戶端呢?雖然這樣做可以減少延遲,但是就無法為更新事件添加自增的唯一編號了。很多應用程序都需要更新事件以某種方式排序,只有將消息發給服務端,由服務端分發更新消息,才能保證更新事件的順序。
有了唯一的編號后,客戶端還能檢測到更多的故障:網絡堵塞或隊列溢出。如果客戶端發現消息輸入流有一段空白,它能采取措施。可能你會覺得此時讓客戶端通知服務端,讓它重新發送丟失的信息,可以解決問題。但事實上沒有必要這么做。消息流的空擋表示網絡狀況不好,如果再進行這樣的請求,只會讓事情變得更糟。所以一般的做法是由客戶端發出警告,并停止運行,等到有專人來維護后再繼續工作。
我們開始創建在客戶端進行狀態更新的模型。以下是客戶端代碼:
**clonesrv3: Clone server, Model Three in C**
```c
//
// 克隆模式 服務端 模型3
//
// 直接編譯,不創建類庫
#include "kvsimple.c"
static int s_send_single (char *key, void *data, void *args);
// 快照請求方信息
typedef struct {
void *socket; // ROUTER套接字
zframe_t *identity; // 請求方標識
} kvroute_t;
int main (void)
{
// 準備上下文和套接字
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (snapshot, "tcp://*:5556");
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5557");
void *collector = zsocket_new (ctx, ZMQ_PULL);
zsocket_bind (collector, "tcp://*:5558");
int64_t sequence = 0;
zhash_t *kvmap = zhash_new ();
zmq_pollitem_t items [] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);
// 執行來自客戶端的更新事件
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (collector);
if (!kvmsg)
break; // 中斷
kvmsg_set_sequence (kvmsg, ++sequence);
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
printf ("I: 發布更新事件 %5d\n", (int) sequence);
}
// 響應快照請求
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv (snapshot);
if (!identity)
break; // 中斷
// 請求內容在消息的第二幀中
char *request = zstr_recv (snapshot);
if (streq (request, "ICANHAZ?"))
free (request);
else {
printf ("E: 錯誤的請求,程序中止\n");
break;
}
// 發送快照
kvroute_t routing = { snapshot, identity };
// 逐條發送
zhash_foreach (kvmap, s_send_single, &routing);
// 發送結束標識和編號
printf ("I: 正在發送快照,版本號:%d\n", (int) sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
}
}
printf (" 已中斷\n已處理 %d 條消息\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
// 發送一條鍵值對狀態給套接字,使用kvmsg對象保存鍵值對
static int
s_send_single (char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
// Send identity of recipient first
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_t *kvmsg = (kvmsg_t *) data;
kvmsg_send (kvmsg, kvroute->socket);
return 0;
}
```
以下是客戶端代碼:
**clonecli3: Clone client, Model Three in C**
```c
//
// 克隆模式 - 客戶端 - 模型3
//
// 直接編譯,不創建類庫
#include "kvsimple.c"
int main (void)
{
// 準備上下文和SUB套接字
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:5557");
void *publisher = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (publisher, "tcp://localhost:5558");
zhash_t *kvmap = zhash_new ();
srandom ((unsigned) time (NULL));
// 獲取狀態快照
int64_t sequence = 0;
zstr_send (snapshot, "ICANHAZ?");
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // 中斷
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("I: 已收到快照,版本號:%d\n", (int) sequence);
kvmsg_destroy (&kvmsg);
break; // 完成
}
kvmsg_store (&kvmsg, kvmap);
}
int64_t alarm = zclock_time () + 1000;
while (!zctx_interrupted) {
zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
int tickless = (int) ((alarm - zclock_time ()));
if (tickless < 0)
tickless = 0;
int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 上下文被關閉
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // 中斷
// 丟棄過時消息,包括心跳
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
printf ("I: 收到更新事件:%d\n", (int) sequence);
}
else
kvmsg_destroy (&kvmsg);
}
// 創建一個隨機的更新事件
if (zclock_time () >= alarm) {
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_destroy (&kvmsg);
alarm = zclock_time () + 1000;
}
}
printf (" 已準備\n收到 %d 條消息\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
```
幾點說明:
* 服務端整合為一個線程,負責收集來自客戶端的更新事件并轉發給其他客戶端。它使用PULL套接字獲取更新事件,ROUTER套接字處理快照請求,以及PUB套接字發布更新事件。
* 客戶端會每隔1秒左右發送隨機的更新事件給服務端,現實中這一動作由應用程序觸發。
#### 子樹克隆
現實中的鍵值緩存會越變越多,而客戶端可能只會需要部分緩存。我們可以使用子樹的方式來實現:客戶端在發送快照請求時告訴服務端它需要的子樹,在訂閱更新事件時也指明子樹。
關于子樹的語法有很多,一種是“分層路徑”結構,另一種是“主題樹”:
* 分層路徑:/some/list/of/paths
* 主題樹:some.list.of.topics
這里我們會使用分層路徑結構,以此擴展服務端和客戶端,進行子樹操作。維護多個子樹其實并不太困難,因此我們不在這里演示。
下面是服務端代碼,由模型3衍化而來:
**clonesrv4: Clone server, Model Four in C**
```c
//
// 克隆模式 服務端 模型4
//
// 直接編譯,不創建類庫
#include "kvsimple.c"
static int s_send_single (char *key, void *data, void *args);
// 快照請求方信息
typedef struct {
void *socket; // ROUTER套接字
zframe_t *identity; // 請求方標識
char *subtree; // 指定的子樹
} kvroute_t;
int main (void)
{
// 準備上下文和套接字
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (snapshot, "tcp://*:5556");
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5557");
void *collector = zsocket_new (ctx, ZMQ_PULL);
zsocket_bind (collector, "tcp://*:5558");
int64_t sequence = 0;
zhash_t *kvmap = zhash_new ();
zmq_pollitem_t items [] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);
// 執行來自客戶端的更新事件
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (collector);
if (!kvmsg)
break; // Interrupted
kvmsg_set_sequence (kvmsg, ++sequence);
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
printf ("I: 發布更新事件 %5d\n", (int) sequence);
}
// 響應快照請求
if (items [1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv (snapshot);
if (!identity)
break; // Interrupted
// 請求內容在消息的第二幀中
char *request = zstr_recv (snapshot);
char *subtree = NULL;
if (streq (request, "ICANHAZ?")) {
free (request);
subtree = zstr_recv (snapshot);
}
else {
printf ("E: 錯誤的請求,程序中止\n");
break;
}
// 發送快照
kvroute_t routing = { snapshot, identity, subtree };
// 逐條發送
zhash_foreach (kvmap, s_send_single, &routing);
// 發送結束標識和編號
printf ("I: 正在發送快照,版本號:%d\n", (int) sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
free (subtree);
}
}
printf (" 已中斷\n已處理 %d 條消息\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
// 發送一條鍵值對狀態給套接字,使用kvmsg對象保存鍵值對
static int
s_send_single (char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
// 先發送接收方的標識
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
}
return 0;
}
```
下面是客戶端代碼:
**clonecli4: Clone client, Model Four in C**
```c
//
// 克隆模式 - 客戶端 - 模型4
//
// 直接編譯,不創建類庫
#include "kvsimple.c"
#define SUBTREE "/client/"
int main (void)
{
// 準備上下文和SUB套接字
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:5557");
zsockopt_set_subscribe (subscriber, SUBTREE);
void *publisher = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (publisher, "tcp://localhost:5558");
zhash_t *kvmap = zhash_new ();
srandom ((unsigned) time (NULL));
// 獲取狀態快照
int64_t sequence = 0;
zstr_sendm (snapshot, "ICANHAZ?");
zstr_send (snapshot, SUBTREE);
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Interrupted
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("I: 已收到快照,版本號:%d\n", (int) sequence);
kvmsg_destroy (&kvmsg);
break; // Done
}
kvmsg_store (&kvmsg, kvmap);
}
int64_t alarm = zclock_time () + 1000;
while (!zctx_interrupted) {
zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
int tickless = (int) ((alarm - zclock_time ()));
if (tickless < 0)
tickless = 0;
int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 上下文被關閉
if (items [0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // 中斷
// 丟棄過時消息,包括心跳
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
printf ("I: 收到更新事件:%d\n", (int) sequence);
}
else
kvmsg_destroy (&kvmsg);
}
// 創建一個隨機的更新事件
if (zclock_time () >= alarm) {
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_fmt_key (kvmsg, "%s%d", SUBTREE, randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_destroy (&kvmsg);
alarm = zclock_time () + 1000;
}
}
printf (" 已準備\n收到 %d 條消息\n", (int) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
```
#### 瞬間值
瞬間值指的是那些會立刻過期的值。如果你用克隆模式搭建一個類似DNS的服務時,就可以用瞬間值來模擬動態DNS解析了。當節點連接網絡,對外發布它的地址,并不斷地更新地址。如果節點斷開連接,則它的地址也會失效。
瞬間值可以和會話(session)聯系起來,當會話結束時,瞬間值也就失效了。克隆模式中,會話是由客戶端定義的,并會在客戶端斷開連接時消亡。
更簡單的方法是為每一個瞬間值設定一個過期時間,客戶端會不斷延長這個時間,當斷開連接時這個時間將得不到更新,服務器就會自動將其刪除。
我們會用這種簡單的方法來實現瞬間值,因為太過復雜的方法可能不值當,它們的差別僅在性能上體現。如果客戶端有很多瞬間值,那為每個值設定過期時間是恰當的;如果瞬間值到達一定的量,那最好還是將其和會話相關聯,統一進行過期處理。
首先,我們需要設法在鍵值對消息中加入過期時間。我們可以增加一個消息幀,但這樣一來每當我們需要增加消息內容時就需要修改kvmsg類庫了,這并不合適。所以,我們一次性增加一個“屬性”消息幀,用于添加不同的消息屬性。
其次,我們需要設法刪除這條數據。目前為止服務端和客戶端會盲目地增改哈希表中的數據,我們可以這樣定義:當消息的值是空的,則表示刪除這個鍵的數據。
下面是一個更為完整的kvmsg類代碼,它實現了“屬性”幀,以及一個UUID幀,我們后面會用到。該類還會負責處理值為空的消息,達到刪除的目的:
**kvmsg: Key-value message class - full in C**
```c
/* =====================================================================
kvmsg - key-value message class for example applications
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "kvmsg.h"
#include <uuid/uuid.h>
#include "zlist.h"
// 鍵是短字符串
#define KVMSG_KEY_MAX 255
// 消息包含五幀
// frame 0: 鍵(ZMQ字符串)
// frame 1: 編號(8個字節,按順序排列)
// frame 2: UUID(二進制塊,16個字節)
// frame 3: 屬性(ZMQ字符串)
// frame 4: 值(二進制塊)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_UUID 2
#define FRAME_PROPS 3
#define FRAME_BODY 4
#define KVMSG_FRAMES 5
// 類結構
struct _kvmsg {
// 幀是否存在
int present [KVMSG_FRAMES];
// 對應消息幀
zmq_msg_t frame [KVMSG_FRAMES];
// 鍵,C語言字符串格式
char key [KVMSG_KEY_MAX + 1];
// 屬性列表,key=value形式
zlist_t *props;
size_t props_size;
};
// 將屬性列表序列化為字符串
static void
s_encode_props (kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame [FRAME_PROPS];
if (self->present [FRAME_PROPS])
zmq_msg_close (msg);
zmq_msg_init_size (msg, self->props_size);
char *prop = zlist_first (self->props);
char *dest = (char *) zmq_msg_data (msg);
while (prop) {
strcpy (dest, prop);
dest += strlen (prop);
*dest++ = '\n';
prop = zlist_next (self->props);
}
self->present [FRAME_PROPS] = 1;
}
// 從字符串中解析屬性列表
static void
s_decode_props (kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame [FRAME_PROPS];
self->props_size = 0;
while (zlist_size (self->props))
free (zlist_pop (self->props));
size_t remainder = zmq_msg_size (msg);
char *prop = (char *) zmq_msg_data (msg);
char *eoln = memchr (prop, '\n', remainder);
while (eoln) {
*eoln = 0;
zlist_append (self->props, strdup (prop));
self->props_size += strlen (prop) + 1;
remainder -= strlen (prop) + 1;
prop = eoln + 1;
eoln = memchr (prop, '\n', remainder);
}
}
// ---------------------------------------------------------------------
// 構造函數,指定消息編號
kvmsg_t *
kvmsg_new (int64_t sequence)
{
kvmsg_t
*self;
self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
self->props = zlist_new ();
kvmsg_set_sequence (self, sequence);
return self;
}
// ---------------------------------------------------------------------
// 析構函數
// 釋放內存函數,供zhash_free_fn()調用
void
kvmsg_free (void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *) ptr;
// 釋放所有消息幀
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);
// 釋放屬性列表
while (zlist_size (self->props))
free (zlist_pop (self->props));
zlist_destroy (&self->props);
// 釋放對象本身
free (self);
}
}
void
kvmsg_destroy (kvmsg_t **self_p)
{
assert (self_p);
if (*self_p) {
kvmsg_free (*self_p);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 復制kvmsg對象
kvmsg_t *
kvmsg_dup (kvmsg_t *self)
{
kvmsg_t *kvmsg = kvmsg_new (0);
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr]) {
zmq_msg_t *src = &self->frame [frame_nbr];
zmq_msg_t *dst = &kvmsg->frame [frame_nbr];
zmq_msg_init_size (dst, zmq_msg_size (src));
memcpy (zmq_msg_data (dst),
zmq_msg_data (src), zmq_msg_size (src));
kvmsg->present [frame_nbr] = 1;
}
}
kvmsg->props = zlist_copy (self->props);
return kvmsg;
}
// ---------------------------------------------------------------------
// 從套接字總讀取鍵值對,返回kvmsg實例
kvmsg_t *
kvmsg_recv (void *socket)
{
assert (socket);
kvmsg_t *self = kvmsg_new (0);
// 讀取所有幀,若有異常則直接返回空
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);
zmq_msg_init (&self->frame [frame_nbr]);
self->present [frame_nbr] = 1;
if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) {
kvmsg_destroy (&self);
break;
}
// 驗證多幀消息
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
if (zsockopt_rcvmore (socket) != rcvmore) {
kvmsg_destroy (&self);
break;
}
}
if (self)
s_decode_props (self);
return self;
}
// ---------------------------------------------------------------------
// 向套接字發送鍵值對消息,空消息也發送
void
kvmsg_send (kvmsg_t *self, void *socket)
{
assert (self);
assert (socket);
s_encode_props (self);
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
zmq_msg_t copy;
zmq_msg_init (©);
if (self->present [frame_nbr])
zmq_msg_copy (©, &self->frame [frame_nbr]);
zmq_sendmsg (socket, ©,
(frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
zmq_msg_close (©);
}
}
// ---------------------------------------------------------------------
// 返回消息的鍵
char *
kvmsg_key (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_KEY]) {
if (!*self->key) {
size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
if (size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy (self->key,
zmq_msg_data (&self->frame [FRAME_KEY]), size);
self->key [size] = 0;
}
return self->key;
}
else
return NULL;
}
// ---------------------------------------------------------------------
// 返回消息的編號
int64_t
kvmsg_sequence (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_SEQ]) {
assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
int64_t sequence = ((int64_t) (source [0]) << 56)
+ ((int64_t) (source [1]) << 48)
+ ((int64_t) (source [2]) << 40)
+ ((int64_t) (source [3]) << 32)
+ ((int64_t) (source [4]) << 24)
+ ((int64_t) (source [5]) << 16)
+ ((int64_t) (source [6]) << 8)
+ (int64_t) (source [7]);
return sequence;
}
else
return 0;
}
// ---------------------------------------------------------------------
// 返回消息的UUID
byte *
kvmsg_uuid (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_UUID]
&& zmq_msg_size (&self->frame [FRAME_UUID]) == sizeof (uuid_t))
return (byte *) zmq_msg_data (&self->frame [FRAME_UUID]);
else
return NULL;
}
// ---------------------------------------------------------------------
// 返回消息的內容
byte *
kvmsg_body (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
else
return NULL;
}
// ---------------------------------------------------------------------
// 返回消息內容的長度
size_t
kvmsg_size (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return zmq_msg_size (&self->frame [FRAME_BODY]);
else
return 0;
}
// ---------------------------------------------------------------------
// 設置消息的鍵
void
kvmsg_set_key (kvmsg_t *self, char *key)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_KEY];
if (self->present [FRAME_KEY])
zmq_msg_close (msg);
zmq_msg_init_size (msg, strlen (key));
memcpy (zmq_msg_data (msg), key, strlen (key));
self->present [FRAME_KEY] = 1;
}
// ---------------------------------------------------------------------
// 設置消息的編號
void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_SEQ];
if (self->present [FRAME_SEQ])
zmq_msg_close (msg);
zmq_msg_init_size (msg, 8);
byte *source = zmq_msg_data (msg);
source [0] = (byte) ((sequence >> 56) & 255);
source [1] = (byte) ((sequence >> 48) & 255);
source [2] = (byte) ((sequence >> 40) & 255);
source [3] = (byte) ((sequence >> 32) & 255);
source [4] = (byte) ((sequence >> 24) & 255);
source [5] = (byte) ((sequence >> 16) & 255);
source [6] = (byte) ((sequence >> 8) & 255);
source [7] = (byte) ((sequence) & 255);
self->present [FRAME_SEQ] = 1;
}
// ---------------------------------------------------------------------
// 生成并設置消息的UUID
void
kvmsg_set_uuid (kvmsg_t *self)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_UUID];
uuid_t uuid;
uuid_generate (uuid);
if (self->present [FRAME_UUID])
zmq_msg_close (msg);
zmq_msg_init_size (msg, sizeof (uuid));
memcpy (zmq_msg_data (msg), uuid, sizeof (uuid));
self->present [FRAME_UUID] = 1;
}
// ---------------------------------------------------------------------
// 設置消息的內容
void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (msg);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (msg, size);
memcpy (zmq_msg_data (msg), body, size);
}
// ---------------------------------------------------------------------
// 使用printf()格式設置消息的鍵
void
kvmsg_fmt_key (kvmsg_t *self, char *format, ...)
{
char value [KVMSG_KEY_MAX + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, KVMSG_KEY_MAX, format, args);
va_end (args);
kvmsg_set_key (self, value);
}
// ---------------------------------------------------------------------
// 使用printf()格式設置消息內容
void
kvmsg_fmt_body (kvmsg_t *self, char *format, ...)
{
char value [255 + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
kvmsg_set_body (self, (byte *) value, strlen (value));
}
// ---------------------------------------------------------------------
// 獲取消息屬性,無則返回空字符串
char *
kvmsg_get_prop (kvmsg_t *self, char *name)
{
assert (strchr (name, '=') == NULL);
char *prop = zlist_first (self->props);
size_t namelen = strlen (name);
while (prop) {
if (strlen (prop) > namelen
&& memcmp (prop, name, namelen) == 0
&& prop [namelen] == '=')
return prop + namelen + 1;
prop = zlist_next (self->props);
}
return "";
}
// ---------------------------------------------------------------------
// 設置消息屬性
// 屬性名稱不能包含=號,值的最大長度是255
void
kvmsg_set_prop (kvmsg_t *self, char *name, char *format, ...)
{
assert (strchr (name, '=') == NULL);
char value [255 + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
// 分配空間
char *prop = malloc (strlen (name) + strlen (value) + 2);
// 刪除已存在的屬性
sprintf (prop, "%s=", name);
char *existing = zlist_first (self->props);
while (existing) {
if (memcmp (prop, existing, strlen (prop)) == 0) {
self->props_size -= strlen (existing) + 1;
zlist_remove (self->props, existing);
free (existing);
break;
}
existing = zlist_next (self->props);
}
// 添加新屬性
strcat (prop, value);
zlist_append (self->props, prop);
self->props_size += strlen (prop) + 1;
}
// ---------------------------------------------------------------------
// 在哈希表中保存kvmsg對象
// 當kvmsg對象不再被使用時進行釋放操作;
// 若傳入的值為空,則刪除該對象。
void
kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
{
assert (self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert (self);
if (kvmsg_size (self)) {
if (self->present [FRAME_KEY]
&& self->present [FRAME_BODY]) {
zhash_update (hash, kvmsg_key (self), self);
zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
}
}
else
zhash_delete (hash, kvmsg_key (self));
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 將消息內容輸出到標準錯誤輸出
void
kvmsg_dump (kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf (stderr, "NULL");
return;
}
size_t size = kvmsg_size (self);
byte *body = kvmsg_body (self);
fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
fprintf (stderr, "[key:%s]", kvmsg_key (self));
fprintf (stderr, "[size:%zd] ", size);
if (zlist_size (self->props)) {
fprintf (stderr, "[");
char *prop = zlist_first (self->props);
while (prop) {
fprintf (stderr, "%s;", prop);
prop = zlist_next (self->props);
}
fprintf (stderr, "]");
}
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
fprintf (stderr, "%02X", body [char_nbr]);
fprintf (stderr, "\n");
}
else
fprintf (stderr, "NULL message\n");
}
// ---------------------------------------------------------------------
// 測試用例
int
kvmsg_test (int verbose)
{
kvmsg_t
*kvmsg;
printf (" * kvmsg: ");
// 準備上下文和套接字
zctx_t *ctx = zctx_new ();
void *output = zsocket_new (ctx, ZMQ_DEALER);
int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);
void *input = zsocket_new (ctx, ZMQ_DEALER);
rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);
zhash_t *kvmap = zhash_new ();
// 測試簡單消息的收發
kvmsg = kvmsg_new (1);
kvmsg_set_key (kvmsg, "key");
kvmsg_set_uuid (kvmsg);
kvmsg_set_body (kvmsg, (byte *) "body", 4);
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_store (&kvmsg, kvmap);
kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
kvmsg_store (&kvmsg, kvmap);
// 測試帶有屬性的消息的收發
kvmsg = kvmsg_new (2);
kvmsg_set_prop (kvmsg, "prop1", "value1");
kvmsg_set_prop (kvmsg, "prop2", "value1");
kvmsg_set_prop (kvmsg, "prop2", "value2");
kvmsg_set_key (kvmsg, "key");
kvmsg_set_uuid (kvmsg);
kvmsg_set_body (kvmsg, (byte *) "body", 4);
assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2"));
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_destroy (&kvmsg);
kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2"));
kvmsg_destroy (&kvmsg);
// 關閉并銷毀所有對象
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
printf ("OK\n");
return 0;
}
```
客戶端模型5和模型4沒有太大區別,只是kvmsg類庫變了。在更新消息的時候還需要添加一個過期時間的屬性:
```c
kvmsg_set_prop (kvmsg, "ttl", "%d", randof (30));
```
服務端模型5有較大的變化,我們會用反應堆來代替輪詢,這樣就能混合處理定時事件和套接字事件了,只是在C語言中是比較麻煩的。下面是代碼:
**clonesrv5: Clone server, Model Five in C**
```c
//
// 克隆模式 - 服務端 - 模型5
//
// 直接編譯,不建類庫
#include "kvmsg.c"
// 反應堆處理器
static int s_snapshots (zloop_t *loop, void *socket, void *args);
static int s_collector (zloop_t *loop, void *socket, void *args);
static int s_flush_ttl (zloop_t *loop, void *socket, void *args);
// 服務器屬性
typedef struct {
zctx_t *ctx; // 上下文
zhash_t *kvmap; // 鍵值對存儲
zloop_t *loop; // zloop反應堆
int port; // 主端口
int64_t sequence; // 更新事件編號
void *snapshot; // 處理快照請求
void *publisher; // 發布更新事件
void *collector; // 從客戶端收集接收更新事件
} clonesrv_t;
int main (void)
{
clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
self->port = 5556;
self->ctx = zctx_new ();
self->kvmap = zhash_new ();
self->loop = zloop_new ();
zloop_set_verbose (self->loop, FALSE);
// 打開克隆模式服務端套接字
self->snapshot = zsocket_new (self->ctx, ZMQ_ROUTER);
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
self->collector = zsocket_new (self->ctx, ZMQ_PULL);
zsocket_bind (self->snapshot, "tcp://*:%d", self->port);
zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);
// 注冊反應堆處理程序
zloop_reader (self->loop, self->snapshot, s_snapshots, self);
zloop_reader (self->loop, self->collector, s_collector, self);
zloop_timer (self->loop, 1000, 0, s_flush_ttl, self);
// 運行反應堆,直至中斷
zloop_start (self->loop);
zloop_destroy (&self->loop);
zhash_destroy (&self->kvmap);
zctx_destroy (&self->ctx);
free (self);
return 0;
}
// ---------------------------------------------------------------------
// 發送快照內容
static int s_send_single (char *key, void *data, void *args);
// 請求方信息
typedef struct {
void *socket; // ROUTER套接字
zframe_t *identity; // 請求方標識
char *subtree; // 子樹信息
} kvroute_t;
static int
s_snapshots (zloop_t *loop, void *snapshot, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zframe_t *identity = zframe_recv (snapshot);
if (identity) {
// 請求位于消息第二幀
char *request = zstr_recv (snapshot);
char *subtree = NULL;
if (streq (request, "ICANHAZ?")) {
free (request);
subtree = zstr_recv (snapshot);
}
else
printf ("E: 錯誤的請求,程序中止\n");
if (subtree) {
// 發送狀態快照
kvroute_t routing = { snapshot, identity, subtree };
zhash_foreach (self->kvmap, s_send_single, &routing);
// 發送結束符和版本號
zclock_log ("I: 正在發送快照,版本號:%d", (int) self->sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
free (subtree);
}
}
return 0;
}
// 每次發送一個快照鍵值對
static int
s_send_single (char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
// 先發送接收方標識
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
}
return 0;
}
// ---------------------------------------------------------------------
// 收集更新事件
static int
s_collector (zloop_t *loop, void *collector, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_recv (collector);
if (kvmsg) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
if (ttl)
kvmsg_set_prop (kvmsg, "ttl",
"%" PRId64, zclock_time () + ttl * 1000);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 正在發布更新事件 %d", (int) self->sequence);
}
return 0;
}
// ---------------------------------------------------------------------
// 刪除過期的瞬間值
static int s_flush_single (char *key, void *data, void *args);
static int
s_flush_ttl (zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zhash_foreach (self->kvmap, s_flush_single, args);
return 0;
}
// 刪除過期的鍵值對,并廣播該事件
static int
s_flush_single (char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time () >= ttl) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 發布刪除事件 %d", (int) self->sequence);
}
return 0;
}
```
#### 克隆服務器的可靠性
克隆模型1至5相對比較簡單,下面我們會探討一個非常復雜的模型。可以發現,為了構建可靠的消息隊列,我們需要花費非常多的精力。所以我們經常會問:有必要這么做嗎?如果說你能夠接受可靠性不夠高的、或者說已經足夠好的架構,那恭喜你,你在成本和收益之間找到了平衡。雖然我們會偶爾丟失一些消息,但從經濟的角度來說還是合理的。不管怎樣,下面我們就來介紹這個復雜的模型。
在模型3中,你會關閉和重啟服務,這會導致數據的丟失。任何后續加入的客戶端只能得到重啟之后的那些數據,而非所有的。下面就讓我們想辦法讓克隆模式能夠承擔服務器重啟的故障。
以下列舉我們需要處理的問題:
* 克隆服務器進程崩潰并自動或手工重啟。進程丟失了所有數據,所以必須從別處進行恢復。
* 克隆服務器硬件故障,長時間不能恢復。客戶端需要切換至另一個可用的服務端。
* 克隆服務器從網絡上斷開,如交換機發生故障等。它會在某個時點重連,但期間的數據就需要替代的服務器負責處理。
第一步我們需要增加一個服務器。我們可以使用第四章中提到的雙子星模式,它是一個反應堆,而我們的程序經過整理后也是一個反應堆,因此可以互相協作。
我們需要保證更新事件在主服務器崩潰時仍能保留,最簡單的機制就是同時發送給兩臺服務器。
備機就可以當做一臺客戶端來運行,像其他客戶端一樣從主機獲取更新事件。同時它又能從客戶端獲取更新事件——雖然不應該以此更新數據,但可以先暫存起來。
所以,相較于模型5,模型6中引入了以下特性:
* 客戶端發送更新事件改用PUB-SUB套接字,而非PUSH-PULL。原因是PUSH套接字會在沒有接收方時阻塞,且會進行負載均衡——我們需要兩臺服務器都接收到消息。我們會在服務器端綁定SUB套接字,在客戶端連接PUB套接字。
* 我們在服務器發送給客戶端的更新事件中加入心跳,這樣客戶端可以知道主機是否已死,然后切換至備機。
* 我們使用雙子星模式的bstar反應堆類來創建主機和備機。雙子星模式中需要有一個“投票”套接字,來協助判定對方節點是否已死。這里我們使用快照請求來作為“投票”。
* 我們將為所有的更新事件添加UUID屬性,它由客戶端生成,服務端會將其發布給所有客戶端。
* 備機將維護一個“待處理列表”,保存來自客戶端、尚未由服務端發布的更新事件;或者反過來,來自服務端、尚未從客戶端收到的更新事件。這個列表從舊到新排列,這樣就能方便地從頂部刪除消息。
我們可以為客戶端設計一個有限狀態機,它有三種狀態:
* 客戶端打開并連接了套接字,然后向服務端發送快照請求。為了避免消息風暴,它只會請求兩次。
* 客戶端等待快照應答,如果獲得了則保存它;如果沒有獲得,則向第二個服務器發送請求。
* 客戶端收到快照,便開始等待更新事件。如果在一定時間內沒有收到服務端響應,則會連接第二個服務端。
客戶端會一直循環下去,可能在程序剛啟動時,部分客戶端會試圖連接主機,部分連接備機,相信雙子星模式會很好地處理這一情況的。
我們可以將客戶端狀態圖繪制出來:

故障恢復的步驟如下:
* 客戶端檢測到主機不再發送心跳,因此轉而連接備機,并請求一份新的快照;
* 備機開始接收快照請求,并檢測到主機死亡,于是開始作為主機運行;
* 備機將待處理列表中的更新事件寫入自身狀態中,然后開始處理快照請求。
當主機恢復連接時:
* 啟動為slave狀態,并作為克隆模式客戶端連接備機;
* 同時,使用SUB套接字從客戶端接收更新事件。
我們做兩點假設:
* 至少有一臺主機會繼續運行。如果兩臺主機都崩潰了,那我們將丟失所有的服務端數據,無法恢復。
* 不同的客戶端不會同時更新同一個鍵值對。客戶端的更新事件會先后到達兩個服務器,因此更新的順序可能會不一致。單個客戶端的更新事件到達兩臺服務器的順序是相同的,所以不用擔心。
下面是整體架構圖:

開始編程之前,我們需要將客戶端重構成一個可復用的類。在ZMQ中寫異步類有時是為了練習如何寫出優雅的代碼,但這里我們確實是希望克隆模式可以成為一種易于使用的程序。上述架構的伸縮性來源于客戶端的正確行為,因此有必要將其封裝成一份API。要在客戶端中進行故障恢復還是比較復雜的,試想一下自由者模式和克隆模式結合起來會是什么樣的吧。
按照我的習慣,我會先寫出一份API的列表,然后加以實現。讓我們假想一個名為clone的API,在其基礎之上編寫克隆模式客戶端API。將代碼封裝為API顯然會提升代碼的穩定性,就以模型5為例,客戶端需要打開三個套接字,端點名稱直接寫在了代碼里。我們可以創建這樣一組API:
```c
// 為每個套接字指定端點
clone_subscribe (clone, "tcp://localhost:5556");
clone_snapshot (clone, "tcp://localhost:5557");
clone_updates (clone, "tcp://localhost:5558");
// 由于有兩個服務端,因此再執行一次
clone_subscribe (clone, "tcp://localhost:5566");
clone_snapshot (clone, "tcp://localhost:5567");
clone_updates (clone, "tcp://localhost:5568");
```
但這種寫法還是比較啰嗦的,因為沒有必要將API內部的一些設計暴露給編程人員。現在我們會使用三個套接字,而將來可能就會使用兩個,或者四個。我們不可能讓所有的應用程序都相應地修改吧?讓我們把這些信息包裝到API中:
```c
// 指定主備服務器端點
clone_connect (clone, "tcp://localhost:5551");
clone_connect (clone, "tcp://localhost:5561");
```
這樣一來代碼就變得非常簡潔,不過也會對現有代碼的內部就夠造成影響。我們需要從一個端點中推算出三個端點。一種方法是假設客戶端和服務端使用三個連續的端點通信,并將這個規則寫入協議;另一個方法是向服務器索取缺少的端點信息。我們使用第一種較為簡單的方法:
* 服務器狀態ROUTER在端點P;
* 服務器更新事件PUB在端點P + 1;
* 服務器更新事件SUB在端點P + 2。
clone類和第四章的flcliapi類很類似,由兩部分組成:
* 一個在后臺運行的異步克隆模式代理。該代理處理所有的I/O操作,實時地和服務器進行通信;
* 一個在前臺應用程序中同步運行的clone類。當你創建了一個clone對象后,它會自動創建后臺的clone線程;當你銷毀clone對象,該后臺線程也會被銷毀。
前臺的clone類會使用inproc管道和后臺的代理進行通信。C語言中,czmq線程會自動為我們創建這個管道。這也是ZMQ多線程編程的常規方式。
如果沒有ZMQ,這種異步的設計將很難處理高壓工作,而ZMQ會讓其變得簡單。編寫出來額代碼會相對比較復雜。我們可以用反應堆的模式來編寫,但這會進一步增加復雜度,且影響應用程序的使用。因此,我們的設計的API將更像是一個能夠和服務器進行通信的鍵值表:
```c
clone_t *clone_new (void);
void clone_destroy (clone_t **self_p);
void clone_connect (clone_t *self, char *address, char *service);
void clone_set (clone_t *self, char *key, char *value);
char *clone_get (clone_t *self, char *key);
```
下面就是克隆模式客戶端模型6的代碼,因為調用了API,所以非常簡短:
**clonecli6: Clone client, Model Six in C**
```
//
// 克隆模式 - 客戶端 - 模型6
//
// 直接編譯,不建類庫
#include "clone.c"
#define SUBTREE "/client/"
int main (void)
{
// 創建分布式哈希表
clone_t *clone = clone_new ();
// 配置
clone_subtree (clone, SUBTREE);
clone_connect (clone, "tcp://localhost", "5556");
clone_connect (clone, "tcp://localhost", "5566");
// 插入隨機鍵值
while (!zctx_interrupted) {
// 生成隨機值
char key [255];
char value [10];
sprintf (key, "%s%d", SUBTREE, randof (10000));
sprintf (value, "%d", randof (1000000));
clone_set (clone, key, value, randof (30));
sleep (1);
}
clone_destroy (&clone);
return 0;
}
```
以下是clone類的實現:
**clone: Clone class in C**
```c
/* =====================================================================
clone - client-side Clone Pattern class
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "clone.h"
// 請求超時時間
#define GLOBAL_TIMEOUT 4000 // msecs
// 判定服務器死亡的時間
#define SERVER_TTL 5000 // msecs
// 服務器數量
#define SERVER_MAX 2
// =====================================================================
// 同步部分,在應用程序線程中工作
// ---------------------------------------------------------------------
// 類結構
struct _clone_t {
zctx_t *ctx; // 上下文
void *pipe; // 和后臺代理間的通信套接字
};
// 該線程用于處理真正的clone類
static void clone_agent (void *args, zctx_t *ctx, void *pipe);
// ---------------------------------------------------------------------
// 構造函數
clone_t *
clone_new (void)
{
clone_t
*self;
self = (clone_t *) zmalloc (sizeof (clone_t));
self->ctx = zctx_new ();
self->pipe = zthread_fork (self->ctx, clone_agent, NULL);
return self;
}
// ---------------------------------------------------------------------
// 析構函數
void
clone_destroy (clone_t **self_p)
{
assert (self_p);
if (*self_p) {
clone_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 在鏈接之前指定快照和更新事件的子樹
// 發送給后臺代理的消息內容為[SUBTREE][subtree]
void clone_subtree (clone_t *self, char *subtree)
{
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "SUBTREE");
zmsg_addstr (msg, subtree);
zmsg_send (&msg, self->pipe);
}
// ---------------------------------------------------------------------
// 連接至新的服務器端點
// 消息內容:[CONNECT][endpoint][service]
void
clone_connect (clone_t *self, char *address, char *service)
{
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "CONNECT");
zmsg_addstr (msg, address);
zmsg_addstr (msg, service);
zmsg_send (&msg, self->pipe);
}
// ---------------------------------------------------------------------
// 設置新值
// 消息內容:[SET][key][value][ttl]
void
clone_set (clone_t *self, char *key, char *value, int ttl)
{
char ttlstr [10];
sprintf (ttlstr, "%d", ttl);
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "SET");
zmsg_addstr (msg, key);
zmsg_addstr (msg, value);
zmsg_addstr (msg, ttlstr);
zmsg_send (&msg, self->pipe);
}
// ---------------------------------------------------------------------
// 取值
// 消息內容:[GET][key]
// 如果沒有clone可用,會返回NULL
char *
clone_get (clone_t *self, char *key)
{
assert (self);
assert (key);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "GET");
zmsg_addstr (msg, key);
zmsg_send (&msg, self->pipe);
zmsg_t *reply = zmsg_recv (self->pipe);
if (reply) {
char *value = zmsg_popstr (reply);
zmsg_destroy (&reply);
return value;
}
return NULL;
}
// =====================================================================
// 異步部分,在后臺運行
// ---------------------------------------------------------------------
// 單個服務端信息
typedef struct {
char *address; // 服務端地址
int port; // 端口
void *snapshot; // 快照套接字
void *subscriber; // 接收更新事件的套接字
uint64_t expiry; // 服務器過期時間
uint requests; // 收到的快照請求數
} server_t;
static server_t *
server_new (zctx_t *ctx, char *address, int port, char *subtree)
{
server_t *self = (server_t *) zmalloc (sizeof (server_t));
zclock_log ("I: adding server %s:%d...", address, port);
self->address = strdup (address);
self->port = port;
self->snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (self->snapshot, "%s:%d", address, port);
self->subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (self->subscriber, "%s:%d", address, port + 1);
zsockopt_set_subscribe (self->subscriber, subtree);
return self;
}
static void
server_destroy (server_t **self_p)
{
assert (self_p);
if (*self_p) {
server_t *self = *self_p;
free (self->address);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// 后臺代理類
// 狀態
#define STATE_INITIAL 0 // 連接之前
#define STATE_SYNCING 1 // 正在同步
#define STATE_ACTIVE 2 // 正在更新
typedef struct {
zctx_t *ctx; // 上下文
void *pipe; // 與主線程通信的套接字
zhash_t *kvmap; // 鍵值表
char *subtree; // 子樹
server_t *server [SERVER_MAX];
uint nbr_servers; // 范圍:0 - SERVER_MAX
uint state; // 當前狀態
uint cur_server; // 當前master,0/1
int64_t sequence; // 鍵值對編號
void *publisher; // 發布更新事件的套接字
} agent_t;
static agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->kvmap = zhash_new ();
self->subtree = strdup ("");
self->state = STATE_INITIAL;
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
return self;
}
static void
agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
int server_nbr;
for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)
server_destroy (&self->server [server_nbr]);
zhash_destroy (&self->kvmap);
free (self->subtree);
free (self);
*self_p = NULL;
}
}
// 若線程被中斷則返回-1
static int
agent_control_message (agent_t *self)
{
zmsg_t *msg = zmsg_recv (self->pipe);
char *command = zmsg_popstr (msg);
if (command == NULL)
return -1;
if (streq (command, "SUBTREE")) {
free (self->subtree);
self->subtree = zmsg_popstr (msg);
}
else
if (streq (command, "CONNECT")) {
char *address = zmsg_popstr (msg);
char *service = zmsg_popstr (msg);
if (self->nbr_servers < SERVER_MAX) {
self->server [self->nbr_servers++] = server_new (
self->ctx, address, atoi (service), self->subtree);
// 廣播更新事件
zsocket_connect (self->publisher, "%s:%d",
address, atoi (service) + 2);
}
else
zclock_log ("E: too many servers (max. %d)", SERVER_MAX);
free (address);
free (service);
}
else
if (streq (command, "SET")) {
char *key = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
char *ttl = zmsg_popstr (msg);
zhash_update (self->kvmap, key, (byte *) value);
zhash_freefn (self->kvmap, key, free);
// 向服務端發送鍵值對
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_set_key (kvmsg, key);
kvmsg_set_uuid (kvmsg);
kvmsg_fmt_body (kvmsg, "%s", value);
kvmsg_set_prop (kvmsg, "ttl", ttl);
kvmsg_send (kvmsg, self->publisher);
kvmsg_destroy (&kvmsg);
puts (key);
free (ttl);
free (key); // 鍵值對實際由哈希表對象控制
}
else
if (streq (command, "GET")) {
char *key = zmsg_popstr (msg);
char *value = zhash_lookup (self->kvmap, key);
if (value)
zstr_send (self->pipe, value);
else
zstr_send (self->pipe, "");
free (key);
free (value);
}
free (command);
zmsg_destroy (&msg);
return 0;
}
// ---------------------------------------------------------------------
// 異步的后臺代理會維護一個服務端池,并處理來自應用程序的請求或應答。
static void
clone_agent (void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new (ctx, pipe);
while (TRUE) {
zmq_pollitem_t poll_set [] = {
{ pipe, 0, ZMQ_POLLIN, 0 },
{ 0, 0, ZMQ_POLLIN, 0 }
};
int poll_timer = -1;
int poll_size = 2;
server_t *server = self->server [self->cur_server];
switch (self->state) {
case STATE_INITIAL:
// 該狀態下,如果有可用服務,會發送快照請求
if (self->nbr_servers > 0) {
zclock_log ("I: 正在等待服務器 %s:%d...",
server->address, server->port);
if (server->requests < 2) {
zstr_sendm (server->snapshot, "ICANHAZ?");
zstr_send (server->snapshot, self->subtree);
server->requests++;
}
server->expiry = zclock_time () + SERVER_TTL;
self->state = STATE_SYNCING;
poll_set [1].socket = server->snapshot;
}
else
poll_size = 1;
break;
case STATE_SYNCING:
// 該狀態下我們從服務器端接收快照內容,若失敗則嘗試其他服務器
poll_set [1].socket = server->snapshot;
break;
case STATE_ACTIVE:
// 該狀態下我們從服務器獲取更新事件,失敗則嘗試其他服務器
poll_set [1].socket = server->subscriber;
break;
}
if (server) {
poll_timer = (server->expiry - zclock_time ())
* ZMQ_POLL_MSEC;
if (poll_timer < 0)
poll_timer = 0;
}
// ------------------------------------------------------------
// poll循環
int rc = zmq_poll (poll_set, poll_size, poll_timer);
if (rc == -1)
break; // 上下文已被關閉
if (poll_set [0].revents & ZMQ_POLLIN) {
if (agent_control_message (self))
break; // 中斷
}
else
if (poll_set [1].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket);
if (!kvmsg)
break; // 中斷
// 任何服務端的消息將重置它的過期時間
server->expiry = zclock_time () + SERVER_TTL;
if (self->state == STATE_SYNCING) {
// 保存快照內容
server->requests = 0;
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
self->sequence = kvmsg_sequence (kvmsg);
self->state = STATE_ACTIVE;
zclock_log ("I: received from %s:%d snapshot=%d",
server->address, server->port,
(int) self->sequence);
kvmsg_destroy (&kvmsg);
}
else
kvmsg_store (&kvmsg, self->kvmap);
}
else
if (self->state == STATE_ACTIVE) {
// 丟棄過期的更新事件
if (kvmsg_sequence (kvmsg) > self->sequence) {
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: received from %s:%d update=%d",
server->address, server->port,
(int) self->sequence);
}
else
kvmsg_destroy (&kvmsg);
}
}
else {
// 服務端已死,嘗試其他服務器
zclock_log ("I: 服務器 %s:%d 無響應",
server->address, server->port);
self->cur_server = (self->cur_server + 1) % self->nbr_servers;
self->state = STATE_INITIAL;
}
}
agent_destroy (&self);
}
```
最后是克隆服務器的模型6代碼:
**clonesrv6: Clone server, Model Six in C**
```c
//
// 克隆模式 - 服務端 - 模型6
//
// 直接編譯,不建類庫
#include "bstar.c"
#include "kvmsg.c"
// bstar反應堆API
static int s_snapshots (zloop_t *loop, void *socket, void *args);
static int s_collector (zloop_t *loop, void *socket, void *args);
static int s_flush_ttl (zloop_t *loop, void *socket, void *args);
static int s_send_hugz (zloop_t *loop, void *socket, void *args);
static int s_new_master (zloop_t *loop, void *unused, void *args);
static int s_new_slave (zloop_t *loop, void *unused, void *args);
static int s_subscriber (zloop_t *loop, void *socket, void *args);
// 服務端屬性
typedef struct {
zctx_t *ctx; // 上下文
zhash_t *kvmap; // 存放鍵值對
bstar_t *bstar; // bstar反應堆核心
int64_t sequence; // 更新事件編號
int port; // 主端口
int peer; // 同伴端口
void *publisher; // 發布更新事件的端口
void *collector; // 接收客戶端更新事件的端口
void *subscriber; // 接受同伴更新事件的端口
zlist_t *pending; // 延遲的更新事件
Bool primary; // 是否為主機
Bool master; // 是否為master
Bool slave; // 是否為slave
} clonesrv_t;
int main (int argc, char *argv [])
{
clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
if (argc == 2 && streq (argv [1], "-p")) {
zclock_log ("I: 作為主機master運行,正在等待備機slave連接。");
self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003",
"tcp://localhost:5004");
bstar_voter (self->bstar, "tcp://*:5556", ZMQ_ROUTER,
s_snapshots, self);
self->port = 5556;
self->peer = 5566;
self->primary = TRUE;
}
else
if (argc == 2 && streq (argv [1], "-b")) {
zclock_log ("I: 作為備機slave運行,正在等待主機master連接。");
self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004",
"tcp://localhost:5003");
bstar_voter (self->bstar, "tcp://*:5566", ZMQ_ROUTER,
s_snapshots, self);
self->port = 5566;
self->peer = 5556;
self->primary = FALSE;
}
else {
printf ("Usage: clonesrv4 { -p | -b }\n");
free (self);
exit (0);
}
// 主機將成為master
if (self->primary)
self->kvmap = zhash_new ();
self->ctx = zctx_new ();
self->pending = zlist_new ();
bstar_set_verbose (self->bstar, TRUE);
// 設置克隆服務端套接字
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
self->collector = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);
// 作為克隆客戶端連接同伴
self->subscriber = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_connect (self->subscriber, "tcp://localhost:%d", self->peer + 1);
// 注冊狀態事件處理器
bstar_new_master (self->bstar, s_new_master, self);
bstar_new_slave (self->bstar, s_new_slave, self);
// 注冊bstar反應堆其他事件處理器
zloop_reader (bstar_zloop (self->bstar), self->collector, s_collector, self);
zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self);
zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self);
// 開啟bstar反應堆
bstar_start (self->bstar);
// 中斷,終止。
while (zlist_size (self->pending)) {
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
kvmsg_destroy (&kvmsg);
}
zlist_destroy (&self->pending);
bstar_destroy (&self->bstar);
zhash_destroy (&self->kvmap);
zctx_destroy (&self->ctx);
free (self);
return 0;
}
// ---------------------------------------------------------------------
// 發送快照內容
static int s_send_single (char *key, void *data, void *args);
// 請求方信息
typedef struct {
void *socket; // ROUTER套接字
zframe_t *identity; // 請求放標識
char *subtree; // 子樹
} kvroute_t;
static int
s_snapshots (zloop_t *loop, void *snapshot, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zframe_t *identity = zframe_recv (snapshot);
if (identity) {
// 請求在消息的第二幀中
char *request = zstr_recv (snapshot);
char *subtree = NULL;
if (streq (request, "ICANHAZ?")) {
free (request);
subtree = zstr_recv (snapshot);
}
else
printf ("E: 錯誤的請求,正在退出……\n");
if (subtree) {
// 發送狀態快照
kvroute_t routing = { snapshot, identity, subtree };
zhash_foreach (self->kvmap, s_send_single, &routing);
// 發送終止消息,以及消息編號
zclock_log ("I: 正在發送快照,版本號:%d", (int) self->sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
free (subtree);
}
}
return 0;
}
// 每次發送一個快照鍵值對
static int
s_send_single (char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
// 先發送接收方的地址
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
}
return 0;
}
// ---------------------------------------------------------------------
// 從客戶端收集更新事件
// 如果我們是master,則將該事件寫入kvmap對象;
// 如果我們是slave,則將其寫入延遲隊列
static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg);
static int
s_collector (zloop_t *loop, void *collector, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_recv (collector);
kvmsg_dump (kvmsg);
if (kvmsg) {
if (self->master) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
if (ttl)
kvmsg_set_prop (kvmsg, "ttl",
"%" PRId64, zclock_time () + ttl * 1000);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 正在發布更新事件:%d", (int) self->sequence);
}
else {
// 如果我們已經從master中獲得了該事件,則丟棄該消息
if (s_was_pending (self, kvmsg))
kvmsg_destroy (&kvmsg);
else
zlist_append (self->pending, kvmsg);
}
}
return 0;
}
// 如果消息已在延遲隊列中,則刪除它并返回TRUE
static int
s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg)
{
kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending);
while (held) {
if (memcmp (kvmsg_uuid (kvmsg),
kvmsg_uuid (held), sizeof (uuid_t)) == 0) {
zlist_remove (self->pending, held);
return TRUE;
}
held = (kvmsg_t *) zlist_next (self->pending);
}
return FALSE;
}
// ---------------------------------------------------------------------
// 刪除帶有過期時間的瞬間值
static int s_flush_single (char *key, void *data, void *args);
static int
s_flush_ttl (zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zhash_foreach (self->kvmap, s_flush_single, args);
return 0;
}
// 如果鍵值對過期,則進行刪除操作,并廣播該事件
static int
s_flush_single (char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time () >= ttl) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 正在發布刪除事件:%d", (int) self->sequence);
}
return 0;
}
// ---------------------------------------------------------------------
// 發送心跳
static int
s_send_hugz (zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "HUGZ");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_destroy (&kvmsg);
return 0;
}
// ---------------------------------------------------------------------
// 狀態改變事件處理函數
// 我們將轉變為master
//
// 備機先將延遲列表中的事件更新到自己的快照中,
// 并開始接收客戶端發來的快照請求。
static int
s_new_master (zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
self->master = TRUE;
self->slave = FALSE;
zloop_cancel (bstar_zloop (self->bstar), self->subscriber);
// 應用延遲列表中的事件
while (zlist_size (self->pending)) {
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 正在發布延遲列表中的更新事件:%d", (int) self->sequence);
}
return 0;
}
// ---------------------------------------------------------------------
// 正在切換為slave
static int
s_new_slave (zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zhash_destroy (&self->kvmap);
self->master = FALSE;
self->slave = TRUE;
zloop_reader (bstar_zloop (self->bstar), self->subscriber,
s_subscriber, self);
return 0;
}
// ---------------------------------------------------------------------
// 從同伴主機(master)接收更新事件;
// 接收該類更新事件時,我們一定是slave。
static int
s_subscriber (zloop_t *loop, void *subscriber, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
// 獲取快照,如果需要的話。
if (self->kvmap == NULL) {
self->kvmap = zhash_new ();
void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:%d", self->peer);
zclock_log ("I: 正在請求快照:tcp://localhost:%d",
self->peer);
zstr_send (snapshot, "ICANHAZ?");
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // 中斷
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_destroy (&kvmsg);
break; // 完成
}
kvmsg_store (&kvmsg, self->kvmap);
}
zclock_log ("I: 收到快照,版本號:%d", (int) self->sequence);
zsocket_destroy (self->ctx, snapshot);
}
// 查找并刪除
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
return 0;
if (strneq (kvmsg_key (kvmsg), "HUGZ")) {
if (!s_was_pending (self, kvmsg)) {
// 如果master的更新事件比客戶端的事件早到,則將master的事件存入延遲列表,
// 當收到客戶端更新事件時會將其從列表中清除。
zlist_append (self->pending, kvmsg_dup (kvmsg));
}
// 如果更新事件比kvmap版本高,則應用它
if (kvmsg_sequence (kvmsg) > self->sequence) {
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 收到更新事件:%d", (int) self->sequence);
}
else
kvmsg_destroy (&kvmsg);
}
else
kvmsg_destroy (&kvmsg);
return 0;
}
```
這段程序只有幾百行,但還是花了一些時間來進行調通的。這個模型中包含了故障恢復,瞬間值,子樹等等。雖然我們前期設計得很完備,但要在多個套接字之間進行調試還是很困難的。以下是我的工作方式:
* 由于使用了反應堆(bstar,建立在zloop之上),我們節省了大量的代碼,讓程序變得簡潔明了。整個服務以一個線程運行,因此不會出現跨線程的問題。只需將結構指針(self)傳遞給所有的處理器即可。此外,使用發應堆后可以讓代碼更為模塊化,易于重用。
* 我們逐個模塊進行調試,只有某個模塊能夠正常運行時才會進入下一步。由于使用了四五個套接字,因此調試的工作量是很大的。我直接將調試信息輸出到了屏幕上,因為實在沒有必要專門開一個調試器來工作。
* 因為一直在使用valgrind工具進行測試,因此我能確定程序沒有內存泄漏的問題。在C語言中,內存泄漏是我們非常關心的問題,因為沒有什么垃圾回收機制可以幫你完成。正確地使用像kvmsg、czmq之類的抽象層可以很好地避免內存泄漏。
這段程序肯定還會存在一些BUG,部分讀者可能會幫助我調試和修復,我在此表示感謝。
測試模型6時,先開啟主機和備機,再打開一組客戶端,順序隨意。隨機地中止某個服務進程,如果程序設計得是正確的,那客戶端獲得的數據應該都是一致的。
#### 克隆模式協議
花費了那么多精力來開發一套可靠的發布-訂閱模式機制,我們當然希望將來能夠方便地在其基礎之上進行擴展。較好的方法是將其編寫為一個協議,這樣就能讓各種語言來實現它了。
我們將其稱為“集群化哈希表協議”,這是一個能夠跨集群地進行鍵值哈希表管理,提供了多客戶端的通信機制;客戶端可以只操作一個子樹的數據,包括更新和定義瞬間值。
* http://rfc.zeromq.org/spec:12