# 云計算設計模式(四)——消費者的競爭模式
允許多個并發用戶處理在同一個通訊通道接收的消息。這種模式使系統能夠同時處理多個郵件,以優化吞吐量,提高可擴展性和可用性,以及平衡工作負載。
## 背景和問題
在云中運行的應用程序,可以預計,以處理大量的請求。而不是過程的每個請求同步地,一個常用的方法是通過一個消息傳送系統到該異步地處理它們的另一服務(消費者服務),以通過他們的應用程序。這種策略有助于確保在應用程序的業務邏輯沒有被阻塞,而正在處理的請求。
請求的數量可以隨著時間的原因有很多顯著變化。突然一陣在用戶活動或聚集的請求,來自多個租戶未來可能會導致不可預測的工作負載。在高峰時間的系統可能需要處理許多每秒數百個請求,而在其他時間的數量可能是非常小的。此外,該工作的性質進行的處理這些請求可能是高度可變的。使用消費者服務的單個實例,可能會導致該實例成為充斥請求或消息傳送系統可通過消息從應用程序來的流入被重載。為了處理這種波動的負載,該系統可以運行消費者服務的多個實例。然而這些消費者必須協調,以確保每個消息只傳送給一個單個消費者。工作量也需要跨消費者被負載平衡,以防止一個實例成為瓶頸。
## 解決方案
使用消息隊列來實現應用和消費者服務的實例之間的通信信道。在消息隊列中的形式應用帖請求,以及消費者的服務實例從隊列中接收消息并對其進行處理。這種方法使消費者的服務實例的同一池中從應用程序的任何實例處理消息。圖 1 示出了該架構。

