# 原則36:理解 I/O 受限制(Bound)操作 PLINQ 的使用
**By D.S.Qiu**
**尊重他人的勞動,支持原創,轉載請注明出處:[http://dsqiu.iteye.com](http://dsqiu.iteye.com)**
Parallel Task 庫看起來是 CPU 受限制操作進行優化。雖然這是類庫的一個核心任務,但是它在 I/O 受限制操作也做的很好。實際上, Parallel Task 庫的處理 I/O 受限制操作的設計更多是默認實現的。它會根據你線程的繁忙程度更新分配線程的數量。更多阻塞的線程(等待 I/O 操作)會導致線程分配更多線程完成手頭的任務。
和其他并行擴展一樣,你可以使用方法調用,或者 LINQ 查詢語法進入并行執行模型。I/O 受限制操作和 CPU 受限制操作的并行執行表現會有些不同。你經常需要比核心更多的線程,因為 I/O 受限制操作花費更多時間等待它們的外部事件。PLINQ 也為這個習慣提供了框架。
下面這段代碼是從一系列網站下載數據:
```
foreach (var url in urls)
{
var result = new WebClient().DownloadData(url);
UseResult(result);
}
```
DownloadData() 的調用發起了 web 的同步請求而且知道所有數據被檢索下來。這個算法花費太多時間等待。你可以使用并行 for 循環改變為并行模型:
```
Parallel.ForEach(urls, url =>
{
var result = new WebClient().DownloadData(url);
UseResult(result);
});
```
Parallel.ForEach() 會選擇進入并行處理模型。這個版本花的時間比順序進行版本少多了。實際上,在我的雙核機器上,加速和 url 的集合的元素個數成正比。線程花更多時間在等待,所以 Parallel Task 庫會創建更多線程。
你可以使用 PLINQ 和查詢語法產生一樣的結果:
```
var results = from url in urls.AsParallel()
select new WebClient().DownloadData(url);
results.ForAll(result => UseResult(result));
```
PLINQ 和 Parallel Task 庫支持的 Parallel.ForEach() 會有一點不同。PLINQ 使用固定的線程數量,然而 AsParallel() 會更加吞吐量的增加和減少活動線程的數量。你可以使用 ParallelEnumeralbe.WithDegreeOfParallelism() 控制線程的數量(查看原則35)。但是 Parallel.ForEach() 會為你管理。 Parallel.ForEach() 當 I/O 受限制和 CPU 受限制操作混合時表現的更好。 Parrallel.ForEach() 會基于當前加載管理活動線程的數量。當更多線程阻塞等待 I/O 操作,它就會創建更多的線程以增加吞吐量。當工作線程增多,它就會讓活動的線程數量減少以最小化上下文的切換。
上面演示的代碼不是真正的異步。它只是充分利用多線程來并行執行些任務。但是圍繞這個程序它會等待所有 web 請求完成才繼續做其他工作。 Parallel Task 庫提供其他基本的異步模式的實現。其中常見的模式之一就是啟動多個 I/O 受限制任務并且知道這些結果返回才執行一些操作。完美地,我更喜歡像這樣寫:
```
urls.RunAsync(
url => startDownload(url), task => finishDownload(task.AsyncState.ToString(), task.Result));
```
這里使用 startDownload() 方法開始每個 URL 的下載。隨著每個下載完成,finsishDownload() 會被執行。一旦所有下載完成, RunAsync() 就完成。 Parallel Task 庫完成這個自然做了很多工作,所以我們仔細檢查下。最好開始的地方是 RunAsync 方法:
```
public static void RunAsync<T, TResult>( this IEnumerable<T> taskParms, Func<T, Task<TResult>> taskStarter, Action<Task<TResult>> taskFinisher)
{
taskParms.Select(parm => taskStarter(parm)).AsParallel().ForAll(t => t.ContinueWith(t2 => taskFinisher(t2)));
}
```
這個方法為每個輸入參數創建一個任務。Select() 方法返回任務序列。下一步,你使用 AsParallel() 并行處理結果。對于每個單一的任務,你會調用后續處理方法。 Task<T> 類表示一個任務,并包含這個任務輸入和輸出值的屬性。 ContinueWith() 是 Task<T> 的其中一個方法。它會在任務完成后調用,允許你對已完成的任務進行處理。在 RunAsync 方法中,它傳入 Task 對象參數調用 taskFinisher 。ForAll() 使用 Inverted Enumeration 算法,它會阻塞知道所有任務完成。
我們深入探討這個模式,理解開始下載方法和匯報下載完成的方法。 finishDownload 方法很簡單,我在完成時輸出:
```
private static void finishDownload(string url, byte[] bytes)
{
Console.WriteLine("Read {0} bytes from {1}", bytes.Length, url);
}
```
StartDownload 比 Parallel Task 庫的接口復雜一些。具體的類型用來幫助支持 Task 接口。我更想抽象出來,但是針對具體的任務的處理不同類型會有點區別。實際上, Parallel Task 庫為 .NET BCL 在此版本之前的很多不同異步模式之上提出了一套通用的接口。
```
private static Task<byte[]> startDownload(string url)
{
var tcs = new TaskCompletionSource<byte[]>(url);
var wc = new WebClient();
wc.DownloadDataCompleted += (sender, e) =>
{
if (e.UserState == tcs)
{
if (e.Cancelled)
tcs.TrySetCanceled();
else if (e.Error != null)
tcs.TrySetException(e.Error);
else
tcs.TrySetResult(e.Result);
}
};
wc.DownloadDataAsync(new Uri(url), tcs);
return tcs.Task;
}
```
這個方法混合了 Task 代碼和從 URL 下載的代碼,所以我們必須非常認真地梳理一遍。首先,它為這個任務創建一個 TaskCompletionSource 對象。TaskCompeletionSource 對象使得任務的創建和完成分離了。這里是非常重要的,因為你使用 WebClient 類的異步方法創建這個任務。TaskCompletionSource 的參數是這個任務的返回的結果。
WebClient 類使用基于事件的異步模式(EAP)。這意味著你像一個事件注冊處理器,當這個異步操作完成時,事件就會被觸發。當事件觸發 startDownload() 吧任務完成信息存儲在 TaskCompletionSource 中。TaskSheduler 選擇一個任務并開始下載。這個方法返回的 Task 對象內嵌在 TaskCompletionSource 中,這樣當任務完成時事件結果就會被處理。
在這些工作后,web 下載另一個線程異步開始。當下載完成, DownloadDataCompleted 事件就會被觸發。事件處理會設置 TaskCompletionSource 的完成狀態。在 TaskCompletionSouce 嵌入 Task 對象表示它已經完成。
現在,任務會調用 ContinueWith() ,報告下載的結果。花了點功夫解開這些細節,但是在接解開過一次后,這個模式就不會那么難理解。
上面展示的例子就是底層使用的基于事件異步模式的正確表達習慣。 .NET 庫的其他領域使用異步編程模型( APM) 模式。在這個模式,一些操作 Foo 你調用 BeginFoo() ,會返回一個 IAsyncResult 對象。一旦操作完成,你可以調用 EndFoo() ,傳入參數就是這個 IAsyncResult 對象。Parallel Task 庫你可以使用 Task<TResult>.Factory.FromAsync() 方法實現這個模式。
底層的原理和我下載 web 數據的版本是類似的。區別在于你提供了不同的委托去匹配使用的異步方法來創建任務。
Parallel Task 庫提供一系列方法,使得 I/O 受限制操作和 CPU 受限制一樣工作。使用 Task 類,你可以對 I/O 受限制操作或混合了 I/O 和 CPU 受限制操作支持各種異步模式。并行任務還是不那么簡單,但是 Parallel Task 庫和 PLINQ 比之前的庫提供更好的語言層次對異步編程的支持。隨著我們程序會更多訪問不同機器的數據和更多線程等待遠程機器的響應,這會變得更重要。
小結:
一般的多線程都是指的是 CPU 受限制的操作進行并行優化,對于 I/O 也同樣存在現在,要等到外部事件響應,PLINQ 和 Parallel Task 庫對 I/O 受現在操作也像 CPU 受限制一樣提供支持。
歡迎各種不爽,各種噴,寫這個純屬個人愛好,秉持”分享“之德!
有關本書的其他章節翻譯請[點擊查看](/category/297763),轉載請注明出處,尊重原創!
如果您對D.S.Qiu有任何建議或意見可以在文章后面評論,或者發郵件(gd.s.qiu@gmail.com)交流,您的鼓勵和支持是我前進的動力,希望能有更多更好的分享。
轉載請在**文首**注明出處:[http://dsqiu.iteye.com/blog/2088750](/blog/2088750)
更多精彩請關注D.S.Qiu的博客和微博(ID:靜水逐風)
- 第一章 C# 語言習慣
- 原則1:使用 屬性(Poperty)代替可直接訪問的數據成員(Data Member)
- 原則2:偏愛 readonly 而不是 const
- 原則3:選擇 is 或 as 而不是強制類型轉換
- 原則4:使用條件特性(conditional attribute)代替 #if
- 原則5:總是提供 ToString()
- 原則6:理解幾個不同相等概念的關系
- 原則7:明白 GetHashCode() 的陷阱
- 原則8:優先考慮查詢語法(query syntax)而不是循環結構
- 原則9:在你的 API 中避免轉換操作
- 原則10:使用默認參數減少函數的重載
- 原則11:理解小函數的魅力
- 第二章 .NET 資源管理
- 原則12:選擇變量初始化語法(initializer)而不是賦值語句
- 原則13:使用恰當的方式對靜態成員進行初始化
- 原則14:減少重復的初始化邏輯
- 原則15:使用 using 和 try/finally 清理資源
- 原則16:避免創建不需要的對象
- 原則17:實現標準的 Dispose 模式
- 原則17:實現標準的 Dispose 模式
- 原則18:值類型和引用類型的區別
- 原則19:確保0是值類型的一個有效狀態
- 原則20:更傾向于使用不可變原子值類型
- 第三章 用 C# 表達設計
- 原則21:限制你的類型的可見性
- 原則22:選擇定義并實現接口,而不是基類
- 原則23:理解接口方法和虛函數的區別
- 原則24:使用委托來表達回調
- 原則25:實現通知的事件模式
- 原則26:避免返回類的內部對象的引用
- 原則27:總是使你的類型可序列化
- 原則28:創建大粒度的網絡服務 APIs
- 原則29:讓接口支持協變和逆變
- 第四章 和框架一起工作
- 原則30:選擇重載而不是事件處理器
- 原則31:用 IComparable&lt;T&gt; 和 IComparer&lt;T&gt; 實現排序關系
- 原則32:避免 ICloneable
- 原則33:只有基類更新處理才使用 new 修飾符
- 原則34:避免定義在基類的方法的重寫
- 原則35:理解 PLINQ 并行算法的實現
- 原則36:理解 I/O 受限制(Bound)操作 PLINQ 的使用
- 原則37:構造并行算法的異常考量
- 第五章 雜項討論
- 原則38:理解動態(Dynamic)的利與弊
- 原則39:使用動態對泛型類型參數的運行時類型的利用
- 原則40:使用動態接收匿名類型參數