上一章里討論了 job 的物理執行圖,也討論了流入 RDD 中的 records 是怎么被 compute() 后流到后續 RDD 的,同時也分析了 task 是怎么產生 result,以及 result 怎么被收集后計算出最終結果的。然而,我們還沒有討論**數據是怎么通過 ShuffleDependency 流向下一個 stage 的?**
## 對比 Hadoop MapReduce 和 Spark 的 Shuffle 過程
如果熟悉 Hadoop MapReduce 中的 shuffle 過程,可能會按照 MapReduce 的思路去想象 Spark 的 shuffle 過程。然而,它們之間有一些區別和聯系。
**從 high-level 的角度來看,兩者并沒有大的差別。**?都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一個 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以內存作緩沖區,邊 shuffle 邊 aggregate 數據,等到數據 aggregate 好以后進行 reduce() (Spark 里可能是后續的一系列操作)。
**從 low-level 的角度來看,兩者差別不小。**?Hadoop MapReduce 是 sort-based,進入 combine() 和 reduce() 的 records 必須先 sort。這樣的好處在于 combine/reduce() 可以處理大規模的數據,因為其輸入數據可以通過**外排**得到(mapper 對每段數據先做排序,reducer 的 shuffle 對排好序的每段數據做歸并)。目前的 Spark 默認選擇的是 hash-based,通常使用 HashMap 來對 shuffle 來的數據進行 aggregate,不會對數據進行提前排序。如果用戶需要經過排序的數據,那么需要自己調用類似 sortByKey() 的操作;如果你是Spark 1.1的用戶,可以將spark.shuffle.manager設置為sort,則會對數據進行排序。在Spark 1.2中,sort將作為默認的Shuffle實現。
**從實現角度來看,兩者也有不少差別。**?Hadoop MapReduce 將處理流程劃分出明顯的幾個階段:map(), spill, merge, shuffle, sort, reduce() 等。每個階段各司其職,可以按照過程式的編程思想來逐一實現每個階段的功能。在 Spark 中,沒有這樣功能明確的階段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蘊含在 transformation() 中。
如果我們將 map 端劃分數據、持久化數據的過程稱為 shuffle write,而將 reducer 讀入數據、aggregate 數據的過程稱為 shuffle read。那么在 Spark 中,**問題就變為怎么在 job 的邏輯或者物理執行圖中加入 shuffle write 和 shuffle read 的處理邏輯?以及兩個處理邏輯應該怎么高效實現?**
## Shuffle write
由于不要求數據有序,shuffle write 的任務很簡單:將數據 partition 好,并持久化。之所以要持久化,一方面是要減少內存存儲空間壓力,另一方面也是為了 fault-tolerance。
shuffle write 的任務很簡單,那么實現也很簡單:將 shuffle write 的處理邏輯加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最后,該 stage 的 final RDD 每輸出一個 record 就將其 partition 并持久化。圖示如下:

