## General logical plan

典型的 Job 邏輯執行圖如上所示,經過下面四個步驟可以得到最終執行結果:
* 從數據源(可以是本地 file,內存數據結構, HDFS,HBase 等)讀取數據創建最初的 RDD。上一章例子中的 parallelize() 相當于 createRDD()。
* 對 RDD 進行一系列的 transformation() 操作,每一個 transformation() 會產生一個或多個包含不同類型 T 的 RDD[T]。T 可以是 Scala 里面的基本類型或數據結構,不限于 (K, V)。但如果是 (K, V),K 不能是 Array 等復雜類型(因為難以在復雜類型上定義 partition 函數)。
* 對最后的 final RDD 進行 action() 操作,每個 partition 計算后產生結果 result。
* 將 result 回送到 driver 端,進行最后的 f(list[result]) 計算。例子中的 count() 實際包含了action() 和 sum() 兩步計算。
> RDD 可以被 cache 到內存或者 checkpoint 到磁盤上。RDD 中的 partition 個數不固定,通常由用戶設定。RDD 和 RDD 之間 partition 的依賴關系可以不是 1 對 1,如上圖既有 1 對 1 關系,也有多對多的關系。
## 邏輯執行圖的生成
了解了 Job 的邏輯執行圖后,寫程序時候會在腦中形成類似上面的數據依賴圖。然而,實際生成的 RDD 個數往往比我們想想的個數多。
要解決邏輯執行圖生成問題,實際需要解決:
* 如何產生 RDD,應該產生哪些 RDD?
* 如何建立 RDD 之間的依賴關系?
### 1\. 如何產生 RDD,應該產生哪些 RDD?
解決這個問題的初步想法是讓每一個 transformation() 方法返回(new)一個 RDD。事實也基本如此,只是某些 transformation() 比較復雜,會包含多個子 transformation(),因而會生成多個 RDD。這就是_實際 RDD 個數比我們想象的多一些_?的原因。
**如何計算每個 RDD 中的數據?**邏輯執行圖實際上是 computing chain,那么 transformation() 的計算邏輯在哪里被 perform?每個 RDD 里有 compute() 方法,負責接收來自上一個 RDD 或者數據源的 input records,perform transformation() 的計算邏輯,然后輸出 records。
產生哪些 RDD 與 transformation() 的計算邏輯有關,下面討論一些典型的?[transformation()](http://spark.apache.org/docs/latest/programming-guide.html#transformations)?及其創建的 RDD。官網上已經解釋了每個 transformation 的含義。iterator(split) 的意思是 foreach record in the partition。這里空了很多,是因為那些 transformation() 較為復雜,會產生多個 RDD,具體會在下一節圖示出來。
| Transformation | Generated RDDs | Compute() |
| --- | --- | --- |
| **map**(func) | MappedRDD | iterator(split).map(f) |
| **filter**(func) | FilteredRDD | iterator(split).filter(f) |
| **flatMap**(func) | FlatMappedRDD | iterator(split).flatMap(f) |
| **mapPartitions**(func) | MapPartitionsRDD | f(iterator(split)) | |
| **mapPartitionsWithIndex**(func) | MapPartitionsRDD | f(split.index, iterator(split)) | |
| **sample**(withReplacement, fraction, seed) | PartitionwiseSampledRDD | PoissonSampler.sample(iterator(split)) BernoulliSampler.sample(iterator(split)) |
| **pipe**(command, [envVars]) | PipedRDD | |
| **union**(otherDataset) | | |
| **intersection**(otherDataset) | | |
| **distinct**([numTasks])) | | |
| **groupByKey**([numTasks]) | | |
| **reduceByKey**(func, [numTasks]) | | |
| **sortByKey**([ascending], [numTasks]) | | |
| **join**(otherDataset, [numTasks]) | | |
| **cogroup**(otherDataset, [numTasks]) | | |
| **cartesian**(otherDataset) | | |
| **coalesce**(numPartitions) | | |
| **repartition**(numPartitions) | |
### 2\. 如何建立 RDD 之間的聯系?
RDD 之間的數據依賴問題實際包括三部分:
* RDD 本身的依賴關系。要生成的 RDD(以后用 RDD x 表示)是依賴一個 parent RDD,還是多個 parent RDDs?
* RDD x 中會有多少個 partition ?
* RDD x 與其 parent RDDs 中 partition 之間是什么依賴關系?是依賴 parent RDD 中一個還是多個 partition?
第一個問題可以很自然的解決,比如`x = rdda.transformation(rddb)`?(e.g., x = a.join(b)) 就表示 RDD x 同時依賴于 RDD a 和 RDD b。
第二個問題中的 partition 個數一般由用戶指定,不指定的話一般取`max(numPartitions[parent RDD 1], .., numPartitions[parent RDD n])`。
第三個問題比較復雜。需要考慮這個 transformation() 的語義,不同的 transformation() 的依賴關系不同。比如 map() 是 1:1,而 groupByKey() 邏輯執行圖中的 ShuffledRDD 中的每個 partition 依賴于 parent RDD 中所有的 partition,還有更復雜的情況。
再次考慮第三個問題,RDD x 中每個 partition 可以依賴于 parent RDD 中一個或者多個 partition。而且這個依賴可以是完全依賴或者部分依賴。部分依賴指的是 parent RDD 中某 partition 中一部分數據與 RDD x 中的一個 parttion 相關,另一部分數據與 RDD x 中的另一個 partition 相關。下圖展示了完全依賴和部分依賴。

