在 Overview 里我們初步介紹了 DAG 型的物理執行圖,里面包含 stages 和 tasks。這一章主要解決的問題是:
**給定 job 的邏輯執行圖,如何生成物理執行圖(也就是 stages 和 tasks)?**
## 一個復雜 job 的邏輯執行圖

代碼貼在本章最后。**給定這樣一個復雜數據依賴圖,如何合理劃分 stage,并確定 task 的類型和個數?**
一個直觀想法是將前后關聯的 RDDs 組成一個 stage,每個箭頭生成一個 task。對于兩個 RDD 聚合成一個 RDD 的情況,這三個 RDD 組成一個 stage。這樣雖然可以解決問題,但顯然效率不高。除了效率問題,這個想法還有一個更嚴重的問題:**大量中間數據需要存儲**。對于 task 來說,其執行結果要么要存到磁盤,要么存到內存,或者兩者皆有。如果每個箭頭都是 task 的話,每個 RDD 里面的數據都需要存起來,占用空間可想而知。
仔細觀察一下邏輯執行圖會發現:在每個 RDD 中,每個 partition 是獨立的,也就是說在 RDD 內部,每個 partition 的數據依賴各自不會相互干擾。因此,一個大膽的想法是將整個流程圖看成一個 stage,為最后一個 finalRDD 中的每個 partition 分配一個 task。圖示如下:

所有的粗箭頭組合成第一個 task,該 task 計算結束后順便將 CoGroupedRDD 中已經計算得到的第二個和第三個 partition 存起來。之后第二個 task(細實線)只需計算兩步,第三個 task(細虛線)也只需要計算兩步,最后得到結果。
這個想法有兩個不靠譜的地方:
* 第一個 task 太大,碰到 ShuffleDependency 后,不得不計算 shuffle 依賴的 RDDs 的所有 partitions,而且都在這一個 task 里面計算。
* 需要設計巧妙的算法來判斷哪個 RDD 中的哪些 partition 需要 cache。而且 cache 會占用存儲空間。
雖然這是個不靠譜的想法,但有一個可取之處,即?**pipeline 思想:數據用的時候再算,而且數據是流到要計算的位置的**。比如在第一個 task 中,從 FlatMappedValuesRDD 中的 partition 向前推算,只計算要用的(依賴的) RDDs 及 partitions。在第二個 task 中,從 CoGroupedRDD 到 FlatMappedValuesRDD 計算過程中,不需要存儲中間結果(MappedValuesRDD 中 partition 的全部數據)。
更進一步,從 record 粒度來講,如下圖中,第一個 pattern 中先算 g(f(record1)),然后原始的 record1 和 f(record1) 都可以丟掉,然后再算 g(f(record2)),丟掉中間結果,最后算 g(f(record3))。對于第二個 pattern 中的 g,record1 進入 g 后,理論上可以丟掉(除非被手動 cache)。其他 pattern 同理。

回到 stage 和 task 的劃分問題,上面不靠譜想法的主要問題是碰到 ShuffleDependency 后無法進行 pipeline。那么只要在 ShuffleDependency 處斷開,就只剩 NarrowDependency,而 NarrowDependency chain 是可以進行 pipeline 的。按照此思想,上面 ComplexJob 的劃分圖如下:

所以劃分算法就是:**從后往前推算,遇到 ShuffleDependency 就斷開,遇到 NarrowDependency 就將其加入該 stage。每個 stage 里面 task 的數目由該 stage 最后一個 RDD 中的 partition 個數決定。**
粗箭頭表示 task。因為是從后往前推算,因此最后一個 stage 的 id 是 0,stage 1 和 stage 2 都是 stage 0 的 parents。**如果 stage 最后要產生 result,那么該 stage 里面的 task 都是 ResultTask,否則都是 ShuffleMapTask。**之所以稱為 ShuffleMapTask 是因為其計算結果需要 shuffle 到下一個 stage,本質上相當于 MapReduce 中的 mapper。ResultTask 相當于 MapReduce 中的 reducer(如果需要從 parent stage 那里 shuffle 數據),也相當于普通 mapper(如果該 stage 沒有 parent stage)。
還有一個問題:算法中提到 NarrowDependency chain 可以 pipeline,可是這里的?**ComplexJob 只展示了 OneToOneDependency 和 RangeDependency 的 pipeline,普通 NarrowDependency 如何 pipeline?**
回想上一章里面 cartesian(otherRDD) 里面復雜的 NarrowDependency,圖示如下:

