## 第二章 ZeroMQ進階
第一章我們簡單試用了ZMQ的若干通信模式:請求-應答模式、發布-訂閱模式、管道模式。這一章我們將學習更多在實際開發中會使用到的東西:
本章涉及的內容有:
* 創建和使用ZMQ套接字
* 使用套接字發送和接收消息
* 使用ZMQ提供的異步I/O套接字構建你的應用程序
* 在單一線程中使用多個套接字
* 恰當地處理致命和非致命錯誤
* 處理諸如Ctrl-C的中斷信號
* 正確地關閉ZMQ應用程序
* 檢查ZMQ應用程序的內存泄露
* 發送和接收多幀消息
* 在網絡中轉發消息
* 建立簡單的消息隊列代理
* 使用ZMQ編寫多線程應用程序
* 使用ZMQ在線程間傳遞信號
* 使用ZMQ協調網絡中的節點
* 使用標識創建持久化套接字
* 在發布-訂閱模式中創建和使用消息信封
* 如何讓持久化的訂閱者能夠從崩潰中恢復
* 使用閾值(HWM)防止內存溢出
### 零的哲學
?MQ一詞中的?讓我們糾結了很久。一方面,這個特殊字符會降低ZMQ在谷歌和推特中的收入量;另一方面,這會惹惱某些丹麥語種的民族,他們會嚷道?并不是一個奇怪的0。
一開始ZMQ代表零中間件、零延遲,同時,它又有了新的含義:零管理、零成本、零浪費。總的來說,零表示最小、最簡,這是貫穿于該項目的哲理。我們致力于減少復雜程度,提高易用性。
### 套接字API
說實話,ZMQ有些偷梁換柱的嫌疑。不過我們并不會為此道歉,因為這種概念上的切換絕對不會有壞處。ZMQ提供了一套類似于BSD套接字的API,但將很多消息處理機制的細節隱藏了起來,你會逐漸適應這種變化,并樂于用它進行編程。
套接字事實上是用于網絡編程的標準接口,ZMQ之所那么吸引人眼球,原因之一就是它是建立在標準套接字API之上。因此,ZMQ的套接字操作非常容易理解,其生命周期主要包含四個部分:
* 創建和銷毀套接字:zmq_socket(), zmq_close()
* 配置和讀取套接字選項:zmq_setsockopt(), zmq_getsockopt()
* 為套接字建立連接:zmq_bind(), zmq_connect()
* 發送和接收消息:zmq_send(), zmq_recv()
如以下C代碼:
```c
void *mousetrap;
// Create socket for catching mice
mousetrap = zmq_socket (context, ZMQ_PULL);
// Configure the socket
int64_t jawsize = 10000;
zmq_setsockopt (mousetrap, ZMQ_HWM, &jawsize, sizeof jawsize);
// Plug socket into mouse hole
zmq_connect (mousetrap, "tcp://192.168.55.221:5001");
// Wait for juicy mouse to arrive
zmq_msg_t mouse;
zmq_msg_init (&mouse);
zmq_recv (mousetrap, &mouse, 0);
// Destroy the mouse
zmq_msg_close (&mouse);
// Destroy the socket
zmq_close (mousetrap);
```
請注意,套接字永遠是空指針類型的,而消息則是一個數據結構(我們下文會講述)。所以,在C語言中你通過變量傳遞套接字,而用引用傳遞消息。記住一點,在ZMQ中所有的套接字都是由ZMQ管理的,只有消息是由程序員管理的。
創建、銷毀、以及配置套接字的工作和處理一個對象差不多,但請記住ZMQ是異步的,伸縮性很強,因此在將其應用到網絡結構中時,可能會需要多一些時間來理解。
### 使用套接字構建拓撲結構
在連接兩個節點時,其中一個需要使用zmq_bind(),另一個則使用zmq_connect()。通常來講,使用zmq_bind()連接的節點稱之為服務端,它有著一個較為固定的網絡地址;使用zmq_connect()連接的節點稱為客戶端,其地址不固定。我們會有這樣的說法:綁定套接字至端點;連接套接字至端點。端點指的是某個廣為周知網絡地址。
ZMQ連接和傳統的TCP連接是有區別的,主要有:
* 使用多種協議,inproc(進程內)、ipc(進程間)、tcp、pgm(廣播)、epgm;
* 當客戶端使用zmq_connect()時連接就已經建立了,并不要求該端點已有某個服務使用zmq_bind()進行了綁定;
* 連接是異步的,并由一組消息隊列做緩沖;
* 連接會表現出某種消息模式,這是由創建連接的套接字類型決定的;
* 一個套接字可以有多個輸入和輸出連接;
* ZMQ沒有提供類似zmq_accept()的函數,因為當套接字綁定至端點時它就自動開始接受連接了;
* 應用程序無法直接和這些連接打交道,因為它們是被封裝在ZMQ底層的。
在很多架構中都使用了類似于C/S的架構。服務端組件式比較穩定的,而客戶端組件則較為動態,來去自如。所以說,服務端地址對客戶端而言往往是可見的,反之則不然。這樣一來,架構中應該將哪些組件作為服務端(使用zmq_bind()),哪些作為客戶端(使用zmq_connect()),就很明顯了。同時,這需要和你使用的套接字類型相聯系起來,我們下文會詳細講述。
讓我們試想一下,如果先打開了客戶端,后打開服務端,會發生什么?傳統網絡連接中,我們打開客戶端時一定會收到系統的報錯信息,但ZMQ讓我們能夠自由地啟動架構中的組件。當客戶端使用zmq_connect()連接至某個端點時,它就已經能夠使用該套接字發送消息了。如果這時,服務端啟動起來了,并使用zmq_bind()綁定至該端點,ZMQ將自動開始轉發消息。
服務端節點可以僅使用一個套接字就能綁定至多個端點。也就是說,它能夠使用不同的協議來建立連接:
```c
zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");
```
當然,你不能多次綁定至同一端點,這樣是會報錯的。
每當有客戶端節點使用zmq_connect()連接至上述某個端點時,服務端就會自動創建連接。ZMQ沒有對連接數量進行限制。此外,客戶端節點也可以使用一個套接字同時建立多個連接。
大多數情況下,哪個節點充當服務端,哪個作為客戶端,是網絡架構層面的內容,而非消息流問題。不過也有一些特殊情況(如失去連接后的消息重發),同一種套接字使用綁定和連接是會有一些不同的行為的。
所以說,當我們在設計架構時,應該遵循“服務端是穩定的,客戶端是靈活的“原則,這樣就不太會出錯。
套接字是有類型的,套接字類型定義了套接字的行為,它在發送和接收消息時的規則等。你可以將不同種類的套接字進行連接,如PUB-SUB組合,這種組合稱之為發布-訂閱模式,其他組合也會有相應的模式名稱,我們會在下文詳述。
正是因為套接字可以使用不同的方式進行連接,才構成了ZMQ最基本的消息隊列系統。我們還可以在此基礎之上建立更為復雜的裝置、路由機制等,下文會詳述。總的來說,ZMQ為你提供了一套組件,供你在網絡架構中拼裝和使用。
### 使用套接字傳遞數據
發送和接收消息使用的是zmq_send()和zmq_recv()這兩個函數。雖然函數名稱看起來很直白,但由于ZMQ的I/O模式和傳統的TCP協議有很大不同,因此還是需要花點時間去理解的。

讓我們看一看TCP套接字和ZMQ套接字之間在傳輸數據方面的區別:
* ZMQ套接字傳輸的是消息,而不是字節(TCP)或幀(UDP)。消息指的是一段指定長度的二進制數據塊,我們下文會講到消息,這種設計是為了性能優化而考慮的,所以可能會比較難以理解。
* ZMQ套接字在后臺進行I/O操作,也就是說無論是接收還是發送消息,它都會先傳送到一個本地的緩沖隊列,這個內存隊列的大小是可以配置的。
* ZMQ套接字可以和多個套接字進行連接(如果套接字類型允許的話)。TCP協議只能進行點對點的連接,而ZMQ則可以進行一對多(類似于無線廣播)、多對多(類似于郵局)、多對一(類似于信箱),當然也包括一對一的情況。
* ZMQ套接字可以發送消息給多個端點(扇出模型),或從多個端點中接收消息(扇入模型)

