# [.NET領域驅動設計實戰系列]專題八:DDD案例:網上書店分布式消息隊列和分布式緩存的實現
## 一、引言
在上一專題中,商家發貨和用戶確認收貨功能引入了消息隊列來實現的,引入消息隊列的好處可以保證消息的順序處理,并且具有良好的可擴展性。但是上一專題消息隊列是基于內存中隊列對象來實現,這樣實現有一個弊端,就是一旦服務重啟或出現故障時,此時消息隊列中的消息會丟失,并且也記錄不了日志。所以就會出現,商家發貨成功后,用戶并沒有收到郵件通知,并且也沒有日志讓我們發現是否發送了郵件通知。為了解決這個問題,就需要引入一種可恢復的消息隊列。目前有很多開源的消息隊列都支持可恢復的,例如TibcoEms.net等。然而,微軟的MSMQ也是支持這種特性的。并且MSMQ還支持分布式部署,關于MSMQ更多內容可以參考:[http://www.cnblogs.com/zhili/p/MSMQ.html](http://www.cnblogs.com/zhili/p/MSMQ.html)
在本專題中將介紹為網上書店案例引入分布式消息隊列和分布式緩存的實現。
## 二、分布式消息隊列的實現
MSMQ的實現原理是:消息的發送者把自己想要發送的信息放入一個容器,然后把它保存到一個系統公用空間的消息隊列中,本地或異地的消息接收程序再從該隊列中取出發給它的消息進行處理。所以,即使服務器突然重啟,消息也會存在于系統公用空間的消息隊列中,待服務器重新啟動后,可以繼續接受消息進行處理,從而解決上一專題存在的問題。另外,上一專題的消息隊列只能被用在當前服務器中,而MSMQ支持分布式部署,不同機器都可以對MSMQ進行接收消息來處理,此時MSMQ起到一個中間件的作用。
在為網上書店引入分布式消息隊列之前,讓我們先理一下實現思路:
* 上一專題中把發貨事件和收貨事件發布到EventBus中,而此時需要用MsmqEventBus來替代EventBus。而MsmqEventBus的實現就很簡單了,完全可以參考EventBus來實現,只是此時消息并不是進入Queue對象中,而是通過[MessageQueue](https://msdn.microsoft.com/zh-cn/library/System.Messaging.MessageQueue(v=vs.110).aspx%20)對象發送到系統的消息隊列中。
* 而Commit方法即從系統的消息隊列中出隊來獲得消息。再獲得消息的處理器時,與上一專題的實現有點不同,因為把事件對象發送到消息隊列時,需要先把事件對象先序列化為Message對象再放入消息隊列中,而出隊的也是消息對象,而不是上一專題中的發貨事件對象。所以此時需要把出隊的消息對象反序列化為對應的事件對象。
有了上面的實現思路,接下來讓我們一起看看MsmqEventBus的具體實現代碼吧。
```
public class MsmqEventBus : DisposableObject, IEventBus
{
public void Publish<TMessage>(TMessage message) where TMessage : class, IEvent
{
// 將消息放入Message中Body屬性進行序列化發送到消息隊列中
var msmqMessage = new Message(message) { Formatter = new XmlMessageFormatter(new[] { message.GetType() }), Label = message.GetType().ToString()};
_messageQueue.Send(msmqMessage);
_committed = false;
}
public void Publish<TMessage>(IEnumerable<TMessage> messages) where TMessage : class, IEvent
{
messages.ToList().ForEach(m =>
{
_messageQueue.Send(m);
_committed = false;
});
}
public void Commit()
{
if (this._useInternalTransaction)
{
using (var transaction = new MessageQueueTransaction())
{
try
{
transaction.Begin();
var message = _messageQueue.Receive();
if (message != null)
{
message.Formatter = new XmlMessageFormatter(new[] { typeof(string) });
var evntType = ConvertStringToType(message.Body.ToString());
var method = _publishMethod.MakeGenericMethod(evntType);
var evnt = Activator.CreateInstance(evntType);
method.Invoke(_aggregator, new object[] { evnt });
transaction.Commit();
}
}
catch
{
transaction.Abort();
throw;
}
}
}
else
{
// 從msmq消息隊列中出隊,此時獲得的對象是消息對象
var message = _messageQueue.Receive();
if (message != null)
{
// 指定反序列化的對象,由于我們之前把對應的事件類型保存在MessageQueue中的Label屬性
// 所以此時可以通過Label屬性來獲得目標序列化類型
message.Formatter = new XmlMessageFormatter(new[] { ConvertStringToType(message.Label) });
// 這樣message.Body獲得就是對應的事件對象,后面的處理邏輯就和EventBus一樣了
var evntType =message.Body.GetType();
var method = _publishMethod.MakeGenericMethod(evntType);
method.Invoke(_aggregator, new object[] { message.Body });
}
}
_committed = true;
}
}
```
結合上面代碼的注釋和前面實現思路的介紹,相信理解MsmqEventBus應該沒什么問題了。接下來,我們需要在配置文件中指定EventBus為MsmqEventBus類,另外**需要在你本地專有隊列中創建"OnlineStoreQueue"隊列來接受消息**。具體的配置文件修改為:
```
<!--Event Bus-->
<!--<register type="OnlineStore.Events.Bus.IEventBus, OnlineStore.Events" mapTo="OnlineStore.Events.Bus.EventBus, OnlineStore.Events">
<lifetime type="singleton" />
</register>-->
<!--注入MsmqEventBus-->
<register type="OnlineStore.Events.Bus.IEventBus, OnlineStore.Events"
mapTo="OnlineStore.Events.Bus.MsmqEventBus, OnlineStore.Events">
<lifetime type="singleton" />
<constructor>
<param name="path" value=".\Private$\OnlineStoreQueue" />
</constructor>
</register>
</container>
```
到此,分布式消息隊列的實現就完成了,具體分布式消息隊列的實現效果和上一專題使用EventBus的效果是一樣的,這里就不再貼圖了,大家可以自行下載源碼查看。
## 三、緩存的實現
在實際開發過程中,緩存的實現是必不可少的,對于已經查詢過的數據可以直接從緩存中進行讀取返回給調用者,利用緩存不但可以加快響應速度,還能減輕數據庫服務器的壓力。在大型電子商務網站中,緩存的實現更是必不可少的功能。然而緩存的實現也有兩種,一種是分布式緩存,另一種本地緩存。在大型網站中,更多實現的是分布式緩存,對于一些少用戶的企業系統,可能才會使用到本地緩存。所以在本專題中,將在網上書店案例中對這兩種緩存分別進行實現。
## 3.1 本地緩存的實現
首先,我們來介紹本地緩存的實現。由于這里需要實現兩種緩存,根據面向接口編程原則,我們自然首先需要定義一個緩存接口,然后這兩種具體緩存都需要實現該接口。針對緩存接口,無非是緩存數據的添加,移除,更新等操作,所以緩存接口的定義如下所示:
```
// 緩存接口的定義
public interface ICacheProvider
{
/// <summary>
/// 向緩存中添加一個對象
/// </summary>
/// <param name="key">緩存的鍵值</param>
/// <param name="valueKey">緩存值的鍵值</param>
/// <param name="value">緩存的對象</param>
void Add(string key, string valueKey, object value);
void Update(string key, string valueKey, object value);
object Get(string key, string valueKey);
void Remove(string key);
bool Exists(string key);
bool Exists(string key, string valueKey);
}
```
在介紹本地緩存的實現之前,讓我們先來思考下本地緩存的實現思路——就是在本地緩存類中定義一個字典對象,添加緩存就是往該字典插入鍵值對而已,其中key就是緩存數據對應的鍵值,value就是真正的緩存數據,如果緩存在字典中存在的話,就直接根據鍵值查找出緩存數據進行返回。
然而網上書店的本地緩存是基于Enterprise Library Caching庫來實現的,其實現思路和我之前介紹的思路也是一樣的,只不過此時字典對象不需要我們在類中定義,此時直接用Enterprise Library Caching庫中定義的就好。有了上面的分析,本地緩存的實現理解起來也就不那么難了,具體本地緩存的實現代碼如下所示:
```
// 表示基于Microsoft Patterns & Practices - Enterprise Library Caching Application Block的緩存機制的實現
// 該類簡單理解為對Enterprise Library Caching中的CacheManager封裝
// 該緩存實現不支持分布式緩存,更多信息參考:
// http://stackoverflow.com/questions/7799664/enterpriselibrary-caching-in-load-balance
public class EntLibCacheProvider : ICacheProvider
{
// 獲得CacheManager實例,該實例的注冊通過cachingConfiguration進行注冊進去的,具體看配置文件
private readonly ICacheManager _cacheManager = CacheFactory.GetCacheManager();
#region ICahceProvider
public void Add(string key, string valueKey, object value)
{
Dictionary<string, object> dict = null;
if (_cacheManager.Contains(key))
{
dict = (Dictionary<string, object>) _cacheManager[key];
dict[valueKey] = value;
}
else
{
dict = new Dictionary<string, object> { { valueKey, value }};
}
_cacheManager.Add(key, dict);
}
public void Update(string key, string valueKey, object value)
{
Add(key, valueKey, value);
}
public object Get(string key, string valueKey)
{
if (!_cacheManager.Contains(key)) return null;
var dict = (Dictionary<string, object>)_cacheManager[key];
if (dict != null && dict.ContainsKey(valueKey))
return dict[valueKey];
else
return null;
}
// 從緩存中移除對象
public void Remove(string key)
{
_cacheManager.Remove(key);
}
// 判斷指定的鍵值的緩存是否存在
public bool Exists(string key)
{
return _cacheManager.Contains(key);
}
// 判斷指定的鍵值和緩存鍵值的緩存是否存在
public bool Exists(string key, string valueKey)
{
return _cacheManager.Contains(key) &&
((Dictionary<string, object>)_cacheManager[key]).ContainsKey(valueKey);
}
#endregion
}
```
到此,網上書店案例中本地緩存的實現就完成了。由于本地緩存不支持分布式部署,所有的緩存都存在于單獨緩存服務器中,然而,針對一些大型網站來說,這樣的實現并不適合,因為在大型網站中,需要通過多個緩存服務進行集群,需要使得緩存均勻分布在集群中的緩存服務器中。此時就需要引入分布式緩存的實現。下面讓我們具體看看分布式緩存如何在該案例中實現。
## 3.2 分布式緩存的實現
分布式緩存可以通過具體的算法把緩存均勻地分布在集群中緩存服務器中,從而用戶請求的不同數據可以路由到對應的緩存服務器中進行添加、更新或獲得。分布式緩存的實現有很多種方式,可以利用Memcached和Redis開源庫來實現。然而,微軟的Windows Azure也提供了分布式緩存的實現,本案例中分布式緩存就是基于Windows Azure的。在對分布式緩存實現之前,需要先下載對應的dll,然后再在項目中進行引用。需要下載的dll已經包含在項目根目錄下的libs文件夾下,具體需要下載的程序集截圖如下所示:

然后在基礎設施層引入這些程序集,之前就可以去實現基于Windows Azure的分布式緩存了。具體的實現代碼如下所示:
```
// 分布式緩存,該類是對微軟分布式緩存服務的封裝
// 在該案例中沒用用到該緩存,但是提供在這里讓大家明白微軟的分布式緩存實現,并不是只有memcached和Redis
// Redis參考:http://www.cnblogs.com/ceecy/p/3279407.html 和 http://blog.csdn.net/suifeng3051/article/details/23739295
// 關于微軟分布式緩存更多介紹參考:http://www.cnblogs.com/shanyou/archive/2010/06/29/AppFabricCaching.html
// 和http://www.cnblogs.com/mlj322/archive/2010/04/05/1704624.html
public class AppfabricCacheProvider : ICacheProvider
{
private readonly DataCacheFactory _factory = new DataCacheFactory();
private readonly DataCache _cache;
public AppfabricCacheProvider()
{
this._cache = _factory.GetDefaultCache();
}
#region ICacheProvider Members
public void Add(string key, string valueKey, object value)
{
// DataCache中不包含Contain方法,所有用Get方法來判斷對應的key值是否在緩存中存在
var val = (Dictionary<string, object>)_cache.Get(key);
if (val == null)
{
val = new Dictionary<string, object> {{ valueKey, value}};
_cache.Add(key, val);
}
else
{
if (!val.ContainsKey(valueKey))
val.Add(valueKey, value);
else
val[valueKey] = value;
_cache.Put(key, val);
}
}
public void Update(string key, string valueKey, object value)
{
Add(key, valueKey, value);
}
public object Get(string key, string valueKey)
{
return Exists(key, valueKey) ? ((Dictionary<string, object>)_cache.Get(key))[valueKey] : null;
}
public void Remove(string key)
{
_cache.Remove(key);
}
public bool Exists(string key)
{
return _cache.Get(key) != null;
}
public bool Exists(string key, string valueKey)
{
var val = _cache.Get(key);
if (val == null)
return false;
return ((Dictionary<string, object>)val).ContainsKey(valueKey);
}
#endregion
}
```
通過上面的步驟,分布式緩存的實現就完成了。其實,分布式緩存和本地緩存不同之處就在于:分布式緩存支持對應的算法可以把緩存存放在不同的服務器上,而本地緩存只能存在于本地,而不能跨機器分布。所以對于大型網站,分布式緩存才是最好的選擇,由于分布式緩存的實現和部署,無疑會增加開發和維護成本,所以對于一些小型系統(指定是單數據庫服務器系統),可以考慮使用本地緩存。
在本案例中,由于本人沒有Windows Azure環境,所以對于分布式緩存的實現也不能進行測試,所以本案例中使用的還是本地緩存。要使緩存生效,還需要對配置文件進行修改。具體配置文件修改為:
```
<unity xmlns="http://schemas.microsoft.com/practices/2010/unity">
<sectionExtension type="Microsoft.Practices.Unity.InterceptionExtension.Configuration.InterceptionConfigurationExtension, Microsoft.Practices.Unity.Interception.Configuration" />
<container>
<extension type="Interception" />
** <!--Cache Provider-->
<register type="OnlineStore.Infrastructure.Caching.ICacheProvider, OnlineStore.Infrastructure" mapTo="OnlineStore.Infrastructure.Caching.EntLibCacheProvider, OnlineStore.Infrastructure" />**
<!--........-->
</container>
</unity>
```
其實,通過上面的配置之后,緩存還是不能生效的,因為我們一般把緩存放在獲得數據方法之前進行調用,在用戶對獲得數據方法調用之前,首先從緩存中進行查找,如果存在,則直接返回緩存中的數據給調用者就可以了,如果不存在再調用獲得數據方法從數據庫中讀取,讀取成功后添加到緩存中再返回給調用者。既然要在方法調用前來查找緩存,從中你是否想到了什么呢?不錯,就是面向切面編程,即AOP。所以要讓緩存生效,在該案例中還需要支持AOP。至于AOP的支持,我將會在下一專題進行介紹。
## 四、總結
到這里,本專題的內容就結束了,正如前面所說的,在下一專題,我將在網上書店案例中引入對AOP的支持。
本專題所有源碼下載地址:[https://github.com/lizhi5753186/OnlineStore_Second/](https://github.com/lizhi5753186/OnlineStore_Second/)
- C# 基礎知識系列
- C# 基礎知識系列 專題一:深入解析委托——C#中為什么要引入委托
- C# 基礎知識系列 專題二:委托的本質論
- C# 基礎知識系列 專題三:如何用委托包裝多個方法——委托鏈
- C# 基礎知識系列 專題四:事件揭秘
- C# 基礎知識系列 專題五:當點擊按鈕時觸發Click事件背后發生的事情
- C# 基礎知識系列 專題六:泛型基礎篇——為什么引入泛型
- C# 基礎知識系列 專題七: 泛型深入理解(一)
- C# 基礎知識系列 專題八: 深入理解泛型(二)
- C# 基礎知識系列 專題九: 深入理解泛型可變性
- C#基礎知識系列 專題十:全面解析可空類型
- C# 基礎知識系列 專題十一:匿名方法解析
- C#基礎知識系列 專題十二:迭代器
- C#基礎知識 專題十三:全面解析對象集合初始化器、匿名類型和隱式類型
- C# 基礎知識系列 專題十四:深入理解Lambda表達式
- C# 基礎知識系列 專題十五:全面解析擴展方法
- C# 基礎知識系列 專題十六:Linq介紹
- C#基礎知識系列 專題十七:深入理解動態類型
- 你必須知道的異步編程 C# 5.0 新特性——Async和Await使異步編程更簡單
- 全面解析C#中參數傳遞
- C#基礎知識系列 全面解析C#中靜態與非靜態
- C# 基礎知識系列 C#中易混淆的知識點
- C#進階系列
- C#進階系列 專題一:深入解析深拷貝和淺拷貝
- C#進階系列 專題二:你知道Dictionary查找速度為什么快嗎?
- C# 開發技巧系列
- C# 開發技巧系列 使用C#操作Word和Excel程序
- C# 開發技巧系列 使用C#操作幻燈片
- C# 開發技巧系列 如何動態設置屏幕分辨率
- C# 開發技巧系列 C#如何實現圖片查看器
- C# 開發技巧 如何防止程序多次運行
- C# 開發技巧 實現屬于自己的截圖工具
- C# 開發技巧 如何使不符合要求的元素等于離它最近的一個元素
- C# 線程處理系列
- C# 線程處理系列 專題一:線程基礎
- C# 線程處理系列 專題二:線程池中的工作者線程
- C# 線程處理系列 專題三:線程池中的I/O線程
- C# 線程處理系列 專題四:線程同步
- C# 線程處理系列 專題五:線程同步——事件構造
- C# 線程處理系列 專題六:線程同步——信號量和互斥體
- C# 多線程處理系列專題七——對多線程的補充
- C#網絡編程系列
- C# 網絡編程系列 專題一:網絡協議簡介
- C# 網絡編程系列 專題二:HTTP協議詳解
- C# 網絡編程系列 專題三:自定義Web服務器
- C# 網絡編程系列 專題四:自定義Web瀏覽器
- C# 網絡編程系列 專題五:TCP編程
- C# 網絡編程系列 專題六:UDP編程
- C# 網絡編程系列 專題七:UDP編程補充——UDP廣播程序的實現
- C# 網絡編程系列 專題八:P2P編程
- C# 網絡編程系列 專題九:實現類似QQ的即時通信程序
- C# 網絡編程系列 專題十:實現簡單的郵件收發器
- C# 網絡編程系列 專題十一:實現一個基于FTP協議的程序——文件上傳下載器
- C# 網絡編程系列 專題十二:實現一個簡單的FTP服務器
- C# 互操作性入門系列
- C# 互操作性入門系列(一):C#中互操作性介紹
- C# 互操作性入門系列(二):使用平臺調用調用Win32 函數
- C# 互操作性入門系列(三):平臺調用中的數據封送處理
- C# 互操作性入門系列(四):在C# 中調用COM組件
- CLR
- 談談: String 和StringBuilder區別和選擇
- 談談:程序集加載和反射
- 利用反射獲得委托和事件以及創建委托實例和添加事件處理程序
- 談談:.Net中的序列化和反序列化
- C#設計模式
- UML類圖符號 各種關系說明以及舉例
- C#設計模式(1)——單例模式
- C#設計模式(2)——簡單工廠模式
- C#設計模式(3)——工廠方法模式
- C#設計模式(4)——抽象工廠模式
- C#設計模式(5)——建造者模式(Builder Pattern)
- C#設計模式(6)——原型模式(Prototype Pattern)
- C#設計模式(7)——適配器模式(Adapter Pattern)
- C#設計模式(8)——橋接模式(Bridge Pattern)
- C#設計模式(9)——裝飾者模式(Decorator Pattern)
- C#設計模式(10)——組合模式(Composite Pattern)
- C#設計模式(11)——外觀模式(Facade Pattern)
- C#設計模式(12)——享元模式(Flyweight Pattern)
- C#設計模式(13)——代理模式(Proxy Pattern)
- C#設計模式(14)——模板方法模式(Template Method)
- C#設計模式(15)——命令模式(Command Pattern)
- C#設計模式(16)——迭代器模式(Iterator Pattern)
- C#設計模式(17)——觀察者模式(Observer Pattern)
- C#設計模式(18)——中介者模式(Mediator Pattern)
- C#設計模式(19)——狀態者模式(State Pattern)
- C#設計模式(20)——策略者模式(Stragety Pattern)
- C#設計模式(21)——責任鏈模式
- C#設計模式(22)——訪問者模式(Vistor Pattern)
- C#設計模式(23)——備忘錄模式(Memento Pattern)
- C#設計模式總結
- WPF快速入門系列
- WPF快速入門系列(1)——WPF布局概覽
- WPF快速入門系列(2)——深入解析依賴屬性
- WPF快速入門系列(3)——深入解析WPF事件機制
- WPF快速入門系列(4)——深入解析WPF綁定
- WPF快速入門系列(5)——深入解析WPF命令
- WPF快速入門系列(6)——WPF資源和樣式
- WPF快速入門系列(7)——深入解析WPF模板
- WPF快速入門系列(8)——MVVM快速入門
- WPF快速入門系列(9)——WPF任務管理工具實現
- ASP.NET 開發
- ASP.NET 開發必備知識點(1):如何讓Asp.net網站運行在自定義的Web服務器上
- ASP.NET 開發必備知識點(2):那些年追過的ASP.NET權限管理
- ASP.NET中實現回調
- 跟我一起學WCF
- 跟我一起學WCF(1)——MSMQ消息隊列
- 跟我一起學WCF(2)——利用.NET Remoting技術開發分布式應用
- 跟我一起學WCF(3)——利用Web Services開發分布式應用
- 跟我一起學WCF(3)——利用Web Services開發分布式應用
- 跟我一起學WCF(4)——第一個WCF程序
- 跟我一起學WCF(5)——深入解析服務契約 上篇
- 跟我一起學WCF(6)——深入解析服務契約 下篇
- 跟我一起學WCF(7)——WCF數據契約與序列化詳解
- 跟我一起學WCF(8)——WCF中Session、實例管理詳解
- 跟我一起學WCF(9)——WCF回調操作的實現
- 跟我一起學WCF(10)——WCF中事務處理
- 跟我一起學WCF(11)——WCF中隊列服務詳解
- 跟我一起學WCF(12)——WCF中Rest服務入門
- 跟我一起學WCF(13)——WCF系列總結
- .NET領域驅動設計實戰系列
- .NET領域驅動設計實戰系列 專題一:前期準備之EF CodeFirst
- .NET領域驅動設計實戰系列 專題二:結合領域驅動設計的面向服務架構來搭建網上書店
- .NET領域驅動設計實戰系列 專題三:前期準備之規約模式(Specification Pattern)
- .NET領域驅動設計實戰系列 專題四:前期準備之工作單元模式(Unit Of Work)
- .NET領域驅動設計實戰系列 專題五:網上書店規約模式、工作單元模式的引入以及購物車的實現
- .NET領域驅動設計實戰系列 專題六:DDD實踐案例:網上書店訂單功能的實現
- .NET領域驅動設計實戰系列 專題七:DDD實踐案例:引入事件驅動與中間件機制來實現后臺管理功能
- .NET領域驅動設計實戰系列 專題八:DDD案例:網上書店分布式消息隊列和分布式緩存的實現
- .NET領域驅動設計實戰系列 專題九:DDD案例:網上書店AOP和站點地圖的實現
- .NET領域驅動設計實戰系列 專題十:DDD擴展內容:全面剖析CQRS模式實現
- .NET領域驅動設計實戰系列 專題十一:.NET 領域驅動設計實戰系列總結