經過算法劃分后結果如下:

圖中粗箭頭展示了第一個 ResultTask,其他的 task 依此類推。由于該 stage 的 task 直接輸出 result,所以這個圖包含 6 個 ResultTasks。與 OneToOneDependency 不同的是這里每個 ResultTask 需要計算 3 個 RDD,讀取兩個 data block,而整個讀取和計算這三個 RDD 的過程在一個 task 里面完成。當計算 CartesianRDD 中的 partition 時,需要從兩個 RDD 獲取 records,由于都在一個 task 里面,不需要 shuffle。這個圖說明:**不管是 1:1 還是 N:1 的 NarrowDependency,只要是 NarrowDependency chain,就可以進行 pipeline,生成的 task 個數與該 stage 最后一個 RDD 的 partition 個數相同。**
## 物理圖的執行
生成了 stage 和 task 以后,下一個問題就是?**task 如何執行來生成最后的 result?**
回到 ComplexJob 的物理執行圖,如果按照 MapReduce 的邏輯,從前到后執行,map() 產生中間數據 map outpus,經過 partition 后放到本地磁盤。再經過 shuffle-sort-aggregate 后生成 reduce inputs,最后 reduce() 執行得到 result。執行流程如下:

整個執行流程沒有問題,但不能直接套用在 Spark 的物理執行圖上,因為 MapReduce 的流程圖簡單、固定,而且沒有 pipeline。
回想 pipeline 的思想是?**數據用的時候再算,而且數據是流到要計算的位置的**。Result 產生的地方的就是要計算的位置,要確定 “需要計算的數據”,我們可以從后往前推,需要哪個 partition 就計算哪個 partition,如果 partition 里面沒有數據,就繼續向前推,形成 computing chain。這樣推下去,結果就是:需要首先計算出每個 stage 最左邊的 RDD 中的某些 partition。
**對于沒有 parent stage 的 stage,該 stage 最左邊的 RDD 是可以立即計算的,而且每計算出一個 record 后便可以流入 f 或 g(見前面圖中的 patterns)。**如果 f 中的 record 關系是 1:1 的,那么 f(record1) 計算結果可以立即順著 computing chain 流入 g 中。如果 f 的 record 關系是 N:1,record1 進入 f() 后也可以被回收。總結一下,computing chain 從后到前建立,而實際計算出的數據從前到后流動,而且計算出的第一個 record 流動到不能再流動后,再計算下一個 record。這樣,雖然是要計算后續 RDD 的 partition 中的 records,但并不是要求當前 RDD 的 partition 中所有 records 計算得到后再整體向后流動。
對于有 parent stage 的 stage,先等著所有 parent stages 中 final RDD 中數據計算好,然后經過 shuffle 后,問題就又回到了計算 “沒有 parent stage 的 stage”。
> 代碼實現:每個 RDD 包含的 getDependency() 負責確立 RDD 的數據依賴,compute() 方法負責接收 parent RDDs 或者 data block 流入的 records,進行計算,然后輸出 record。經常可以在 RDD 中看到這樣的代碼`firstParent[T].iterator(split, context).map(f)`。firstParent 表示該 RDD 依賴的第一個 parent RDD,iterator() 表示 parentRDD 中的 records 是一個一個流入該 RDD 的,map(f) 表示每流入一個 recod 就對其進行 f(record) 操作,輸出 record。為了統一接口,這段 compute() 仍然返回一個 iterator,來迭代 map(f) 輸出的 records。
總結一下:**整個 computing chain 根據數據依賴關系自后向前建立,遇到 ShuffleDependency 后形成 stage。在每個 stage 中,每個 RDD 中的 compute() 調用 parentRDD.iter() 來將 parent RDDs 中的 records 一個個 fetch 過來。**
如果要自己設計一個 RDD,那么需要注意的是 compute() 只負責定義 parent RDDs => output records 的計算邏輯,具體依賴哪些 parent RDDs 由?`getDependency()`?定義,具體依賴 parent RDD 中的哪些 partitions 由?`dependency.getParents()`?定義。
例如,在 CartesianRDD 中,
~~~
// RDD x = (RDD a).cartesian(RDD b)
// 定義 RDD x 應該包含多少個 partition,每個 partition 是什么類型
override def getPartitions: Array[Partition] = {
// create the cross product split
val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
// 定義 RDD x 中的每個 partition 怎么計算得到
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
// s1 表示 RDD x 中的 partition 依賴 RDD a 中的 partitions(這里只依賴一個)
// s2 表示 RDD x 中的 partition 依賴 RDD b 中的 partitions(這里只依賴一個)
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
// 定義 RDD x 中的 partition i 依賴于哪些 RDD 中的哪些 partitions
//
// 這里 RDD x 依賴于 RDD a,同時依賴于 RDD b,都是 NarrowDependency
// 對于第一個依賴,RDD x 中的 partition i 依賴于 RDD a 中的
// 第 List(i / numPartitionsInRdd2) 個 partition
// 對于第二個依賴,RDD x 中的 partition i 依賴于 RDD b 中的
// 第 List(id % numPartitionsInRdd2) 個 partition
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
},
new NarrowDependency(rdd2) {
def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
}
)
~~~
## 生成 job
前面介紹了邏輯和物理執行圖的生成原理,那么,**怎么觸發 job 的生成?已經介紹了 task,那么 job 是什么?**
下表列出了可以觸發執行圖生成的典型?[action()](http://spark.apache.org/docs/latest/programming-guide.html#actions),其中第二列是?`processPartition()`,定義如何計算 partition 中的 records 得到 result。第三列是?`resultHandler()`,定義如何對從各個 partition 收集來的 results 進行計算來得到最終結果。
| Action | finalRDD(records) => result | compute(results) |
| --- | --- | --- |
| reduce(func) | (record1, record2) => result, (result, record i) => result | (result1, result 2) => result, (result, result i) => result |
| collect() | Array[records] => result | Array[result] |
| count() | count(records) => result | sum(result) |
| foreach(f) | f(records) => result | Array[result] |
| take(n) | record (i result | Array[result] |
| first() | record 1 => result | Array[result] |
| takeSample() | selected records => result | Array[result] |
| takeOrdered(n, [ordering]) | TopN(records) => result | TopN(results) |
| saveAsHadoopFile(path) | records => write(records) | null |
| countByKey() | (K, V) => Map(K, count(K)) | (Map, Map) => Map(K, count(K)) |
用戶的 driver 程序中一旦出現 action(),就會生成一個 job,比如?`foreach()`?會調用`sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))`,向 DAGScheduler 提交 job。如果 driver 程序后面還有 action(),那么其他 action() 也會生成 job 提交。所以,driver 有多少個 action(),就會生成多少個 job。這就是 Spark 稱 driver 程序為 application(可能包含多個 job)而不是 job 的原因。
每一個 job 包含 n 個 stage,最后一個 stage 產生 result。比如,第一章的 GroupByTest 例子中存在兩個 job,一共產生了兩組 result。在提交 job 過程中,DAGScheduler 會首先劃分 stage,然后先提交**無 parent stage 的 stages**,并在提交過程中確定該 stage 的 task 個數及類型,并提交具體的 task。無 parent stage 的 stage 提交完后,依賴該 stage 的 stage 才能夠提交。從 stage 和 task 的執行角度來講,一個 stage 的 parent stages 執行完后,該 stage 才能執行。
## 提交 job 的實現細節
下面簡單分析下 job 的生成和提交代碼,提交過程在 Architecture 那一章也會有圖文并茂的分析:
1. rdd.action() 會調用?`DAGScheduler.runJob(rdd, processPartition, resultHandler)`?來生成 job。
2. runJob() 會首先通過`rdd.getPartitions()`來得到 finalRDD 中應該存在的 partition 的個數和類型:Array[Partition]。然后根據 partition 個數 new 出來將來要持有 result 的數組?`Array[Result](partitions.size)`。
3. 最后調用 DAGScheduler 的`runJob(rdd, cleanedFunc, partitions, allowLocal, resultHandler)`來提交 job。cleanedFunc 是 processParittion 經過閉包清理后的結果,這樣可以被序列化后傳遞給不同節點的 task。
4. DAGScheduler 的 runJob 繼續調用`submitJob(rdd, func, partitions, allowLocal, resultHandler)`來提交 job。
5. submitJob() 首先得到一個 jobId,然后再次包裝 func,向 DAGSchedulerEventProcessActor 發送 JobSubmitted 信息,該 actor 收到信息后進一步調用`dagScheduler.handleJobSubmitted()`來處理提交的 job。之所以這么麻煩,是為了符合事件驅動模型。
6. handleJobSubmmitted() 首先調用 finalStage = newStage() 來劃分 stage,然后submitStage(finalStage)。由于 finalStage 可能有 parent stages,實際先提交 parent stages,等到他們執行完,finalStage 需要再次提交執行。再次提交由 handleJobSubmmitted() 最后的 submitWaitingStages() 負責。
分析一下 newStage() 如何劃分 stage:
1. 該方法在 new Stage() 的時候會調用 finalRDD 的 getParentStages()。
2. getParentStages() 從 finalRDD 出發,反向 visit 邏輯執行圖,遇到 NarrowDependency 就將依賴的 RDD 加入到 stage,遇到 ShuffleDependency 切開 stage,并遞歸到 ShuffleDepedency 依賴的 stage。
3. 一個 ShuffleMapStage(不是最后形成 result 的 stage)形成后,會將該 stage 最后一個 RDD 注冊到`MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)`,這一步很重要,因為 shuffle 過程需要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數據的位置。
分析一下 submitStage(stage) 如何提交 stage 和 task:
1. 先確定該 stage 的 missingParentStages,使用`getMissingParentStages(stage)`。如果 parentStages 都可能已經執行過了,那么就為空了。
2. 如果 missingParentStages 不為空,那么先遞歸提交 missing 的 parent stages,并將自己加入到 waitingStages 里面,等到 parent stages 執行結束后,會觸發提交 waitingStages 里面的 stage。
3. 如果 missingParentStages 為空,說明該 stage 可以立即執行,那么就調用`submitMissingTasks(stage, jobId)`來生成和提交具體的 task。如果 stage 是 ShuffleMapStage,那么 new 出來與該 stage 最后一個 RDD 的 partition 數相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出來與 stage 最后一個 RDD 的 partition 個數相同的 ResultTasks。一個 stage 里面的 task 組成一個 TaskSet,最后調用`taskScheduler.submitTasks(taskSet)`來提交一整個 taskSet。
4. 這個 taskScheduler 類型是 TaskSchedulerImpl,在 submitTasks() 里面,每一個 taskSet 被包裝成 manager: TaskSetMananger,然后交給`schedulableBuilder.addTaskSetManager(manager)`。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 調度器。submitTasks() 最后一步是通知`backend.reviveOffers()`去執行 task,backend 的類型是 SchedulerBackend。如果在集群上運行,那么這個 backend 類型是 SparkDeploySchedulerBackend。
5. SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子類,`backend.reviveOffers()`其實是向 DriverActor 發送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的時候,會啟動 DriverActor。DriverActor 收到 ReviveOffers 消息后,會調用`launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))`?來 launch tasks。scheduler 就是 TaskSchedulerImpl。`scheduler.resourceOffers()`從 FIFO 或者 Fair 調度器那里獲得排序后的 TaskSetManager,并經過`TaskSchedulerImpl.resourceOffer()`,考慮 locality 等因素來確定 task 的全部信息 TaskDescription。調度細節這里暫不討論。
6. DriverActor 中的 launchTasks() 將每個 task 序列化,如果序列化大小不超過 Akka 的 akkaFrameSize,那么直接將 task 送到 executor 那里執行`executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))`。
## Discussion
至此,我們討論了:
* driver 程序如何觸發 job 的生成
* 如何從邏輯執行圖得到物理執行圖
* pipeline 思想與實現
* 生成與提交 job 的實際代碼
還有很多地方沒有深入討論,如:
* 連接 stage 的 shuffle 過程
* task 運行過程及運行位置
下一章重點討論 shuffle 過程。
從邏輯執行圖的建立,到將其轉換成物理執行圖的過程很經典,過程中的 dependency 劃分,pipeline,stage 分割,task 生成 都是有條不紊,有理有據的。
## ComplexJob 的源代碼
~~~
package internals
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner
object complexJob {
def main(args: Array[String]) {
val sc = new SparkContext("local", "ComplexJob test")
val data1 = Array[(Int, Char)](
(1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'),
(5, 'e'), (3, 'f'),
(2, 'g'), (1, 'h'))
val rangePairs1 = sc.parallelize(data1, 3)
val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))
val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
(3, "C"), (4, "D"))
val pairs2 = sc.parallelize(data2, 2)
val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))
val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
val rangePairs3 = sc.parallelize(data3, 2)
val rangePairs = rangePairs2.union(rangePairs3)
val result = hashPairs1.join(rangePairs)
result.foreachWith(i => i)((x, i) => println("[result " + i + "] " + x))
println(result.toDebugString)
}
}
~~~