上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數為 2,可以同時運行兩個 task。每個 task 的執行結果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁盤上。每個 task 包含 R 個緩沖區,R = reducer 個數(也就是下一個 stage 中 task 的個數),緩沖區被稱為 bucket,其大小為`spark.shuffle.file.buffer.kb`?,默認是 32KB(Spark 1.1 版本以前是 100KB)。
> 其實 bucket 是一個廣義的概念,代表 ShuffleMapTask 輸出結果經過 partition 后要存放的地方,這里為了細化數據存放位置和數據名稱,僅僅用 bucket 表示緩沖區。
ShuffleMapTask 的執行過程很簡單:先利用 pipeline 計算得到 finalRDD 中對應 partition 的 records。每得到一個 record 就將其送到對應的 bucket 里,具體是哪個 bucket 由`partitioner.partition(record.getKey()))`決定。每個 bucket 里面的數據會不斷被寫到本地磁盤上,形成一個 ShuffleBlockFile,或者簡稱?**FileSegment**。之后的 reducer 會去 fetch 屬于自己的 FileSegment,進入 shuffle read 階段。
這樣的實現很簡單,但有幾個問題:
1. **產生的 FileSegment 過多。**每個 ShuffleMapTask 產生 R(reducer 個數)個 FileSegment,M 個 ShuffleMapTask 就會產生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數據文件。
2. **緩沖區占用內存空間大。**每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產生 M_R 個 bucket。雖然一個 ShuffleMapTask 結束后,對應的緩沖區可以被回收,但一個 worker node 上同時存在的 bucket 個數可以達到 cores?_R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask),占用的內存空間也就達到了`cores * R * 32 KB`。對于 8 核 1000 個 reducer 來說,占用內存就是 256MB。
目前來看,第二個問題還沒有好的方法解決,因為寫磁盤終究是要開緩沖區的,緩沖區太小會影響 IO 速度。但第一個問題有一些方法去解決,下面介紹已經在 Spark 里面實現的 FileConsolidation 方法。先上圖:

可以明顯看出,在一個 core 上連續執行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 形成 ShuffleBlock i,后執行的 ShuffleMapTask 可以將輸出數據直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為?**FileSegment**。下一個 stage 的 reducer 只需要 fetch 整個 ShuffleFile 就行了。這樣,每個 worker 持有的文件數降為 cores * R。FileConsolidation 功能可以通過`spark.shuffle.consolidateFiles=true`來開啟。
## Shuffle read
先看一張包含 ShuffleDependency 的物理執行圖,來自 reduceByKey:

很自然地,要計算 ShuffleRDD 中的數據,必須先把 MapPartitionsRDD 中的數據 fetch 過來。那么問題就來了:
* 在什么時候 fetch,parent stage 中的一個 ShuffleMapTask 執行完還是等全部 ShuffleMapTasks 執行完?
* 邊 fetch 邊處理還是一次性 fetch 完再處理?
* fetch 來的數據存放到哪里?
* 怎么獲得要 fetch 的數據的存放位置?
解決問題:
* **在什么時候 fetch?**當 parent stage 的所有 ShuffleMapTasks 結束后再 fetch。理論上講,一個 ShuffleMapTask 結束后就可以 fetch,但是為了迎合 stage 的概念(即一個 stage 如果其 parent stages 沒有執行完,自己是不能被提交執行的),還是選擇全部 ShuffleMapTasks 執行完再去 fetch。因為 fetch 來的 FileSegments 要先在內存做緩沖,所以一次 fetch 的 FileSegments 總大小不能太大。Spark 規定這個緩沖界限不能超過?`spark.reducer.maxMbInFlight`,這里用?**softBuffer**?表示,默認大小為 48MB。一個 softBuffer 里面一般包含多個 FileSegment,但如果某個 FileSegment 特別大的話,這一個就可以填滿甚至超過 softBuffer 的界限。
* **邊 fetch 邊處理還是一次性 fetch 完再處理?**邊 fetch 邊處理。本質上,MapReduce shuffle 階段就是邊 fetch 邊使用 combine() 進行處理,只是 combine() 處理的是部分數據。MapReduce 為了讓進入 reduce() 的 records 有序,必須等到全部數據都 shuffle-sort 后再開始 reduce()。因為 Spark 不要求 shuffle 后的數據全局有序,因此沒必要等到全部數據 shuffle 完成后再處理。**那么如何實現邊 shuffle 邊處理,而且流入的 records 是無序的?**答案是使用可以 aggregate 的數據結構,比如 HashMap。每 shuffle 得到(從緩沖的 FileSegment 中 deserialize 出來)一個 \?record,直接將其放進 HashMap 里面。如果該 HashMap 已經存在相應的 Key,那么直接進行 aggregate 也就是?`func(hashMap.get(Key), Value)`,比如上面 WordCount 例子中的 func 就是?`hashMap.get(Key) + Value`,并將 func 的結果重新 put(key) 到 HashMap 中去。這個 func 功能上相當于 reduce(),但實際處理數據的方式與 MapReduce reduce() 有差別,差別相當于下面兩段程序的差別。
~~~
// MapReduce
reduce(K key, Iterable<V> values) {
result = process(key, values)
return result
}
// Spark
reduce(K key, Iterable<V> values) {
result = null
for (V value : values)
result = func(result, value)
return result
}
~~~
MapReduce 可以在 process 函數里面可以定義任何數據結構,也可以將部分或全部的 values 都 cache 后再進行處理,非常靈活。而 Spark 中的 func 的輸入參數是固定的,一個是上一個 record 的處理結果,另一個是當前讀入的 record,它們經過 func 處理后的結果被下一個 record 處理時使用。因此一些算法比如求平均數,在 process 里面很好實現,直接`sum(values)/values.length`,而在 Spark 中 func 可以實現`sum(values)`,但不好實現`/values.length`。更多的 func 將會在下面的章節細致分析。
* **fetch 來的數據存放到哪里?**剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區,經過處理后的數據放在內存 + 磁盤上。這里我們主要討論處理后的數據,可以靈活設置這些數據是“只用內存”還是“內存+磁盤”。如果`spark.shuffle.spill = false`就只用內存。內存使用的是`AppendOnlyMap`?,類似 Java 的`HashMap`,內存+磁盤使用的是`ExternalAppendOnlyMap`,如果內存空間不足時,`ExternalAppendOnlyMap`可以將 \?records 進行 sort 后 spill 到磁盤上,等到需要它們的時候再進行歸并,后面會詳解。**使用“內存+磁盤”的一個主要問題就是如何在兩者之間取得平衡?**在 Hadoop MapReduce 中,默認將 reducer 的 70% 的內存空間用于存放 shuffle 來的數據,等到這個空間利用率達到 66% 的時候就開始 merge-combine()-spill。在 Spark 中,也適用同樣的策略,一旦 ExternalAppendOnlyMap 達到一個閾值就開始 spill,具體細節下面會討論。
* **怎么獲得要 fetch 的數據的存放位置?**在上一章討論物理執行圖中的 stage 劃分的時候,我們強調 “一個 ShuffleMapStage 形成后,會將該 stage 最后一個 final RDD 注冊到`MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size)`,這一步很重要,因為 shuffle 過程需要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數據的位置”。因此,reducer 在 shuffle 的時候是要去 driver 里面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數據位置的。每個 ShuffleMapTask 完成時會將 FileSegment 的存儲位置信息匯報給 MapOutputTrackerMaster。
至此,我們已經討論了 shuffle write 和 shuffle read 設計的核心思想、算法及某些實現。接下來,我們深入一些細節來討論。
## 典型 transformation() 的 shuffle read
### 1\. reduceByKey(func)
上面初步介紹了 reduceByKey() 是如何實現邊 fetch 邊 reduce() 的。需要注意的是雖然 Example(WordCount) 中給出了各個 RDD 的內容,但一個 partition 里面的 records 并不是同時存在的。比如在 ShuffledRDD 中,每 fetch 來一個 record 就立即進入了 func 進行處理。MapPartitionsRDD 中的數據是 func 在全部 records 上的處理結果。從 record 粒度上來看,reduce() 可以表示如下:

