# 原則37:構造并行算法的異常考量
**By D.S.Qiu**
**尊重他人的勞動,支持原創,轉載請注明出處:[http://dsqiu.iteye.com](http://dsqiu.iteye.com)**
前面兩個原則幸福地忽略了任何子線程運行出錯的可能性。這顯然不是現實世界所進行的。異常會在你的子線程發生,你不得不轉向去收拾殘局。當然,后臺線程的異常在某種程度上增加復雜性。異常不能繼續調用線程邊界的函數棧。而是,如果在線程啟動方法出現異常,這個線程就會終止。沒有任何方式調用線程檢索錯誤,或者對異常做任何事。更重要的是,如果出現異常你的并行算法就必須支持回滾,你不得不理解異常出現的副作用并且能從異常恢復過來。每個算法都有不同需求,所以在并行的環境中沒有通用的答案處理異常。我只能提供的指導就是:對于特定的應用你可以使用最好的策略。
我們從上個原則的異步下載開始說。最簡單策略就是沒有副作用,并且從所有的 web 主機上持續下載不用考慮其中一個會失敗。并行操作使用新的 AggregateException 類處理并行操作的異常。 AggregateException 的 InnerException 屬性包含所有在并行操作可能產生的異常。在這個過程有好幾個方式處理這個異常。首先,我會演示一個更普遍的情況,怎么在外部處理子任務產生的錯誤。
在上一個原則的 RunAsync() 方法在多個并行操作使用。這意味你要捕獲 AggregateException 的 InnerException 集合的異常。越多并行操作,嵌套就越深。因為并行操作有不同的構成,你應該防止多次復制在異常集合的元素異常。我修改 RunAsync() 以處理可能的錯誤:
```
try
{
urls.RunAsync( url => startDownload(url), task => finishDownload(task.AsyncState.ToString(), task.Result));
}
catch (AggregateException problems)
{
ReportAggregateError(problems);
}
private static void ReportAggregateError(AggregateException aggregate)
{
foreach (var exception in aggregate.InnerExceptions)
if (exception is AggregateException)
ReportAggregateError( exception as AggregateException);
else
Console.WriteLine(exception.Message);
}
```
ReportAggregateError 輸出所有不是 AggregateException 自身異常的信息。當然,這會掩蓋所有異常,不管你是否有沒有預料到。這是相當危險的。相反,你需要處理你可以從中恢復的異常,或者重拋出其他異常。
這里有很多集合遞歸,所以有一個試用的函數式很有意義的。泛型方法必須知道哪些異常類你要處理,哪些異常你是不期望的并且你要處理那些你想要處理的異常。你需要為這個方法確定要處理的異常類型和處理異常的代碼。這是簡單的類型和 Action<T> lambda 表達式的字典。并且,如果處理沒有處理 InnerException 集合的每個異常,清楚哪些異常出現異常。這說明你要重新拋出原來的異常。下面是新的 Runsync 代碼:
```
try
{
urls.RunAsync(url => startDownload(url), task => finishDownload(task.AsyncState.ToString(), task.Result));
}
catch (AggregateException problems)
{
var handlers = new Dictionary<Type, Action<Exception>>();
handlers.Add(typeof(WebException),ex => Console.WriteLine(ex.Message));
if (!HandleAggregateError(problems, handlers))
throw;
}
```
HandleAggregateError 方法遞歸查看每個異常。如果異常是預料的,處理器會被調用。否則, HandleAggregateError 返回 false ,說明這類異常不能被正確處理:
```
private static bool HandleAggregateError(AggregateException aggregate, Dictionary<Type, Action<Exception>> exceptionHandlers)
{
foreach (var exception in aggregate.InnerExceptions)
if (exception is AggregateException)
return HandleAggregateError( exception as AggregateException, exceptionHandlers);
else if (exceptionHandlers.ContainsKey( exception.GetType()))
{
exceptionHandlers[exception.GetType()] (exception);
}
else
return false;
return true;
}
```
這點代碼看著有些密集,但是并不難。當它傳入一個 AggregateException ,它會對子列遞歸評估。當遍歷到任何其他異常,它會查詢字典。如果處理器 Action<> 已經被注冊,就會調用這個處理器。如果沒有,就會理解返回 false ,即發現一個不能處理的異常。
你會奇怪為什么當沒有注冊處理器拋出的是元素的 AggregateException 而不是單一的異常。拋出集合中的單一異常會丟失重要信息。 InnerException 可能包含很多異常。可能會有多個異常是沒有預料到的。你必須返回這個集合而避免丟失太多信息。很多情況,AggregateException 的 InnerException 集合只有一個異常。然而,你不能那樣寫代碼因為當你想要額外的信息,它卻不在那。
當然,這會感覺有一點丑陋。還有沒有更好的防止異常出現使得任務離開運行的后臺工作的辦法。在幾乎所有情況,這是更好的。修改代碼使得正在運行的后臺任務確保沒有異常能停止這個后臺任務。當你使用 TaskCompletionSource<> 類,就說明你沒有調用 TrySetException() ,而是確保每個任何調用 TrySetResult() 表示完成。這就有了下面對 startDownload() 的修改。但是,正如和我前面說的,你不能只是捕獲每個異常。你應該只捕獲可以從中恢復的異常。在這個例子中,你可以從 WebException 恢復,這個異常出現因為遠程主機不可訪問。其他異常類型可能表明更嚴重的問題。那些會持續產生的異常會終止所有處理。 startDownload 方法有了下面的修改:
```
private static Task<byte[]> startDownload(string url)
{
var tcs = new TaskCompletionSource<byte[]>(url);
var wc = new WebClient();
wc.DownloadDataCompleted += (sender, e) =>
{
if (e.UserState == tcs)
{
if (e.Cancelled)
tcs.TrySetCanceled();
else if (e.Error != null)
{
if (e.Error is WebException)
tcs.TrySetResult(new byte[0]);
else
tcs.TrySetException(e.Error);
}
else
tcs.TrySetResult(e.Result);
}
};
wc.DownloadDataAsync(new Uri(url), tcs);
return tcs.Task;
}
```
WebException 的返回說明0字節數組讀取,而且所有其他異常會通過正常的通道拋出。對的,也就是說當 AggregateException 被拋出仍可以繼續對正在發生進行處理。很可能你只是把它們當做致命錯誤,而你的后臺任務可以繼續處理其他錯誤。但是你需要理解所有不同類型的異常。
當然,如果你使用 LINQ 語法,后臺任務的錯誤有引起其他問題。記得原則35我描述了三條和并行算法的區別。在所有情況下,使用 PLINQ 和正常的懶評估會有些變化,而這些變化是你在 PLINQ 算法處理異常時必須考慮的。請記住,通常,一個查詢只有在其他代碼請求這個查詢產生的項時才執行。這當然不是 PLINQ 的工作。后臺線程運行產生結果,而且另一個任務組合最后的結果序列。它不能立即評估。查詢的結果不是立即產生的。然而,后臺線程只要調用運行就會開始產生結果。現在,你意味著必須改變異常處理的代碼。在典型的 LINQ 查詢,你可以將使用查詢結果的代碼放在 try/catch 塊內。這不需要包裹定義 LINQ 查詢表達式的代碼:
```
var nums = from n in data
where n < 150
select Factorial(n);
try
{
foreach (var item in nums)
Console.WriteLine(item);
}
catch (InvalidOperationException inv)
{
// elided
}
```
一旦涉及 PLINQ ,你必須在 tyr/cathc 塊中封閉查詢的定義。而且,當然,記住如果你使用 PLINQ ,你必須捕獲 AggregateException 而不是無論你原來期望的是什么異常。不管你使用 Pipelining ,Stop&Go ,或者 Inverted Enumeration 算法都是正確的。
異常在任何算法中都是復雜的。并行任務引起更多并發癥。 Parallel Task 庫使用 AggregateExceptoion 類持有并行算法排除的所有異常。只要有一個后臺線程拋出一個異常,其他后端操作都會被停止。你最好的計劃是確保在你的并行任務執行代碼時沒有任何異常拋出。即使這,其他你沒有期望的異常也有可能在某些地方拋出。這意味著你必須處理 AggregateException 以控制線程初始化所有后臺工作。
小結:
異常,無論什么時候都要考慮進來,好吧,還沒有用到 LINQ 和 PLINQ (工作沒有這個需求),暫且沒有深入的感受。
跑步去,加油,加油!
歡迎各種不爽,各種噴,寫這個純屬個人愛好,秉持”分享“之德!
有關本書的其他章節翻譯請[點擊查看](/category/297763),轉載請注明出處,尊重原創!
如果您對D.S.Qiu有任何建議或意見可以在文章后面評論,或者發郵件(gd.s.qiu@gmail.com)交流,您的鼓勵和支持是我前進的動力,希望能有更多更好的分享。
轉載請在**文首**注明出處:[http://dsqiu.iteye.com/blog/2088794](/blog/2088794)
更多精彩請關注D.S.Qiu的博客和微博(ID:靜水逐風)
- 第一章 C# 語言習慣
- 原則1:使用 屬性(Poperty)代替可直接訪問的數據成員(Data Member)
- 原則2:偏愛 readonly 而不是 const
- 原則3:選擇 is 或 as 而不是強制類型轉換
- 原則4:使用條件特性(conditional attribute)代替 #if
- 原則5:總是提供 ToString()
- 原則6:理解幾個不同相等概念的關系
- 原則7:明白 GetHashCode() 的陷阱
- 原則8:優先考慮查詢語法(query syntax)而不是循環結構
- 原則9:在你的 API 中避免轉換操作
- 原則10:使用默認參數減少函數的重載
- 原則11:理解小函數的魅力
- 第二章 .NET 資源管理
- 原則12:選擇變量初始化語法(initializer)而不是賦值語句
- 原則13:使用恰當的方式對靜態成員進行初始化
- 原則14:減少重復的初始化邏輯
- 原則15:使用 using 和 try/finally 清理資源
- 原則16:避免創建不需要的對象
- 原則17:實現標準的 Dispose 模式
- 原則17:實現標準的 Dispose 模式
- 原則18:值類型和引用類型的區別
- 原則19:確保0是值類型的一個有效狀態
- 原則20:更傾向于使用不可變原子值類型
- 第三章 用 C# 表達設計
- 原則21:限制你的類型的可見性
- 原則22:選擇定義并實現接口,而不是基類
- 原則23:理解接口方法和虛函數的區別
- 原則24:使用委托來表達回調
- 原則25:實現通知的事件模式
- 原則26:避免返回類的內部對象的引用
- 原則27:總是使你的類型可序列化
- 原則28:創建大粒度的網絡服務 APIs
- 原則29:讓接口支持協變和逆變
- 第四章 和框架一起工作
- 原則30:選擇重載而不是事件處理器
- 原則31:用 IComparable&lt;T&gt; 和 IComparer&lt;T&gt; 實現排序關系
- 原則32:避免 ICloneable
- 原則33:只有基類更新處理才使用 new 修飾符
- 原則34:避免定義在基類的方法的重寫
- 原則35:理解 PLINQ 并行算法的實現
- 原則36:理解 I/O 受限制(Bound)操作 PLINQ 的使用
- 原則37:構造并行算法的異常考量
- 第五章 雜項討論
- 原則38:理解動態(Dynamic)的利與弊
- 原則39:使用動態對泛型類型參數的運行時類型的利用
- 原則40:使用動態接收匿名類型參數