前三個是完全依賴,RDD x 中的 partition 與 parent RDD 中的 partition/partitions 完全相關。最后一個是部分依賴,RDD x 中的 partition 只與 parent RDD 中的 partition 一部分數據相關,另一部分數據與 RDD x 中的其他 partition 相關。
在 Spark 中,完全依賴被稱為 NarrowDependency,部分依賴被稱為 ShuffleDependency。其實 ShuffleDependency 跟 MapReduce 中 shuffle 的數據依賴相同(mapper 將其 output 進行 partition,然后每個 reducer 會將所有 mapper 輸出中屬于自己的 partition 通過 HTTP fetch 得到)。
* 第一種 1:1 的情況被稱為 OneToOneDependency。
* 第二種 N:1 的情況被稱為 N:1 NarrowDependency。
* 第三種 N:N 的情況被稱為 N:N NarrowDependency。不屬于前兩種情況的完全依賴都屬于這個類別。
* 第四種被稱為 ShuffleDependency。
對于 NarrowDependency,具體 RDD x 中的 partitoin i 依賴 parrent RDD 中一個 partition 還是多個 partitions,是由 RDD x 中的?`getParents(partition i)`?決定(下圖中某些例子會詳細介紹)。還有一種 RangeDependency 的完全依賴,不過該依賴目前只在 UnionRDD 中使用,下面會介紹。
所以,總結下來 partition 之間的依賴關系如下:
* NarrowDependency (**使用黑色實線或黑色虛線箭頭表示**)
* OneToOneDependency (1:1)
* NarrowDependency (N:1)
* NarrowDependency (N:N)
* RangeDependency (只在 UnionRDD 中使用)
* ShuffleDependency (**使用紅色箭頭表示**)
之所以要劃分 NarrowDependency 和 ShuffleDependency 是為了生成物理執行圖,下一章會具體介紹。
> 需要注意的是第三種 NarrowDependency (N:N) 很少在兩個 RDD 之間出現。因為如果 parent RDD 中的 partition 同時被 child RDD 中多個 partitions 依賴,那么最后生成的依賴圖往往與 ShuffleDependency 一樣。只是對于 parent RDD 中的 partition 來說一個是完全依賴,一個是部分依賴,而箭頭數沒有少。所以 Spark 定義的 NarrowDependency 其實是 “each partition of the parent RDD is used by at most one partition of the child RDD“,也就是只有 OneToOneDependency (1:1) 和 NarrowDependency (N:1) 兩種情況。但是,自己設計的奇葩 RDD 確實可以呈現出 NarrowDependency (N:N) 的情況。這里描述的比較亂,其實看懂下面的幾個典型的 RDD 依賴即可。
**如何計算得到 RDD x 中的數據(records)?**下圖展示了 OneToOneDependency 的數據依賴,雖然 partition 和 partition 之間是 1:1,但不代表計算 records 的時候也是讀一個 record 計算一個 record。 下圖右邊上下兩個 pattern 之間的差別類似于下面兩個程序的差別:

code1 of iter.f()
~~~
int[] array = {1, 2, 3, 4, 5}
for(int i = 0; i < array.length; i++)
f(array[i])
~~~
code2 of f(iter)
~~~
int[] array = {1, 2, 3, 4, 5}
f(array)
~~~
### 3\. 給出一些典型的 transformation() 的計算過程及數據依賴圖
**1) union(otherRDD)**

