<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # 概覽 拿到系統后,部署系統是第一件事,那么系統部署成功以后,**各個節點都啟動了哪些服務?** ## 部署圖 ![deploy](https://box.kancloud.cn/2015-07-22_55af27d281727.png) 從部署圖中可以看到 * 整個集群分為 Master 節點和 Worker 節點,相當于 Hadoop 的 Master 和 Slave 節點。 * Master 節點上常駐 Master 守護進程,負責管理全部的 Worker 節點。 * Worker 節點上常駐 Worker 守護進程,負責與 Master 節點通信并管理 executors。 * Driver 官方解釋是 “The process running the main() function of the application and creating the SparkContext”。Application 就是用戶自己寫的 Spark 程序(driver program),比如 WordCount.scala。如果 driver program 在 Master 上運行,比如在 Master 上運行 ~~~ ./bin/run-example SparkPi 10 ~~~ 那么 SparkPi 就是 Master 上的 Driver。如果是 YARN 集群,那么 Driver 可能被調度到 Worker 節點上運行(比如上圖中的 Worker Node 2)。另外,如果直接在自己的 PC 上運行 driver program,比如在 Eclipse 中運行 driver program,使用 ~~~ val sc = new SparkContext("spark://master:7077", "AppName") ~~~ 去連接 master 的話,driver 就在自己的 PC 上,但是不推薦這樣的方式,因為 PC 和 Workers 可能不在一個局域網,driver 和 executor 之間的通信會很慢。 * 每個 Worker 上存在一個或者多個 ExecutorBackend 進程。每個進程包含一個 Executor 對象,該對象持有一個線程池,每個線程可以執行一個 task。 * 每個 application 包含一個 driver 和多個 executors,每個 executor 里面運行的 tasks 都屬于同一個 application。 * 在 Standalone 版本中,ExecutorBackend 被實例化成 CoarseGrainedExecutorBackend 進程。 > 在我部署的集群中每個 Worker 只運行了一個 CoarseGrainedExecutorBackend 進程,沒有發現如何配置多個 CoarseGrainedExecutorBackend 進程。(應該是運行多個 applications 的時候會產生多個進程,這個我還沒有實驗,) > > 想了解 Worker 和 Executor 的關系詳情,可以參閱?[@OopsOutOfMemory](http://weibo.com/oopsoom)?同學寫的?[Spark Executor Driver資源調度小結](http://blog.csdn.net/oopsoom/article/details/38763985)?。 * Worker 通過持有 ExecutorRunner 對象來控制 CoarseGrainedExecutorBackend 的啟停。 了解了部署圖之后,我們先給出一個 job 的例子,然后概覽一下 job 如何生成與運行。 # Job 例子 我們使用 Spark 自帶的 examples 包中的 GroupByTest,假設在 Master 節點運行,命令是 ~~~ /* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] */ bin/run-example GroupByTest 100 10000 1000 36 ~~~ GroupByTest 具體代碼如下 ~~~ package org.apache.spark.examples import java.util.Random import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ /** * Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] */ object GroupByTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("GroupBy Test") var numMappers = 100 var numKVPairs = 10000 var valSize = 1000 var numReducers = 36 val sc = new SparkContext(sparkConf) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random var arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { val byteArr = new Array[Byte](valSize) ranGen.nextBytes(byteArr) arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) } arr1 }.cache // Enforce that everything has been calculated and in cache pairs1.count println(pairs1.groupByKey(numReducers).count) sc.stop() } } ~~~ 閱讀代碼后,用戶頭腦中 job 的執行流程是這樣的:?![deploy](https://box.kancloud.cn/2015-07-22_55af28144ad79.png) 具體流程很簡單,這里來估算下 data size 和執行結果: 1. 初始化 SparkConf()。 2. 初始化 numMappers=100, numKVPairs=10,000, valSize=1000, numReducers= 36。 3. 初始化 SparkContext。這一步很重要,是要確立 driver 的地位,里面包含創建 driver 所需的各種 actors 和 objects。 4. 每個 mapper 生成一個?`arr1: Array[(Int, Byte[])]`,length 為 numKVPairs。每一個 Byte[] 的 length 為 valSize,Int 為隨機生成的整數。`Size(arr1) = numKVPairs * (4 + valSize) = 10MB`,所以`Size(pairs1) = numMappers * Size(arr1) =1000MB`。這里的數值計算結果都是_約等于_。 5. 每個 mapper 將產生的 arr1 數組 cache 到內存。 6. 然后執行一個 action 操作 count(),來統計所有 mapper 中 arr1 中的元素個數,執行結果是?`numMappers * numKVPairs = 1,000,000`。這一步主要是為了將每個 mapper 產生的 arr1 數組 cache 到內存。 7. 在已經被 cache 的 paris1 上執行 groupByKey 操作,groupByKey 產生的 reducer (也就是 partition) 個數為 numReducers。理論上,如果 hash(Key) 比較平均的話,每個 reducer 收到的?record 個數為`numMappers * numKVPairs / numReducer = 27,777`,大小為?`Size(pairs1) / numReducer = 27MB`。 8. reducer 將收到的?`<Int, Byte[]>`?records 中擁有相同 Int 的 records 聚在一起,得到?`<Int, list(Byte[], Byte[], ..., Byte[])>`。 9. 最后 count 將所有 reducer 中 records 個數進行加和,最后結果實際就是 pairs1 中不同的 Int 總個數。 ## Job 邏輯執行圖 Job 的實際執行流程比用戶頭腦中的要復雜,需要先建立邏輯執行圖(或者叫數據依賴圖),然后劃分邏輯執行圖生成 DAG 型的物理執行圖,然后生成具體 task 執行。分析一下這個 job 的邏輯執行圖: 使用?`RDD.toDebugString`?可以看到整個 logical plan (RDD 的數據依賴關系)如下 ~~~ MapPartitionsRDD[3] at groupByKey at GroupByTest.scala:51 (36 partitions) ShuffledRDD[2] at groupByKey at GroupByTest.scala:51 (36 partitions) FlatMappedRDD[1] at flatMap at GroupByTest.scala:38 (100 partitions) ParallelCollectionRDD[0] at parallelize at GroupByTest.scala:38 (100 partitions) ~~~ 用圖表示就是:?![deploy](https://box.kancloud.cn/2015-07-22_55af287cc4393.png) > 需要注意的是 data in the partition 展示的是每個 partition 應該得到的計算結果,并不意味著這些結果都同時存在于內存中。 根據上面的分析可知: * 用戶首先 init 了一個0-99 的數組:?`0 until numMappers` * parallelize() 產生最初的 ParrallelCollectionRDD,每個 partition 包含一個整數 i。 * 執行 RDD 上的 transformation 操作(這里是 flatMap)以后,生成 FlatMappedRDD,其中每個 partition 包含一個 Array[(Int, Array[Byte])]。 * 第一個 count() 執行時,先在每個 partition 上執行 count,然后執行結果被發送到 driver,最后在 driver 端進行 sum。 * 由于 FlatMappedRDD 被 cache 到內存,因此這里將里面的 partition 都換了一種顏色表示。 * groupByKey 產生了后面兩個 RDD,為什么產生這兩個在后面章節討論。 * 如果 job 需要 shuffle,一般會產生 ShuffledRDD。該 RDD 與前面的 RDD 的關系類似于 Hadoop 中 mapper 輸出數據與 reducer 輸入數據之間的關系。 * MapPartitionsRDD 里包含 groupByKey() 的結果。 * 最后將 MapPartitionsRDD 中的 每個value(也就是Array[Byte])都轉換成 Iterable 類型。 * 最后的 count 與上一個 count 的執行方式類似。 **可以看到邏輯執行圖描述的是 job 的數據流:job 會經過哪些 transformation(),中間生成哪些 RDD 及 RDD 之間的依賴關系。** ## Job 物理執行圖 邏輯執行圖表示的是數據上的依賴關系,不是 task 的執行圖。在 Hadoop 中,用戶直接面對 task,mapper 和 reducer 的職責分明:一個進行分塊處理,一個進行 aggregate。Hadoop 中 整個數據流是固定的,只需要填充 map() 和 reduce() 函數即可。Spark 面對的是更復雜的數據處理流程,數據依賴更加靈活,很難將數據流和物理 task 簡單地統一在一起。因此 Spark 將數據流和具體 task 的執行流程分開,并設計算法將邏輯執行圖轉換成 task 物理執行圖,轉換算法后面的章節討論。 針對這個 job,我們先畫出它的物理執行 DAG 圖如下:?![deploy](https://box.kancloud.cn/2015-07-22_55af28ba18e46.png) 可以看到 GroupByTest 這個 application 產生了兩個 job,第一個 job 由第一個 action(也就是`pairs1.count`)觸發產生,分析一下第一個 job: * 整個 job 只包含 1 個 stage(不明白什么是stage沒關系,后面章節會解釋,這里只需知道有這樣一個概念)。 * Stage 0 包含 100 個 ResultTask。 * 每個 task 先計算 flatMap,產生 FlatMappedRDD,然后執行 action() 也就是 count(),統計每個 partition 里 records 的個數,比如 partition 99 里面只含有 9 個 records。 * 由于 pairs1 被聲明要進行 cache,因此在 task 計算得到 FlatMappedRDD 后會將其包含的 partitions 都 cache 到 executor 的內存。 * task 執行完后,driver 收集每個 task 的執行結果,然后進行 sum()。 * job 0 結束。 第二個 job 由?`pairs1.groupByKey(numReducers).count`?觸發產生。分析一下該 job: * 整個 job 包含 2 個 stage。 * Stage 1 包含 100 個 ShuffleMapTask,每個 task 負責從 cache 中讀取 pairs1 的一部分數據并將其進行類似 Hadoop 中 mapper 所做的 partition,最后將 partition 結果寫入本地磁盤。 * Stage 0 包含 36 個 ResultTask,每個 task 首先 shuffle 自己要處理的數據,邊 fetch 數據邊進行 aggregate 以及后續的 mapPartitions() 操作,最后進行 count() 計算得到 result。 * task 執行完后,driver 收集每個 task 的執行結果,然后進行 sum()。 * job 1 結束。 可以看到物理執行圖并不簡單。與 MapReduce 不同的是,Spark 中一個 application 可能包含多個 job,每個 job 包含多個 stage,每個 stage 包含多個 task。**怎么劃分 job,怎么劃分 stage,怎么劃分 task 等等問題會在后面的章節介紹。** ## Discussion 到這里,我們對整個系統和 job 的生成與執行有了概念,而且還探討了 cache 等特性。 接下來的章節會討論 job 生成與執行涉及到的系統核心功能,包括: 1. 如何生成邏輯執行圖 2. 如何生成物理執行圖 3. 如何提交與調度 Job 4. Task 如何生成、執行與結果處理 5. 如何進行 shuffle 6. cache機制 7. broadcast 機制
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看