可以看到,fetch 來的 records 被逐個 aggreagte 到 HashMap 中,等到所有 records 都進入 HashMap,就得到最后的處理結果。唯一要求是 func 必須是 commulative 的(參見上面的 Spark 的 reduce() 的代碼)。
ShuffledRDD 到 MapPartitionsRDD 使用的是 mapPartitionsWithContext 操作。
為了減少數據傳輸量,MapReduce 可以在 map 端先進行 combine(),其實在 Spark 也可以實現,只需要將上圖 ShuffledRDD => MapPartitionsRDD 的 mapPartitionsWithContext 在 ShuffleMapStage 中也進行一次即可,比如 reduceByKey 例子中 ParallelCollectionRDD => MapPartitionsRDD 完成的就是 map 端的 combine()。
**對比 MapReduce 的 map()-reduce() 和 Spark 中的 reduceByKey():**
* map 端的區別:map() 沒有區別。對于 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上進行 combine()。
* reduce 端區別:MapReduce 的 shuffle 階段先 fetch 數據,數據量到達一定規模后 combine(),再將剩余數據 merge-sort 后 reduce(),reduce() 非常靈活。Spark 邊 fetch 邊 reduce()(在 HashMap 上執行 func),因此要求 func 符合 commulative 的特性。
**從內存利用上來對比:**
* map 端區別:MapReduce 需要開一個大型環形緩沖區來暫存和排序 map() 的部分輸出結果,但 combine() 不需要額外空間(除非用戶自己定義)。 Spark 需要 HashMap 內存數據結構來進行 combine(),同時輸出 records 到磁盤上時也需要一個小的 buffer(bucket)。
* reduce 端區別:MapReduce 需要一部分內存空間來存儲 shuffle 過來的數據,combine() 和 reduce() 不需要額外空間,因為它們的輸入數據分段有序,只需歸并一下就可以得到。在 Spark 中,fetch 時需要 softBuffer,處理數據時如果只使用內存,那么需要 HashMap 來持有處理后的結果。如果使用內存+磁盤,那么在 HashMap 存放一部分處理后的數據。
### 2\. groupByKey(numPartitions)

與 reduceByKey() 流程一樣,只是 func 變成?`result = result ++ record.value`,功能是將每個 key 對應的所有 values 鏈接在一起。result 來自 hashMap.get(record.key),計算后的 result 會再次被 put 到 hashMap 中。與 reduceByKey() 的區別就是 groupByKey() 沒有 map 端的 combine()。對于 groupByKey() 來說 map 端的 combine() 只是減少了重復 Key 占用的空間,如果 key 重復率不高,沒必要 combine(),否則,最好能夠 combine()。
### 3\. distinct(numPartitions)

與 reduceByKey() 流程一樣,只是 func 變成?`result = result == null? record.value : result`,如果 HashMap 中沒有該 record 就將其放入,否則舍棄。與 reduceByKey() 相同,在map 端存在 combine()。
### 4\. cogroup(otherRDD, numPartitions)

CoGroupedRDD 可能有 0 個、1 個或者多個 ShuffleDependency。但并不是要為每一個 ShuffleDependency 建立一個 HashMap,而是所有的 Dependency 共用一個 HashMap。與 reduceByKey() 不同的是,HashMap 在 CoGroupedRDD 的 compute() 中建立,而不是在 mapPartitionsWithContext() 中建立。
粗線表示的 task 首先 new 出一個 Array[ArrayBuffer(), ArrayBuffer()],ArrayBuffer() 的個數與參與 cogroup 的 RDD 個數相同。func 的邏輯是這樣的:每當從 RDD a 中 shuffle 過來一個 \?record 就將其添加到 hashmap.get(Key) 對應的 Array 中的第一個 ArrayBuffer() 中,每當從 RDD b 中 shuffle 過來一個 record,就將其添加到對應的 Array 中的第二個 ArrayBuffer()。
CoGroupedRDD => MappedValuesRDD 對應 mapValues() 操作,就是將 [ArrayBuffer(), ArrayBuffer()] 變成 [Iterable[V], Iterable[W]]。
### 5\. intersection(otherRDD) 和 join(otherRDD, numPartitions)

這兩個操作中均使用了 cogroup,所以 shuffle 的處理方式與 cogroup 一樣。
### 6\. sortByKey(ascending, numPartitions)

sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的處理邏輯與 reduceByKey() 不太一樣,沒有使用 HashMap 和 func 來處理 fetch 過來的 records。
sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的處理邏輯是:將 shuffle 過來的一個個 record 存放到一個 Array 里,然后按照 Key 來對 Array 中的 records 進行 sort。
### 7\. coalesce(numPartitions, shuffle = true)

coalesce() 雖然有 ShuffleDependency,但不需要對 shuffle 過來的 records 進行 aggregate,所以沒有建立 HashMap。每 shuffle 一個 record,就直接流向 CoalescedRDD,進而流向 MappedRDD 中。
## Shuffle read 中的 HashMap
HashMap 是 Spark shuffle read 過程中頻繁使用的、用于 aggregate 的數據結構。Spark 設計了兩種:一種是全內存的 AppendOnlyMap,另一種是內存+磁盤的 ExternalAppendOnlyMap。下面我們來分析一下**兩者特性及內存使用情況**。
### 1\. AppendOnlyMap
AppendOnlyMap 的官方介紹是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed。意思是類似 HashMap,但沒有`remove(key)`方法。其實現原理很簡單,開一個大 Object 數組,藍色部分存儲 Key,白色部分存儲 Value。如下圖:

當要 put(K, V) 時,先 hash(K) 找存放位置,**如果存放位置已經被占用,就使用 Quadratic probing 探測方法來找下一個空閑位置**。對于圖中的 K6 來說,第三次查找找到 K4 后面的空閑位置,放進去即可。get(K6) 的時候類似,找三次找到 K6,取出緊挨著的 V6,與先來的 value 做 func,結果重新放到 V6 的位置。
迭代 AppendOnlyMap 中的元素的時候,從前到后掃描輸出。
如果 Array 的利用率達到 70%,那么就擴張一倍,并對所有 key 進行 rehash 后,重新排列每個 key 的位置。
AppendOnlyMap 還有一個?`destructiveSortedIterator(): Iterator[(K, V)]`?方法,可以返回 Array 中排序后的 (K, V) pairs。實現方法很簡單:先將所有 (K, V) pairs compact 到 Array 的前端,并使得每個 (K, V) 占一個位置(原來占兩個),之后直接調用 Array.sort() 排序,不過這樣做會破壞數組(key 的位置變化了)。
### 2\. ExternalAppendOnlyMap