union() 將兩個 RDD 簡單合并在一起,不改變 partition 里面的數據。RangeDependency 實際上也是 1:1,只是為了訪問 union() 后的 RDD 中的 partition 方便,保留了原始 RDD 的 range 邊界。
**2) groupByKey(numPartitions)**

上一章已經介紹了 groupByKey 的數據依賴,這里算是_溫故而知新_?吧。
groupByKey() 只需要將 Key 相同的 records 聚合在一起,一個簡單的 shuffle 過程就可以完成。ShuffledRDD 中的 compute() 只負責將屬于每個 partition 的數據 fetch 過來,之后使用 mapPartitions() 操作(前面的 OneToOneDependency 展示過)進行 aggregate,生成 MapPartitionsRDD,到這里 groupByKey() 已經結束。最后為了統一返回值接口,將 value 中的 ArrayBuffer[] 數據結構抽象化成 Iterable[]。
> groupByKey() 沒有在 map 端進行 combine,因為 map 端 combine 只會省掉 partition 里面重復 key 占用的空間,當重復 key 特別多時,可以考慮開啟 combine。
>
> 這里的 ArrayBuffer 實際上應該是 CompactBuffer,An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
ParallelCollectionRDD 是最基礎的 RDD,直接從 local 數據結構 create 出的 RDD 屬于這個類型,比如
~~~
val pairs = sc.parallelize(List(1, 2, 3, 4, 5), 3)
~~~
生成的 pairs 就是 ParallelCollectionRDD。
**2) reduceyByKey(func, numPartitions)**

reduceyByKey() 相當于傳統的 MapReduce,整個數據流也與 Hadoop 中的數據流基本一樣。reduceyByKey() 默認在 map 端開啟 combine(),因此在 shuffle 之前先通過 mapPartitions 操作進行 combine,得到 MapPartitionsRDD,然后 shuffle 得到 ShuffledRDD,然后再進行 reduce(通過 aggregate + mapPartitions() 操作來實現)得到 MapPartitionsRDD。
**3) distinct(numPartitions)**

distinct() 功能是 deduplicate RDD 中的所有的重復數據。由于重復數據可能分散在不同的 partition 里面,因此需要 shuffle 來進行 aggregate 后再去重。然而,shuffle 要求數據類型是?`<K, V>`。如果原始數據只有 Key(比如例子中 record 只有一個整數),那么需要補充成?`<K, null>`。這個補充過程由 map() 操作完成,生成 MappedRDD。然后調用上面的 reduceByKey() 來進行 shuffle,在 map 端進行 combine,然后 reduce 進一步去重,生成 MapPartitionsRDD。最后,將?`<K, null>`?還原成 K,仍然由 map() 完成,生成 MappedRDD。藍色的部分就是調用的 reduceByKey()。
**4) cogroup(otherRDD, numPartitions)**

與 groupByKey() 不同,cogroup() 要 aggregate 兩個或兩個以上的 RDD。**那么 CoGroupedRDD 與 RDD a 和 RDD b 的關系都必須是 ShuffleDependency 么?是否存在 OneToOneDependency?**
首先要明確的是 CoGroupedRDD 存在幾個 partition 可以由用戶直接設定,與 RDD a 和 RDD b 無關。然而,如果 CoGroupedRDD 中 partition 個數與 RDD a/b 中的 partition 個數不一樣,那么不可能存在 1:1 的關系。
再次,cogroup() 的計算結果放在 CoGroupedRDD 中哪個 partition 是由用戶設置的 partitioner 確定的(默認是 HashPartitioner)。那么可以推出:即使 RDD a/b 中的 partition 個數與 CoGroupedRDD 中的一樣,如果 RDD a/b 中的 partitioner 與 CoGroupedRDD 中的不一樣,也不可能存在 1:1 的關系。比如,在上圖的 example 里面,RDD a 是 RangePartitioner,b 是 HashPartitioner,CoGroupedRDD 也是 RangePartitioner 且 partition 個數與 a 的相同。那么很自然地,a 中的每個 partition 中 records 可以直接送到 CoGroupedRDD 中對應的 partition。RDD b 中的 records 必須再次進行劃分與 shuffle 后才能進入對應的 partition。
最后,經過上面分析,**對于兩個或兩個以上的 RDD 聚合,當且僅當聚合后的 RDD 中 partitioner 類別及 partition 個數與前面的 RDD 都相同,才會與前面的 RDD 構成 1:1 的關系。否則,只能是 ShuffleDependency。**這個算法對應的代碼可以在`CoGroupedRDD.getDependencies()`?中找到,雖然比較難理解。
> Spark 代碼中如何表示 CoGroupedRDD 中的 partition 依賴于多個 parent RDDs 中的 partitions?
>
> 首先,將 CoGroupedRDD 依賴的所有 RDD 放進數組 rdds[RDD] 中。再次,foreach i,如果 CoGroupedRDD 和 rdds(i) 對應的 RDD 是 OneToOneDependency 關系,那么 Dependecy[i] = new OneToOneDependency(rdd),否則 = new ShuffleDependency(rdd)。最后,返回與每個 parent RDD 的依賴關系數組 deps[Dependency]。
>
> Dependency 類中的 getParents(partition id) 負責給出某個 partition 按照該 dependency 所依賴的 parent RDD 中的 partitions: List[Int]。
>
> getPartitions() 負責給出 RDD 中有多少個 partition,以及每個 partition 如何序列化。
**5) intersection(otherRDD)**

