# 原則35:理解 PLINQ 并行算法的實現
**By D.S.Qiu**
**尊重他人的勞動,支持原創,轉載請注明出處:[http://dsqiu.iteye.com](http://dsqiu.iteye.com)**
在這個原則我希望我能說并行編程現在和在你循環添加 AsParall() 一樣簡單。雖然不是,但是 PLINQ 使得更容易利用多核并且程序仍然是正確的。創建多和程序絕不是微不足道,但 PLINQ 使得它更簡單。
你已經理解訪問數據必須是同步的。你仍需要衡量下聲明在 ParallEnumerable 的并行和串行版本的方法的影響。LINQ 涉及的一些方法很容易就可以并行執行。其他要強制串行訪問序列的每個元素——或者,至少,需要完整的序列(比如 Sort )。讓我們通過一些使用 PLINQ 的例子學習是在怎么工作的,并且那些地方還存在缺陷。這節的所有例子和討論都是對 Object 使用 LINQ 。標簽甚至可以叫做 “ Enumerable ” 而不是“ Queryable ”。 PLINQ 不會幫助你 SQL 的 LINQ ,或者是實體框架的算法的并行執行。這不是一個真正的限制特性,因為這些的實現利用并行數據庫引擎來執行并行查詢。
這個例子是很簡單的查詢,使用語法方法調用對前150個計算 n! :
```
var nums = data.Where(m => m < 150). Select(n => Factorial(n));
```
你在查詢第一個函數調用增加 AsParallel() 使得查詢是并行的:
```
var numsParallel = data.AsParallel().Where(m => m < 150).Select(n => Factorial(n));
```
當然,你可以使用查詢語法做類似的工作。
```
var nums = from n in data where n < 150 select Factorial(n);
```
并行的版本依賴于 data 序列增加的 AsParallel() :
```
var numsParallel = from n in data.AsParallel() where n < 150 select Factorial(n);
```
這個結果是調用方法的版本是一樣的。
第一個例子很簡單,但通過你調用的方法 PLINQ.AsParallel() 選擇并行執行任何查詢表達式說了幾個重要的概念。一旦你調用 AsParallel() ,在多核就是使用 多線程 Thread.AsParallel() 分成子序列操作進行并且返回 IParallelEnumerable() 而不是 IEnumerable() 。 PLINQ 就是 IParallelEnumerable 實現的擴展方法集。它們大多數都和擴展 IEnumerable 的 Enumerable 類的擴展方法有一樣的簽名。IParallelEnumerable 只是簡單替換 Enumerable 的參數和返回值。這樣做的優勢是 PLINQ 遵循所有 LINQ 遵循的模式。這使得 PLINQ 非常容易學習。你對 LINQ 的任何掌握,通常,都可以應用于 PLINQ 。
當然,這也不是那么簡單。初始查詢很容就能應用 PLINQ 。它沒有任何共享數據。結果的次序沒有任何影響。這就是為什么代碼運行獲得的加速直接和機器的核心數成正比。為了幫助 PLINQ 獲得最好的性能,IParallelEnumerable 有很多控制并行任務類庫函數。
每個并行查詢都是從分區步驟開始。 PLINQ 需要對輸入元素進行分區并分配它們一些任務去執行查詢。分區是 PLINQ 最重要的一方面,所以理解 PLINQ 不同的方法, PLINQ 決定使用哪個方法,和每個方法都是如何工作的是非常重要的。首先,分區不能花太多時間。如果 PLINQ 類庫花費太多時間在分區上,那么留給處理數據的時間就太少了。 PLINQ 會根據輸入源和創建的查詢類型決定使用不同的分區算法。最簡單分區算法是范圍分區。范圍分區會根據任務數量劃分輸入序列并將每部分分配給一個任務。例如,1000個元素的序列運行在四核的機器上會被分為每250的四部分。范圍分區只有當查詢數據源支持索引序列和報告了序列的長度才會被使用。也就是說范圍分區被限制在像 List<T> 數組,和其他支持 IList<T> 的序列的查詢數據源。當查詢數據源支持這些操作范圍分區經常被使用。
第二個分區選擇是塊分區。這個算法給每個任務分配一個輸入元素塊,并且這會需要更多功夫。內部的塊算法會與時俱進,所以我不會深入涉及當前的實現。你會認為塊的開始會很小,因為輸入源可能很小。這樣就可以防止一個任務處理整個小序列。你也會認為隨著工作的持續,塊的大小會變大。這可以最小化線程的開銷和有助于最大化吞吐量。塊也有可能根據查詢委托和 where 子句過濾的元素花費的時間調整大小。這樣的目的是所有的任務能在接近的時間完成,也就是最大化整體的吞吐量。
另外兩個分區方案會優化某些查詢操作。第一個是帶分區。帶分區是范圍分區的特例,它優化處理序列開始的元素。每個工作線程都跳過前面N項然后處理后面的M項。在處理M項之后,工作線程會接著跳過下N項。帶算法很容易理解,假設帶寬為1個元素。在四個工作任務的情況下,第一個任務獲得項的下標為0,4,8,12等等,第二個任務獲得項下標為1,5,9,13等等。整個查詢過程,帶算法避免任何線程內部的同步實現 TakeWhile() 和 SkipWhile() 。同時,每個工作線程移到下一個元素只需要很簡單算術運算。
最后的算法是哈希分區。哈希分區是針對 Join , GroupJoin ,GroupBy ,Distinct ,Except ,Union ,和 Intersect 操作的查詢設計的特殊目標的算法。這些耗時的操作,特定的分區算法可以使得這些查詢更大的并行。Hash 算法保證所有產生的相同的哈希碼被同一個任務處理。這就最小化任務之間的處理這些操作的交流。
除了分區算法, PLINQ 還是用三個不同的算法并行你的代碼的任務: Pipelining , Stop & Go 以及 Inverted Enumeration 。 Pipelining 是默認的,所以我首先進行解釋。 Pipelining 算法,一個線程處理枚舉( foreach 或者 查詢序列)。多個線程用來處理處理每個元素的查詢。每個請求的新元素,都會在其他線程被處理。 PLINQ 在 Pipelining 模式使用的線程數經常是核心的數量(對于大多基于 CPU 查詢)。在我的階乘的例子,它在我的雙核機器上使用兩個線程。第一個元素被檢索并在一個線程處理。緊接著第二個元素會被請求并在第二個線程處理。然后,當這兩個有一個完成,第三個元素就會被請求,并且查詢表達式會在這個線程處理。整個序列查詢的執行過程中,兩個線程都忙于元素的查詢。機器愈多核,更多元素會被并行處理。
例如,在16核的機器上,前16項會在16不同線程上立即處理(推測運行在16個不同核心上)。我已經進行簡化了,有一個線程處理枚舉,這就說明通常 Pipelining 創建(核心數+1)的線程。在大多數情況下,枚舉線程會等很長時間,所以創建額外線程是有意義的。
Stop & Go 算法意味著枚舉的線程會加入到其他線程運行查詢表達式。當你需要立即執行 ToList() 或 ToArray() 和任何時候 PLINQ 排序之前需要所有結果集等查詢時,這個算法會使用。下面兩個查詢都使用 Stop & Go 算法:
```
var stopAndGoArray = (from n in data.AsParallel()
where n < 150
select Factorial(n)).ToArray();
var stopAndGoList = (from n in data.AsParallel()
where n < 150
select Factorial(n)).ToList();
```
使用 Stop & Go 算法處理會占用更多內存并獲得更好的性能。然而,注意我在進行任何查詢表達式之前已經構造整個查詢。你要構成整個查詢而不是處理每部分都使用 Stop & Go 算法然后在將最后的結果組合起來進行其他查詢。這往往會引起引起線程開銷而降低性能。像一個組合操作一樣處理整個查詢表達式總是一個更好的選擇。
并行任務庫使用的最后一個算法是倒計數算法。 Inverted Enumeration 不會產生任何結果集。而是每個查詢的結果執行某些行為。在我前面的例子章,我向控制臺輸出階乘計算的結果:
```
var numsParallel = from n in data.AsParallel()
where n < 150
select Factorial(n);
foreach (var item in numsParallel)
Console.WriteLine(item);
```
Object 的 LINQ (非并行)被認為是非常懶的。也就是只有請求了才會產生值。當你處理查詢結果時,可以選擇并行執行模型。這就是為什么你會需要 Inverted Enumeration 模型:
```
var nums2 = from n in data.AsParallel()
where n < 150
select Factorial(n);
nums2.ForAll(item => Console.WriteLine(item));
```
Inverted Enumeration 比 Stop & Go 方法使用更少內存。同時,它可以在結果集上并行操作。注意你在 ForAll() 查詢仍需要使用 AsParallel() 。 ForAll() 比在 Stop & Go 模型占用更少內存。在某些情況,依賴于查詢表達式的結果集上的行為的工作量,通常 Inverted Enumeration 會并枚舉方法更快。
所有的 LINQ 查詢的執行都很懶。你創建的查詢,這些查詢只有在請求查詢結果的項時才執行。對于 Object 的 LINQ 有改進。Object 的 LINQ 在你需求元素時才對每個元素執行查詢。 PINQ 進行不同的工作。它們的模型跟 SQL 的 LINQ ,或者實體框架很接近。在這些模型,當你請求第一個元素,整個結果序列都會產生。 PLINQ 跟這些模型很接近,但又不全相同。如果你對 PLINQ 怎樣執行查詢有誤解,你會使用超過必須的資源,這樣你實際的并行查詢運行在多核機器上比 Object 的 LINQ 更慢。
為了演示一些區別,我針對給出簡單的查詢。我會給你展示增加的 AsParallel() 怎樣改變執行模型。兩個模型都是有效的。 LINQ 關注的它的結果,而不是它們是如何產生的。你將看到兩個模型都產生一樣的結果。如果你算法對查詢自己有因為區別就會顯現出來。
下面的查詢用來演示區別:
```
var answers = from n in Enumerable.Range(0, 300)
where n.SomeTest()
select n.SomeProjection();
```
我會輸出顯示 SomeTest() 和 SomeProjection() 方法的調用:
```
public static bool SomeTest(this int inputValue)
{
Console.WriteLine("testing element: {0}", inputValue);
return inputValue % 10 == 0;
}
public static string SomeProjection(this int input)
{
Console.WriteLine("projecting an element: {0}", input);
return string.Format("Delivered {0} at {1}",input.ToString(), DateTime.Now.ToLongTimeString());
}
```
最后,用一個簡單的循環,我使用 IEnumerator<string> 成員對結果進行遍歷,可以看到不同行為的發生。這就對序列的產生(并行)和枚舉(在枚舉循環)顯示的更清晰。在生成的代碼,我更喜歡不同的實現。
```
var iter = answers.GetEnumerator();
Console.WriteLine("About to start iterating");
while (iter.MoveNext())
{
Console.WriteLine("called MoveNext");
Console.WriteLine(iter.Current);
}
```
使用標準的 Object 的 LINQ 實現,你會看到像下面的輸出:
```
About to start iterating
testing element: 0
projecting an element: 0
called MoveNext
Delivered 0 at 1:46:08 PM
testing element: 1
testing element: 2
testing element: 3
testing element: 4
testing element: 5
testing element: 6
testing element: 7
testing element: 8
testing element: 9
testing element: 10
projecting an element: 10
called MoveNext
Delivered 10 at 1:46:08 PM
testing element: 11
testing element: 12
testing element: 13
testing element: 14
testing element: 15
testing element: 16
testing element: 17
testing element: 18
testing element: 19
testing element: 20
projecting an element: 20
called MoveNext
Delivered 20 at 1:46:08 PM
testing element: 21
testing element: 22
testing element: 23
testing element: 24
testing element: 25
testing element: 26
testing element: 27
testing element: 28
testing element: 29
testing element: 30
projecting an element: 30
testing element: 10
projecting an element: 10
called MoveNext
Delivered 10 at 1:46:08 PM
```
查詢知道第一次枚舉的 MoveNext() 的調用開始執行。第一次 MoveNext() 的調用查詢了足夠的元素檢索出第一個結果序列上的元素(恰好是查詢的第一個元素)。第二個 MoveNext() 處理輸入序列的元素知道下一個輸出元素的產生。使用 Object 的 LINQ ,每次調用 MoveNext 執行的查詢直到下一個輸出元素的產生。
要是你將查詢改為并行查詢,規則就會改變:
```
var answers = from n in ParallelEnumerable.Range(0, 300)
where n.SomeTest()
select n.SomeProjection();
```
這個查詢的輸出看起來非常的不一樣。下面是運行一次的采樣(每次運行可能有些不同):
```
About to start iterating
testing element: 150
projecting an element: 150
testing element: 0
testing element: 151
projecting an element: 0
testing element: 1
testing element: 2
testing element: 3
testing element: 4
testing element: 5
testing element: 6
testing element: 7
testing element: 8
testing element: 9
testing element: 10
projecting an element: 10
testing element: 11
testing element: 12
testing element: 13
testing element: 14
testing element: 15
testing element: 16
testing element: 17
testing element: 18
testing element: 19
testing element: 152
testing element: 153
testing element: 154
testing element: 155
testing element: 156
testing element: 157
testing element: 20
... Lots more here elided ...
testing element: 286
testing element: 287
testing element: 288
testing element: 289
testing element: 290
Delivered 130 at 1:50:39 PM
called MoveNext
Delivered 140 at 1:50:39 PM
projecting an element: 290
testing element: 291
testing element: 292
testing element: 293
testing element: 294
testing element: 295
testing element: 296
testing element: 297
testing element: 298
testing element: 299
called MoveNext
Delivered 150 at 1:50:39 PM
called MoveNext
Delivered 160 at 1:50:39 PM
called MoveNext
Delivered 170 at 1:50:39 PM
called MoveNext
Delivered 180 at 1:50:39 PM
called MoveNext
Delivered 190 at 1:50:39 PM
called MoveNext
Delivered 200 at 1:50:39 PM
called MoveNext
Delivered 210 at 1:50:39 PM
called MoveNext
Delivered 220 at 1:50:39 PM
called MoveNext
Delivered 230 at 1:50:39 PM
called MoveNext
Delivered 240 at 1:50:39 PM
called MoveNext
Delivered 250 at 1:50:39 PM
called MoveNext
Delivered 260 at 1:50:39 PM
called MoveNext
Delivered 270 at 1:50:39 PM
called MoveNext
Delivered 280 at 1:50:39 PM
called MoveNext
Delivered 290 at 1:50:39 PM
```
注意到多大的改變了吧。第一次 MoveNext() 的調用使得 PLINQ 啟動所有參與的線程產生結果。這過程產生更少(在這個例子,幾乎所有的)結果對象。后續每次 MoveNext() 的調用都是抓取下來的項都是已經產生好的。你不能斷定具體的輸入元素什么時候會被處理。你只)知道的是當你請求查詢的第一個元素是查詢就開始執行(在幾個線程
上)。
PLINQ 的方法理解查詢語法的行為并影響查詢的執行。假設你修改查詢使用 Skip() 和 Take() 選擇第二頁的結果:
```
var answers = (from n in ParallelEnumerable.Range(0, 300)
where n.SomeTest()
select n.SomeProjection()).
Skip(20).Take(20);
```
這個查詢的執行結果和 Object 的 LINQ 的是相同的。這是因為 PLINQ 知道產生20個元素比300個更快。(我已經簡化了,但是 PLINQ 的 Skip() 和 Take() 的實現更傾向于一個連續的算法而不是其他算法)。
你可以對查詢再修改一點,而且 PLINQ 還是使用并行執行模型產生所有元素。只是增加 orderby 子句:
```
var answers = (from n in ParallelEnumerable.Range(0, 300)
where n.SomeTest()
orderby n.ToString().Length
select n.SomeProjection()).
Skip(20).Take(20);
```
orderby 的 lambda 參數一定不能被編譯器優化的表達式(這就是為什么上面我使用 n.ToString().Length 而不是 n )。現在,查詢引擎必須在排序之前產生所有輸出序列的元素。一旦元素被排序之后 Skip() 和 Take() 方法才知道哪些元素會被返回。當然在多核機器上使用多線程產生所有輸出比順序進行更快。 PINQ 也知道這點,所以它會啟動多個線程來創建輸出。
PLINQ 嘗試創建你的寫的查詢的最好實現,產生你需要的結果的花費最少工作和最少時間。有時 PLINQ 查詢會和你期望的不一樣的方式執行。有時,它會表現的項 Object 的 LINQ ,請求輸出序列的下一項才執行代碼產生它。有時,它會的行為更像 SQL 的 LINQ 或實體框架即請求第一個就會產生所有。有時它的行為更像兩者的混合。你應該確保你不要引入 LINQ 查詢的任何副作用。那些在 PLINQ 執行模型是不可靠的。你構建查詢是需要確保你考慮大部分的底層技術。這就需要你理解它們的工作是怎樣的不同。
并行算法被 Amdahl 法則限制:使用多處理器的程序被限制在程序的連續執行部分。 ParallelEnumerable 的擴展方法對于這個規則毫無例外。很多方法都并行執行,但是有些因為它們的性質會影響并行的程度。顯然 OrderBy 和 ThenBy 需要在任務之間進行協調。 Skip , SkipWhile ,Take 和 TakeWhile 會影響并行程度。并行任務運行在不同的核心上完成的順序可能不同。你可以使用 AsOrdered() 和 AsUnordered() 指示 PLINQ 是結果序列否受次序的影響。
有時你自己的算法會有副作用并不能并行。你可以使用擴展方法 ParallelEnumerable.AsSequential() 將并行序列解釋為 IEnumerable 并強制順序執行。
最后, ParallelEnumerable 包含允許你控制 PLINQ 執行并行查詢的方法。你可以使用 WithExecutionMode() 建議并行執行,即使那會選擇高開銷的算法。默認情況下, PLINQ 會并行構造那些有并行需求的地方。你可以使用 WithDegreeOfParallelism() 建議在你的算法使用的線程數量。通常, PLINQ會分配和當前機器處理器數量一樣多的線程。你可以使用 WithMergeOptions() 請求改變 PLINQ 查詢過程中控制的的緩存結果。通常,PLINQ 會緩存每個線程的結果直到它們被消費者線程使用。你可以請求不緩存而是理解使用結果。你也可以請求權緩存,這會增加性能的高時延。默認的,自動緩存,很好的權衡時延和性能。緩存只是一個提示,而不是需求。PLINQ 可能忽略你的請求。
我沒有給出這些設置的具體指導,因為最優方法高度依賴于你的算法。然而,你有這些設置可以改變,你可以在不同的目標機器上做實驗查看是否會有利于你的算法。如果你沒有多個不同的機器去實驗,我建議你使用默認值。
PLINQ 使得并行計算比之前更簡單。因為這些的增加時間會越來越重要;并行計算隨著司空見慣的臺式機和比較的核心數增加而越來越重要。這還不是很容易。設計糟糕的算法可能在并行上看不到性能的提高。你的任務就是反復查看這些算法并找出哪些是能并行的。嘗試實現那些算法的并行版本。測試結果。和性能更好的算法一起工作。意識到一些算法很難實現并行,就讓它們串行。
小結:
原創帖占位,未完待續!
工作中還沒有怎么用到過 LINQ 和 PLINQ ,從本原則可以知道 PLINQ 可以實現并行,但是依賴于你的算法,所以既要對 PLINQ 有理解,也要能很好的設計你的算法才是最好的設計!
終于翻譯完了(這個原則竟然有12頁),太累了,堅持,堅持,再堅持!
歡迎各種不爽,各種噴,寫這個純屬個人愛好,秉持”分享“之德!
有關本書的其他章節翻譯請[點擊查看](/category/297763),轉載請注明出處,尊重原創!
如果您對D.S.Qiu有任何建議或意見可以在文章后面評論,或者發郵件(gd.s.qiu@gmail.com)交流,您的鼓勵和支持是我前進的動力,希望能有更多更好的分享。
轉載請在**文首**注明出處:[http://dsqiu.iteye.com/blog/2088663](/blog/2088663)
更多精彩請關注D.S.Qiu的博客和微博(ID:靜水逐風)
- 第一章 C# 語言習慣
- 原則1:使用 屬性(Poperty)代替可直接訪問的數據成員(Data Member)
- 原則2:偏愛 readonly 而不是 const
- 原則3:選擇 is 或 as 而不是強制類型轉換
- 原則4:使用條件特性(conditional attribute)代替 #if
- 原則5:總是提供 ToString()
- 原則6:理解幾個不同相等概念的關系
- 原則7:明白 GetHashCode() 的陷阱
- 原則8:優先考慮查詢語法(query syntax)而不是循環結構
- 原則9:在你的 API 中避免轉換操作
- 原則10:使用默認參數減少函數的重載
- 原則11:理解小函數的魅力
- 第二章 .NET 資源管理
- 原則12:選擇變量初始化語法(initializer)而不是賦值語句
- 原則13:使用恰當的方式對靜態成員進行初始化
- 原則14:減少重復的初始化邏輯
- 原則15:使用 using 和 try/finally 清理資源
- 原則16:避免創建不需要的對象
- 原則17:實現標準的 Dispose 模式
- 原則17:實現標準的 Dispose 模式
- 原則18:值類型和引用類型的區別
- 原則19:確保0是值類型的一個有效狀態
- 原則20:更傾向于使用不可變原子值類型
- 第三章 用 C# 表達設計
- 原則21:限制你的類型的可見性
- 原則22:選擇定義并實現接口,而不是基類
- 原則23:理解接口方法和虛函數的區別
- 原則24:使用委托來表達回調
- 原則25:實現通知的事件模式
- 原則26:避免返回類的內部對象的引用
- 原則27:總是使你的類型可序列化
- 原則28:創建大粒度的網絡服務 APIs
- 原則29:讓接口支持協變和逆變
- 第四章 和框架一起工作
- 原則30:選擇重載而不是事件處理器
- 原則31:用 IComparable&lt;T&gt; 和 IComparer&lt;T&gt; 實現排序關系
- 原則32:避免 ICloneable
- 原則33:只有基類更新處理才使用 new 修飾符
- 原則34:避免定義在基類的方法的重寫
- 原則35:理解 PLINQ 并行算法的實現
- 原則36:理解 I/O 受限制(Bound)操作 PLINQ 的使用
- 原則37:構造并行算法的異常考量
- 第五章 雜項討論
- 原則38:理解動態(Dynamic)的利與弊
- 原則39:使用動態對泛型類型參數的運行時類型的利用
- 原則40:使用動態接收匿名類型參數