圖1 - 使用消息隊列分發工作提高到一個服務的實例
該解決方案具有以下優點:
- 它使固有的負載調平系統,可以處理由應用程序實例發送請求量很大的變化。隊列充當應用程序實例和消費者服務實例,這有助于最大限度地減少對應用程序和服務實例(所描述的基于隊列的負載調平模式)的可用性和響應性的影響之間的緩沖區。處理的消息,需要一些將被執行時,不會妨礙同時由消費者服務的其他實例所處理的其它消息長期運行的處理。
- 它提高了可靠性。如果一個生產者直接與消費者,而不是使用這種模式進行通信,但不監視消費者,有一個高概率的消息可能丟失或失敗,如果消費者無法進行處理。在這種模式的消息不被發送給一個特定的服務實例,一個失敗服務實例不會阻塞一個生產者和消息可以通過任何加工服務實例進行處理。
- 它不需要復雜的協調的消費者之間,或在生產者和消費者的實例。消息隊列確保每個消息傳遞至少一次。
- 它是可擴展的。該系統能夠動態地增加或減少消費者服務的實例的數目的消息量是波動的。
- 它可以提高彈性,如果消息隊列提供事務讀取操作。如果消費者服務實例能夠讀取和處理該消息作為一個事務操作的一部分,并且如果這種消費服務實例隨后發生故障時,這種模式可以確保該消息將被返回到隊列中被拾起并處理通過的另一個實例消費者服務。
## 問題和注意事項
在決定如何實現這個模式時,請考慮以下幾點:
- 留言訂購。其中消費者服務實例接收消息的順序是無法保證的,并且不一定反映了所創建的消息中的順序。設計系統,以確保信息的處理是冪等的,因為這將有助于消除該消息的處理順序上的任何依賴。有關冪等的詳細信息,請參閱喬納森·奧利弗的博客冪等模式??。
**注意**
微軟 Azure 服務總線隊列可以通過使用消息會先入先出消息的順序工具保證。欲了解更多信息,請參閱消息傳遞模式 MSDN 上使用會話。
- 設計服務的永續性。如果系統被設計為檢測和重新啟動失敗的服務實例中,可能有必要執行由服務??實例執行作為冪等操作,以最小化被檢索和處理一次以上的單個消息的影響的處理。
- 檢測有害消息。格式不正確的消息,或者需要訪問不可用的資源的任務,可能會造成服務實例失敗。該系統應避免這樣的消息被返回到隊列,而是捕獲和別處存儲這些信息的詳細信息,以便可以在需要進行分析。
- 處理結果。服務實例處理一個消息從生成該消息的應用程序邏輯完全分離,并且它們未必能夠直接進行通信。如果服務實例生成必須傳回給應用程序邏輯結果,該信息必須被存儲在一個位置,都可以訪問兩個和系統必須提供某種指示時的處理已經完成,以防止應用邏輯從檢索數據不全。
**注意**
如果您正在使用 Azure 的工作進程可能能夠通過使用專用的郵件回復隊列回傳結果的應用程序邏輯。應用邏輯必須能夠將這些結果與原來的消息關聯起來。這種情況下進行了更詳細的異步消息的引物進行說明。
- 擴展的信息系統。在一個大型的解決方案,一個消息隊列可以是不堪重負的消息的數量,并成為系統中的瓶頸。在這種情況下,考慮分割該消息系統直接從特定制造商的信息到一個特定的隊列,或使用負載平衡,以跨多個消息隊列分發消息。
- 郵件系統的可靠性保障。一個可靠的消息傳送系統,需要保證的是,一旦應用程序放入隊列的消息,它也不會丟失。這是確保所有郵件傳遞至少一次重要的。
## 當使用這個模式
使用這種模式時:
- 工作量為一個應用程序被分成可異步運行任務。
- 任務是獨立的,可以并行地運行。
- 工作容積變化很大,需要一個可擴展的解決方案。
- 該解決方案必須提供高可用性,并且如果處理一個任務失敗必須是有彈性的。
這種模式可能不適合時:
- 它是不容易的應用程序的工作負荷分離成離散的任務,或有任務之間的依賴程度高。
- 任務必須同步進行,而應用邏輯必須等待任務完成后再繼續。
- 任務必須以特定的順序來執行。
### Note
有些郵件系統支持會話,使生產者對消息進行分組在一起,并確保它們都被同一個接收者處理。這個機制可以與優先消息使用(如果它們支持)來實現消息排序的一種形式,在順序從生產者傳送消息到單個消費者。
### 例子
Azure 提供存儲隊列和服務總線隊列,可作為一個合適的機制來實現這種模式。應用邏輯可以發布消息到一個隊列,而消費者實現為在一個或多個角色的任務可以檢索從這個隊列中的消息并進行處理。對于彈性,一個服務總線隊列使得消費者使用 PeekLock 模式,當它從隊列檢索消息。這種模式實際上不是刪除消息,而只是從其他消費者隱藏它。當處理完它原來的用戶可以刪除該郵件。如果消費者要失敗,偷看鎖將超時,消息將再次變得可見,讓消費者又找回它。
### Note
有關使用 Azure 的服務總線隊列的詳細信息,請參閱服務總線隊列,主題和 MSDN 上的訂閱。有關使用 Azure 存儲隊列的信息,請參閱如何 MSDN 上使用隊列存儲服務。
從可供下載的例子 CompetingConsumers 解決方案的 QueueManager 類下面的代碼顯示了本指南說明了如何通過在網絡或輔助角色開始的事件處理程序使用 QueueClient 實例中創建一個隊列。
~~~
private string queueName = ...;
private string connectionString = ...;
...
?
public async Task Start()
{
// Check if the queue already exists.
var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
if (!manager.QueueExists(this.queueName))
{
var queueDescription = new QueueDescription(this.queueName);
?
// Set the maximum delivery count for messages in the queue. A message
// is automatically dead-lettered after this number of deliveries. The
// default value for dead letter count is 10.
queueDescription.MaxDeliveryCount = 3;
?
await manager.CreateQueueAsync(queueDescription);
}
...
?
// Create the queue client. By default the PeekLock method is used.
this.client = QueueClient.CreateFromConnectionString(
this.connectionString, this.queueName);
}
~~~
下面的代碼片段顯示了一個應用程序如何創建和發送一批消息隊列。
~~~
public async Task SendMessagesAsync()
{
// Simulate sending a batch of messages to the queue.
var messages = new List<BrokeredMessage>();
?
for (int i = 0; i < 10; i++)
{
var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
messages.Add(message);
}
await this.client.SendBatchAsync(messages);
}
~~~
下面的代碼顯示了如何消費服務實例可以從隊列中下一個事件驅動的方式接收消息。該 processMessageTask 參數的 ReceiveMessages 法為代表,它引用在收到消息時運行的代碼。此代碼是異步運行。
~~~
private ManualResetEvent pauseProcessingEvent;
...
?
public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
{
// Set up the options for the message pump.
var options = new OnMessageOptions();
?
// When AutoComplete is disabled it is necessary to manually
// complete or abandon the messages and handle any errors.
options.AutoComplete = false;
options.MaxConcurrentCalls = 10;
options.ExceptionReceived += this.OptionsOnExceptionReceived;
?
// Use of the Service Bus OnMessage message pump.
// The OnMessage method must be called once, otherwise an exception will occur.
this.client.OnMessageAsync(
async (msg) =>
{
// Will block the current thread if Stop is called.
this.pauseProcessingEvent.WaitOne();
?
// Execute processing task here.
await processMessageTask(msg);
},
options);
}
...
?
private void OptionsOnExceptionReceived(object sender,
ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
...
}
~~~
需要注意的是自動縮放的功能,例如可在天青,可用于啟動和停止的角色實例的隊列長度的波動。欲了解更多信息,請參閱自動縮放指導。另外,沒有必要維持角色實例和工人之間的一對一的對應過程,單個角色實例可以實現多個工作進程。欲了解更多信息,請參閱計算資源整合模式。
- 前言
- (一)—— 緩存預留模式
- (二)—— 斷路器模式
- (三)—— 補償交易模式
- (四)——消費者的競爭模式
- (五)——計算資源整合模式
- (六)——命令和查詢職責分離(CQRS)模式
- (七)——事件獲取模式
- (八)——外部配置存儲模式
- (九)—— 聯合身份模式
- (十)——守門員模式
- (十一)—— 健康端點監控模式
- (十二)—— 索引表模式
- (十三)——領導人選舉模式
- (十四)——實體化視圖模式
- (十五)—— 管道和過濾器模式
- (十六)——優先級隊列模式
- (十七)—— 基于隊列的負載均衡模式
- (十八)—— 重試模式
- (十九)——運行重構模式
- (二十)—— 調度程序代理管理者模式
- (二十一)——Sharding 分片模式
- (二十二)——靜態內容托管模式
- (二十三)——Throttling 節流模式
- (二十四)—— 仆人鍵模式