intersection() 功能是抽取出 RDD a 和 RDD b 中的公共數據。先使用 map() 將 RDD[T] 轉變成 RDD[(T, null)],這里的 T 只要不是 Array 等集合類型即可。接著,進行 a.cogroup(b),藍色部分與前面的 cogroup() 一樣。之后再使用 filter() 過濾掉 [iter(groupA()), iter(groupB())] 中 groupA 或 groupB 為空的 records,得到 FilteredRDD。最后,使用 keys() 只保留 key 即可,得到 MappedRDD。
6)?**join(otherRDD, numPartitions)**

join() 將兩個 RDD[(K, V)] 按照 SQL 中的 join 方式聚合在一起。與 intersection() 類似,首先進行 cogroup(),得到`<K, (Iterable[V1], Iterable[V2])>`類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,并將集合 flat() 化。
這里給出了兩個 example,第一個 example 的 RDD 1 和 RDD 2 使用 RangePartitioner 劃分,而 CoGroupedRDD 使用 HashPartitioner,與 RDD 1/2 都不一樣,因此是 ShuffleDependency。第二個 example 中, RDD 1 事先使用 HashPartitioner 對其 key 進行劃分,得到三個 partition,與 CoGroupedRDD 使用的 HashPartitioner(3) 一致,因此數據依賴是 1:1。如果 RDD 2 事先也使用 HashPartitioner 對其 key 進行劃分,得到三個 partition,那么 join() 就不存在 ShuffleDependency 了,這個 join() 也就變成了 hashjoin()。
**7) sortByKey(ascending, numPartitions)**

sortByKey() 將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的數據依賴很簡單,先使用 shuffle 將 records 聚集在一起(放到對應的 partition 里面),然后將 partition 內的所有 records 按 key 排序,最后得到的 MapPartitionsRDD 中的 records 就有序了。
> 目前 sortByKey() 先使用 Array 來保存 partition 中所有的 records,再排序。
**8) cartesian(otherRDD)**

Cartesian 對兩個 RDD 做笛卡爾集,生成的 CartesianRDD 中 partition 個數 = partitionNum(RDD a) * partitionNum(RDD b)。
這里的依賴關系與前面的不太一樣,CartesianRDD 中每個partition 依賴兩個 parent RDD,而且其中每個 partition 完全依賴 RDD a 中一個 partition,同時又完全依賴 RDD b 中另一個 partition。這里沒有紅色箭頭,因為所有依賴都是 NarrowDependency。
> CartesianRDD.getDependencies() 返回 rdds[RDD a, RDD b]。CartesianRDD 中的 partiton i 依賴于 (RDD a).List(i / numPartitionsInRDDb) 和 (RDD b).List(i % numPartitionsInRDDb)。
**9) coalesce(numPartitions, shuffle = false)**