相比 AppendOnlyMap,ExternalAppendOnlyMap 的實現略復雜,但邏輯其實很簡單,類似 Hadoop MapReduce 中的 shuffle-merge-combine-sort 過程:
ExternalAppendOnlyMap 持有一個 AppendOnlyMap,shuffle 來的一個個 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 過程與原始的 AppendOnlyMap 一模一樣。**如果 AppendOnlyMap 快被裝滿時檢查一下內存剩余空間是否可以夠擴展,夠就直接在內存中擴展,不夠就 sort 一下 AppendOnlyMap,將其內部所有 records 都 spill 到磁盤上。**圖中 spill 了 4 次,每次 spill 完在磁盤上生成一個 spilledMap 文件,然后重新 new 出來一個 AppendOnlyMap。最后一個 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 來的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已經被處理完,因為每次 insert 的時候,新來的 record 只與 AppendOnlyMap 中的 records 進行 aggregate,并不是與所有的 records 進行 aggregate(一些 records 已經被 spill 到磁盤上了)。因此當需要 aggregate 的最終結果時,需要對 AppendOnlyMap 和所有的 spilledMaps 進行全局 merge-aggregate。
**全局 merge-aggregate 的流程也很簡單:**先將 AppendOnlyMap 中的 records 進行 sort,形成 sortedMap。然后利用 DestructiveSortedIterator 和 DiskMapIterator 分別從 sortedMap 和各個 spilledMap 讀出一部分數據(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key),所以圖中第一個 spilledMap 只讀出前三個 records 進入 StreamBuffer。mergeHeap 顧名思義就是使用堆排序不斷提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并將其一個個放入 mergeBuffers 中,放入的時候與已經存在于 mergeBuffers 中的 StreamBuffer 進行 merge-combine,第一個被放入 mergeBuffers 的 StreamBuffer 被稱為 minBuffer,那么 minKey 就是 minBuffer 中第一個 record 的 key。當 merge-combine 的時候,與 minKey 相同的 records 被 aggregate 一起,然后輸出。整個 merge-combine 在 mergeBuffers 中結束后,StreamBuffer 剩余的 records 隨著 StreamBuffer 重新進入 mergeHeap。一旦某個 StreamBuffer 在 merge-combine 后變為空(里面的 records 都被輸出了),那么會使用 DestructiveSortedIterator 或 DiskMapIterator 重新裝填 hash(key) 相同的 records,然后再重新進入 mergeHeap。
整個 insert-merge-aggregate 的過程有三點需要進一步探討一下:
* 內存剩余空間檢測
與 Hadoop MapReduce 規定 reducer 中 70% 的空間可用于 shuffle-sort 類似,Spark 也規定 executor 中?`spark.shuffle.memoryFraction * spark.shuffle.safetyFraction`?的空間(默認是`0.3 * 0.8`)可用于 ExternalOnlyAppendMap。**Spark 略保守是不是?更保守的是這 24% 的空間不是完全用于一個 ExternalOnlyAppendMap 的,而是由在 executor 上同時運行的所有 reducer 共享的。**為此,exectuor 專門持有一個?`ShuffleMemroyMap: HashMap[threadId, occupiedMemory]`?來監控每個 reducer 中 ExternalOnlyAppendMap 占用的內存量。每當 AppendOnlyMap 要擴展時,都會計算**ShuffleMemroyMap 持有的所有 reducer 中的 AppendOnlyMap 已占用的內存 + 擴展后的內存**?是會否會大于內存限制,大于就會將 AppendOnlyMap spill 到磁盤。有一點需要注意的是前 1000 個 records 進入 AppendOnlyMap 的時候不會啟動**是否要 spill**?的檢查,需要擴展時就直接在內存中擴展。
* AppendOnlyMap 大小估計
為了獲知 AppendOnlyMap 占用的內存空間,可以在每次擴展時都將 AppendOnlyMap reference 的所有 objects 大小都算一遍,然后加和,但這樣做非常耗時。所以 Spark 設計了粗略的估算算法,算法時間復雜度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 后 result 的大小變化及一共 insert 的 records 的個數來估算大小,具體見?`SizeTrackingAppendOnlyMap`?和`SizeEstimator`。
* Spill 過程
與 shuffle write 一樣,在 spill records 到磁盤上的時候,會建立一個 buffer 緩沖區,大小仍為`spark.shuffle.file.buffer.kb`?,默認是 32KB。另外,由于 serializer 也會分配緩沖區用于序列化和反序列化,所以如果一次 serialize 的 records 過多的話緩沖區會變得很大。Spark 限制每次 serialize 的 records 個數為?`spark.shuffle.spill.batchSize`,默認是 10000。
## Discussion
通過本章的介紹可以發現,相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加靈活,會根據不同的 transformation() 的語義去設計不同的 shuffle-aggregate 策略,再加上不同的內存數據結構來混搭出合理的執行流程。
這章主要討論了 Spark 是怎么在不排序 records 的情況下完成 shuffle write 和 shuffle read,以及怎么將 shuffle 過程融入 RDD computing chain 中的。附帶討論了內存與磁盤的平衡以及與 Hadoop MapReduce shuffle 的異同。下一章將從部署圖以及進程通信角度來描述 job 執行的整個流程,也會涉及 shuffle write 和 shuffle read 中的數據位置獲取問題。
另外,Jerry Shao 寫的?[詳細探究Spark的shuffle實現](http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/)?很贊,里面還介紹了 shuffle 過程在 Spark 中的進化史。目前 sort-based 的 shuffle 也在實現當中,stay tuned。