<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                前三章從 job 的角度介紹了用戶寫的 program 如何一步步地被分解和執行。這一章主要從架構的角度來討論 master,worker,driver 和 executor 之間怎么協調來完成整個 job 的運行。 > 實在不想在文檔中貼過多的代碼,這章貼這么多,只是為了方面自己回頭 debug 的時候可以迅速定位,不想看代碼的話,直接看圖和描述即可。 ## 部署圖 重新貼一下 Overview 中給出的部署圖: ![deploy](http://spark-internals.books.yourtion.com/markdown/PNGfigures/deploy.png) 接下來分階段討論并細化這個圖。 ## Job 提交 下圖展示了driver program(假設在 master node 上運行)如何生成 job,并提交到 worker node 上執行。 ![JobSubmission](http://spark-internals.books.yourtion.com/markdown/PNGfigures/JobSubmission.png) Driver 端的邏輯如果用代碼表示: ~~~ finalRDD.action() => sc.runJob() // generate job, stages and tasks => dagScheduler.runJob() => dagScheduler.submitJob() => dagSchedulerEventProcessActor ! JobSubmitted => dagSchedulerEventProcessActor.JobSubmitted() => dagScheduler.handleJobSubmitted() => finalStage = newStage() => mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size) => dagScheduler.submitStage() => missingStages = dagScheduler.getMissingParentStages() => dagScheduler.subMissingTasks(readyStage) // add tasks to the taskScheduler => taskScheduler.submitTasks(new TaskSet(tasks)) => fifoSchedulableBuilder.addTaskSetManager(taskSet) // send tasks => sparkDeploySchedulerBackend.reviveOffers() => driverActor ! ReviveOffers => sparkDeploySchedulerBackend.makeOffers() => sparkDeploySchedulerBackend.launchTasks() => foreach task CoarseGrainedExecutorBackend(executorId) ! LaunchTask(serializedTask) ~~~ 代碼的文字描述: 當用戶的 program 調用?`val sc = new SparkContext(sparkConf)`?時,這個語句會幫助 program 啟動諸多有關 driver 通信、job 執行的對象、線程、actor等,**該語句確立了 program 的 driver 地位。** ### 生成 Job 邏輯執行圖 Driver program 中的 transformation() 建立 computing chain(一系列的 RDD),每個 RDD 的 compute() 定義數據來了怎么計算得到該 RDD 中 partition 的結果,getDependencies() 定義 RDD 之間 partition 的數據依賴。 ### 生成 Job 物理執行圖 每個 action() 觸發生成一個 job,在 dagScheduler.runJob() 的時候進行 stage 劃分,在 submitStage() 的時候生成該 stage 包含的具體的 ShuffleMapTasks 或者 ResultTasks,然后將 tasks 打包成 TaskSet 交給 taskScheduler,如果 taskSet 可以運行就將 tasks 交給 sparkDeploySchedulerBackend 去分配執行。 ### 分配 Task sparkDeploySchedulerBackend 接收到 taskSet 后,會通過自帶的 DriverActor 將 serialized tasks 發送到調度器指定的 worker node 上的 CoarseGrainedExecutorBackend Actor上。 ## Job 接收 Worker 端接收到 tasks 后,執行如下操作 ~~~ coarseGrainedExecutorBackend ! LaunchTask(serializedTask) => executor.launchTask() => executor.threadPool.execute(new TaskRunner(taskId, serializedTask)) ~~~ **executor 將 task 包裝成 taskRunner,并從線程池中抽取出一個空閑線程運行 task。一個 CoarseGrainedExecutorBackend 進程有且僅有一個 executor 對象。** ## Task 運行 下圖展示了 task 被分配到 worker node 上后的執行流程及 driver 如何處理 task 的 result。 ![TaskExecution](http://spark-internals.books.yourtion.com/markdown/PNGfigures/taskexecution.png) Executor 收到 serialized 的 task 后,先 deserialize 出正常的 task,然后運行 task 得到其執行結果 directResult,這個結果要送回到 driver 那里。但是通過 Actor 發送的數據包不易過大,**如果 result 比較大(比如 groupByKey 的 result)先把 result 存放到本地的“內存+磁盤”上,由 blockManager 來管理,只把存儲位置信息(indirectResult)發送給 driver**,driver 需要實際的 result 的時候,會通過 HTTP 去 fetch。如果 result 不大(小于`spark.akka.frameSize = 10MB`),那么直接發送給 driver。 上面的描述還有一些細節:如果 task 運行結束生成的 directResult > akka.frameSize,directResult 會被存放到由 blockManager 管理的本地“內存+磁盤”上。**BlockManager 中的 memoryStore 開辟了一個 LinkedHashMap 來存儲要存放到本地內存的數據。**LinkedHashMap 存儲的數據總大小不超過`Runtime.getRuntime.maxMemory * spark.storage.memoryFraction(default 0.6)`?。如果 LinkedHashMap 剩余空間不足以存放新來的數據,就將數據交給 diskStore 存放到磁盤上,但前提是該數據的 storageLevel 中包含“磁盤”。 ~~~ In TaskRunner.run() // deserialize task, run it and then send the result to => coarseGrainedExecutorBackend.statusUpdate() => task = ser.deserialize(serializedTask) => value = task.run(taskId) => directResult = new DirectTaskResult(ser.serialize(value)) => if( directResult.size() > akkaFrameSize() ) indirectResult = blockManager.putBytes(taskId, directResult, MEMORY+DISK+SER) else return directResult => coarseGrainedExecutorBackend.statusUpdate(result) => driver ! StatusUpdate(executorId, taskId, result) ~~~ ShuffleMapTask 和 ResultTask 生成的 result 不一樣。**ShuffleMapTask 生成的是 MapStatus**,MapStatus 包含兩項內容:一是該 task 所在的 BlockManager 的 BlockManagerId(實際是 executorId + host, port, nettyPort),二是 task 輸出的每個 FileSegment 大小。**ResultTask 生成的 result 的是 func 在 partition 上的執行結果。**比如 count() 的 func 就是統計 partition 中 records 的個數。由于 ShuffleMapTask 需要將 FileSegment 寫入磁盤,因此需要輸出流 writers,這些 writers 是由 blockManger 里面的 shuffleBlockManager 產生和控制的。 ~~~ In task.run(taskId) // if the task is ShuffleMapTask => shuffleMapTask.runTask(context) => shuffleWriterGroup = shuffleBlockManager.forMapTask(shuffleId, partitionId, numOutputSplits) => shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split, context)) => return MapStatus(blockManager.blockManagerId, Array[compressedSize(fileSegment)]) //If the task is ResultTask => return func(context, rdd.iterator(split, context)) ~~~ Driver 收到 task 的執行結果 result 后會進行一系列的操作:首先告訴 taskScheduler 這個 task 已經執行完,然后去分析 result。由于 result 可能是 indirectResult,需要先調用 blockManager.getRemoteBytes() 去 fech 實際的 result,這個過程下節會詳解。得到實際的 result 后,需要分情況分析,**如果是 ResultTask 的 result,那么可以使用 ResultHandler 對 result 進行 driver 端的計算(比如 count() 會對所有 ResultTask 的 result 作 sum)**,如果 result 是 ShuffleMapTask 的 MapStatus,那么需要將 MapStatus(ShuffleMapTask 輸出的 FileSegment 的位置和大小信息)**存放到 mapOutputTrackerMaster 中的 mapStatuses 數據結構中以便以后 reducer shuffle 的時候查詢**。如果 driver 收到的 task 是該 stage 中的最后一個 task,那么可以 submit 下一個 stage,如果該 stage 已經是最后一個 stage,那么告訴 dagScheduler job 已經完成。 ~~~ After driver receives StatusUpdate(result) => taskScheduler.statusUpdate(taskId, state, result.value) => taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result) => if result is IndirectResult serializedTaskResult = blockManager.getRemoteBytes(IndirectResult.blockId) => scheduler.handleSuccessfulTask(taskSetManager, tid, result) => taskSetManager.handleSuccessfulTask(tid, taskResult) => dagScheduler.taskEnded(result.value, result.accumUpdates) => dagSchedulerEventProcessActor ! CompletionEvent(result, accumUpdates) => dagScheduler.handleTaskCompletion(completion) => Accumulators.add(event.accumUpdates) // If the finished task is ResultTask => if (job.numFinished == job.numPartitions) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) => job.listener.taskSucceeded(outputId, result) => jobWaiter.taskSucceeded(index, result) => resultHandler(index, result) // if the finished task is ShuffleMapTask => stage.addOutputLoc(smt.partitionId, status) => if (all tasks in current stage have finished) mapOutputTrackerMaster.registerMapOutputs(shuffleId, Array[MapStatus]) mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses) => submitStage(stage) ~~~ ## Shuffle read 上一節描述了 task 運行過程及 result 的處理過程,這一節描述 reducer(需要 shuffle 的 task )是如何獲取到輸入數據的。關于 reducer 如何處理輸入數據已經在上一章的 shuffle read 中解釋了。 **問題:reducer 怎么知道要去哪里 fetch 數據?** ![readMapStatus](http://spark-internals.books.yourtion.com/markdown/PNGfigures/readMapStatus.png)reducer 首先要知道 parent stage 中 ShuffleMapTask 輸出的 FileSegments 在哪個節點。**這個信息在 ShuffleMapTask 完成時已經送到了 driver 的 mapOutputTrackerMaster,并存放到了 mapStatuses: HashMap?里面**,給定 stageId,可以獲取該 stage 中 ShuffleMapTasks 生成的 FileSegments 信息 Array[MapStatus],通過 Array(taskId) 就可以得到某個 task 輸出的 FileSegments 位置(blockManagerId)及每個 FileSegment 大小。 當 reducer 需要 fetch 輸入數據的時候,會首先調用 blockStoreShuffleFetcher 去獲取輸入數據(FileSegments)的位置。blockStoreShuffleFetcher 通過調用本地的 MapOutputTrackerWorker 去完成這個任務,MapOutputTrackerWorker 使用 mapOutputTrackerMasterActorRef 來與 mapOutputTrackerMasterActor 通信獲取 MapStatus 信息。blockStoreShuffleFetcher 對獲取到的 MapStatus 信息進行加工,提取出該 reducer 應該去哪些節點上獲取哪些 FileSegment 的信息,這個信息存放在 blocksByAddress 里面。之后,blockStoreShuffleFetcher 將獲取 FileSegment 數據的任務交給 basicBlockFetcherIterator。 ~~~ rdd.iterator() => rdd(e.g., ShuffledRDD/CoGroupedRDD).compute() => SparkEnv.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser) => blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer) => statuses = MapOutputTrackerWorker.getServerStatuses(shuffleId, reduceId) => blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = compute(statuses) => basicBlockFetcherIterator = blockManager.getMultiple(blocksByAddress, serializer) => itr = basicBlockFetcherIterator.flatMap(unpackBlock) ~~~ ![blocksByAddress](http://spark-internals.books.yourtion.com/markdown/PNGfigures/blocksByAddress.png) basicBlockFetcherIterator 收到獲取數據的任務后,會生成一個個 fetchRequest,**每個 fetchRequest 包含去某個節點獲取若干個 FileSegments 的任務。**圖中展示了 reducer-2 需要從三個 worker node 上獲取所需的白色 FileSegment (FS)。總的數據獲取任務由 blocksByAddress 表示,要從第一個 node 獲取 4 個,從第二個 node 獲取 3 個,從第三個 node 獲取 4 個。 為了加快任務獲取過程,顯然要將總任務劃分為子任務(fetchRequest),然后為每個任務分配一個線程去 fetch。Spark 為每個 reducer 啟動 5 個并行 fetch 的線程(Hadoop 也是默認啟動 5 個)。由于 fetch 來的數據會先被放到內存作緩沖,因此一次 fetch 的數據不能太多,Spark 設定不能超過`spark.reducer.maxMbInFlight=48MB`。**注意這 48MB 的空間是由這 5 個 fetch 線程共享的**,因此在劃分子任務時,盡量使得 fetchRequest 不超過`48MB / 5 = 9.6MB`。如圖在 node 1 中,Size(FS0-2) + Size(FS1-2) 9.6MB,因此要在 t1-r2 和 t2-r2 處斷開,所以圖中有兩個 fetchRequest 都是要去 node 1 fetch。**那么會不會有 fetchRequest 超過 9.6MB?**當然會有,如果某個 FileSegment 特別大,仍然需要一次性將這個 FileSegment fetch 過來。另外,如果 reducer 需要的某些 FileSegment 就在本節點上,那么直接進行 local read。最后,將 fetch 來的 FileSegment 進行 deserialize,將里面的 records 以 iterator 的形式提供給 rdd.compute(),整個 shuffle read 結束。 ~~~ In basicBlockFetcherIterator: // generate the fetch requests => basicBlockFetcherIterator.initialize() => remoteRequests = splitLocalRemoteBlocks() => fetchRequests ++= Utils.randomize(remoteRequests) // fetch remote blocks => sendRequest(fetchRequests.dequeue()) until Size(fetchRequests) > maxBytesInFlight => blockManager.connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage) => fetchResults.put(new FetchResult(blockId, sizeMap(blockId))) => dataDeserialize(blockId, blockMessage.getData, serializer) // fetch local blocks => getLocalBlocks() => fetchResults.put(new FetchResult(id, 0, () => iter)) ~~~ 下面再討論一些細節問題: **reducer 如何將 fetchRequest 信息發送到目標節點?目標節點如何處理 fetchRequest 信息,如何讀取 FileSegment 并回送給 reducer?** ![fetchrequest](http://spark-internals.books.yourtion.com/markdown/PNGfigures/fetchrequest.png) rdd.iterator() 碰到 ShuffleDependency 時會調用 BasicBlockFetcherIterator 去獲取 FileSegments。BasicBlockFetcherIterator 使用 blockManager 中的 connectionManager 將 fetchRequest 發送給其他節點的 connectionManager。connectionManager 之間使用 NIO 模式通信。其他節點,比如 worker node 2 上的 connectionManager 收到消息后,會交給 blockManagerWorker 處理,blockManagerWorker 使用 blockManager 中的 diskStore 去本地磁盤上讀取 fetchRequest 要求的 FileSegments,然后仍然通過 connectionManager 將 FileSegments 發送回去。如果使用了 FileConsolidation,diskStore 還需要 shuffleBlockManager 來提供 blockId 所在的具體位置。如果 FileSegment 不超過`spark.storage.memoryMapThreshold=8KB`?,那么 diskStore 在讀取 FileSegment 的時候會直接將 FileSegment 放到內存中,否則,會使用 RandomAccessFile 中 FileChannel 的內存映射方法來讀取 FileSegment(這樣可以將大的 FileSegment 加載到內存)。 當 BasicBlockFetcherIterator 收到其他節點返回的 serialized FileSegments 后會將其放到 fetchResults: Queue 里面,并進行 deserialization,所以?**fetchResults: Queue 就相當于在 Shuffle details 那一章提到的 softBuffer。**如果 BasicBlockFetcherIterator 所需的某些 FileSegments 就在本地,會通過 diskStore 直接從本地文件讀取,并放到 fetchResults 里面。最后 reducer 一邊從 FileSegment 中邊讀取 records 一邊處理。 ~~~ After the blockManager receives the fetch request => connectionManager.receiveMessage(bufferMessage) => handleMessage(connectionManagerId, message, connection) // invoke blockManagerWorker to read the block (FileSegment) => blockManagerWorker.onBlockMessageReceive() => blockManagerWorker.processBlockMessage(blockMessage) => buffer = blockManager.getLocalBytes(blockId) => buffer = diskStore.getBytes(blockId) => fileSegment = diskManager.getBlockLocation(blockId) => shuffleManager.getBlockLocation() => if(fileSegment < minMemoryMapBytes) buffer = ByteBuffer.allocate(fileSegment) else channel.map(MapMode.READ_ONLY, segment.offset, segment.length) ~~~ 每個 reducer 都持有一個 BasicBlockFetcherIterator,一個 BasicBlockFetcherIterator 理論上可以持有 48MB 的 fetchResults。每當 fetchResults 中有一個 FileSegment 被讀取完,就會一下子去 fetch 很多個 FileSegment,直到 48MB 被填滿。 ~~~ BasicBlockFetcherIterator.next() => result = results.task() => while (!fetchRequests.isEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { sendRequest(fetchRequests.dequeue()) } => result.deserialize() ~~~ ## Discussion 這一章寫了三天,也是我這個月來心情最不好的幾天。Anyway,繼續總結。 架構部分其實沒有什么好說的,就是設計時盡量功能獨立,模塊獨立,松耦合。BlockManager 設計的不錯,就是管的東西太多(數據塊、內存、磁盤、通信)。 這一章主要探討了系統中各個模塊是怎么協同來完成 job 的生成、提交、運行、結果收集、結果計算以及 shuffle 的。貼了很多代碼,也畫了很多圖,雖然細節很多,但遠沒有達到源碼的細致程度。如果有地方不明白的,請根據描述閱讀一下源碼吧。 如果想進一步了解 blockManager,可以參閱 Jerry Shao 寫的?[Spark源碼分析之-Storage模塊](http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/)。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看