coalesce() 可以將 parent RDD 的 partition 個數進行調整,比如從 5 個減少到 3 個,或者從 5 個增加到 10 個。需要注意的是當 shuffle = false 的時候,是不能增加 partition 個數的(不能從 5 個變為 10 個)。
coalesce() 的核心問題是**如何確立 CoalescedRDD 中 partition 和其 parent RDD 中 partition 的關系。**
* coalesce(shuffle = false) 時,由于不能進行 shuffle,**問題變為 parent RDD 中哪些partition 可以合并在一起。**合并因素除了要考慮 partition 中元素個數外,還要考慮 locality 及 balance 的問題。因此,Spark 設計了一個非常復雜的算法來解決該問題(算法部分我還沒有深究)。注意`Example: a.coalesce(3, shuffle = false)`展示了 N:1 的 NarrowDependency。
* coalesce(shuffle = true) 時,**由于可以進行 shuffle,問題變為如何將 RDD 中所有 records 平均劃分到 N 個 partition 中。**很簡單,在每個 partition 中,給每個 record 附加一個 key,key 遞增,這樣經過 hash(key) 后,key 可以被平均分配到不同的 partition 中,類似 Round-robin 算法。在第二個例子中,RDD a 中的每個元素,先被加上了遞增的 key(如 MapPartitionsRDD 第二個 partition 中 (1, 3) 中的 1)。在每個 partition 中,第一個元素 (Key, Value) 中的 key 由?`(new Random(index)).nextInt(numPartitions)`?計算得到,index 是該 partition 的索引,numPartitions 是 CoalescedRDD 中的 partition 個數。接下來元素的 key 是遞增的,然后 shuffle 后的 ShuffledRDD 可以得到均分的 records,然后經過復雜算法來建立 ShuffledRDD 和 CoalescedRDD 之間的數據聯系,最后過濾掉 key,得到 coalesce 后的結果 MappedRDD。
**10) repartition(numPartitions)**
等價于 coalesce(numPartitions, shuffle = true)
## Primitive transformation()
**combineByKey()**
**分析了這么多 RDD 的邏輯執行圖,它們之間有沒有共同之處?如果有,是怎么被設計和實現的?**
仔細分析 RDD 的邏輯執行圖會發現,ShuffleDependency 左邊的 RDD 中的 record 要求是 \?型的,經過 ShuffleDependency 后,包含相同 key 的 records 會被 aggregate 到一起,然后在 aggregated 的 records 上執行不同的計算邏輯。實際執行時(后面的章節會具體談到)很多 transformation() 如 groupByKey(),reduceByKey() 是邊 aggregate 數據邊執行計算邏輯的,因此共同之處就是?**aggregate 同時 compute()**。Spark 使用 combineByKey() 來實現這個 aggregate + compute() 的基礎操作。
combineByKey() 的定義如下:
~~~
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
~~~
其中主要有三個參數 createCombiner,mergeValue 和 mergeCombiners。簡單解釋下這三個函數及 combineByKey() 的意義,注意它們的類型:
假設一組具有相同 K 的 \?records 正在一個個流向 combineByKey(),createCombiner 將第一個 record 的 value 初始化為 c (比如,c = value),然后從第二個 record 開始,來一個 record 就使用 mergeValue(c, record.value) 來更新 c,比如想要對這些 records 的所有 values 做 sum,那么使用 c = c + record.value。等到 records 全部被 mergeValue(),得到結果 c。假設還有一組 records(key 與前面那組的 key 均相同)一個個到來,combineByKey() 使用前面的方法不斷計算得到 c'。現在如果要求這兩組 records 總的 combineByKey() 后的結果,那么可以使用 final c = mergeCombiners(c, c') 來計算。
## Discussion
至此,我們討論了如何生成 job 的邏輯執行圖,這些圖也是 Spark 看似簡單的 API 背后的復雜計算邏輯及數據依賴關系。
整個 job 會產生哪些 RDD 由 transformation() 語義決定。一些 transformation(), 比如 cogroup() 會被很多其他操作用到。
RDD 本身的依賴關系由 transformation() 生成的每一個 RDD 本身語義決定。如 CoGroupedRDD 依賴于所有參加 cogroup() 的 RDDs。
RDD 中 partition 依賴關系分為 NarrowDependency 和 ShuffleDependency。前者是完全依賴,后者是部分依賴。NarrowDependency 里面又包含多種情況,只有前后兩個 RDD 的 partition 個數以及 partitioner 都一樣,才會出現 NarrowDependency。
從數據處理邏輯的角度來看,MapReduce 相當于 Spark 中的 map() + reduceByKey(),但嚴格來講 MapReduce 中的 reduce() 要比 reduceByKey() 的功能強大些,詳細差別會在 Shuffle details 一章中繼續討論。