所以,向套接字寫入一個消息時可能會將消息發送給很多節點,相應的,套接字又會從所有已建立的連接中接收消息。zmq_recv()方法使用了公平隊列的算法來決定接收哪個連接的消息。
調用zmq_send()方法時其實并沒有真正將消息發送給套接字連接。消息會在一個內存隊列中保存下來,并由后臺的I/O線程異步地進行發送。如果不出意外情況,這一行為是非阻塞的。所以說,即便zmq_send()有返回值,并不能代表消息已經發送。當你在用zmq_msg_init_data()初始化消息后,你不能重用或是釋放這條消息,否則ZMQ的I/O線程會認為它在傳輸垃圾數據。這對初學者來講是一個常犯的錯誤,下文我們會講述如何正確地處理消息。
### 單播傳輸
ZMQ提供了一組單播傳輸協議(inporc, ipc, tcp),和兩個廣播協議(epgm, pgm)。廣播協議是比較高級的協議,我們會在以后講述。如果你不能回答我扇出比例會影響一對多的單播傳輸時,就先不要去學習廣播協議了吧。
一般而言我們會使用**tcp**作為傳輸協議,這種TCP連接是可以脫機運作的,它靈活、便攜、且足夠快速。為什么稱之為脫機,是因為ZMQ中的TCP連接不需要該端點已經有某個服務進行了綁定,客戶端和服務端可以隨時進行連接和綁定,這對應用程序而言都是透明的。
進程間協議,即**ipc**,和tcp的行為差不多,但已從網絡傳輸中抽象出來,不需要指定IP地址或者域名。這種協議很多時候會很方便,本指南中的很多示例都會使用這種協議。ZMQ中的ipc協議同樣可以是脫機的,但有一個缺點——無法在Windows操作系統上運作,這一點也許會在未來的ZMQ版本中修復。我們一般會在端點名稱的末尾附上.ipc的擴展名,在UNIX系統上,使用ipc協議還需要注意權限問題。你還需要保證所有的程序都能夠找到這個ipc端點。
進程內協議,即**inproc**,可以在同一個進程的不同線程之間進行消息傳輸,它比ipc或tcp要快得多。這種協議有一個要求,必須先綁定到端點,才能建立連接,也許未來也會修復。通常的做法是先啟動服務端線程,綁定至端點,后啟動客戶端線程,連接至端點。
### ZMQ不只是數據傳輸
經常有新人會問,如何使用ZMQ建立一項服務?我能使用ZMQ建立一個HTTP服務器嗎?
他們期望得到的回答是,我們用普通的套接字來傳輸HTTP請求和應答,那用ZMQ套接字也能夠完成這個任務,且能運行得更快、更好。
只可惜答案并不是這樣的。ZMQ不只是一個數據傳輸的工具,而是在現有通信協議之上建立起來的新架構。它的數據幀和現有的協議并不兼容,如下面是一個HTTP請求和ZMQ請求的對比,同樣使用的是TCP/IPC協議:

HTTP請求使用CR-LF(換行符)作為信息幀的間隔,而ZMQ則使用指定長度來定義幀:

所以說,你的確是可以用ZMQ來寫一個類似于HTTP協議的東西,但是這并不是HTTP。
不過,如果有人問我如何更好地使用ZMQ建立一個新的服務,我會給出一個不錯的答案,那就是:你可以自行設計一種通信協議,用ZMQ進行連接,使用不同的語言提供服務和擴展,可以在本地,亦可通過遠程傳輸。賽德?肖的[Mongrel2](http://www.mongrel2.org)網絡服務的架構就是一個很好的示例。
### I/O線程
我們提過ZMQ是通過后臺的I/O線程進行消息傳輸的。一個I/O線程已經足以處理多個套接字的數據傳輸要求,當然,那些極端的應用程序除外。這也就是我們在創建上下文時傳入的1所代表的意思:
```c
void *context = zmq_init (1);
```
ZMQ應用程序和傳統應用程序的區別之一就是你不需要為每個套接字都創建一個連接。單個ZMQ套接字可以處理所有的發送和接收任務。如,當你需要向一千個訂閱者發布消息時,使用一個套接字就可以了;當你需要向二十個服務進程分發任務時,使用一個套接字就可以了;當你需要從一千個網頁應用程序中獲取數據時,也是使用一個套接字就可以了。
這一特性可能會顛覆網絡應用程序的編寫步驟,傳統應用程序每個進程或線程會有一個遠程連接,它又只能處理一個套接字。ZMQ讓你打破這種結構,使用一個線程來完成所有工作,更易于擴展。
### 核心消息模式
ZMQ的套接字API中提供了多種消息模式。如果你熟悉企業級消息應用,那這些模式會看起來很熟悉。不過對于新手來說,ZMQ的套接字還是會讓人大吃一驚的。
讓我們回顧一下ZMQ會為你做些什么:它會將消息快速高效地發送給其他節點,這里的節點可以是線程、進程、或是其他計算機;ZMQ為應用程序提供了一套簡單的套接字API,不用考慮實際使用的協議類型(進程內、進程間、TPC、或廣播);當節點調動時,ZMQ會自動進行連接或重連;無論是發送消息還是接收消息,ZMQ都會先將消息放入隊列中,并保證進程不會因為內存溢出而崩潰,適時地將消息寫入磁盤;ZMQ會處理套接字異常;所有的I/O操作都在后臺進行;ZMQ不會產生死鎖。
但是,以上種種的前提是用戶能夠正確地使用消息模式,這種模式往往也體現出了ZMQ的智慧。消息模式將我們從實踐中獲取的經驗進行抽象和重組,用于解決之后遇到的所有問題。ZMQ的消息模式目前是編譯在類庫中的,不過未來的ZMQ版本可能會允許用戶自行制定消息模式。
ZMQ的消息模式是指不同類型套接字的組合。換句話說,要理解ZMQ的消息模式,你需要理解ZMQ的套接字類型,它們是如何一起工作的。這一部分是需要死記硬背的。
ZMQ的核心消息模式有:
* **請求-應答模式** 將一組服務端和一組客戶端相連,用于遠程過程調用或任務分發。
* **發布-訂閱模式** 將一組發布者和一組訂閱者相連,用于數據分發。
* **管道模式** 使用扇入或扇出的形式組裝多個節點,可以產生多個步驟或循環,用于構建并行處理架構。
我們在第一章中已經講述了這些模式,不過還有一種模式是為那些仍然認為ZMQ是類似TCP那樣點對點連接的人們準備的:
* **排他對接模式** 將兩個套接字一對一地連接起來,這種模式應用場景很少,我們會在本章最末尾看到一個示例。
zmq_socket()函數的說明頁中有對所有消息模式的說明,比較清楚,因此值得研讀幾次。我們會介紹每種消息模式的內容和應用場景。
以下是合法的套接字連接-綁定對(一端綁定、一端連接即可):
* PUB - SUB
* REQ - REP
* REQ - ROUTER
* DEALER - REP
* DEALER - ROUTER
* DEALER - DEALER
* ROUTER - ROUTER
* PUSH - PULL
* PAIR - PAIR
其他的組合模式會產生不可預知的結果,在將來的ZMQ版本中可能會直接返回錯誤。你也可以通過代碼去了解這些套接字類型的行為。
### 上層消息模式
上文中的四種核心消息模式是內建在ZMQ中的,他們是API的一部分,在ZMQ的C++核心類庫中實現,能夠保證正確地運行。如果有朝一日Linux內核將ZMQ采納了進來,那這些核心模式也肯定會包含其中。
在這些消息模式之上,我們會建立更為_上層的消息模式_。這種模式可以用任何語言編寫,他們不屬于核心類型的一部分,不隨ZMQ發行,只在你自己的應用程序中出現,或者在ZMQ社區中維護。
本指南的目的之一就是為你提供一些上層的消息模式,有簡單的(如何正確處理消息),也有復雜的(可靠的發布-訂閱模式)。
### 消息的使用方法
ZMQ的傳輸單位是消息,即一個二進制塊。你可以使用任意的序列化工具,如谷歌的Protocal Buffers、XDR、JSON等,將內容轉化成ZMQ消息。不過這種轉化工具最好是便捷和快速的,這個請自己衡量。
在內存中,ZMQ消息由zmq_msg_t結構表示(每種語言有特定的表示)。在C語言中使用ZMQ消息時需要注意以下幾點:
* 你需要創建和傳遞zmq_msg_t對象,而不是一組數據塊;
* 讀取消息時,先用zmq_msg_init()初始化一個空消息,再將其傳遞給zmq_recv()函數;
* 寫入消息時,先用zmq_msg_init_size()來創建消息(同時也已初始化了一塊內存區域),然后用memcpy()函數將信息拷貝到該對象中,最后傳給zmq_send()函數;
* 釋放消息(并不是銷毀)時,使用zmq_msg_close()函數,它會將對消息對象的引用刪除,最終由ZMQ將消息銷毀;
* 獲取消息內容時需使用zmq_msg_data()函數;若想知道消息的長度,可以使用zmq_msg_size()函數;
* 至于zmq_msg_move()、zmq_msg_copy()、zmq_msg_init_data()函數,在充分理解手冊中的說明之前,建議不好貿然使用。
以下是一段處理消息的典型代碼,如果之前的代碼你有看的話,那應該會感到熟悉。這段代碼其實是從zhelpers.h文件中抽出的:
```c
// 從套接字中獲取ZMQ字符串,并轉換為C語言字符串
static char *
s_recv (void *socket) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
int size = zmq_msg_size (&message);
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
zmq_msg_close (&message);
string [size] = 0;
return (string);
}
// 將C語言字符串轉換為ZMQ字符串,并發送給套接字
static int
s_send (void *socket, char *string) {
int rc;
zmq_msg_t message;
zmq_msg_init_size (&message, strlen (string));
memcpy (zmq_msg_data (&message), string, strlen (string));
rc = zmq_send (socket, &message, 0);
assert (!rc);
zmq_msg_close (&message);
return (rc);
}
```
你可以對以上代碼進行擴展,讓其支持發送和接受任一長度的數據。
**需要注意的是,當你將一個消息對象傳遞給zmq_send()函數后,該對象的長度就會被清零,因此你無法發送同一個消息對象兩次,也無法獲得已發送消息的內容。**
如果你想發送同一個消息對象兩次,就需要在發送第一次前新建一個對象,使用zmq_msg_copy()函數進行拷貝。這個函數不會拷貝消息內容,只是拷貝引用。然后你就可以再次發送這個消息了(或者任意多次,只要進行了足夠的拷貝)。當消息最后一個引用被釋放時,消息對象就會被銷毀。
ZMQ支持多幀消息,即在一條消息中保存多個消息幀。這在實際應用中被廣泛使用,我們會在第三章進行講解。
關于消息,還有一些需要注意的地方:
* ZMQ的消息是作為一個整體來收發的,你不會只收到消息的一部分;
* ZMQ不會立即發送消息,而是有一定的延遲;
* 你可以發送0字節長度的消息,作為一種信號;
* 消息必須能夠在內存中保存,如果你想發送文件或超長的消息,就需要將他們切割成小塊,在獨立的消息中進行發送;
* 必須使用zmq_msg_close()函數來關閉消息,但在一些會在變量超出作用域時自動釋放消息對象的語言中除外。
再重復一句,不要貿然使用zmq_msg_init_data()函數。它是用于零拷貝,而且可能會造成麻煩。關于ZMQ還有太多東西需要你去學習,因此現在暫時不用去考慮如何削減幾微秒的開銷。
### 處理多個套接字
在之前的示例中,主程序的循環體內會做以下幾件事:
1. 等待套接字的消息;
1. 處理消息;
1. 返回第一步。
如果我們想要讀取多個套接字中的消息呢?最簡單的方法是將套接字連接到多個端點上,讓ZMQ使用公平隊列的機制來接受消息。如果不同端點上的套接字類型是一致的,那可以使用這種方法。但是,如果一個套接字的類型是PULL,另一個是PUB怎么辦?如果現在開始混用套接字類型,那將來就沒有可靠性可言了。
正確的方法應該是使用zmq_poll()函數。更好的方法是將zmq_poll()包裝成一個框架,編寫一個事件驅動的反應器,但這個就比較復雜了,我們這里暫不討論。
我們先不使用zmq_poll(),而用NOBLOCK(非阻塞)的方式來實現從多個套接字讀取消息的功能。下面將氣象信息服務和并行處理這兩個示例結合起來:
**msreader: Multiple socket reader in C**
```c
//
// 從多個套接字中獲取消息
// 本示例簡單地再循環中使用recv函數
//
#include "zhelpers.h"
int main (void)
{
// 準備上下文和套接字
void *context = zmq_init (1);
// 連接至任務分發器
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// 連接至天氣服務
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
// 處理從兩個套接字中接收到的消息
// 這里我們會優先處理從任務分發器接收到的消息
while (1) {
// 處理等待中的任務
int rc;
for (rc = 0; !rc; ) {
zmq_msg_t task;
zmq_msg_init (&task);
if ((rc = zmq_recv (receiver, &task, ZMQ_NOBLOCK)) == 0) {
// 處理任務
}
zmq_msg_close (&task);
}
// 處理等待中的氣象更新
for (rc = 0; !rc; ) {
zmq_msg_t update;
zmq_msg_init (&update);
if ((rc = zmq_recv (subscriber, &update, ZMQ_NOBLOCK)) == 0) {
// 處理氣象更新
}
zmq_msg_close (&update);
}
// 沒有消息,等待1毫秒
s_sleep (1);
}
// 程序不會運行到這里,但還是做正確的退出清理工作
zmq_close (receiver);
zmq_close (subscriber);
zmq_term (context);
return 0;
}
```
這種方式的缺點之一是,在收到第一條消息之前會有1毫秒的延遲,這在高壓力的程序中還是會構成問題的。此外,你還需要翻閱諸如nanosleep()的函數,不會造成循環次數的激增。
示例中將任務分發器的優先級提升了,你可以做一個改進,輪流處理消息,正如ZMQ內部做的公平隊列機制一樣。
下面,讓我們看看如何用zmq_poll()來實現同樣的功能:
**mspoller: Multiple socket poller in C**
```c
//
// 從多個套接字中接收消息
// 本例使用zmq_poll()函數
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 連接任務分發器
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// 連接氣象更新服務
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
// 初始化輪詢對象
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ subscriber, 0, ZMQ_POLLIN, 0 }
};
// 處理來自兩個套接字的消息
while (1) {
zmq_msg_t message;
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (receiver, &message, 0);
// 處理任務
zmq_msg_close (&message);
}
if (items [1].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (subscriber, &message, 0);
// 處理氣象更新
zmq_msg_close (&message);
}
}
// 程序不會運行到這兒
zmq_close (receiver);
zmq_close (subscriber);
zmq_term (context);
return 0;
}
```
### 處理錯誤和ETERM信號
ZMQ的錯誤處理機制提倡的是快速崩潰。我們認為,一個進程對于自身內部的錯誤來說要越脆弱越好,而對外部的攻擊和錯誤要足夠健壯。舉個例子,活細胞會因檢測到自身問題而瓦解,但對外界的攻擊卻能極力抵抗。在ZMQ編程中,斷言用得是非常多的,如同細胞膜一樣。如果我們無法確定一個錯誤是來自于內部還是外部,那這就是一個設計缺陷了,需要修復。
在C語言中,斷言失敗會讓程序立即中止。其他語言中可以使用異常來做到。
當ZMQ檢測到來自外部的問題時,它會返回一個錯誤給調用程序。如果ZMQ不能從錯誤中恢復,那它是不會安靜地將消息丟棄的。某些情況下,ZMQ也會去斷言外部錯誤,這些可以被歸結為BUG。
到目前為止,我們很少看到C語言的示例中有對錯誤進行處理。**現實中的代碼應該對每一次的ZMQ函數調用作錯誤處理**。如果你不是使用C語言進行編程,可能那種語言的ZMQ類庫已經做了錯誤處理。但在C語言中,你需要自己動手。以下是一些常規的錯誤處理手段,從POSIX規范開始:
* 創建對象的方法如果失敗了會返回NULL;
* 其他方法執行成功時會返回0,失敗時會返回其他值(一般是-1);
* 錯誤代碼可以從變量errno中獲得,或者調用zmq_errno()函數;
* 錯誤消息可以調用zmq_strerror()函數獲得。
有兩種情況不應該被認為是錯誤:
* 當線程使用NOBLOCK方式調用zmq_recv()時,若沒有接收到消息,該方法會返回-1,并設置errno為EAGAIN;
* 當線程調用zmq_term()時,若其他線程正在進行阻塞式的處理,該函數會中止所有的處理,關閉套接字,并使得那些阻塞方法的返回值為-1,errno設置為ETERM。
遵循以上規則,你就可以在ZMQ程序中使用斷言了:
```c
void *context = zmq_init (1);
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc;
rc = zmq_bind (socket, "tcp://*:5555");
assert (rc == 0);
```
第一版的程序中我將函數調用直接放在了assert()函數里面,這樣做會有問題,因為一些優化程序會直接將程序中的assert()函數去除。
讓我們看看如何正確地關閉一個進程,我們用管道模式舉例。當我們在后臺開啟了一組worker時,我們需要在任務執行完畢后關閉它們。我們可以向這些worker發送自殺的消息,這項工作由結果收集器來完成會比較恰當。
如何將結果收集器和worker相連呢?PUSH-PULL套接字是單向的。ZMQ的原則是:如果需要解決一個新的問題,就該使用新的套接字。這里我們使用發布-訂閱模式來發送自殺的消息:
* 結果收集器創建PUB套接字,并連接至一個新的端點;
* worker將SUB套接字連接至這個端點;
* 當結果收集器檢測到任務執行完畢時,會通過PUB套接字發送自殺信號;
* worker收到自殺信號后便會中止。
這一過程不會添加太多的代碼:
```c
void *control = zmq_socket (context, ZMQ_PUB);
zmq_bind (control, "tcp://*:5559");
...
// Send kill signal to workers
zmq_msg_init_data (&message, "KILL", 5);
zmq_send (control, &message, 0);
zmq_msg_close (&message);
```

下面是worker進程的代碼,它會打開三個套接字:用于接收任務的PULL、用于發送結果的PUSH、以及用于接收自殺信號的SUB,使用zmq_poll()進行輪詢:
**taskwork2: Parallel task worker with kill signaling in C**
```c
//
// 管道模式 - worker 設計2
// 添加發布-訂閱消息流,用以接收自殺消息
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 用于接收消息的套接字
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// 用戶發送消息的套接字
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// 用戶接收控制消息的套接字
void *controller = zmq_socket (context, ZMQ_SUB);
zmq_connect (controller, "tcp://localhost:5559");
zmq_setsockopt (controller, ZMQ_SUBSCRIBE, "", 0);
// 處理接收到的任務或控制消息
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ controller, 0, ZMQ_POLLIN, 0 }
};
// 處理消息
while (1) {
zmq_msg_t message;
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (receiver, &message, 0);
// 工作
s_sleep (atoi ((char *) zmq_msg_data (&message)));
// 發送結果
zmq_msg_init (&message);
zmq_send (sender, &message, 0);
// 簡單的任務進圖指示
printf (".");
fflush (stdout);
zmq_msg_close (&message);
}
// 任何控制命令都表示自殺
if (items [1].revents & ZMQ_POLLIN)
break; // 退出循環
}
// 結束程序
zmq_close (receiver);
zmq_close (sender);
zmq_close (controller);
zmq_term (context);
return 0;
}
```
下面是修改后的結果收集器代碼,在收集完結果后向所有worker發送自殺消息:
**tasksink2: Parallel task sink with kill signaling in C**
```c
//
// 管道模式 - 結構收集器 設計2
// 添加發布-訂閱消息流,用以向worker發送自殺信號
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 用于接收消息的套接字
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// 用以發送控制信息的套接字
void *controller = zmq_socket (context, ZMQ_PUB);
zmq_bind (controller, "tcp://*:5559");
// 等待任務開始
char *string = s_recv (receiver);
free (string);
// 開始計時
int64_t start_time = s_clock ();
// 確認100個任務處理完畢
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
printf ("總執行時間: %d msec\n",
(int) (s_clock () - start_time));
// 發送自殺消息給worker
s_send (controller, "KILL");
// 結束
sleep (1); // 等待發送完畢
zmq_close (receiver);
zmq_close (controller);
zmq_term (context);
return 0;
}
```
### 處理中斷信號
現實環境中,當應用程序收到Ctrl-C或其他諸如ETERM的信號時需要能夠正確地清理和退出。默認情況下,這一信號會殺掉進程,意味著尚未發送的消息就此丟失,文件不能被正確地關閉等。
在C語言中我們是這樣處理消息的:
**interrupt: Handling Ctrl-C cleanly in C**
```c
//
// Shows how to handle Ctrl-C
//
#include <zmq.h>
#include <stdio.h>
#include <signal.h>
// ---------------------------------------------------------------------
// 消息處理
//
// 程序開始運行時調用s_catch_signals()函數;
// 在循環中判斷s_interrupted是否為1,是則跳出循環;
// 很適用于zmq_poll()。
static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{
s_interrupted = 1;
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset (&action.sa_mask);
sigaction (SIGINT, &action, NULL);
sigaction (SIGTERM, &action, NULL);
}
int main (void)
{
void *context = zmq_init (1);
void *socket = zmq_socket (context, ZMQ_REP);
zmq_bind (socket, "tcp://*:5555");
s_catch_signals ();
while (1) {
// 阻塞式的讀取會在收到信號時停止
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
if (s_interrupted) {
printf ("W: 收到中斷消息,程序中止...\n");
break;
}
}
zmq_close (socket);
zmq_term (context);
return 0;
}
```
這段程序使用s_catch_signals()函數來捕捉像Ctrl-C(SIGINT)和SIGTERM這樣的信號。收到任一信號后,該函數會將全局變量s_interrupted設置為1。你的程序并不會自動停止,需要顯式地做一些清理和退出工作。
* 在程序開始時調用s_catch_signals()函數,用來進行信號捕捉的設置;
* 如果程序在zmq_recv()、zmq_poll()、zmq_send()等函數中阻塞,當有信號傳來時,這些函數會返回EINTR;
* 像s_recv()這樣的函數會將這種中斷包裝為NULL返回;
* 所以,你的應用程序可以檢查是否有EINTR錯誤碼、或是NULL的返回、或者s_interrupted變量是否為1。
如果以下代碼就十分典型:
```c
s_catch_signals ();
client = zmq_socket (...);
while (!s_interrupted) {
char *message = s_recv (client);
if (!message)
break; // 按下了Ctrl-C
}
zmq_close (client);
```
如果你在設置s_catch_signals()之后沒有進行相應的處理,那么你的程序將對Ctrl-C和ETERM免疫。
### 檢測內存泄露
任何長時間運行的程序都應該妥善的管理內存,否則最終會發生內存溢出,導致程序崩潰。如果你所使用的編程序言會自動幫你完成內存管理,那就要恭喜你了。但若你使用類似C/C++之類的語言時,就需要自己動手進行內存管理了。下面會介紹一個名為valgrind的工具,可以用它來報告內存泄露的問題。
* 在Ubuntu或Debian操作系統上安裝valgrind:sudo apt-get install valgrind
* 缺省情況下,ZMQ會讓valgrind不停地報錯,想要屏蔽警告的話可以在編譯ZMQ時使用ZMQ_MAKE_VALGRIND_HAPPY宏選項:
```
$ cd zeromq2
$ export CPPFLAGS=-DZMQ_MAKE_VALGRIND_HAPPY
$ ./configure
$ make clean; make
$ sudo make install
```
* 應用程序應該正確地處理Ctrl-C,特別是對于長時間運行的程序(如隊列裝置),如果不這么做,valgrind會報告所有已分配的內存發生了錯誤。
* 使用-DDEBUG選項編譯程序,這樣可以讓valgrind告訴你具體是哪段代碼發生了內存溢出。
* 最后,使用如下方法運行valgrind:
```
valgrind --tool=memcheck --leak-check=full someprog
```
解決完所有的問題后,你會看到以下信息:
```
==30536== ERROR SUMMARY: 0 errors from 0 contexts...
```
### 多幀消息
ZMQ消息可以包含多個幀,這在實際應用中非常常見,特別是那些有關“信封”的應用,我們下文會談到。我們這一節要講的是如何正確地收發多幀消息。
多幀消息的每一幀都是一個zmq_msg結構,也就是說,當你在收發含有五個幀的消息時,你需要處理五個zmq_msg結構。你可以將這些幀放入一個數據結構中,或者直接一個個地處理它們。
下面的代碼演示如何發送多幀消息:
```c
zmq_send (socket, &message, ZMQ_SNDMORE);
...
zmq_send (socket, &message, ZMQ_SNDMORE);
...
zmq_send (socket, &message, 0);
```
然后我們看看如何接收并處理這些消息,這段代碼對單幀消息和多幀消息都適用:
```c
while (1) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
// 處理一幀消息
zmq_msg_close (&message);
int64_t more;
size_t more_size = sizeof (more);
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
if (!more)
break; // 已到達最后一幀
}
```
關于多幀消息,你需要了解的還有:
* 在發送多幀消息時,只有當最后一幀提交發送了,整個消息才會被發送;
* 如果使用了zmq_poll()函數,當收到了消息的第一幀時,其它幀其實也已經收到了;
* 多幀消息是整體傳輸的,不會只收到一部分;
* 多幀消息的每一幀都是一個zmq_msg結構;
* 無論你是否檢查套接字的ZMQ_RCVMORE選項,你都會收到所有的消息;
* 發送時,ZMQ會將開始的消息幀緩存在內存中,直到收到最后一幀才會發送;
* 我們無法在發送了一部分消息后取消發送,只能關閉該套接字。
### 中間件和裝置
當網絡組件的數量較少時,所有節點都知道其它節點的存在。但隨著節點數量的增加,這種結構的成本也會上升。因此,我們需要將這些組件拆分成更小的模塊,使用一個中間件來連接它們。
這種結構在現實世界中是非常常見的,我們的社會和經濟體系中充滿了中間件的機制,用以降低復雜度,壓縮構建大型網絡的成本。中間件也會被稱為批發商、分包商、管理者等等。
ZMQ網絡也是一樣,如果規模不斷增長,就一定會需要中間件。ZMQ中,我們稱其為“裝置”。在構建ZMQ軟件的初期,我們會畫出幾個節點,然后將它們連接起來,不使用中間件:

隨后,我們對這個結構不斷地進行擴充,將裝置放到特定的位置,進一步增加節點數量:

ZMQ裝置沒有具體的設計規則,但一般會有一組“前端”端點和一組“后端”端點。裝置是無狀態的,因此可以被廣泛地部署在網絡中。你可以在進程中啟動一個線程來運行裝置,或者直接在一個進程中運行裝置。ZMQ內部也提供了基本的裝置實現可供使用。
ZMQ裝置可以用作路由和尋址、提供服務、隊列調度、以及其他你所能想到的事情。不同的消息模式需要用到不同類型的裝置來構建網絡。如,請求-應答模式中可以使用隊列裝置、抽象服務;發布-訂閱模式中則可使用流裝置、主題裝置等。
ZMQ裝置比起其他中間件的優勢在于,你可以將它放在網絡中任何一個地方,完成任何你想要的事情。
#### 發布-訂閱代理服務
我們經常會需要將發布-訂閱模式擴充到不同類型的網絡中。比如說,有一組訂閱者是在外網上的,我們想用廣播的方式發布消息給內網的訂閱者,而用TCP協議發送給外網訂閱者。
我們要做的就是寫一個簡單的代理服務裝置,在發布者和外網訂閱者之間搭起橋梁。這個裝置有兩個端點,一端連接內網上的發布者,另一端連接到外網上。它會從發布者處接收訂閱的消息,并轉發給外網上的訂閱者們。
**wuproxy: Weather update proxy in C**
```c
//
// 氣象信息代理服務裝置
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 訂閱氣象信息
void *frontend = zmq_socket (context, ZMQ_SUB);
zmq_connect (frontend, "tcp://192.168.55.210:5556");
// 轉發氣象信息
void *backend = zmq_socket (context, ZMQ_PUB);
zmq_bind (backend, "tcp://10.1.1.0:8100");
// 訂閱所有消息
zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0);
// 轉發消息
while (1) {
while (1) {
zmq_msg_t message;
int64_t more;
// 處理所有的消息幀
zmq_msg_init (&message);
zmq_recv (frontend, &message, 0);
size_t more_size = sizeof (more);
zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
zmq_send (backend, &message, more? ZMQ_SNDMORE: 0);
zmq_msg_close (&message);
if (!more)
break; // 到達最后一幀
}
}
// 程序不會運行到這里,但依然要正確地退出
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
```
我們稱這個裝置為代理,因為它既是訂閱者,又是發布者。這就意味著,添加該裝置時不需要更改其他程序的代碼,只需讓外網訂閱者知道新的網絡地址即可。

可以注意到,這段程序能夠正確處理多幀消息,會將它完整的轉發給訂閱者。如果我們在發送時不指定ZMQ_SNDMORE選項,那么下游節點收到的消息就可能是破損的。編寫裝置時應該要保證能夠正確地處理多幀消息,否則會造成消息的丟失。
#### 請求-應答代理
下面讓我們在請求-應答模式中編寫一個小型的消息隊列代理裝置。
在Hello World客戶/服務模型中,一個客戶端和一個服務端進行通信。但在真實環境中,我們會需要讓多個客戶端和多個服務端進行通信。關鍵問題在于,服務端應該是無狀態的,所有的狀態都應該包含在一次請求中,或者存放其它介質中,如數據庫。
我們有兩種方式來連接多個客戶端和多個服務端。第一種是讓客戶端直接和多個服務端進行連接。客戶端套接字可以連接至多個服務端套接字,它所發送的請求會通過負載均衡的方式分發給服務端。比如說,有一個客戶端連接了三個服務端,A、B、C,客戶端產生了R1、R2、R3、R4四個請求,那么,R1和R4會由服務A處理,R2由B處理,R3由C處理:

這種設計的好處在于可以方便地添加客戶端,但若要添加服務端,那就得修改每個客戶端的配置。如果你有100個客戶端,需要添加三個服務端,那么這些客戶端都需要重新進行配置,讓其知道新服務端的存在。
這種方式肯定不是我們想要的。一個網絡結構中如果有太多固化的模塊就越不容易擴展。因此,我們需要有一個模塊位于客戶端和服務端之間,將所有的知識都匯聚到這個網絡拓撲結構中。理想狀態下,我們可以任意地增減客戶端或是服務端,不需要更改任何組件的配置。
下面就讓我們編寫這樣一個組件。這個代理會綁定到兩個端點,前端端點供客戶端連接,后端端點供服務端連接。它會使用zmq_poll()來輪詢這兩個套接字,接收消息并進行轉發。裝置中不會有隊列的存在,因為ZMQ已經自動在套接字中完成了。
在使用REQ和REP套接字時,其請求-應答的會話是嚴格同步。客戶端發送請求,服務端接收請求并發送應答,由客戶端接收。如果客戶端或服務端中的一個發生問題(如連續兩次發送請求),程序就會報錯。
但是,我們的代理裝置必須要是非阻塞式的,雖然可以使用zmq_poll()同時處理兩個套接字,但這里顯然不能使用REP和REQ套接字。
幸運的是,我們有DEALER和ROUTER套接字可以勝任這項工作,進行非阻塞的消息收發。DEALER過去被稱為XREQ,ROUTER被稱為XREP,但新的代碼中應盡量使用DEALER/ROUTER這種名稱。在第三章中你會看到如何用DEALER和ROUTER套接字構建不同類型的請求-應答模式。
下面就讓我們看看DEALER和ROUTER套接字是怎樣在裝置中工作的。
下方的簡圖描述了一個請求-應答模式,REQ和ROUTER通信,DEALER再和REP通信。ROUTER和DEALER之間我們則需要進行消息轉發:

請求-應答代理會將兩個套接字分別綁定到前端和后端,供客戶端和服務端套接字連接。在使用該裝置之前,還需要對客戶端和服務端的代碼進行調整。
** rrclient: Request-reply client in C **
```c
//
// Hello world 客戶端
// 連接REQ套接字至 tcp://localhost:5559 端點
// 發送Hello給服務端,等待World應答
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 用于和服務端通信的套接字
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5559");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
s_send (requester, "Hello");
char *string = s_recv (requester);
printf ("收到應答 %d [%s]\n", request_nbr, string);
free (string);
}
zmq_close (requester);
zmq_term (context);
return 0;
}
```
下面是服務代碼:
**rrserver: Request-reply service in C**
```c
//
// Hello World 服務端
// 連接REP套接字至 tcp://*:5560 端點
// 接收Hello請求,返回World應答
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 用于何客戶端通信的套接字
void *responder = zmq_socket (context, ZMQ_REP);
zmq_connect (responder, "tcp://localhost:5560");
while (1) {
// 等待下一個請求
char *string = s_recv (responder);
printf ("Received request: [%s]\n", string);
free (string);
// 做一些“工作”
sleep (1);
// 返回應答信息
s_send (responder, "World");
}
// 程序不會運行到這里,不過還是做好清理工作
zmq_close (responder);
zmq_term (context);
return 0;
}
```
最后是代理程序,可以看到它是能夠處理多幀消息的:
**rrbroker: Request-reply broker in C**
```c
//
// 簡易請求-應答代理
//
#include "zhelpers.h"
int main (void)
{
// 準備上下文和套接字
void *context = zmq_init (1);
void *frontend = zmq_socket (context, ZMQ_ROUTER);
void *backend = zmq_socket (context, ZMQ_DEALER);
zmq_bind (frontend, "tcp://*:5559");
zmq_bind (backend, "tcp://*:5560");
// 初始化輪詢集合
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
// 在套接字間轉發消息
while (1) {
zmq_msg_t message;
int64_t more; // 檢測多幀消息
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
while (1) {
// 處理所有消息幀
zmq_msg_init (&message);
zmq_recv (frontend, &message, 0);
size_t more_size = sizeof (more);
zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
zmq_send (backend, &message, more? ZMQ_SNDMORE: 0);
zmq_msg_close (&message);
if (!more)
break; // 最后一幀
}
}
if (items [1].revents & ZMQ_POLLIN) {
while (1) {
// 處理所有消息幀
zmq_msg_init (&message);
zmq_recv (backend, &message, 0);
size_t more_size = sizeof (more);
zmq_getsockopt (backend, ZMQ_RCVMORE, &more, &more_size);
zmq_send (frontend, &message, more? ZMQ_SNDMORE: 0);
zmq_msg_close (&message);
if (!more)
break; // 最后一幀
}
}
}
// 程序不會運行到這里,不過還是做好清理工作
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
```
使用請求-應答代理可以讓你的C/S網絡結構更易于擴展:客戶端不知道服務端的存在,服務端不知道客戶端的存在。網絡中唯一穩定的組件是中間的代理裝置:

#### 內置裝置
ZMQ提供了一些內置的裝置,不過大多數人需要自己手動編寫這些裝置。內置裝置有:
* QUEUE,可用作請求-應答代理;
* FORWARDER,可用作發布-訂閱代理服務;
* STREAMER,可用作管道模式代理。
可以使用zmq_device()來啟動一個裝置,需要傳遞兩個套接字給它:
```c
zmq_device (ZMQ_QUEUE, frontend, backend);
```
啟動了QUEUE隊列就如同在網絡中加入了一個請求-應答代理,只需為其創建已綁定或連接的套接字即可。下面這段代碼是使用內置裝置的情形:
**msgqueue: Message queue broker in C**
```c
//
// 簡單消息隊列代理
// 功能和請求-應答代理相同,但使用了內置的裝置
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 客戶端套接字
void *frontend = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (frontend, "tcp://*:5559");
// 服務端套接字
void *backend = zmq_socket (context, ZMQ_DEALER);
zmq_bind (backend, "tcp://*:5560");
// 啟動內置裝置
zmq_device (ZMQ_QUEUE, frontend, backend);
// 程序不會運行到這里
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
```
內置裝置會恰當地處理錯誤,而我們手工實現的代理并沒有加入錯誤處理機制。所以說,當你能夠在程序中使用內置裝置的時候就盡量用吧。
可能你會像某些ZMQ開發者一樣提出這樣一個問題:如果我將其他類型的套接字傳入這些裝置中會發生什么?答案是:別這么做。你可以隨意傳入不同類型的套接字,但是執行結果會非常奇怪。所以,QUEUE裝置應使用ROUTER/DEALER套接字、FORWARDER應使用SUB/PUB、STREAMER應使用PULL/PUSH。
當你需要其他的套接字類型進行組合時,那就需要自己編寫裝置了。
### ZMQ多線程編程
使用ZMQ進行多線程編程(MT編程)將會是一種享受。在多線程中使用ZMQ套接字時,你不需要考慮額外的東西,讓它們自如地運作就好。
使用ZMQ進行多線程編程時,**不需要考慮互斥、鎖、或其他并發程序中要考慮的因素,你唯一要關心的僅僅是線程之間的消息**。
什么叫“完美”的多線程編程,指的是代碼易寫易讀,可以跨系統、跨語言地使用同一種技術,能夠在任意顆核心的計算機上運行,沒有狀態,沒有速度的瓶頸。
如果你有多年的多線程編程經驗,知道如何使用鎖、信號燈、臨界區等機制來使代碼運行得正確(尚未考慮快速),那你可能會很沮喪,因為ZMQ將改變這一切。三十多年來,并發式應用程序開發所總結的經驗是:不要共享狀態。這就好比兩個醉漢想要分享一杯啤酒,如果他們不是鐵哥們兒,那他們很快就會打起來。當有更多的醉漢加入時,情況就會更糟。多線程編程有時就像醉漢搶奪啤酒那樣糟糕。
進行多線程編程往往是痛苦的,當程序因為壓力過大而崩潰時,你會不知所然。有人寫過一篇《多線程代碼中的11個錯誤易發點》的文章,在大公司中廣為流傳,列舉其中的幾項:沒有進行同步、錯誤的粒度、讀寫分離、無鎖排序、鎖傳遞、優先級沖突等。
假設某一天的下午三點,當證券市場正交易得如火如荼的時候,突然之間,應用程序因為鎖的問題崩潰了,那將會是何等的場景?所以,作為程序員的我們,為解決那些復雜的多線程問題,只能用上更復雜的編程機制。
有人曾這樣比喻,那些多線程程序原本應作為大型公司的核心支柱,但往往又最容易出錯;那些想要通過網絡不斷進行延伸的產品,最后總以失敗告終。
如何用ZMQ進行多線程編程,以下是一些規則:
* 不要在不同的線程之間訪問同一份數據,如果要用到傳統編程中的互斥機制,那就有違ZMQ的思想了。唯一的例外是ZMQ上下文對象,它是線程安全的。
* 必須為進程創建ZMQ上下文,并將其傳遞給所有你需要使用inproc協議進行通信的線程;
* 你可以將線程作為單獨的任務來對待,使用自己的上下文,但是這些線程之間就不能使用inproc協議進行通信了。這樣做的好處是可以在日后方便地將程序拆分為不同的進程來運行。
* 不要在不同的線程之間傳遞套接字對象,這些對象不是線程安全的。從技術上來說,你是可以這樣做的,但是會用到互斥和鎖的機制,這會讓你的應用程序變得緩慢和脆弱。唯一合理的情形是,在某些語言的ZMQ類庫內部,需要使用垃圾回收機制,這時可能會進行套接字對象的傳遞。
當你需要在應用程序中使用兩個裝置時,可能會將套接字對象從一個線程傳遞給另一個線程,這樣做一開始可能會成功,但最后一定會隨機地發生錯誤。所以說,應在同一個線程中打開和關閉套接字。
如果你能遵循上面的規則,就會發現多線程程序可以很容易地拆分成多個進程。程序邏輯可以在線程、進程、或是計算機中運行,根據你的需求進行部署即可。
ZMQ使用的是系統原生的線程機制,而不是某種“綠色線程”。這樣做的好處是你不需要學習新的多線程編程API,而且可以和目標操作系統進行很好的結合。你可以使用類似英特爾的ThreadChecker工具來查看線程工作的情況。缺點在于,如果程序創建了太多的線程(如上千個),則可能導致操作系統負載過高。
下面我們舉一個實例,讓原來的Hello World服務變得更為強大。原來的服務是單線程的,如果請求較少,自然沒有問題。ZMQ的線程可以在一個核心上高速地運行,執行大量的工作。但是,如果有一萬次請求同時發送過來會怎么樣?因此,現實環境中,我們會啟動多個worker線程,他們會盡可能地接收客戶端請求,處理并返回應答。
當然,我們可以使用啟動多個worker進程的方式來實現,但是啟動一個進程總比啟動多個進程要來的方便且易于管理。而且,作為線程啟動的worker,所占用的帶寬會比較少,延遲也會較低。
以下是多線程版的Hello World服務:
**mtserver: Multithreaded service in C**
```c
//
// 多線程版Hello World服務
//
#include "zhelpers.h"
#include <pthread.h>
static void *
worker_routine (void *context) {
// 連接至代理的套接字
void *receiver = zmq_socket (context, ZMQ_REP);
zmq_connect (receiver, "inproc://workers");
while (1) {
char *string = s_recv (receiver);
printf ("Received request: [%s]\n", string);
free (string);
// 工作
sleep (1);
// 返回應答
s_send (receiver, "World");
}
zmq_close (receiver);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
// 用于和client進行通信的套接字
void *clients = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (clients, "tcp://*:5555");
// 用于和worker進行通信的套接字
void *workers = zmq_socket (context, ZMQ_DEALER);
zmq_bind (workers, "inproc://workers");
// 啟動一個worker池
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_routine, context);
}
// 啟動隊列裝置
zmq_device (ZMQ_QUEUE, clients, workers);
// 程序不會運行到這里,但仍進行清理工作
zmq_close (clients);
zmq_close (workers);
zmq_term (context);
return 0;
}
```
所有的代碼應該都已經很熟悉了:
* 服務端啟動一組worker線程,每個worker創建一個REP套接字,并處理收到的請求。worker線程就像是一個單線程的服務,唯一的區別是使用了inproc而非tcp協議,以及綁定-連接的方向調換了。
* 服務端創建ROUTER套接字用以和client通信,因此提供了一個TCP協議的外部接口。
* 服務端創建DEALER套接字用以和worker通信,使用了內部接口(inproc)。
* 服務端啟動了QUEUE內部裝置,連接兩個端點上的套接字。QUEUE裝置會將收到的請求分發給連接上的worker,并將應答路由給請求方。
需要注意的是,在某些編程語言中,創建線程并不是特別方便,POSIX提供的類庫是pthreads,但Windows中就需要使用不同的API了。我們會在第三章中講述如何包裝一個多線程編程的API。
示例中的“工作”僅僅是1秒鐘的停留,我們可以在worker中進行任意的操作,包括與其他節點進行通信。消息的流向是這樣的:REQ-ROUTER-queue-DEALER-REP。

### 線程間的信號傳輸
當你剛開始使用ZMQ進行多線程編程時,你可能會問:要如何協調兩個線程的工作呢?可能會想要使用sleep()這樣的方法,或者使用諸如信號、互斥等機制。事實上,**你唯一要用的就是ZMQ本身**。回憶一下那個醉漢搶啤酒的例子吧。
下面的示例演示了三個線程之間需要如何進行同步:

我們使用PAIR套接字和inproc協議。
**mtrelay: Multithreaded relay in C**
```c
//
// 多線程同步
//
#include "zhelpers.h"
#include <pthread.h>
static void *
step1 (void *context) {
// 連接至步驟2,告知我已就緒
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step2");
printf ("步驟1就緒,正在通知步驟2……\n");
s_send (xmitter, "READY");
zmq_close (xmitter);
return NULL;
}
static void *
step2 (void *context) {
// 啟動步驟1前線綁定至inproc套接字
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step2");
pthread_t thread;
pthread_create (&thread, NULL, step1, context);
// 等待信號
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);
// 連接至步驟3,告知我已就緒
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step3");
printf ("步驟2就緒,正在通知步驟3……\n");
s_send (xmitter, "READY");
zmq_close (xmitter);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
// 啟動步驟2前線綁定至inproc套接字
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step3");
pthread_t thread;
pthread_create (&thread, NULL, step2, context);
// 等待信號
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);
printf ("測試成功!\n");
zmq_term (context);
return 0;
}
```
這是一個ZMQ多線程編程的典型示例:
1. 兩個線程通過inproc協議進行通信,使用同一個上下文;
1. 父線程創建一個套接字,綁定至inproc://端點,然后再啟動子線程,將上下文對象傳遞給它;
1. 子線程創建第二個套接字,連接至inproc://端點,然后發送已就緒信號給父線程。
需要注意的是,這段代碼無法擴展到多個進程之間的協調。如果你使用inproc協議,只能建立結構非常緊密的應用程序。在延遲時間必須嚴格控制的情況下可以使用這種方法。對其他應用程序來說,每個線程使用同一個上下文,協議選用ipc或tcp。然后,你就可以自由地將應用程序拆分為多個進程甚至是多臺計算機了。
這是我們第一次使用PAIR套接字。為什么要使用PAIR?其他類型的套接字也可以使用,但都有一些缺點會影響到線程間的通信:
* 你可以讓信號發送方使用PUSH,接收方使用PULL,這看上去可能可以,但是需要注意的是,PUSH套接字發送消息時會進行負載均衡,如果你不小心開啟了兩個接收方,就會“丟失”一半的信號。而PAIR套接字建立的是一對一的連接,具有排他性。
* 可以讓發送方使用DEALER,接收方使用ROUTER。但是,ROUTER套接字會在消息的外層包裹一個來源地址,這樣一來原本零字節的信號就可能要成為一個多段消息了。如果你不在乎這個問題,并且不會重復讀取那個套接字,自然可以使用這種方法。但是,如果你想要使用這個套接字接收真正的數據,你就會發現ROUTER提供的消息是錯誤的。至于DEALER套接字,它同樣有負載均衡的機制,和PUSH套接字有相同的風險。
* 可以讓發送方使用PUB,接收方使用SUB。一來消息可以照原樣發送,二來PUB套接字不會進行負載均衡。但是,你需要對SUB套接字設置一個空的訂閱信息(用以接收所有消息);而且,如果SUB套接字沒有及時和PUB建立連接,消息很有可能會丟失。
綜上,使用PAIR套接字進行線程間的協調是最合適的。
### 節點協調
當你想要對節點進行協調時,PAIR套接字就不怎么合適了,這也是線程和節點之間的不同之處。一般來說,節點是來去自由的,而線程則較為穩定。使用PAIR套接字時,若遠程節點斷開連接后又進行重連,PAIR不會予以理會。
第二個區別在于,線程的數量一般是固定的,而節點數量則會經常變化。讓我們以氣象信息模型為基礎,看看要怎樣進行節點的協調,以保證客戶端不會丟失最開始的那些消息。
下面是程序運行邏輯:
* 發布者知道預期的訂閱者數量,這個數字可以任意指定;
* 發布者啟動后會先等待所有訂閱者進行連接,也就是節點協調。每個訂閱者會使用另一個套接字來告知發布者自己已就緒;
* 當所有訂閱者準備就緒后,發布者才開始發送消息。
這里我們會使用REQ-REP套接字來同步發布者和訂閱者。發布者的代碼如下:
**syncpub: Synchronized publisher in C**
```c
//
// 發布者 - 同步版
//
#include "zhelpers.h"
// 等待10個訂閱者連接
#define SUBSCRIBERS_EXPECTED 10
int main (void)
{
void *context = zmq_init (1);
// 用于和客戶端通信的套接字
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5561");
// 用于接收信號的套接字
void *syncservice = zmq_socket (context, ZMQ_REP);
zmq_bind (syncservice, "tcp://*:5562");
// 接收訂閱者的就緒信號
printf ("正在等待訂閱者就緒\n");
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {
// - 等待就緒信息
char *string = s_recv (syncservice);
free (string);
// - 發送應答
s_send (syncservice, "");
subscribers++;
}
// 開始發送100萬條數據
printf ("正在廣播消息\n");
int update_nbr;
for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
s_send (publisher, "Rhubarb");
s_send (publisher, "END");
zmq_close (publisher);
zmq_close (syncservice);
zmq_term (context);
return 0;
}
```

以下是訂閱者的代碼:
**syncsub: Synchronized subscriber in C**
```c
//
// 訂閱者 - 同步版
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 一、連接SUB套接字
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5561");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
// ZMQ太快了,我們延遲一會兒……
sleep (1);
// 二、與發布者進行同步
void *syncclient = zmq_socket (context, ZMQ_REQ);
zmq_connect (syncclient, "tcp://localhost:5562");
// - 發送請求
s_send (syncclient, "");
// - 等待應答
char *string = s_recv (syncclient);
free (string);
// 三、處理消息
int update_nbr = 0;
while (1) {
char *string = s_recv (subscriber);
if (strcmp (string, "END") == 0) {
free (string);
break;
}
free (string);
update_nbr++;
}
printf ("收到 %d 條消息\n", update_nbr);
zmq_close (subscriber);
zmq_close (syncclient);
zmq_term (context);
return 0;
}
```
以下這段shell腳本會啟動10個訂閱者、1個發布者:
```sh
echo "正在啟動訂閱者..."
for a in 1 2 3 4 5 6 7 8 9 10; do
syncsub &
done
echo "正在啟動發布者..."
syncpub
```
結果如下:
```
正在啟動訂閱者...
正在啟動發布者...
收到 1000000 條消息
收到 1000000 條消息
收到 1000000 條消息
收到 1000000 條消息
收到 1000000 條消息
收到 1000000 條消息
收到 1000000 條消息
收到 1000000 條消息
收到 1000000 條消息
收到 1000000 條消息
```
當REQ-REP請求完成時,我們仍無法保證SUB套接字已成功建立連接。除非使用inproc協議,否則對外連接的順序是不一定的。因此,示例程序中使用了sleep(1)的方式來進行處理,隨后再發送同步請求。
更可靠的模型可以是:
* 發布者打開PUB套接字,開始發送Hello消息(非數據);
* 訂閱者連接SUB套接字,當收到Hello消息后再使用REQ-REP套接字進行同步;
* 當發布者獲得所有訂閱者的同步消息后,才開始發送真正的數據。
### 零拷貝
第一章中我們曾提過零拷貝是很危險的,其實那是嚇唬你的。既然你已經讀到這里了,說明你已經具備了足夠的知識,能夠使用零拷貝。但需要記住,條條大路通地獄,過早地對程序進行優化其實是沒有必要的。簡單的說,如果你用不好零拷貝,那可能會讓程序架構變得更糟。
ZMQ提供的API可以讓你直接發送和接收消息,不用考慮緩存的問題。正因為消息是由ZMQ在后臺收發的,所以使用零拷貝需要一些額外的工作。
做零拷貝時,使用zmq_msg_init_data()函數創建一條消息,其內容指向某個已經分配好的內存區域,然后將該消息傳遞給zmq_send()函數。創建消息時,你還需要提供一個用于釋放消息內容的函數,ZMQ會在消息發送完畢時調用。下面是一個簡單的例子,我們假設已經分配好的內存區域為1000個字節:
```c
void my_free (void *data, void *hint) {
free (data);
}
// Send message from buffer, which we allocate and 0MQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_send (socket, &message, 0);
```
在接收消息的時候是無法使用零拷貝的:ZMQ會將收到的消息放入一塊內存區域供你讀取,但不會將消息寫入程序指定的內存區域。
ZMQ的多段消息能夠很好地支持零拷貝。在傳統消息系統中,你需要將不同緩存中的內容保存到同一個緩存中,然后才能發送。但ZMQ會將來自不同內存區域的內容作為消息的一個幀進行發送。而且在ZMQ內部,一條消息會作為一個整體進行收發,因而非常高效。
### 瞬時套接字和持久套接字
在傳統網絡編程中,套接字是一個API對象,它們的生命周期不會長過程序的生命周期。但仔細打量一下套接字,它會占用一項特定的資源——緩存,這時ZMQ的開發者可能會問:是否有辦法在程序崩潰時讓這些套接字緩存得以保留,稍后能夠恢復?
這種特性應該會非常有用,雖然不能應對所有的危險,但至少可以挽回一部分損失,特別是多發布-訂閱模式來說。讓我們來討論一下。
這里有兩個套接字正在歡快地傳送著氣象信息:

如果接收方(SUB、PULL、REQ)指定了套接字標識,當它們斷開網絡時,發送方(PUB、PUSH、REP)會為它們緩存信息,直至達到閾值(HWM)。這里發送方不需要有套接字標識。
需要注意,ZMQ的套接字緩存對程序原來說是不可見的,正如TCP緩存一樣。
到目前為止,我們使用的套接字都是瞬時套接字。要將瞬時套接字轉化為持久套接字,需要為其設定一個套接字標識。所有的ZMQ套接字都會有一個標識,不過是由ZMQ自動生成的UUID。
在ZMQ內部,兩個套接字相連時會先交換各自的標識。如果發生對方沒有ID,則會自行生成一個用以標識對方:

但套接字也可以告知對方自己的標識,那當它們第二次連接時,就能知道對方的身份:
```
+-----------+
| |
| Sender |
| |
+-----------+
| Socket |
\-----------/
^ "Lucy! Nice to see you again..."
|
|
| "My name's Lucy"
/-----+-----\
| Socket |
+-----------+
| |
| Receiver |
| |
+-----------+
Figure # - Durable socket
```
下面這行代碼就可以為套接字設置標識,從而建立了一個持久的套接字:
```c
zmq_setsockopt (socket, ZMQ_IDENTITY, "Lucy", 4);
```
關于套接字標識還有幾點說明:
* 如果要為套接字設置標識,必須在連接或綁定至端點之前設置;
* 接收方會選擇使用套接字標識,正如cookie在HTTP網頁應用中的性質,是由服務器去選擇要使用哪個cookie的;
* 套接字標識是二進制字符串;以字節0開頭的套接字標識為ZMQ保留標識;
* 不用為多個套接字指定相同的標識,若套接字使用的標識已被占用,它將無法連接至其他套接字;
* 不要使用隨機的套接字標識,這樣會生成很多持久化套接字,最終讓節點崩潰;
* 如果你想獲取對方套接字的標識,只有ROUTER套接字會幫你自動完成這件事,使用其他套接字類型時,需要將標識作為消息的一幀發送過來;
* 說了以上這些,使用持久化套接字其實并不明智,因為它會讓發送者越來越混亂,讓架構變得脆弱。如果我們能重新設計ZMQ,很可能會去掉這種顯式聲明套接字標識的功能。
其他信息可以查看zmq_setsockopt()函數的ZMQ_IDENTITY一節。注意,該方法只能獲取程序中套接字的標識,而不能獲得對方套接字的標識。
### 發布-訂閱消息信封
我們簡單介紹了多幀消息,下面就來看看它的典型用法——消息信封。信封是指為消息注明來源地址,而不修改消息內容。
在發布-訂閱模式中,信封包含了訂閱信息,用以過濾掉不需要接收的消息。
如果你想要使用發布-訂閱信封,就需要自行生成和設置。這個動作是可選的,我們在之前的示例中也沒有使用到。在發布-訂閱模式中使用信封可能會比較麻煩,但在現實應用中還是很有必要的,畢竟信封和消息的確是兩塊不想干的數據。
這是發布-訂閱模式中一個帶有信封的消息:

我們回憶一下,發布-訂閱模式中,消息的接收是根據訂閱信息來的,也就是消息的前綴。將這個前綴放入單獨的消息幀,可以讓匹配變得非常明顯。因為不會有一個應用程序恰好只匹配了一部分數據。
下面是一個最簡的發布-訂閱消息信封示例。發布者會發送兩類消息:A和B,信封中指明了消息類型:
**psenvpub: Pub-sub envelope publisher in C**
```c
//
// 發布-訂閱消息信封 - 發布者
// s_sendmore()函數也是zhelpers.h提供的
//
#include "zhelpers.h"
int main (void)
{
// 準備上下文和PUB套接字
void *context = zmq_init (1);
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5563");
while (1) {
// 發布兩條消息,A類型和B類型
s_sendmore (publisher, "A");
s_send (publisher, "We don't want to see this");
s_sendmore (publisher, "B");
s_send (publisher, "We would like to see this");
sleep (1);
}
// 正確退出
zmq_close (publisher);
zmq_term (context);
return 0;
}
```
假設訂閱者只需要B類型的消息:
**psenvsub: Pub-sub envelope subscriber in C**
```c
//
// 發布-訂閱消息信封 - 訂閱者
//
#include "zhelpers.h"
int main (void)
{
// 準備上下文和SUB套接字
void *context = zmq_init (1);
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5563");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "B", 1);
while (1) {
// 讀取消息信封
char *address = s_recv (subscriber);
// 讀取消息內容
char *contents = s_recv (subscriber);
printf ("[%s] %s\n", address, contents);
free (address);
free (contents);
}
// 正確退出
zmq_close (subscriber);
zmq_term (context);
return 0;
}
```
執行上面的程序時,訂閱者會打印如下信息:
```
[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
...
```
這個示例說明訂閱者會丟棄未訂閱的消息,且接收完整的多幀消息——你不會只獲得消息的一部分。
如果你訂閱了多個套接字,又想知道這些套接字的標識,從而通過另一個套接字來發送消息給它們(這個用例很常見),你可以讓發布者創建一條含有三幀的消息:

### (半)持久訂閱者和閾值(HWM)
所有的套接字類型都可以使用標識。如果你在使用PUB和SUB套接字,其中SUB套接字為自己聲明了標識,那么,當SUB斷開連接時,PUB會保留要發送給SUB的消息。
這種機制有好有壞。好的地方在于發布者會暫存這些消息,當訂閱者重連后進行發送;不好的地方在于這樣很容易讓發布者因內存溢出而崩潰。
**如果你在使用持久化的SUB套接字(即為SUB設置了套接字標識),那么你必須設法避免消息在發布者隊列中堆砌并溢出,應該使用閾值(HWM)來保護發布者套接字**。發布者的閾值會分別影響所有的訂閱者。
我們可以運行一個示例來證明這一點,用第一章中的wuclient和wuserver具體,在wuclient中進行套接字連接前加入這一行:
```c
zmq_setsockopt (subscriber, ZMQ_IDENTITY, "Hello", 5);
```
編譯并運行這兩段程序,一切看起來都很平常。但是觀察一下發布者的內存占用情況,可以看到當訂閱者逐個退出后,發布者的內存占用會逐漸上升。若此時你重啟訂閱者,會發現發布者的內存占用不再增長了,一旦訂閱者停止,就又會增長。很快地,它就會耗盡系統資源。
我們先來看看如何設置閾值,然后再看如何設置得正確。下面的發布者和訂閱者使用了上文提到的“節點協調”機制。發布者會每隔一秒發送一條消息,這時你可以中斷訂閱者,重新啟動它,看看會發生什么。
以下是發布者的代碼:
**durapub: Durable publisher in C**
```c
//
// 發布者 - 連接持久化的訂閱者
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 訂閱者會發送已就緒的消息
void *sync = zmq_socket (context, ZMQ_PULL);
zmq_bind (sync, "tcp://*:5564");
// 使用該套接字發布消息
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5565");
// 等待同步消息
char *string = s_recv (sync);
free (string);
// 廣播10條消息,一秒一條
int update_nbr;
for (update_nbr = 0; update_nbr < 10; update_nbr++) {
char string [20];
sprintf (string, "Update %d", update_nbr);
s_send (publisher, string);
sleep (1);
}
s_send (publisher, "END");
zmq_close (sync);
zmq_close (publisher);
zmq_term (context);
return 0;
}
```
下面是訂閱者的代碼:
**durasub: Durable subscriber in C**
```c
//
// 持久化的訂閱者
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 連接SUB套接字
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_setsockopt (subscriber, ZMQ_IDENTITY, "Hello", 5);
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
zmq_connect (subscriber, "tcp://localhost:5565");
// 發送同步消息
void *sync = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sync, "tcp://localhost:5564");
s_send (sync, "");
// 獲取更新,并按指令退出
while (1) {
char *string = s_recv (subscriber);
printf ("%s\n", string);
if (strcmp (string, "END") == 0) {
free (string);
break;
}
free (string);
}
zmq_close (sync);
zmq_close (subscriber);
zmq_term (context);
return 0;
}
```
運行以上代碼,在不同的窗口中先后打開發布者和訂閱者。當訂閱者獲取了一至兩條消息后按Ctrl-C中止,然后重新啟動,看看執行結果:
```
$ durasub
Update 0
Update 1
Update 2
^C
$ durasub
Update 3
Update 4
Update 5
Update 6
Update 7
^C
$ durasub
Update 8
Update 9
END
```
可以看到訂閱者的唯一區別是為套接字設置了標識,發布者就會將消息緩存起來,待重建連接后發送。設置套接字標識可以讓瞬時套接字轉變為持久套接字。實踐中,你需要小心地給套接字起名字,可以從配置文件中獲取,或者生成一個UUID并保存起來。
當我們為PUB套接字設置了閾值,發布者就會緩存指定數量的消息,轉而丟棄溢出的消息。讓我們將閾值設置為2,看看會發生什么:
```c
uint64_t hwm = 2;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));
```
運行程序,中斷訂閱者后等待一段時間再重啟,可以看到結果如下:
```
$ durasub
Update 0
Update 1
^C
$ durasub
Update 2
Update 3
Update 7
Update 8
Update 9
END
```
看仔細了,發布者只為我們保存了兩條消息(2和3)。閾值使得ZMQ丟棄溢出隊列的消息。
簡而言之,如果你要使用持久化的訂閱者,就必須在發布者端設置閾值,否則可能造成服務器因內存溢出而崩潰。但是,還有另一種方法。ZMQ提供了名為交換區(swap)的機制,它是一個磁盤文件,用于存放從隊列中溢出的消息。啟動它很簡單:
```c
// 指定交換區大小,單位:字節。
uint64_t swap = 25000000;
zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));
```
我們可以將上面的方法綜合起來,編寫一個既能接受持久化套接字,又不至于內存溢出的發布者:
**durapub2: Durable but cynical publisher in C**
```c
//
// 發布者 - 連接持久化訂閱者
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 訂閱者會告知我們它已就緒
void *sync = zmq_socket (context, ZMQ_PULL);
zmq_bind (sync, "tcp://*:5564");
// 使用該套接字發送消息
void *publisher = zmq_socket (context, ZMQ_PUB);
// 避免慢持久化訂閱者消息溢出的問題
uint64_t hwm = 1;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));
// 設置交換區大小,供所有訂閱者使用
uint64_t swap = 25000000;
zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));
zmq_bind (publisher, "tcp://*:5565");
// 等待同步消息
char *string = s_recv (sync);
free (string);
// 發布10條消息,一秒一條
int update_nbr;
for (update_nbr = 0; update_nbr < 10; update_nbr++) {
char string [20];
sprintf (string, "Update %d", update_nbr);
s_send (publisher, string);
sleep (1);
}
s_send (publisher, "END");
zmq_close (sync);
zmq_close (publisher);
zmq_term (context);
return 0;
}
```
若在現實環境中將閾值設置為1,致使所有待發送的消息都保存到磁盤上,會大大降低處理速度。這里有一些典型的方法用以處理不同的訂閱者:
* **必須為PUB套接字設置閾值**,具體數字可以通過最大訂閱者數、可供隊列使用的最大內存區域、以及消息的平均大小來衡量。舉例來說,你預計會有5000個訂閱者,有1G的內存可供使用,消息大小在200個字節左右,那么,一個合理的閾值是1,000,000,000 / 200 / 5,000 = 1,000。
* 如果你不希望慢速或崩潰的訂閱者丟失消息,可以設置一個交換區,在高峰期的時候存放這些消息。交換區的大小可以根據訂閱者數、高峰消息比率、消息平均大小、暫存時間等來衡量。比如,你預計有5000個訂閱者,消息大小為200個字節左右,每秒會有10萬條消息。這樣,你每秒就需要100MB的磁盤空間來存放消息。加總起來,你會需要6GB的磁盤空間,而且必須足夠的快(這超出了本指南的講解范圍)。
關于持久化訂閱者:
* 數據可能會丟失,這要看消息發布的頻率、網絡緩存大小、通信協議等。持久化的訂閱者比起瞬時套接字要可靠一些,但也并不是完美的。
* 交換區文件是無法恢復的,所以當發布者或代理消亡時,交換區中的數據仍然會丟失。
關于閾值:
* 這個選項會同時影響套接字的發送和接收隊列。當然,PUB、PUSH不會有接收隊列,SUB、PULL、REQ、REP不會有發送隊列。而像DEALER、ROUTER、PAIR套接字時,他們既有發送隊列,又有接收隊列。
* 當套接字達到閾值時,ZMQ會發生阻塞,或直接丟棄消息。
* 使用inproc協議時,發送者和接受者共享同一個隊列緩存,所以說,真正的閾值是兩個套接字閾值之和。如果一方套接字沒有設置閾值,那么它就不會有緩存方面的限制。
### 這就是你想要的!
ZMQ就像是一盒積木,只要你有足夠的想象力,就可以用它組裝出任何造型的網絡架構。
這種高可擴、高彈性的架構一定會打開你的眼界。其實這并不是ZMQ原創的,早就有像[Erlang](http://www.erlang.org/)這樣的[基于流的編程語言](http://en.wikipedia.org/wiki/Flow-based_programming)已經能夠做到了,只是ZMQ提供了更為友善和易用的接口。
正如[Gonzo Diethelm](http://permalink.gmane.org/gmane.network.zeromq.devel/2145)所言:“我想用一句話來總結,‘如果ZMQ不存在,那它就應該被發明出來。’作為一個有著多年相關工作經驗的人,ZMQ太能引起我的共鳴了。我只能說,‘這就是我想要的!’”