<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ## 第五章 高級發布-訂閱模式 第三章和第四章講述了ZMQ中請求-應答模式的一些高級用法。如果你已經能夠徹底理解了,那我要說聲恭喜。這一章我們會關注發布-訂閱模式,使用上層模式封裝,提升ZMQ發布-訂閱模式的性能、可靠性、狀態同步及安全機制。 本章涉及的內容有: * 處理慢訂閱者(自殺的蝸牛模式) * 高速訂閱者(黑箱模式) * 構建一個共享鍵值緩存(克隆模式) ### 檢測慢訂閱者(自殺的蝸牛模式) 在使用發布-訂閱模式的時候,最常見的問題之一是如何處理響應較慢的訂閱者。理想狀況下,發布者能以全速發送消息給訂閱者,但現實中,訂閱者會需要對消息做較長時間的處理,或者寫得不夠好,無法跟上發布者的腳步。 如何處理慢訂閱者?最好的方法當然是讓訂閱者高效起來,不過這需要額外的工作。以下是一些處理慢訂閱者的方法: * **在發布者中貯存消息**。這是Gmail的做法,如果過去的幾小時里沒有閱讀郵件的話,它會把郵件保存起來。但在高吞吐量的應用中,發布者堆積消息往往會導致內存溢出,最終崩潰。特別是當同是有多個訂閱者時,或者無法用磁盤來做一個緩沖,情況就會變得更為復雜。 * **在訂閱者中貯存消息**。這種做法要好的多,其實ZMQ默認的行為就是這樣的。如果非得有一個人會因為內存溢出而崩潰,那也只會是訂閱者,而非發布者,這挺公平的。然而,這種做法只對瞬間消息量很大的應用才合理,訂閱者只是一時處理不過來,但最終會趕上進度。但是,這還是沒有解決訂閱者速度過慢的問題。 * **暫停發送消息**。這也是Gmail的做法,當我的郵箱容量超過7.554GB時,新的郵件就會被Gmail拒收或丟棄。這種做法對發布者來說很有益,ZMQ中若設置了閾值(HWM),其默認行為也就是這樣的。但是,我們仍不能解決慢訂閱者的問題,我們只是讓消息變得斷斷續續而已。 * **斷開與滿訂閱者的連接**。這是hotmail的做法,如果連續兩周沒有登錄,它就會斷開,這也是為什么我正在使用第十五個hotmail郵箱。不過這種方案在ZMQ里是行不通的,因為對于發布者而言,訂閱者是不可見的,無法做相應處理。 看來沒有一種經典的方式可以滿足我們的需求,所以我們就要進行創新了。我們可以讓訂閱者自殺,而不僅僅是斷開連接。這就是“自殺的蝸牛”模式。當訂閱者發現自身運行得過慢時(對于慢速的定義應該是一個配置項,當達到這個標準時就大聲地喊出來吧,讓程序員知道),它會哀嚎一聲,然后自殺。 訂閱者如何檢測自身速度過慢呢?一種方式是為消息進行編號,并在發布者端設置閾值。當訂閱者發現消息編號不連續時,它就知道事情不對勁了。這里的閾值就是訂閱者自殺的值。 這種方案有兩個問題:一、如果我們連接的多個發布者,我們要如何為消息進行編號呢?解決方法是為每一個發布者設定一個唯一的編號,作為消息編號的一部分。二、如果訂閱者使用ZMQ_SUBSRIBE選項對消息進行了過濾,那么我們精心設計的消息編號機制就毫無用處了。 有些情形不會進行消息的過濾,所以消息編號還是行得通的。不過更為普遍的解決方案是,發布者為消息標注時間戳,當訂閱者收到消息時會檢測這個時間戳,如果其差別達到某一個值,就發出警報并自殺。 當訂閱者有自身的客戶端或服務協議,需要保證最大延遲時間時,自殺的蝸牛模式會很合適。撤銷一個訂閱者也許并不是最周全的方案,但至少不會引發后續的問題。如果訂閱者收到了過時的消息,那可能會對數據造成進一步的破壞,而且很難被發現。 以下是自殺的蝸牛模式的最簡實現: **suisnail: Suicidal Snail in C** ```c // // 自殺的蝸牛模式 // #include "czmq.h" // --------------------------------------------------------------------- // 該訂閱者會連接至發布者,接收所有的消息, // 運行過程中它會暫停一會兒,模擬復雜的運算過程, // 當發現收到的消息超過1秒的延遲時,就自殺。 #define MAX_ALLOWED_DELAY 1000 // 毫秒 static void subscriber (void *args, zctx_t *ctx, void *pipe) { // 訂閱所有消息 void *subscriber = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (subscriber, "tcp://localhost:5556"); // 獲取并處理消息 while (1) { char *string = zstr_recv (subscriber); int64_t clock; int terms = sscanf (string, "%" PRId64, &clock); assert (terms == 1); free (string); // 自殺邏輯 if (zclock_time () - clock > MAX_ALLOWED_DELAY) { fprintf (stderr, "E: 訂閱者無法跟進, 取消中\n"); break; } // 工作一定時間 zclock_sleep (1 + randof (2)); } zstr_send (pipe, "訂閱者中止"); } // --------------------------------------------------------------------- // 發布者每毫秒發送一條用時間戳標記的消息 static void publisher (void *args, zctx_t *ctx, void *pipe) { // 準備發布者 void *publisher = zsocket_new (ctx, ZMQ_PUB); zsocket_bind (publisher, "tcp://*:5556"); while (1) { // 發送當前時間(毫秒)給訂閱者 char string [20]; sprintf (string, "%" PRId64, zclock_time ()); zstr_send (publisher, string); char *signal = zstr_recv_nowait (pipe); if (signal) { free (signal); break; } zclock_sleep (1); // 等待1毫秒 } } // 下面的代碼會啟動一個訂閱者和一個發布者,當訂閱者死亡時停止運行 // int main (void) { zctx_t *ctx = zctx_new (); void *pubpipe = zthread_fork (ctx, publisher, NULL); void *subpipe = zthread_fork (ctx, subscriber, NULL); free (zstr_recv (subpipe)); zstr_send (pubpipe, "break"); zclock_sleep (100); zctx_destroy (&ctx); return 0; } ``` 幾點說明: * 示例程序中的消息包含了系統當前的時間戳(毫秒)。在現實應用中,你應該使用時間戳作為消息頭,并提供消息內容。 * 示例程序中的發布者和訂閱者是同一個進程的兩個線程。在現實應用中,他們應該是兩個不同的進程。示例中這么做只是為了演示的方便 ### 高速訂閱者(黑箱模式) 發布-訂閱模式的一個典型應用場景是大規模分布式數據處理。如要處理從證券市場上收集到的數據,可以在證券交易系統上設置一個發布者,獲取價格信息,并發送給一組訂閱者。如果我們有很多訂閱者,我們可以使用TCP。如果訂閱者到達一定的量,那我們就應該使用可靠的廣播協議,如pgm。 假設我們的發布者每秒產生10萬條100個字節的消息。在剔除了不需要的市場信息后,這個比率還是比較合理的。現在我們需要記錄一天的數據(8小時約有250GB),再將其傳入一個模擬網絡,即一組訂閱者。雖然10萬條數據對ZMQ來說很容易處理,但我們需要更高的速度。 假設我們有多臺機器,一臺做發布者,其他的做訂閱者。這些機器都是8核的,發布者那臺有12核。 在我們開始發布消息時,有兩點需要注意: 1. 即便只是處理很少的數據,訂閱者仍有可能跟不上發布者的速度; 1. 當處理到6M/s的數據量時,發布者和訂閱者都有可能達到極限。 首先,我們需要將訂閱者設計為一種多線程的處理程序,這樣我們就能在一個線程中讀取消息,使用其他線程來處理消息。一般來說,我們對每種消息的處理方式都是不同的。這樣一來,訂閱者可以對收到的消息進行一次過濾,如根據頭信息來判別。當消息滿足某些條件,訂閱者會將消息交給worker處理。用ZMQ的語言來說,訂閱者會將消息轉發給worker來處理。 這樣一來,訂閱者看上去就像是一個隊列裝置,我們可以用各種方式去連接隊列裝置和worker。如我們建立單向的通信,每個worker都是相同的,可以使用PUSH和PULL套接字,分發的工作就交給ZMQ吧。這是最簡單也是最快速的方式: ![1](https://github.com/anjuke/zguide-cn/raw/master/images/chapter5_1.png) 訂閱者和發布者之間的通信使用TCP或PGM協議,訂閱者和worker的通信由于是在同一個進程中完成的,所以使用inproc協議。 下面我們看看如何突破瓶頸。由于訂閱者是單線程的,當它的CPU占用率達到100%時,它無法使用其他的核心。單線程程序總是會遇到瓶頸的,不管是2M、6M還是更多。我們需要將工作量分配到不同的線程中去,并發地執行。 很多高性能產品使用的方案是分片,就是將工作量拆分成獨立并行的流。如,一半的專題數據由一個流媒體傳輸,另一半由另一個流媒體傳輸。我們可以建立更多的流媒體,但如果CPU核心數不變,那就沒有必要了。 讓我們看看如何將工作量分片為兩個流: ![2](https://github.com/anjuke/zguide-cn/raw/master/images/chapter5_2.png) 要讓兩個流全速工作,需要這樣配置ZMQ: * 使用兩個I/O線程,而不是一個; * 使用兩個獨立的網絡接口; * 每個I/O線程綁定至一個網絡接口; * 兩個訂閱者線程,分別綁定至一個核心; * 使用兩個SUB套接字; * 剩余的核心供worker使用; * worker線程同時綁定至兩個訂閱者線程的PUSH套接字。 創建的線程數量應和CPU核心數一致,如果我們建立的線程數量超過核心數,那其處理速度只會減少。另外,開放多個I/O線程也是沒有必要的。 ### 共享鍵值緩存(克隆模式) 發布-訂閱模式和無線電廣播有些類似,在你收聽之前發送的消息你將無從得知,收到消息的多少又會取決于你的接收能力。讓人吃驚的是,對于那些追求完美的工程師來說,這種機器恰恰符合他們的需求,且廣為傳播,成為現實生活中分發消息的最佳機制。想想非死不可、推特、BBS新聞、體育新聞等應用就知道了。 但是,在很多情形下,可靠的發布-訂閱模式同樣是有價值的。正如我們討論請求-應答模式一樣,我們會根據“故障”來定義“可靠性”,下面幾項便是發布-訂閱模式中可能發生的故障: * 訂閱者連接太慢,因此沒有收到發布者最初發送的消息; * 訂閱者速度太慢,同樣會丟失消息; * 訂閱者可能會斷開,其間的消息也會丟失。 還有一些情況我們碰到的比較少,但不是沒有: * 訂閱者崩潰、重啟,從而丟失了所有已收到的消息; * 訂閱者處理消息的速度過慢,導致消息在隊列中堆砌并溢出; * 因網絡過載而丟失消息(特別是PGM協議下的連接); * 網速過慢,消息在發布者處溢出,從而崩潰。 其實還會有其他出錯的情況,只是以上這些在現實應用中是比較典型的。 我們已經有方法解決上面的某些問題了,比如對于慢速訂閱者可以使用自殺的蝸牛模式。但是,對于其他的問題,我們最后能有一個可復用的框架來編寫可靠的發布-訂閱模式。 難點在于,我們并不知道目標應用程序會怎樣處理這些數據。它們會進行過濾、只處理一部分消息嗎?它們是否會將消息記錄起來供日后使用?它們是否會將消息轉發給其下的worker進行處理?需要考慮的情況實在太多了,每種情況都有其所謂的可靠性。 所以,我們將問題抽象出來,供多種應用程序使用。這種抽象應用我們稱之為共享的鍵值緩存,它的功能是通過唯一的鍵名存儲二進制數據塊。 不要將這個抽象應用和分布式哈希表混淆起來,它是用來解決節點在分布式網絡中相連接的問題的;也不要和分布式鍵值表混淆,它更像是一個NoSQL數據庫。我們要建立的應用是將內存中的狀態可靠地傳遞給一組客戶端,它要做到的是: * 客戶端可以隨時加入網絡,并獲得服務端當前的狀態; * 任何客戶端都可以改變鍵值緩存(插入、更新、刪除); * 將這種變化以最短的延遲可靠地傳達給所有的客戶端; * 能夠處理大量的客戶端,成百上千。 克隆模式的要點在于客戶端會反過來和服務端進行通信,這在簡單的發布-訂閱模式中并不常見。所以我這里使用“服務端”、“客戶端”而不是“發布者”、“訂閱者”這兩個詞。我們會使用發布-訂閱模式作為核心消息模式,不過還需要夾雜其他模式。 #### 分發鍵值更新事件 我們會分階段實施克隆模式。首先,我們看看如何從服務器發送鍵值更新事件給所有的客戶端。我們將第一章中使用的天氣服務模型進行改造,以鍵值對的方式發送信息,并讓客戶端使用哈希表來保存: ![3](https://github.com/anjuke/zguide-cn/raw/master/images/chapter5_3.png) 以下是服務端代碼: **clonesrv1: Clone server, Model One in C** ```c // // 克隆模式服務端模型1 // // 讓我們直接編譯,不生成類庫 #include "kvsimple.c" int main (void) { // 準備上下文和PUB套接字 zctx_t *ctx = zctx_new (); void *publisher = zsocket_new (ctx, ZMQ_PUB); zsocket_bind (publisher, "tcp://*:5556"); zclock_sleep (200); zhash_t *kvmap = zhash_new (); int64_t sequence = 0; srandom ((unsigned) time (NULL)); while (!zctx_interrupted) { // 使用鍵值對分發消息 kvmsg_t *kvmsg = kvmsg_new (++sequence); kvmsg_fmt_key (kvmsg, "%d", randof (10000)); kvmsg_fmt_body (kvmsg, "%d", randof (1000000)); kvmsg_send (kvmsg, publisher); kvmsg_store (&kvmsg, kvmap); } printf (" 已中止\n已發送 %d 條消息\n", (int) sequence); zhash_destroy (&kvmap); zctx_destroy (&ctx); return 0; } ``` 以下是客戶端代碼: **clonecli1: Clone client, Model One in C** ```c // // 克隆模式客戶端模型1 // // 讓我們直接編譯,不生成類庫 #include "kvsimple.c" int main (void) { // 準備上下文和SUB套接字 zctx_t *ctx = zctx_new (); void *updates = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (updates, "tcp://localhost:5556"); zhash_t *kvmap = zhash_new (); int64_t sequence = 0; while (TRUE) { kvmsg_t *kvmsg = kvmsg_recv (updates); if (!kvmsg) break; // 中斷 kvmsg_store (&kvmsg, kvmap); sequence++; } printf (" 已中斷\n收到 %d 條消息\n", (int) sequence); zhash_destroy (&kvmap); zctx_destroy (&ctx); return 0; } ``` 幾點說明: * 所有復雜的工作都在kvmsg類中完成了,這個類能夠處理鍵值對類型的消息對象,其實質上是一個ZMQ多幀消息,共有三幀:鍵(ZMQ字符串)、編號(64位,按字節順序排列)、二進制體(保存所有附加信息)。 * 服務端隨機生成消息,使用四位數作為鍵,這樣可以模擬大量而不是過量的哈希表(1萬個條目)。 * 服務端綁定套接字后會等待200毫秒,以避免訂閱者連接延遲而丟失數據的問題。我們會在后面的模型中解決這一點。 * 我們使用“發布者”和“訂閱者”來命名程序中使用的套接字,這樣可以避免和后續模型中的其他套接字發生混淆。 以下是kvmsg的代碼,已經經過了精簡: **kvsimple: Key-value message class in C** ```c /* ===================================================================== kvsimple - simple key-value message class for example applications --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "kvsimple.h" #include "zlist.h" // 鍵是一個短字符串 #define KVMSG_KEY_MAX 255 // 消息被格式化成三幀 // frame 0: 鍵(ZMQ字符串) // frame 1: 編號(8個字節,按順序排列) // frame 2: 內容(二進制數據塊) #define FRAME_KEY 0 #define FRAME_SEQ 1 #define FRAME_BODY 2 #define KVMSG_FRAMES 3 // 類結構 struct _kvmsg { // 消息中某幀是否存在 int present [KVMSG_FRAMES]; // 對應的ZMQ消息幀 zmq_msg_t frame [KVMSG_FRAMES]; // 將鍵轉換為C語言字符串 char key [KVMSG_KEY_MAX + 1]; }; // --------------------------------------------------------------------- // 構造函數,設置編號 kvmsg_t * kvmsg_new (int64_t sequence) { kvmsg_t *self; self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t)); kvmsg_set_sequence (self, sequence); return self; } // --------------------------------------------------------------------- // 析構函數 // 釋放消息中的幀,可供zhash_freefn()函數調用 void kvmsg_free (void *ptr) { if (ptr) { kvmsg_t *self = (kvmsg_t *) ptr; // 銷毀消息中的幀 int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) if (self->present [frame_nbr]) zmq_msg_close (&self->frame [frame_nbr]); // 釋放對象本身 free (self); } } void kvmsg_destroy (kvmsg_t **self_p) { assert (self_p); if (*self_p) { kvmsg_free (*self_p); *self_p = NULL; } } // --------------------------------------------------------------------- // 從套接字中讀取鍵值消息,返回kvmsg實例 kvmsg_t * kvmsg_recv (void *socket) { assert (socket); kvmsg_t *self = kvmsg_new (0); // 讀取所有幀,出錯則銷毀對象 int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) { if (self->present [frame_nbr]) zmq_msg_close (&self->frame [frame_nbr]); zmq_msg_init (&self->frame [frame_nbr]); self->present [frame_nbr] = 1; if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) { kvmsg_destroy (&self); break; } // 驗證多幀消息 int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0; if (zsockopt_rcvmore (socket) != rcvmore) { kvmsg_destroy (&self); break; } } return self; } // --------------------------------------------------------------------- // 向套接字發送鍵值對消息,不檢驗消息幀的內容 void kvmsg_send (kvmsg_t *self, void *socket) { assert (self); assert (socket); int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) { zmq_msg_t copy; zmq_msg_init (&copy); if (self->present [frame_nbr]) zmq_msg_copy (&copy, &self->frame [frame_nbr]); zmq_sendmsg (socket, &copy, (frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0); zmq_msg_close (&copy); } } // --------------------------------------------------------------------- // 從消息中獲取鍵值,不存在則返回NULL char * kvmsg_key (kvmsg_t *self) { assert (self); if (self->present [FRAME_KEY]) { if (!*self->key) { size_t size = zmq_msg_size (&self->frame [FRAME_KEY]); if (size > KVMSG_KEY_MAX) size = KVMSG_KEY_MAX; memcpy (self->key, zmq_msg_data (&self->frame [FRAME_KEY]), size); self->key [size] = 0; } return self->key; } else return NULL; } // --------------------------------------------------------------------- // 返回消息的編號 int64_t kvmsg_sequence (kvmsg_t *self) { assert (self); if (self->present [FRAME_SEQ]) { assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8); byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]); int64_t sequence = ((int64_t) (source [0]) << 56) + ((int64_t) (source [1]) << 48) + ((int64_t) (source [2]) << 40) + ((int64_t) (source [3]) << 32) + ((int64_t) (source [4]) << 24) + ((int64_t) (source [5]) << 16) + ((int64_t) (source [6]) << 8) + (int64_t) (source [7]); return sequence; } else return 0; } // --------------------------------------------------------------------- // 返回消息內容,不存在則返回NULL byte * kvmsg_body (kvmsg_t *self) { assert (self); if (self->present [FRAME_BODY]) return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]); else return NULL; } // --------------------------------------------------------------------- // 返回消息內容的大小 size_t kvmsg_size (kvmsg_t *self) { assert (self); if (self->present [FRAME_BODY]) return zmq_msg_size (&self->frame [FRAME_BODY]); else return 0; } // --------------------------------------------------------------------- // 設置消息的鍵 void kvmsg_set_key (kvmsg_t *self, char *key) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_KEY]; if (self->present [FRAME_KEY]) zmq_msg_close (msg); zmq_msg_init_size (msg, strlen (key)); memcpy (zmq_msg_data (msg), key, strlen (key)); self->present [FRAME_KEY] = 1; } // --------------------------------------------------------------------- // 設置消息的編號 void kvmsg_set_sequence (kvmsg_t *self, int64_t sequence) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_SEQ]; if (self->present [FRAME_SEQ]) zmq_msg_close (msg); zmq_msg_init_size (msg, 8); byte *source = zmq_msg_data (msg); source [0] = (byte) ((sequence >> 56) & 255); source [1] = (byte) ((sequence >> 48) & 255); source [2] = (byte) ((sequence >> 40) & 255); source [3] = (byte) ((sequence >> 32) & 255); source [4] = (byte) ((sequence >> 24) & 255); source [5] = (byte) ((sequence >> 16) & 255); source [6] = (byte) ((sequence >> 8) & 255); source [7] = (byte) ((sequence) & 255); self->present [FRAME_SEQ] = 1; } // --------------------------------------------------------------------- // 設置消息內容 void kvmsg_set_body (kvmsg_t *self, byte *body, size_t size) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_BODY]; if (self->present [FRAME_BODY]) zmq_msg_close (msg); self->present [FRAME_BODY] = 1; zmq_msg_init_size (msg, size); memcpy (zmq_msg_data (msg), body, size); } // --------------------------------------------------------------------- // 使用printf()格式設置消息鍵 void kvmsg_fmt_key (kvmsg_t *self, char *format, ...) { char value [KVMSG_KEY_MAX + 1]; va_list args; assert (self); va_start (args, format); vsnprintf (value, KVMSG_KEY_MAX, format, args); va_end (args); kvmsg_set_key (self, value); } // --------------------------------------------------------------------- // 使用springf()格式設置消息內容 void kvmsg_fmt_body (kvmsg_t *self, char *format, ...) { char value [255 + 1]; va_list args; assert (self); va_start (args, format); vsnprintf (value, 255, format, args); va_end (args); kvmsg_set_body (self, (byte *) value, strlen (value)); } // --------------------------------------------------------------------- // 若kvmsg結構的鍵值均存在,則存入哈希表; // 如果kvmsg結構已沒有引用,則自動銷毀和釋放。 void kvmsg_store (kvmsg_t **self_p, zhash_t *hash) { assert (self_p); if (*self_p) { kvmsg_t *self = *self_p; assert (self); if (self->present [FRAME_KEY] && self->present [FRAME_BODY]) { zhash_update (hash, kvmsg_key (self), self); zhash_freefn (hash, kvmsg_key (self), kvmsg_free); } *self_p = NULL; } } // --------------------------------------------------------------------- // 將消息內容打印至標準錯誤輸出,用以調試和跟蹤 void kvmsg_dump (kvmsg_t *self) { if (self) { if (!self) { fprintf (stderr, "NULL"); return; } size_t size = kvmsg_size (self); byte *body = kvmsg_body (self); fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self)); fprintf (stderr, "[key:%s]", kvmsg_key (self)); fprintf (stderr, "[size:%zd] ", size); int char_nbr; for (char_nbr = 0; char_nbr < size; char_nbr++) fprintf (stderr, "%02X", body [char_nbr]); fprintf (stderr, "\n"); } else fprintf (stderr, "NULL message\n"); } // --------------------------------------------------------------------- // 測試用例 int kvmsg_test (int verbose) { kvmsg_t *kvmsg; printf (" * kvmsg: "); // 準備上下文和套接字 zctx_t *ctx = zctx_new (); void *output = zsocket_new (ctx, ZMQ_DEALER); int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc"); assert (rc == 0); void *input = zsocket_new (ctx, ZMQ_DEALER); rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc"); assert (rc == 0); zhash_t *kvmap = zhash_new (); // 測試簡單消息的發送和接受 kvmsg = kvmsg_new (1); kvmsg_set_key (kvmsg, "key"); kvmsg_set_body (kvmsg, (byte *) "body", 4); if (verbose) kvmsg_dump (kvmsg); kvmsg_send (kvmsg, output); kvmsg_store (&kvmsg, kvmap); kvmsg = kvmsg_recv (input); if (verbose) kvmsg_dump (kvmsg); assert (streq (kvmsg_key (kvmsg), "key")); kvmsg_store (&kvmsg, kvmap); // 關閉并銷毀所有對象 zhash_destroy (&kvmap); zctx_destroy (&ctx); printf ("OK\n"); return 0; } ``` 我們會在下文編寫一個更為完整的kvmsg類,可以用到現實環境中。 客戶端和服務端都會維護一個哈希表,但這個模型需要所有的客戶端都比服務端啟動得早,而且不能崩潰,這顯然不能滿足可靠性的要求。 #### 創建快照 為了讓后續連接的(或從故障中恢復的)客戶端能夠獲取服務器上的狀態信息,需要讓它在連接時獲取一份快照。正如我們將“消息”的概念簡化為“已編號的鍵值對”,我們也可以將“狀態”簡化為“一個哈希表”。為獲取服務端狀態,客戶端會打開一個REQ套接字進行請求: ![4](https://github.com/anjuke/zguide-cn/raw/master/images/chapter5_4.png) 我們需要考慮時間的問題,因為生成快照是需要一定時間的,我們需要知道應從哪個更新事件開始更新快照,服務端是不知道何時有更新事件的。一種方法是先開始訂閱消息,收到第一個消息之后向服務端請求“將該條更新之前的所有內容發送給”。這樣一來,服務器需要為每一次更新保存一份快照,這顯然是不現實的。 所以,我們會在客戶端用以下方式進行同步: * 客戶端開始訂閱服務器的更新事件,然后請求一份快照。這樣就能保證這份快照是在上一次更新事件之后產生的。 * 客戶端開始等待服務器的快照,并將更新事件保存在隊列中,做法很簡單,不要從套接字中讀取消息就可以了,ZMQ會自動將這些消息保存起來,這時不應設置閾值(HWM)。 * 當客戶端獲取到快照后,它將再次開始讀取更新事件,但是需要丟棄那些早于快照生成時間的事件。如快照生成時包含了200次更新,那客戶端會從第201次更新開始讀取。 * 隨后,客戶端就會用更新事件去更新自身的狀態了。 這是一個比較簡單的模型,因為它用到了ZMQ消息隊列的機制。服務端代碼如下: **clonesrv2: Clone server, Model Two in C** ```c // // 克隆模式 - 服務端 - 模型2 // // 讓我們直接編譯,不創建類庫 #include "kvsimple.c" static int s_send_single (char *key, void *data, void *args); static void state_manager (void *args, zctx_t *ctx, void *pipe); int main (void) { // 準備套接字和上下文 zctx_t *ctx = zctx_new (); void *publisher = zsocket_new (ctx, ZMQ_PUB); zsocket_bind (publisher, "tcp://*:5557"); int64_t sequence = 0; srandom ((unsigned) time (NULL)); // 開啟狀態管理器,并等待同步信號 void *updates = zthread_fork (ctx, state_manager, NULL); free (zstr_recv (updates)); while (!zctx_interrupted) { // 分發鍵值消息 kvmsg_t *kvmsg = kvmsg_new (++sequence); kvmsg_fmt_key (kvmsg, "%d", randof (10000)); kvmsg_fmt_body (kvmsg, "%d", randof (1000000)); kvmsg_send (kvmsg, publisher); kvmsg_send (kvmsg, updates); kvmsg_destroy (&kvmsg); } printf (" 已中斷\n已發送 %d 條消息\n", (int) sequence); zctx_destroy (&ctx); return 0; } // 快照請求方信息 typedef struct { void *socket; // 用于發送快照的ROUTER套接字 zframe_t *identity; // 請求方的標識 } kvroute_t; // 發送快照中單個鍵值對 // 使用kvmsg對象作為載體 static int s_send_single (char *key, void *data, void *args) { kvroute_t *kvroute = (kvroute_t *) args; // 先發送接收方標識 zframe_send (&kvroute->identity, kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE); kvmsg_t *kvmsg = (kvmsg_t *) data; kvmsg_send (kvmsg, kvroute->socket); return 0; } // 該線程維護服務端狀態,并處理快照請求。 // static void state_manager (void *args, zctx_t *ctx, void *pipe) { zhash_t *kvmap = zhash_new (); zstr_send (pipe, "READY"); void *snapshot = zsocket_new (ctx, ZMQ_ROUTER); zsocket_bind (snapshot, "tcp://*:5556"); zmq_pollitem_t items [] = { { pipe, 0, ZMQ_POLLIN, 0 }, { snapshot, 0, ZMQ_POLLIN, 0 } }; int64_t sequence = 0; // 當前快照版本 while (!zctx_interrupted) { int rc = zmq_poll (items, 2, -1); if (rc == -1 && errno == ETERM) break; // 上下文異常 // 等待主線程的更新事件 if (items [0].revents & ZMQ_POLLIN) { kvmsg_t *kvmsg = kvmsg_recv (pipe); if (!kvmsg) break; // 中斷 sequence = kvmsg_sequence (kvmsg); kvmsg_store (&kvmsg, kvmap); } // 執行快照請求 if (items [1].revents & ZMQ_POLLIN) { zframe_t *identity = zframe_recv (snapshot); if (!identity) break; // 中斷 // 請求內容在第二幀中 char *request = zstr_recv (snapshot); if (streq (request, "ICANHAZ?")) free (request); else { printf ("E: 錯誤的請求,程序中止\n"); break; } // 發送快照給客戶端 kvroute_t routing = { snapshot, identity }; // 逐項發送 zhash_foreach (kvmap, s_send_single, &routing); // 發送結束標識,內含快照版本號 printf ("正在發送快照,版本號 %d\n", (int) sequence); zframe_send (&identity, snapshot, ZFRAME_MORE); kvmsg_t *kvmsg = kvmsg_new (sequence); kvmsg_set_key (kvmsg, "KTHXBAI"); kvmsg_set_body (kvmsg, (byte *) "", 0); kvmsg_send (kvmsg, snapshot); kvmsg_destroy (&kvmsg); } } zhash_destroy (&kvmap); } ``` 以下是客戶端代碼: **clonecli2: Clone client, Model Two in C** ```c // // 克隆模式 - 客戶端 - 模型2 // // 讓我們直接編譯,不生成類庫 #include "kvsimple.c" int main (void) { // 準備上下文和SUB套接字 zctx_t *ctx = zctx_new (); void *snapshot = zsocket_new (ctx, ZMQ_DEALER); zsocket_connect (snapshot, "tcp://localhost:5556"); void *subscriber = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (subscriber, "tcp://localhost:5557"); zhash_t *kvmap = zhash_new (); // 獲取快照 int64_t sequence = 0; zstr_send (snapshot, "ICANHAZ?"); while (TRUE) { kvmsg_t *kvmsg = kvmsg_recv (snapshot); if (!kvmsg) break; // 中斷 if (streq (kvmsg_key (kvmsg), "KTHXBAI")) { sequence = kvmsg_sequence (kvmsg); printf ("已獲取快照,版本號=%d\n", (int) sequence); kvmsg_destroy (&kvmsg); break; // 完成 } kvmsg_store (&kvmsg, kvmap); } // 應用隊列中的更新事件,丟棄過時事件 while (!zctx_interrupted) { kvmsg_t *kvmsg = kvmsg_recv (subscriber); if (!kvmsg) break; // 中斷 if (kvmsg_sequence (kvmsg) > sequence) { sequence = kvmsg_sequence (kvmsg); kvmsg_store (&kvmsg, kvmap); } else kvmsg_destroy (&kvmsg); } zhash_destroy (&kvmap); zctx_destroy (&ctx); return 0; } ``` 幾點說明: * 客戶端使用兩個線程,一個用來生成隨機的更新事件,另一個用來管理狀態。兩者之間使用PAIR套接字通信。可能你會考慮使用SUB套接字,但是“慢連接”的問題會影響到程序運行。PAIR套接字會讓兩個線程嚴格同步的。 * 我們在updates套接字上設置了閾值(HWM),避免更新服務內存溢出。在inproc協議的連接中,閾值是兩端套接字閾值的加和,所以要分別設置。 * 客戶端比較簡單,用C語言編寫,大約60行代碼。大多數工作都在kvmsg類中完成了,不過總的來說,克隆模式實現起來還是比較簡單的。 * 我們沒有用特別的方式來序列化狀態內容。鍵值對用kvmsg對象表示,保存在一個哈希表中。在不同的時間請求狀態時會得到不同的快照。 * 我們假設客戶端只和一個服務進行通信,而且服務必須是正常運行的。我們暫不考慮如何從服務崩潰的情形中恢復過來。 現在,這兩段程序都還沒有真正地工作起來,但已經能夠正確地同步狀態了。這是一個多種消息模式的混合體:進程內的PAIR、發布-訂閱、ROUTER-DEALER等。 #### 重發鍵值更新事件 第二個模型中,鍵值更新事件都來自于服務器,構成了一個中心化的模型。但是我們需要的是一個能夠在客戶端進行更新的緩存,并能同步到其他客戶端中。這時,服務端只是一個無狀態的中間件,帶來的好處有: * 我們不用太過關心服務端的可靠性,因為即使它崩潰了,我們仍能從客戶端中獲取完整的數據。 * 我們可以使用鍵值緩存在動態節點之間分享數據。 客戶端的鍵值更新事件會通過PUSH-PULL套接字傳達給服務端: ![5](https://github.com/anjuke/zguide-cn/raw/master/images/chapter5_5.png) 我們為什么不讓客戶端直接將更新信息發送給其他客戶端呢?雖然這樣做可以減少延遲,但是就無法為更新事件添加自增的唯一編號了。很多應用程序都需要更新事件以某種方式排序,只有將消息發給服務端,由服務端分發更新消息,才能保證更新事件的順序。 有了唯一的編號后,客戶端還能檢測到更多的故障:網絡堵塞或隊列溢出。如果客戶端發現消息輸入流有一段空白,它能采取措施。可能你會覺得此時讓客戶端通知服務端,讓它重新發送丟失的信息,可以解決問題。但事實上沒有必要這么做。消息流的空擋表示網絡狀況不好,如果再進行這樣的請求,只會讓事情變得更糟。所以一般的做法是由客戶端發出警告,并停止運行,等到有專人來維護后再繼續工作。 我們開始創建在客戶端進行狀態更新的模型。以下是客戶端代碼: **clonesrv3: Clone server, Model Three in C** ```c // // 克隆模式 服務端 模型3 // // 直接編譯,不創建類庫 #include "kvsimple.c" static int s_send_single (char *key, void *data, void *args); // 快照請求方信息 typedef struct { void *socket; // ROUTER套接字 zframe_t *identity; // 請求方標識 } kvroute_t; int main (void) { // 準備上下文和套接字 zctx_t *ctx = zctx_new (); void *snapshot = zsocket_new (ctx, ZMQ_ROUTER); zsocket_bind (snapshot, "tcp://*:5556"); void *publisher = zsocket_new (ctx, ZMQ_PUB); zsocket_bind (publisher, "tcp://*:5557"); void *collector = zsocket_new (ctx, ZMQ_PULL); zsocket_bind (collector, "tcp://*:5558"); int64_t sequence = 0; zhash_t *kvmap = zhash_new (); zmq_pollitem_t items [] = { { collector, 0, ZMQ_POLLIN, 0 }, { snapshot, 0, ZMQ_POLLIN, 0 } }; while (!zctx_interrupted) { int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC); // 執行來自客戶端的更新事件 if (items [0].revents & ZMQ_POLLIN) { kvmsg_t *kvmsg = kvmsg_recv (collector); if (!kvmsg) break; // 中斷 kvmsg_set_sequence (kvmsg, ++sequence); kvmsg_send (kvmsg, publisher); kvmsg_store (&kvmsg, kvmap); printf ("I: 發布更新事件 %5d\n", (int) sequence); } // 響應快照請求 if (items [1].revents & ZMQ_POLLIN) { zframe_t *identity = zframe_recv (snapshot); if (!identity) break; // 中斷 // 請求內容在消息的第二幀中 char *request = zstr_recv (snapshot); if (streq (request, "ICANHAZ?")) free (request); else { printf ("E: 錯誤的請求,程序中止\n"); break; } // 發送快照 kvroute_t routing = { snapshot, identity }; // 逐條發送 zhash_foreach (kvmap, s_send_single, &routing); // 發送結束標識和編號 printf ("I: 正在發送快照,版本號:%d\n", (int) sequence); zframe_send (&identity, snapshot, ZFRAME_MORE); kvmsg_t *kvmsg = kvmsg_new (sequence); kvmsg_set_key (kvmsg, "KTHXBAI"); kvmsg_set_body (kvmsg, (byte *) "", 0); kvmsg_send (kvmsg, snapshot); kvmsg_destroy (&kvmsg); } } printf (" 已中斷\n已處理 %d 條消息\n", (int) sequence); zhash_destroy (&kvmap); zctx_destroy (&ctx); return 0; } // 發送一條鍵值對狀態給套接字,使用kvmsg對象保存鍵值對 static int s_send_single (char *key, void *data, void *args) { kvroute_t *kvroute = (kvroute_t *) args; // Send identity of recipient first zframe_send (&kvroute->identity, kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE); kvmsg_t *kvmsg = (kvmsg_t *) data; kvmsg_send (kvmsg, kvroute->socket); return 0; } ``` 以下是客戶端代碼: **clonecli3: Clone client, Model Three in C** ```c // // 克隆模式 - 客戶端 - 模型3 // // 直接編譯,不創建類庫 #include "kvsimple.c" int main (void) { // 準備上下文和SUB套接字 zctx_t *ctx = zctx_new (); void *snapshot = zsocket_new (ctx, ZMQ_DEALER); zsocket_connect (snapshot, "tcp://localhost:5556"); void *subscriber = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (subscriber, "tcp://localhost:5557"); void *publisher = zsocket_new (ctx, ZMQ_PUSH); zsocket_connect (publisher, "tcp://localhost:5558"); zhash_t *kvmap = zhash_new (); srandom ((unsigned) time (NULL)); // 獲取狀態快照 int64_t sequence = 0; zstr_send (snapshot, "ICANHAZ?"); while (TRUE) { kvmsg_t *kvmsg = kvmsg_recv (snapshot); if (!kvmsg) break; // 中斷 if (streq (kvmsg_key (kvmsg), "KTHXBAI")) { sequence = kvmsg_sequence (kvmsg); printf ("I: 已收到快照,版本號:%d\n", (int) sequence); kvmsg_destroy (&kvmsg); break; // 完成 } kvmsg_store (&kvmsg, kvmap); } int64_t alarm = zclock_time () + 1000; while (!zctx_interrupted) { zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } }; int tickless = (int) ((alarm - zclock_time ())); if (tickless < 0) tickless = 0; int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC); if (rc == -1) break; // 上下文被關閉 if (items [0].revents & ZMQ_POLLIN) { kvmsg_t *kvmsg = kvmsg_recv (subscriber); if (!kvmsg) break; // 中斷 // 丟棄過時消息,包括心跳 if (kvmsg_sequence (kvmsg) > sequence) { sequence = kvmsg_sequence (kvmsg); kvmsg_store (&kvmsg, kvmap); printf ("I: 收到更新事件:%d\n", (int) sequence); } else kvmsg_destroy (&kvmsg); } // 創建一個隨機的更新事件 if (zclock_time () >= alarm) { kvmsg_t *kvmsg = kvmsg_new (0); kvmsg_fmt_key (kvmsg, "%d", randof (10000)); kvmsg_fmt_body (kvmsg, "%d", randof (1000000)); kvmsg_send (kvmsg, publisher); kvmsg_destroy (&kvmsg); alarm = zclock_time () + 1000; } } printf (" 已準備\n收到 %d 條消息\n", (int) sequence); zhash_destroy (&kvmap); zctx_destroy (&ctx); return 0; } ``` 幾點說明: * 服務端整合為一個線程,負責收集來自客戶端的更新事件并轉發給其他客戶端。它使用PULL套接字獲取更新事件,ROUTER套接字處理快照請求,以及PUB套接字發布更新事件。 * 客戶端會每隔1秒左右發送隨機的更新事件給服務端,現實中這一動作由應用程序觸發。 #### 子樹克隆 現實中的鍵值緩存會越變越多,而客戶端可能只會需要部分緩存。我們可以使用子樹的方式來實現:客戶端在發送快照請求時告訴服務端它需要的子樹,在訂閱更新事件時也指明子樹。 關于子樹的語法有很多,一種是“分層路徑”結構,另一種是“主題樹”: * 分層路徑:/some/list/of/paths * 主題樹:some.list.of.topics 這里我們會使用分層路徑結構,以此擴展服務端和客戶端,進行子樹操作。維護多個子樹其實并不太困難,因此我們不在這里演示。 下面是服務端代碼,由模型3衍化而來: **clonesrv4: Clone server, Model Four in C** ```c // // 克隆模式 服務端 模型4 // // 直接編譯,不創建類庫 #include "kvsimple.c" static int s_send_single (char *key, void *data, void *args); // 快照請求方信息 typedef struct { void *socket; // ROUTER套接字 zframe_t *identity; // 請求方標識 char *subtree; // 指定的子樹 } kvroute_t; int main (void) { // 準備上下文和套接字 zctx_t *ctx = zctx_new (); void *snapshot = zsocket_new (ctx, ZMQ_ROUTER); zsocket_bind (snapshot, "tcp://*:5556"); void *publisher = zsocket_new (ctx, ZMQ_PUB); zsocket_bind (publisher, "tcp://*:5557"); void *collector = zsocket_new (ctx, ZMQ_PULL); zsocket_bind (collector, "tcp://*:5558"); int64_t sequence = 0; zhash_t *kvmap = zhash_new (); zmq_pollitem_t items [] = { { collector, 0, ZMQ_POLLIN, 0 }, { snapshot, 0, ZMQ_POLLIN, 0 } }; while (!zctx_interrupted) { int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC); // 執行來自客戶端的更新事件 if (items [0].revents & ZMQ_POLLIN) { kvmsg_t *kvmsg = kvmsg_recv (collector); if (!kvmsg) break; // Interrupted kvmsg_set_sequence (kvmsg, ++sequence); kvmsg_send (kvmsg, publisher); kvmsg_store (&kvmsg, kvmap); printf ("I: 發布更新事件 %5d\n", (int) sequence); } // 響應快照請求 if (items [1].revents & ZMQ_POLLIN) { zframe_t *identity = zframe_recv (snapshot); if (!identity) break; // Interrupted // 請求內容在消息的第二幀中 char *request = zstr_recv (snapshot); char *subtree = NULL; if (streq (request, "ICANHAZ?")) { free (request); subtree = zstr_recv (snapshot); } else { printf ("E: 錯誤的請求,程序中止\n"); break; } // 發送快照 kvroute_t routing = { snapshot, identity, subtree }; // 逐條發送 zhash_foreach (kvmap, s_send_single, &routing); // 發送結束標識和編號 printf ("I: 正在發送快照,版本號:%d\n", (int) sequence); zframe_send (&identity, snapshot, ZFRAME_MORE); kvmsg_t *kvmsg = kvmsg_new (sequence); kvmsg_set_key (kvmsg, "KTHXBAI"); kvmsg_set_body (kvmsg, (byte *) subtree, 0); kvmsg_send (kvmsg, snapshot); kvmsg_destroy (&kvmsg); free (subtree); } } printf (" 已中斷\n已處理 %d 條消息\n", (int) sequence); zhash_destroy (&kvmap); zctx_destroy (&ctx); return 0; } // 發送一條鍵值對狀態給套接字,使用kvmsg對象保存鍵值對 static int s_send_single (char *key, void *data, void *args) { kvroute_t *kvroute = (kvroute_t *) args; kvmsg_t *kvmsg = (kvmsg_t *) data; if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg)) && memcmp (kvroute->subtree, kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) { // 先發送接收方的標識 zframe_send (&kvroute->identity, kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE); kvmsg_send (kvmsg, kvroute->socket); } return 0; } ``` 下面是客戶端代碼: **clonecli4: Clone client, Model Four in C** ```c // // 克隆模式 - 客戶端 - 模型4 // // 直接編譯,不創建類庫 #include "kvsimple.c" #define SUBTREE "/client/" int main (void) { // 準備上下文和SUB套接字 zctx_t *ctx = zctx_new (); void *snapshot = zsocket_new (ctx, ZMQ_DEALER); zsocket_connect (snapshot, "tcp://localhost:5556"); void *subscriber = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (subscriber, "tcp://localhost:5557"); zsockopt_set_subscribe (subscriber, SUBTREE); void *publisher = zsocket_new (ctx, ZMQ_PUSH); zsocket_connect (publisher, "tcp://localhost:5558"); zhash_t *kvmap = zhash_new (); srandom ((unsigned) time (NULL)); // 獲取狀態快照 int64_t sequence = 0; zstr_sendm (snapshot, "ICANHAZ?"); zstr_send (snapshot, SUBTREE); while (TRUE) { kvmsg_t *kvmsg = kvmsg_recv (snapshot); if (!kvmsg) break; // Interrupted if (streq (kvmsg_key (kvmsg), "KTHXBAI")) { sequence = kvmsg_sequence (kvmsg); printf ("I: 已收到快照,版本號:%d\n", (int) sequence); kvmsg_destroy (&kvmsg); break; // Done } kvmsg_store (&kvmsg, kvmap); } int64_t alarm = zclock_time () + 1000; while (!zctx_interrupted) { zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } }; int tickless = (int) ((alarm - zclock_time ())); if (tickless < 0) tickless = 0; int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC); if (rc == -1) break; // 上下文被關閉 if (items [0].revents & ZMQ_POLLIN) { kvmsg_t *kvmsg = kvmsg_recv (subscriber); if (!kvmsg) break; // 中斷 // 丟棄過時消息,包括心跳 if (kvmsg_sequence (kvmsg) > sequence) { sequence = kvmsg_sequence (kvmsg); kvmsg_store (&kvmsg, kvmap); printf ("I: 收到更新事件:%d\n", (int) sequence); } else kvmsg_destroy (&kvmsg); } // 創建一個隨機的更新事件 if (zclock_time () >= alarm) { kvmsg_t *kvmsg = kvmsg_new (0); kvmsg_fmt_key (kvmsg, "%s%d", SUBTREE, randof (10000)); kvmsg_fmt_body (kvmsg, "%d", randof (1000000)); kvmsg_send (kvmsg, publisher); kvmsg_destroy (&kvmsg); alarm = zclock_time () + 1000; } } printf (" 已準備\n收到 %d 條消息\n", (int) sequence); zhash_destroy (&kvmap); zctx_destroy (&ctx); return 0; } ``` #### 瞬間值 瞬間值指的是那些會立刻過期的值。如果你用克隆模式搭建一個類似DNS的服務時,就可以用瞬間值來模擬動態DNS解析了。當節點連接網絡,對外發布它的地址,并不斷地更新地址。如果節點斷開連接,則它的地址也會失效。 瞬間值可以和會話(session)聯系起來,當會話結束時,瞬間值也就失效了。克隆模式中,會話是由客戶端定義的,并會在客戶端斷開連接時消亡。 更簡單的方法是為每一個瞬間值設定一個過期時間,客戶端會不斷延長這個時間,當斷開連接時這個時間將得不到更新,服務器就會自動將其刪除。 我們會用這種簡單的方法來實現瞬間值,因為太過復雜的方法可能不值當,它們的差別僅在性能上體現。如果客戶端有很多瞬間值,那為每個值設定過期時間是恰當的;如果瞬間值到達一定的量,那最好還是將其和會話相關聯,統一進行過期處理。 首先,我們需要設法在鍵值對消息中加入過期時間。我們可以增加一個消息幀,但這樣一來每當我們需要增加消息內容時就需要修改kvmsg類庫了,這并不合適。所以,我們一次性增加一個“屬性”消息幀,用于添加不同的消息屬性。 其次,我們需要設法刪除這條數據。目前為止服務端和客戶端會盲目地增改哈希表中的數據,我們可以這樣定義:當消息的值是空的,則表示刪除這個鍵的數據。 下面是一個更為完整的kvmsg類代碼,它實現了“屬性”幀,以及一個UUID幀,我們后面會用到。該類還會負責處理值為空的消息,達到刪除的目的: **kvmsg: Key-value message class - full in C** ```c /* ===================================================================== kvmsg - key-value message class for example applications --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "kvmsg.h" #include <uuid/uuid.h> #include "zlist.h" // 鍵是短字符串 #define KVMSG_KEY_MAX 255 // 消息包含五幀 // frame 0: 鍵(ZMQ字符串) // frame 1: 編號(8個字節,按順序排列) // frame 2: UUID(二進制塊,16個字節) // frame 3: 屬性(ZMQ字符串) // frame 4: 值(二進制塊) #define FRAME_KEY 0 #define FRAME_SEQ 1 #define FRAME_UUID 2 #define FRAME_PROPS 3 #define FRAME_BODY 4 #define KVMSG_FRAMES 5 // 類結構 struct _kvmsg { // 幀是否存在 int present [KVMSG_FRAMES]; // 對應消息幀 zmq_msg_t frame [KVMSG_FRAMES]; // 鍵,C語言字符串格式 char key [KVMSG_KEY_MAX + 1]; // 屬性列表,key=value形式 zlist_t *props; size_t props_size; }; // 將屬性列表序列化為字符串 static void s_encode_props (kvmsg_t *self) { zmq_msg_t *msg = &self->frame [FRAME_PROPS]; if (self->present [FRAME_PROPS]) zmq_msg_close (msg); zmq_msg_init_size (msg, self->props_size); char *prop = zlist_first (self->props); char *dest = (char *) zmq_msg_data (msg); while (prop) { strcpy (dest, prop); dest += strlen (prop); *dest++ = '\n'; prop = zlist_next (self->props); } self->present [FRAME_PROPS] = 1; } // 從字符串中解析屬性列表 static void s_decode_props (kvmsg_t *self) { zmq_msg_t *msg = &self->frame [FRAME_PROPS]; self->props_size = 0; while (zlist_size (self->props)) free (zlist_pop (self->props)); size_t remainder = zmq_msg_size (msg); char *prop = (char *) zmq_msg_data (msg); char *eoln = memchr (prop, '\n', remainder); while (eoln) { *eoln = 0; zlist_append (self->props, strdup (prop)); self->props_size += strlen (prop) + 1; remainder -= strlen (prop) + 1; prop = eoln + 1; eoln = memchr (prop, '\n', remainder); } } // --------------------------------------------------------------------- // 構造函數,指定消息編號 kvmsg_t * kvmsg_new (int64_t sequence) { kvmsg_t *self; self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t)); self->props = zlist_new (); kvmsg_set_sequence (self, sequence); return self; } // --------------------------------------------------------------------- // 析構函數 // 釋放內存函數,供zhash_free_fn()調用 void kvmsg_free (void *ptr) { if (ptr) { kvmsg_t *self = (kvmsg_t *) ptr; // 釋放所有消息幀 int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) if (self->present [frame_nbr]) zmq_msg_close (&self->frame [frame_nbr]); // 釋放屬性列表 while (zlist_size (self->props)) free (zlist_pop (self->props)); zlist_destroy (&self->props); // 釋放對象本身 free (self); } } void kvmsg_destroy (kvmsg_t **self_p) { assert (self_p); if (*self_p) { kvmsg_free (*self_p); *self_p = NULL; } } // --------------------------------------------------------------------- // 復制kvmsg對象 kvmsg_t * kvmsg_dup (kvmsg_t *self) { kvmsg_t *kvmsg = kvmsg_new (0); int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) { if (self->present [frame_nbr]) { zmq_msg_t *src = &self->frame [frame_nbr]; zmq_msg_t *dst = &kvmsg->frame [frame_nbr]; zmq_msg_init_size (dst, zmq_msg_size (src)); memcpy (zmq_msg_data (dst), zmq_msg_data (src), zmq_msg_size (src)); kvmsg->present [frame_nbr] = 1; } } kvmsg->props = zlist_copy (self->props); return kvmsg; } // --------------------------------------------------------------------- // 從套接字總讀取鍵值對,返回kvmsg實例 kvmsg_t * kvmsg_recv (void *socket) { assert (socket); kvmsg_t *self = kvmsg_new (0); // 讀取所有幀,若有異常則直接返回空 int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) { if (self->present [frame_nbr]) zmq_msg_close (&self->frame [frame_nbr]); zmq_msg_init (&self->frame [frame_nbr]); self->present [frame_nbr] = 1; if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) { kvmsg_destroy (&self); break; } // 驗證多幀消息 int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0; if (zsockopt_rcvmore (socket) != rcvmore) { kvmsg_destroy (&self); break; } } if (self) s_decode_props (self); return self; } // --------------------------------------------------------------------- // 向套接字發送鍵值對消息,空消息也發送 void kvmsg_send (kvmsg_t *self, void *socket) { assert (self); assert (socket); s_encode_props (self); int frame_nbr; for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) { zmq_msg_t copy; zmq_msg_init (&copy); if (self->present [frame_nbr]) zmq_msg_copy (&copy, &self->frame [frame_nbr]); zmq_sendmsg (socket, &copy, (frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0); zmq_msg_close (&copy); } } // --------------------------------------------------------------------- // 返回消息的鍵 char * kvmsg_key (kvmsg_t *self) { assert (self); if (self->present [FRAME_KEY]) { if (!*self->key) { size_t size = zmq_msg_size (&self->frame [FRAME_KEY]); if (size > KVMSG_KEY_MAX) size = KVMSG_KEY_MAX; memcpy (self->key, zmq_msg_data (&self->frame [FRAME_KEY]), size); self->key [size] = 0; } return self->key; } else return NULL; } // --------------------------------------------------------------------- // 返回消息的編號 int64_t kvmsg_sequence (kvmsg_t *self) { assert (self); if (self->present [FRAME_SEQ]) { assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8); byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]); int64_t sequence = ((int64_t) (source [0]) << 56) + ((int64_t) (source [1]) << 48) + ((int64_t) (source [2]) << 40) + ((int64_t) (source [3]) << 32) + ((int64_t) (source [4]) << 24) + ((int64_t) (source [5]) << 16) + ((int64_t) (source [6]) << 8) + (int64_t) (source [7]); return sequence; } else return 0; } // --------------------------------------------------------------------- // 返回消息的UUID byte * kvmsg_uuid (kvmsg_t *self) { assert (self); if (self->present [FRAME_UUID] && zmq_msg_size (&self->frame [FRAME_UUID]) == sizeof (uuid_t)) return (byte *) zmq_msg_data (&self->frame [FRAME_UUID]); else return NULL; } // --------------------------------------------------------------------- // 返回消息的內容 byte * kvmsg_body (kvmsg_t *self) { assert (self); if (self->present [FRAME_BODY]) return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]); else return NULL; } // --------------------------------------------------------------------- // 返回消息內容的長度 size_t kvmsg_size (kvmsg_t *self) { assert (self); if (self->present [FRAME_BODY]) return zmq_msg_size (&self->frame [FRAME_BODY]); else return 0; } // --------------------------------------------------------------------- // 設置消息的鍵 void kvmsg_set_key (kvmsg_t *self, char *key) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_KEY]; if (self->present [FRAME_KEY]) zmq_msg_close (msg); zmq_msg_init_size (msg, strlen (key)); memcpy (zmq_msg_data (msg), key, strlen (key)); self->present [FRAME_KEY] = 1; } // --------------------------------------------------------------------- // 設置消息的編號 void kvmsg_set_sequence (kvmsg_t *self, int64_t sequence) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_SEQ]; if (self->present [FRAME_SEQ]) zmq_msg_close (msg); zmq_msg_init_size (msg, 8); byte *source = zmq_msg_data (msg); source [0] = (byte) ((sequence >> 56) & 255); source [1] = (byte) ((sequence >> 48) & 255); source [2] = (byte) ((sequence >> 40) & 255); source [3] = (byte) ((sequence >> 32) & 255); source [4] = (byte) ((sequence >> 24) & 255); source [5] = (byte) ((sequence >> 16) & 255); source [6] = (byte) ((sequence >> 8) & 255); source [7] = (byte) ((sequence) & 255); self->present [FRAME_SEQ] = 1; } // --------------------------------------------------------------------- // 生成并設置消息的UUID void kvmsg_set_uuid (kvmsg_t *self) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_UUID]; uuid_t uuid; uuid_generate (uuid); if (self->present [FRAME_UUID]) zmq_msg_close (msg); zmq_msg_init_size (msg, sizeof (uuid)); memcpy (zmq_msg_data (msg), uuid, sizeof (uuid)); self->present [FRAME_UUID] = 1; } // --------------------------------------------------------------------- // 設置消息的內容 void kvmsg_set_body (kvmsg_t *self, byte *body, size_t size) { assert (self); zmq_msg_t *msg = &self->frame [FRAME_BODY]; if (self->present [FRAME_BODY]) zmq_msg_close (msg); self->present [FRAME_BODY] = 1; zmq_msg_init_size (msg, size); memcpy (zmq_msg_data (msg), body, size); } // --------------------------------------------------------------------- // 使用printf()格式設置消息的鍵 void kvmsg_fmt_key (kvmsg_t *self, char *format, ...) { char value [KVMSG_KEY_MAX + 1]; va_list args; assert (self); va_start (args, format); vsnprintf (value, KVMSG_KEY_MAX, format, args); va_end (args); kvmsg_set_key (self, value); } // --------------------------------------------------------------------- // 使用printf()格式設置消息內容 void kvmsg_fmt_body (kvmsg_t *self, char *format, ...) { char value [255 + 1]; va_list args; assert (self); va_start (args, format); vsnprintf (value, 255, format, args); va_end (args); kvmsg_set_body (self, (byte *) value, strlen (value)); } // --------------------------------------------------------------------- // 獲取消息屬性,無則返回空字符串 char * kvmsg_get_prop (kvmsg_t *self, char *name) { assert (strchr (name, '=') == NULL); char *prop = zlist_first (self->props); size_t namelen = strlen (name); while (prop) { if (strlen (prop) > namelen && memcmp (prop, name, namelen) == 0 && prop [namelen] == '=') return prop + namelen + 1; prop = zlist_next (self->props); } return ""; } // --------------------------------------------------------------------- // 設置消息屬性 // 屬性名稱不能包含=號,值的最大長度是255 void kvmsg_set_prop (kvmsg_t *self, char *name, char *format, ...) { assert (strchr (name, '=') == NULL); char value [255 + 1]; va_list args; assert (self); va_start (args, format); vsnprintf (value, 255, format, args); va_end (args); // 分配空間 char *prop = malloc (strlen (name) + strlen (value) + 2); // 刪除已存在的屬性 sprintf (prop, "%s=", name); char *existing = zlist_first (self->props); while (existing) { if (memcmp (prop, existing, strlen (prop)) == 0) { self->props_size -= strlen (existing) + 1; zlist_remove (self->props, existing); free (existing); break; } existing = zlist_next (self->props); } // 添加新屬性 strcat (prop, value); zlist_append (self->props, prop); self->props_size += strlen (prop) + 1; } // --------------------------------------------------------------------- // 在哈希表中保存kvmsg對象 // 當kvmsg對象不再被使用時進行釋放操作; // 若傳入的值為空,則刪除該對象。 void kvmsg_store (kvmsg_t **self_p, zhash_t *hash) { assert (self_p); if (*self_p) { kvmsg_t *self = *self_p; assert (self); if (kvmsg_size (self)) { if (self->present [FRAME_KEY] && self->present [FRAME_BODY]) { zhash_update (hash, kvmsg_key (self), self); zhash_freefn (hash, kvmsg_key (self), kvmsg_free); } } else zhash_delete (hash, kvmsg_key (self)); *self_p = NULL; } } // --------------------------------------------------------------------- // 將消息內容輸出到標準錯誤輸出 void kvmsg_dump (kvmsg_t *self) { if (self) { if (!self) { fprintf (stderr, "NULL"); return; } size_t size = kvmsg_size (self); byte *body = kvmsg_body (self); fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self)); fprintf (stderr, "[key:%s]", kvmsg_key (self)); fprintf (stderr, "[size:%zd] ", size); if (zlist_size (self->props)) { fprintf (stderr, "["); char *prop = zlist_first (self->props); while (prop) { fprintf (stderr, "%s;", prop); prop = zlist_next (self->props); } fprintf (stderr, "]"); } int char_nbr; for (char_nbr = 0; char_nbr < size; char_nbr++) fprintf (stderr, "%02X", body [char_nbr]); fprintf (stderr, "\n"); } else fprintf (stderr, "NULL message\n"); } // --------------------------------------------------------------------- // 測試用例 int kvmsg_test (int verbose) { kvmsg_t *kvmsg; printf (" * kvmsg: "); // 準備上下文和套接字 zctx_t *ctx = zctx_new (); void *output = zsocket_new (ctx, ZMQ_DEALER); int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc"); assert (rc == 0); void *input = zsocket_new (ctx, ZMQ_DEALER); rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc"); assert (rc == 0); zhash_t *kvmap = zhash_new (); // 測試簡單消息的收發 kvmsg = kvmsg_new (1); kvmsg_set_key (kvmsg, "key"); kvmsg_set_uuid (kvmsg); kvmsg_set_body (kvmsg, (byte *) "body", 4); if (verbose) kvmsg_dump (kvmsg); kvmsg_send (kvmsg, output); kvmsg_store (&kvmsg, kvmap); kvmsg = kvmsg_recv (input); if (verbose) kvmsg_dump (kvmsg); assert (streq (kvmsg_key (kvmsg), "key")); kvmsg_store (&kvmsg, kvmap); // 測試帶有屬性的消息的收發 kvmsg = kvmsg_new (2); kvmsg_set_prop (kvmsg, "prop1", "value1"); kvmsg_set_prop (kvmsg, "prop2", "value1"); kvmsg_set_prop (kvmsg, "prop2", "value2"); kvmsg_set_key (kvmsg, "key"); kvmsg_set_uuid (kvmsg); kvmsg_set_body (kvmsg, (byte *) "body", 4); assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2")); if (verbose) kvmsg_dump (kvmsg); kvmsg_send (kvmsg, output); kvmsg_destroy (&kvmsg); kvmsg = kvmsg_recv (input); if (verbose) kvmsg_dump (kvmsg); assert (streq (kvmsg_key (kvmsg), "key")); assert (streq (kvmsg_get_prop (kvmsg, "prop2"), "value2")); kvmsg_destroy (&kvmsg); // 關閉并銷毀所有對象 zhash_destroy (&kvmap); zctx_destroy (&ctx); printf ("OK\n"); return 0; } ``` 客戶端模型5和模型4沒有太大區別,只是kvmsg類庫變了。在更新消息的時候還需要添加一個過期時間的屬性: ```c kvmsg_set_prop (kvmsg, "ttl", "%d", randof (30)); ``` 服務端模型5有較大的變化,我們會用反應堆來代替輪詢,這樣就能混合處理定時事件和套接字事件了,只是在C語言中是比較麻煩的。下面是代碼: **clonesrv5: Clone server, Model Five in C** ```c // // 克隆模式 - 服務端 - 模型5 // // 直接編譯,不建類庫 #include "kvmsg.c" // 反應堆處理器 static int s_snapshots (zloop_t *loop, void *socket, void *args); static int s_collector (zloop_t *loop, void *socket, void *args); static int s_flush_ttl (zloop_t *loop, void *socket, void *args); // 服務器屬性 typedef struct { zctx_t *ctx; // 上下文 zhash_t *kvmap; // 鍵值對存儲 zloop_t *loop; // zloop反應堆 int port; // 主端口 int64_t sequence; // 更新事件編號 void *snapshot; // 處理快照請求 void *publisher; // 發布更新事件 void *collector; // 從客戶端收集接收更新事件 } clonesrv_t; int main (void) { clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t)); self->port = 5556; self->ctx = zctx_new (); self->kvmap = zhash_new (); self->loop = zloop_new (); zloop_set_verbose (self->loop, FALSE); // 打開克隆模式服務端套接字 self->snapshot = zsocket_new (self->ctx, ZMQ_ROUTER); self->publisher = zsocket_new (self->ctx, ZMQ_PUB); self->collector = zsocket_new (self->ctx, ZMQ_PULL); zsocket_bind (self->snapshot, "tcp://*:%d", self->port); zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1); zsocket_bind (self->collector, "tcp://*:%d", self->port + 2); // 注冊反應堆處理程序 zloop_reader (self->loop, self->snapshot, s_snapshots, self); zloop_reader (self->loop, self->collector, s_collector, self); zloop_timer (self->loop, 1000, 0, s_flush_ttl, self); // 運行反應堆,直至中斷 zloop_start (self->loop); zloop_destroy (&self->loop); zhash_destroy (&self->kvmap); zctx_destroy (&self->ctx); free (self); return 0; } // --------------------------------------------------------------------- // 發送快照內容 static int s_send_single (char *key, void *data, void *args); // 請求方信息 typedef struct { void *socket; // ROUTER套接字 zframe_t *identity; // 請求方標識 char *subtree; // 子樹信息 } kvroute_t; static int s_snapshots (zloop_t *loop, void *snapshot, void *args) { clonesrv_t *self = (clonesrv_t *) args; zframe_t *identity = zframe_recv (snapshot); if (identity) { // 請求位于消息第二幀 char *request = zstr_recv (snapshot); char *subtree = NULL; if (streq (request, "ICANHAZ?")) { free (request); subtree = zstr_recv (snapshot); } else printf ("E: 錯誤的請求,程序中止\n"); if (subtree) { // 發送狀態快照 kvroute_t routing = { snapshot, identity, subtree }; zhash_foreach (self->kvmap, s_send_single, &routing); // 發送結束符和版本號 zclock_log ("I: 正在發送快照,版本號:%d", (int) self->sequence); zframe_send (&identity, snapshot, ZFRAME_MORE); kvmsg_t *kvmsg = kvmsg_new (self->sequence); kvmsg_set_key (kvmsg, "KTHXBAI"); kvmsg_set_body (kvmsg, (byte *) subtree, 0); kvmsg_send (kvmsg, snapshot); kvmsg_destroy (&kvmsg); free (subtree); } } return 0; } // 每次發送一個快照鍵值對 static int s_send_single (char *key, void *data, void *args) { kvroute_t *kvroute = (kvroute_t *) args; kvmsg_t *kvmsg = (kvmsg_t *) data; if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg)) && memcmp (kvroute->subtree, kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) { // 先發送接收方標識 zframe_send (&kvroute->identity, kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE); kvmsg_send (kvmsg, kvroute->socket); } return 0; } // --------------------------------------------------------------------- // 收集更新事件 static int s_collector (zloop_t *loop, void *collector, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = kvmsg_recv (collector); if (kvmsg) { kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_send (kvmsg, self->publisher); int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl")); if (ttl) kvmsg_set_prop (kvmsg, "ttl", "%" PRId64, zclock_time () + ttl * 1000); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 正在發布更新事件 %d", (int) self->sequence); } return 0; } // --------------------------------------------------------------------- // 刪除過期的瞬間值 static int s_flush_single (char *key, void *data, void *args); static int s_flush_ttl (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; zhash_foreach (self->kvmap, s_flush_single, args); return 0; } // 刪除過期的鍵值對,并廣播該事件 static int s_flush_single (char *key, void *data, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = (kvmsg_t *) data; int64_t ttl; sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl); if (ttl && zclock_time () >= ttl) { kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_set_body (kvmsg, (byte *) "", 0); kvmsg_send (kvmsg, self->publisher); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 發布刪除事件 %d", (int) self->sequence); } return 0; } ``` #### 克隆服務器的可靠性 克隆模型1至5相對比較簡單,下面我們會探討一個非常復雜的模型。可以發現,為了構建可靠的消息隊列,我們需要花費非常多的精力。所以我們經常會問:有必要這么做嗎?如果說你能夠接受可靠性不夠高的、或者說已經足夠好的架構,那恭喜你,你在成本和收益之間找到了平衡。雖然我們會偶爾丟失一些消息,但從經濟的角度來說還是合理的。不管怎樣,下面我們就來介紹這個復雜的模型。 在模型3中,你會關閉和重啟服務,這會導致數據的丟失。任何后續加入的客戶端只能得到重啟之后的那些數據,而非所有的。下面就讓我們想辦法讓克隆模式能夠承擔服務器重啟的故障。 以下列舉我們需要處理的問題: * 克隆服務器進程崩潰并自動或手工重啟。進程丟失了所有數據,所以必須從別處進行恢復。 * 克隆服務器硬件故障,長時間不能恢復。客戶端需要切換至另一個可用的服務端。 * 克隆服務器從網絡上斷開,如交換機發生故障等。它會在某個時點重連,但期間的數據就需要替代的服務器負責處理。 第一步我們需要增加一個服務器。我們可以使用第四章中提到的雙子星模式,它是一個反應堆,而我們的程序經過整理后也是一個反應堆,因此可以互相協作。 我們需要保證更新事件在主服務器崩潰時仍能保留,最簡單的機制就是同時發送給兩臺服務器。 備機就可以當做一臺客戶端來運行,像其他客戶端一樣從主機獲取更新事件。同時它又能從客戶端獲取更新事件——雖然不應該以此更新數據,但可以先暫存起來。 所以,相較于模型5,模型6中引入了以下特性: * 客戶端發送更新事件改用PUB-SUB套接字,而非PUSH-PULL。原因是PUSH套接字會在沒有接收方時阻塞,且會進行負載均衡——我們需要兩臺服務器都接收到消息。我們會在服務器端綁定SUB套接字,在客戶端連接PUB套接字。 * 我們在服務器發送給客戶端的更新事件中加入心跳,這樣客戶端可以知道主機是否已死,然后切換至備機。 * 我們使用雙子星模式的bstar反應堆類來創建主機和備機。雙子星模式中需要有一個“投票”套接字,來協助判定對方節點是否已死。這里我們使用快照請求來作為“投票”。 * 我們將為所有的更新事件添加UUID屬性,它由客戶端生成,服務端會將其發布給所有客戶端。 * 備機將維護一個“待處理列表”,保存來自客戶端、尚未由服務端發布的更新事件;或者反過來,來自服務端、尚未從客戶端收到的更新事件。這個列表從舊到新排列,這樣就能方便地從頂部刪除消息。 我們可以為客戶端設計一個有限狀態機,它有三種狀態: * 客戶端打開并連接了套接字,然后向服務端發送快照請求。為了避免消息風暴,它只會請求兩次。 * 客戶端等待快照應答,如果獲得了則保存它;如果沒有獲得,則向第二個服務器發送請求。 * 客戶端收到快照,便開始等待更新事件。如果在一定時間內沒有收到服務端響應,則會連接第二個服務端。 客戶端會一直循環下去,可能在程序剛啟動時,部分客戶端會試圖連接主機,部分連接備機,相信雙子星模式會很好地處理這一情況的。 我們可以將客戶端狀態圖繪制出來: ![6](https://github.com/anjuke/zguide-cn/raw/master/images/chapter5_6.png) 故障恢復的步驟如下: * 客戶端檢測到主機不再發送心跳,因此轉而連接備機,并請求一份新的快照; * 備機開始接收快照請求,并檢測到主機死亡,于是開始作為主機運行; * 備機將待處理列表中的更新事件寫入自身狀態中,然后開始處理快照請求。 當主機恢復連接時: * 啟動為slave狀態,并作為克隆模式客戶端連接備機; * 同時,使用SUB套接字從客戶端接收更新事件。 我們做兩點假設: * 至少有一臺主機會繼續運行。如果兩臺主機都崩潰了,那我們將丟失所有的服務端數據,無法恢復。 * 不同的客戶端不會同時更新同一個鍵值對。客戶端的更新事件會先后到達兩個服務器,因此更新的順序可能會不一致。單個客戶端的更新事件到達兩臺服務器的順序是相同的,所以不用擔心。 下面是整體架構圖: ![7](https://github.com/anjuke/zguide-cn/raw/master/images/chapter5_7.png) 開始編程之前,我們需要將客戶端重構成一個可復用的類。在ZMQ中寫異步類有時是為了練習如何寫出優雅的代碼,但這里我們確實是希望克隆模式可以成為一種易于使用的程序。上述架構的伸縮性來源于客戶端的正確行為,因此有必要將其封裝成一份API。要在客戶端中進行故障恢復還是比較復雜的,試想一下自由者模式和克隆模式結合起來會是什么樣的吧。 按照我的習慣,我會先寫出一份API的列表,然后加以實現。讓我們假想一個名為clone的API,在其基礎之上編寫克隆模式客戶端API。將代碼封裝為API顯然會提升代碼的穩定性,就以模型5為例,客戶端需要打開三個套接字,端點名稱直接寫在了代碼里。我們可以創建這樣一組API: ```c // 為每個套接字指定端點 clone_subscribe (clone, "tcp://localhost:5556"); clone_snapshot (clone, "tcp://localhost:5557"); clone_updates (clone, "tcp://localhost:5558"); // 由于有兩個服務端,因此再執行一次 clone_subscribe (clone, "tcp://localhost:5566"); clone_snapshot (clone, "tcp://localhost:5567"); clone_updates (clone, "tcp://localhost:5568"); ``` 但這種寫法還是比較啰嗦的,因為沒有必要將API內部的一些設計暴露給編程人員。現在我們會使用三個套接字,而將來可能就會使用兩個,或者四個。我們不可能讓所有的應用程序都相應地修改吧?讓我們把這些信息包裝到API中: ```c // 指定主備服務器端點 clone_connect (clone, "tcp://localhost:5551"); clone_connect (clone, "tcp://localhost:5561"); ``` 這樣一來代碼就變得非常簡潔,不過也會對現有代碼的內部就夠造成影響。我們需要從一個端點中推算出三個端點。一種方法是假設客戶端和服務端使用三個連續的端點通信,并將這個規則寫入協議;另一個方法是向服務器索取缺少的端點信息。我們使用第一種較為簡單的方法: * 服務器狀態ROUTER在端點P; * 服務器更新事件PUB在端點P + 1; * 服務器更新事件SUB在端點P + 2。 clone類和第四章的flcliapi類很類似,由兩部分組成: * 一個在后臺運行的異步克隆模式代理。該代理處理所有的I/O操作,實時地和服務器進行通信; * 一個在前臺應用程序中同步運行的clone類。當你創建了一個clone對象后,它會自動創建后臺的clone線程;當你銷毀clone對象,該后臺線程也會被銷毀。 前臺的clone類會使用inproc管道和后臺的代理進行通信。C語言中,czmq線程會自動為我們創建這個管道。這也是ZMQ多線程編程的常規方式。 如果沒有ZMQ,這種異步的設計將很難處理高壓工作,而ZMQ會讓其變得簡單。編寫出來額代碼會相對比較復雜。我們可以用反應堆的模式來編寫,但這會進一步增加復雜度,且影響應用程序的使用。因此,我們的設計的API將更像是一個能夠和服務器進行通信的鍵值表: ```c clone_t *clone_new (void); void clone_destroy (clone_t **self_p); void clone_connect (clone_t *self, char *address, char *service); void clone_set (clone_t *self, char *key, char *value); char *clone_get (clone_t *self, char *key); ``` 下面就是克隆模式客戶端模型6的代碼,因為調用了API,所以非常簡短: **clonecli6: Clone client, Model Six in C** ``` // // 克隆模式 - 客戶端 - 模型6 // // 直接編譯,不建類庫 #include "clone.c" #define SUBTREE "/client/" int main (void) { // 創建分布式哈希表 clone_t *clone = clone_new (); // 配置 clone_subtree (clone, SUBTREE); clone_connect (clone, "tcp://localhost", "5556"); clone_connect (clone, "tcp://localhost", "5566"); // 插入隨機鍵值 while (!zctx_interrupted) { // 生成隨機值 char key [255]; char value [10]; sprintf (key, "%s%d", SUBTREE, randof (10000)); sprintf (value, "%d", randof (1000000)); clone_set (clone, key, value, randof (30)); sleep (1); } clone_destroy (&clone); return 0; } ``` 以下是clone類的實現: **clone: Clone class in C** ```c /* ===================================================================== clone - client-side Clone Pattern class --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "clone.h" // 請求超時時間 #define GLOBAL_TIMEOUT 4000 // msecs // 判定服務器死亡的時間 #define SERVER_TTL 5000 // msecs // 服務器數量 #define SERVER_MAX 2 // ===================================================================== // 同步部分,在應用程序線程中工作 // --------------------------------------------------------------------- // 類結構 struct _clone_t { zctx_t *ctx; // 上下文 void *pipe; // 和后臺代理間的通信套接字 }; // 該線程用于處理真正的clone類 static void clone_agent (void *args, zctx_t *ctx, void *pipe); // --------------------------------------------------------------------- // 構造函數 clone_t * clone_new (void) { clone_t *self; self = (clone_t *) zmalloc (sizeof (clone_t)); self->ctx = zctx_new (); self->pipe = zthread_fork (self->ctx, clone_agent, NULL); return self; } // --------------------------------------------------------------------- // 析構函數 void clone_destroy (clone_t **self_p) { assert (self_p); if (*self_p) { clone_t *self = *self_p; zctx_destroy (&self->ctx); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 在鏈接之前指定快照和更新事件的子樹 // 發送給后臺代理的消息內容為[SUBTREE][subtree] void clone_subtree (clone_t *self, char *subtree) { assert (self); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "SUBTREE"); zmsg_addstr (msg, subtree); zmsg_send (&msg, self->pipe); } // --------------------------------------------------------------------- // 連接至新的服務器端點 // 消息內容:[CONNECT][endpoint][service] void clone_connect (clone_t *self, char *address, char *service) { assert (self); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "CONNECT"); zmsg_addstr (msg, address); zmsg_addstr (msg, service); zmsg_send (&msg, self->pipe); } // --------------------------------------------------------------------- // 設置新值 // 消息內容:[SET][key][value][ttl] void clone_set (clone_t *self, char *key, char *value, int ttl) { char ttlstr [10]; sprintf (ttlstr, "%d", ttl); assert (self); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "SET"); zmsg_addstr (msg, key); zmsg_addstr (msg, value); zmsg_addstr (msg, ttlstr); zmsg_send (&msg, self->pipe); } // --------------------------------------------------------------------- // 取值 // 消息內容:[GET][key] // 如果沒有clone可用,會返回NULL char * clone_get (clone_t *self, char *key) { assert (self); assert (key); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "GET"); zmsg_addstr (msg, key); zmsg_send (&msg, self->pipe); zmsg_t *reply = zmsg_recv (self->pipe); if (reply) { char *value = zmsg_popstr (reply); zmsg_destroy (&reply); return value; } return NULL; } // ===================================================================== // 異步部分,在后臺運行 // --------------------------------------------------------------------- // 單個服務端信息 typedef struct { char *address; // 服務端地址 int port; // 端口 void *snapshot; // 快照套接字 void *subscriber; // 接收更新事件的套接字 uint64_t expiry; // 服務器過期時間 uint requests; // 收到的快照請求數 } server_t; static server_t * server_new (zctx_t *ctx, char *address, int port, char *subtree) { server_t *self = (server_t *) zmalloc (sizeof (server_t)); zclock_log ("I: adding server %s:%d...", address, port); self->address = strdup (address); self->port = port; self->snapshot = zsocket_new (ctx, ZMQ_DEALER); zsocket_connect (self->snapshot, "%s:%d", address, port); self->subscriber = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (self->subscriber, "%s:%d", address, port + 1); zsockopt_set_subscribe (self->subscriber, subtree); return self; } static void server_destroy (server_t **self_p) { assert (self_p); if (*self_p) { server_t *self = *self_p; free (self->address); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 后臺代理類 // 狀態 #define STATE_INITIAL 0 // 連接之前 #define STATE_SYNCING 1 // 正在同步 #define STATE_ACTIVE 2 // 正在更新 typedef struct { zctx_t *ctx; // 上下文 void *pipe; // 與主線程通信的套接字 zhash_t *kvmap; // 鍵值表 char *subtree; // 子樹 server_t *server [SERVER_MAX]; uint nbr_servers; // 范圍:0 - SERVER_MAX uint state; // 當前狀態 uint cur_server; // 當前master,0/1 int64_t sequence; // 鍵值對編號 void *publisher; // 發布更新事件的套接字 } agent_t; static agent_t * agent_new (zctx_t *ctx, void *pipe) { agent_t *self = (agent_t *) zmalloc (sizeof (agent_t)); self->ctx = ctx; self->pipe = pipe; self->kvmap = zhash_new (); self->subtree = strdup (""); self->state = STATE_INITIAL; self->publisher = zsocket_new (self->ctx, ZMQ_PUB); return self; } static void agent_destroy (agent_t **self_p) { assert (self_p); if (*self_p) { agent_t *self = *self_p; int server_nbr; for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++) server_destroy (&self->server [server_nbr]); zhash_destroy (&self->kvmap); free (self->subtree); free (self); *self_p = NULL; } } // 若線程被中斷則返回-1 static int agent_control_message (agent_t *self) { zmsg_t *msg = zmsg_recv (self->pipe); char *command = zmsg_popstr (msg); if (command == NULL) return -1; if (streq (command, "SUBTREE")) { free (self->subtree); self->subtree = zmsg_popstr (msg); } else if (streq (command, "CONNECT")) { char *address = zmsg_popstr (msg); char *service = zmsg_popstr (msg); if (self->nbr_servers < SERVER_MAX) { self->server [self->nbr_servers++] = server_new ( self->ctx, address, atoi (service), self->subtree); // 廣播更新事件 zsocket_connect (self->publisher, "%s:%d", address, atoi (service) + 2); } else zclock_log ("E: too many servers (max. %d)", SERVER_MAX); free (address); free (service); } else if (streq (command, "SET")) { char *key = zmsg_popstr (msg); char *value = zmsg_popstr (msg); char *ttl = zmsg_popstr (msg); zhash_update (self->kvmap, key, (byte *) value); zhash_freefn (self->kvmap, key, free); // 向服務端發送鍵值對 kvmsg_t *kvmsg = kvmsg_new (0); kvmsg_set_key (kvmsg, key); kvmsg_set_uuid (kvmsg); kvmsg_fmt_body (kvmsg, "%s", value); kvmsg_set_prop (kvmsg, "ttl", ttl); kvmsg_send (kvmsg, self->publisher); kvmsg_destroy (&kvmsg); puts (key); free (ttl); free (key); // 鍵值對實際由哈希表對象控制 } else if (streq (command, "GET")) { char *key = zmsg_popstr (msg); char *value = zhash_lookup (self->kvmap, key); if (value) zstr_send (self->pipe, value); else zstr_send (self->pipe, ""); free (key); free (value); } free (command); zmsg_destroy (&msg); return 0; } // --------------------------------------------------------------------- // 異步的后臺代理會維護一個服務端池,并處理來自應用程序的請求或應答。 static void clone_agent (void *args, zctx_t *ctx, void *pipe) { agent_t *self = agent_new (ctx, pipe); while (TRUE) { zmq_pollitem_t poll_set [] = { { pipe, 0, ZMQ_POLLIN, 0 }, { 0, 0, ZMQ_POLLIN, 0 } }; int poll_timer = -1; int poll_size = 2; server_t *server = self->server [self->cur_server]; switch (self->state) { case STATE_INITIAL: // 該狀態下,如果有可用服務,會發送快照請求 if (self->nbr_servers > 0) { zclock_log ("I: 正在等待服務器 %s:%d...", server->address, server->port); if (server->requests < 2) { zstr_sendm (server->snapshot, "ICANHAZ?"); zstr_send (server->snapshot, self->subtree); server->requests++; } server->expiry = zclock_time () + SERVER_TTL; self->state = STATE_SYNCING; poll_set [1].socket = server->snapshot; } else poll_size = 1; break; case STATE_SYNCING: // 該狀態下我們從服務器端接收快照內容,若失敗則嘗試其他服務器 poll_set [1].socket = server->snapshot; break; case STATE_ACTIVE: // 該狀態下我們從服務器獲取更新事件,失敗則嘗試其他服務器 poll_set [1].socket = server->subscriber; break; } if (server) { poll_timer = (server->expiry - zclock_time ()) * ZMQ_POLL_MSEC; if (poll_timer < 0) poll_timer = 0; } // ------------------------------------------------------------ // poll循環 int rc = zmq_poll (poll_set, poll_size, poll_timer); if (rc == -1) break; // 上下文已被關閉 if (poll_set [0].revents & ZMQ_POLLIN) { if (agent_control_message (self)) break; // 中斷 } else if (poll_set [1].revents & ZMQ_POLLIN) { kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket); if (!kvmsg) break; // 中斷 // 任何服務端的消息將重置它的過期時間 server->expiry = zclock_time () + SERVER_TTL; if (self->state == STATE_SYNCING) { // 保存快照內容 server->requests = 0; if (streq (kvmsg_key (kvmsg), "KTHXBAI")) { self->sequence = kvmsg_sequence (kvmsg); self->state = STATE_ACTIVE; zclock_log ("I: received from %s:%d snapshot=%d", server->address, server->port, (int) self->sequence); kvmsg_destroy (&kvmsg); } else kvmsg_store (&kvmsg, self->kvmap); } else if (self->state == STATE_ACTIVE) { // 丟棄過期的更新事件 if (kvmsg_sequence (kvmsg) > self->sequence) { self->sequence = kvmsg_sequence (kvmsg); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: received from %s:%d update=%d", server->address, server->port, (int) self->sequence); } else kvmsg_destroy (&kvmsg); } } else { // 服務端已死,嘗試其他服務器 zclock_log ("I: 服務器 %s:%d 無響應", server->address, server->port); self->cur_server = (self->cur_server + 1) % self->nbr_servers; self->state = STATE_INITIAL; } } agent_destroy (&self); } ``` 最后是克隆服務器的模型6代碼: **clonesrv6: Clone server, Model Six in C** ```c // // 克隆模式 - 服務端 - 模型6 // // 直接編譯,不建類庫 #include "bstar.c" #include "kvmsg.c" // bstar反應堆API static int s_snapshots (zloop_t *loop, void *socket, void *args); static int s_collector (zloop_t *loop, void *socket, void *args); static int s_flush_ttl (zloop_t *loop, void *socket, void *args); static int s_send_hugz (zloop_t *loop, void *socket, void *args); static int s_new_master (zloop_t *loop, void *unused, void *args); static int s_new_slave (zloop_t *loop, void *unused, void *args); static int s_subscriber (zloop_t *loop, void *socket, void *args); // 服務端屬性 typedef struct { zctx_t *ctx; // 上下文 zhash_t *kvmap; // 存放鍵值對 bstar_t *bstar; // bstar反應堆核心 int64_t sequence; // 更新事件編號 int port; // 主端口 int peer; // 同伴端口 void *publisher; // 發布更新事件的端口 void *collector; // 接收客戶端更新事件的端口 void *subscriber; // 接受同伴更新事件的端口 zlist_t *pending; // 延遲的更新事件 Bool primary; // 是否為主機 Bool master; // 是否為master Bool slave; // 是否為slave } clonesrv_t; int main (int argc, char *argv []) { clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t)); if (argc == 2 && streq (argv [1], "-p")) { zclock_log ("I: 作為主機master運行,正在等待備機slave連接。"); self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003", "tcp://localhost:5004"); bstar_voter (self->bstar, "tcp://*:5556", ZMQ_ROUTER, s_snapshots, self); self->port = 5556; self->peer = 5566; self->primary = TRUE; } else if (argc == 2 && streq (argv [1], "-b")) { zclock_log ("I: 作為備機slave運行,正在等待主機master連接。"); self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004", "tcp://localhost:5003"); bstar_voter (self->bstar, "tcp://*:5566", ZMQ_ROUTER, s_snapshots, self); self->port = 5566; self->peer = 5556; self->primary = FALSE; } else { printf ("Usage: clonesrv4 { -p | -b }\n"); free (self); exit (0); } // 主機將成為master if (self->primary) self->kvmap = zhash_new (); self->ctx = zctx_new (); self->pending = zlist_new (); bstar_set_verbose (self->bstar, TRUE); // 設置克隆服務端套接字 self->publisher = zsocket_new (self->ctx, ZMQ_PUB); self->collector = zsocket_new (self->ctx, ZMQ_SUB); zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1); zsocket_bind (self->collector, "tcp://*:%d", self->port + 2); // 作為克隆客戶端連接同伴 self->subscriber = zsocket_new (self->ctx, ZMQ_SUB); zsocket_connect (self->subscriber, "tcp://localhost:%d", self->peer + 1); // 注冊狀態事件處理器 bstar_new_master (self->bstar, s_new_master, self); bstar_new_slave (self->bstar, s_new_slave, self); // 注冊bstar反應堆其他事件處理器 zloop_reader (bstar_zloop (self->bstar), self->collector, s_collector, self); zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self); zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self); // 開啟bstar反應堆 bstar_start (self->bstar); // 中斷,終止。 while (zlist_size (self->pending)) { kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending); kvmsg_destroy (&kvmsg); } zlist_destroy (&self->pending); bstar_destroy (&self->bstar); zhash_destroy (&self->kvmap); zctx_destroy (&self->ctx); free (self); return 0; } // --------------------------------------------------------------------- // 發送快照內容 static int s_send_single (char *key, void *data, void *args); // 請求方信息 typedef struct { void *socket; // ROUTER套接字 zframe_t *identity; // 請求放標識 char *subtree; // 子樹 } kvroute_t; static int s_snapshots (zloop_t *loop, void *snapshot, void *args) { clonesrv_t *self = (clonesrv_t *) args; zframe_t *identity = zframe_recv (snapshot); if (identity) { // 請求在消息的第二幀中 char *request = zstr_recv (snapshot); char *subtree = NULL; if (streq (request, "ICANHAZ?")) { free (request); subtree = zstr_recv (snapshot); } else printf ("E: 錯誤的請求,正在退出……\n"); if (subtree) { // 發送狀態快照 kvroute_t routing = { snapshot, identity, subtree }; zhash_foreach (self->kvmap, s_send_single, &routing); // 發送終止消息,以及消息編號 zclock_log ("I: 正在發送快照,版本號:%d", (int) self->sequence); zframe_send (&identity, snapshot, ZFRAME_MORE); kvmsg_t *kvmsg = kvmsg_new (self->sequence); kvmsg_set_key (kvmsg, "KTHXBAI"); kvmsg_set_body (kvmsg, (byte *) subtree, 0); kvmsg_send (kvmsg, snapshot); kvmsg_destroy (&kvmsg); free (subtree); } } return 0; } // 每次發送一個快照鍵值對 static int s_send_single (char *key, void *data, void *args) { kvroute_t *kvroute = (kvroute_t *) args; kvmsg_t *kvmsg = (kvmsg_t *) data; if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg)) && memcmp (kvroute->subtree, kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) { // 先發送接收方的地址 zframe_send (&kvroute->identity, kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE); kvmsg_send (kvmsg, kvroute->socket); } return 0; } // --------------------------------------------------------------------- // 從客戶端收集更新事件 // 如果我們是master,則將該事件寫入kvmap對象; // 如果我們是slave,則將其寫入延遲隊列 static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg); static int s_collector (zloop_t *loop, void *collector, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = kvmsg_recv (collector); kvmsg_dump (kvmsg); if (kvmsg) { if (self->master) { kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_send (kvmsg, self->publisher); int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl")); if (ttl) kvmsg_set_prop (kvmsg, "ttl", "%" PRId64, zclock_time () + ttl * 1000); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 正在發布更新事件:%d", (int) self->sequence); } else { // 如果我們已經從master中獲得了該事件,則丟棄該消息 if (s_was_pending (self, kvmsg)) kvmsg_destroy (&kvmsg); else zlist_append (self->pending, kvmsg); } } return 0; } // 如果消息已在延遲隊列中,則刪除它并返回TRUE static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg) { kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending); while (held) { if (memcmp (kvmsg_uuid (kvmsg), kvmsg_uuid (held), sizeof (uuid_t)) == 0) { zlist_remove (self->pending, held); return TRUE; } held = (kvmsg_t *) zlist_next (self->pending); } return FALSE; } // --------------------------------------------------------------------- // 刪除帶有過期時間的瞬間值 static int s_flush_single (char *key, void *data, void *args); static int s_flush_ttl (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; zhash_foreach (self->kvmap, s_flush_single, args); return 0; } // 如果鍵值對過期,則進行刪除操作,并廣播該事件 static int s_flush_single (char *key, void *data, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = (kvmsg_t *) data; int64_t ttl; sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl); if (ttl && zclock_time () >= ttl) { kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_set_body (kvmsg, (byte *) "", 0); kvmsg_send (kvmsg, self->publisher); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 正在發布刪除事件:%d", (int) self->sequence); } return 0; } // --------------------------------------------------------------------- // 發送心跳 static int s_send_hugz (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = kvmsg_new (self->sequence); kvmsg_set_key (kvmsg, "HUGZ"); kvmsg_set_body (kvmsg, (byte *) "", 0); kvmsg_send (kvmsg, self->publisher); kvmsg_destroy (&kvmsg); return 0; } // --------------------------------------------------------------------- // 狀態改變事件處理函數 // 我們將轉變為master // // 備機先將延遲列表中的事件更新到自己的快照中, // 并開始接收客戶端發來的快照請求。 static int s_new_master (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; self->master = TRUE; self->slave = FALSE; zloop_cancel (bstar_zloop (self->bstar), self->subscriber); // 應用延遲列表中的事件 while (zlist_size (self->pending)) { kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending); kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_send (kvmsg, self->publisher); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 正在發布延遲列表中的更新事件:%d", (int) self->sequence); } return 0; } // --------------------------------------------------------------------- // 正在切換為slave static int s_new_slave (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; zhash_destroy (&self->kvmap); self->master = FALSE; self->slave = TRUE; zloop_reader (bstar_zloop (self->bstar), self->subscriber, s_subscriber, self); return 0; } // --------------------------------------------------------------------- // 從同伴主機(master)接收更新事件; // 接收該類更新事件時,我們一定是slave。 static int s_subscriber (zloop_t *loop, void *subscriber, void *args) { clonesrv_t *self = (clonesrv_t *) args; // 獲取快照,如果需要的話。 if (self->kvmap == NULL) { self->kvmap = zhash_new (); void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER); zsocket_connect (snapshot, "tcp://localhost:%d", self->peer); zclock_log ("I: 正在請求快照:tcp://localhost:%d", self->peer); zstr_send (snapshot, "ICANHAZ?"); while (TRUE) { kvmsg_t *kvmsg = kvmsg_recv (snapshot); if (!kvmsg) break; // 中斷 if (streq (kvmsg_key (kvmsg), "KTHXBAI")) { self->sequence = kvmsg_sequence (kvmsg); kvmsg_destroy (&kvmsg); break; // 完成 } kvmsg_store (&kvmsg, self->kvmap); } zclock_log ("I: 收到快照,版本號:%d", (int) self->sequence); zsocket_destroy (self->ctx, snapshot); } // 查找并刪除 kvmsg_t *kvmsg = kvmsg_recv (subscriber); if (!kvmsg) return 0; if (strneq (kvmsg_key (kvmsg), "HUGZ")) { if (!s_was_pending (self, kvmsg)) { // 如果master的更新事件比客戶端的事件早到,則將master的事件存入延遲列表, // 當收到客戶端更新事件時會將其從列表中清除。 zlist_append (self->pending, kvmsg_dup (kvmsg)); } // 如果更新事件比kvmap版本高,則應用它 if (kvmsg_sequence (kvmsg) > self->sequence) { self->sequence = kvmsg_sequence (kvmsg); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 收到更新事件:%d", (int) self->sequence); } else kvmsg_destroy (&kvmsg); } else kvmsg_destroy (&kvmsg); return 0; } ``` 這段程序只有幾百行,但還是花了一些時間來進行調通的。這個模型中包含了故障恢復,瞬間值,子樹等等。雖然我們前期設計得很完備,但要在多個套接字之間進行調試還是很困難的。以下是我的工作方式: * 由于使用了反應堆(bstar,建立在zloop之上),我們節省了大量的代碼,讓程序變得簡潔明了。整個服務以一個線程運行,因此不會出現跨線程的問題。只需將結構指針(self)傳遞給所有的處理器即可。此外,使用發應堆后可以讓代碼更為模塊化,易于重用。 * 我們逐個模塊進行調試,只有某個模塊能夠正常運行時才會進入下一步。由于使用了四五個套接字,因此調試的工作量是很大的。我直接將調試信息輸出到了屏幕上,因為實在沒有必要專門開一個調試器來工作。 * 因為一直在使用valgrind工具進行測試,因此我能確定程序沒有內存泄漏的問題。在C語言中,內存泄漏是我們非常關心的問題,因為沒有什么垃圾回收機制可以幫你完成。正確地使用像kvmsg、czmq之類的抽象層可以很好地避免內存泄漏。 這段程序肯定還會存在一些BUG,部分讀者可能會幫助我調試和修復,我在此表示感謝。 測試模型6時,先開啟主機和備機,再打開一組客戶端,順序隨意。隨機地中止某個服務進程,如果程序設計得是正確的,那客戶端獲得的數據應該都是一致的。 #### 克隆模式協議 花費了那么多精力來開發一套可靠的發布-訂閱模式機制,我們當然希望將來能夠方便地在其基礎之上進行擴展。較好的方法是將其編寫為一個協議,這樣就能讓各種語言來實現它了。 我們將其稱為“集群化哈希表協議”,這是一個能夠跨集群地進行鍵值哈希表管理,提供了多客戶端的通信機制;客戶端可以只操作一個子樹的數據,包括更新和定義瞬間值。 * http://rfc.zeromq.org/spec:12
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看