<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                > 原文出處:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/ > 作者:王 龍, 軟件開發工程師, IBM # 使用 Scala 語言開發 Spark 應用程序 本文旨在通過具有實際意義的案例向讀者介紹如何使用 Scala 語言開發 Spark 應用程序并在 Spark 集群上運行。本文涉及的所有源數據都將從 HDFS(Hadoop Distributed File System)讀取,部分案例的輸出結果也會寫入到 HDFS, 所以通過閱讀本文,讀者也會學習到 Spark 和 HDFS 交互的一些知識。 [TOC=2,3] ## 引言 在當前這個信息時代里,大數據所蘊含的價值已經被絕大多數的企業所認知。在 IT 的世界里,往往都是需求驅動技術的發展和革新。Hadoop 在這個大背景下應運而生,它給我們提供了一個存儲和處理大數據的良好的解決方案,短短的幾年時間里,它已無處不在,事實上它已經成了大數據技術的代名詞。然而在人們越來越多的使用 Hadoop 提供的 MapReduce 框架處理大數據的時候,卻發現它存在許多天生的缺陷, 如效率低,編程模型不夠靈活,只適合做離線計算等。Spark 的出現無疑讓諸多大數據計算的從業者和愛好者眼前一亮,它基于內存,并且提供了更加豐富的算子使得我們可以更高效和靈活的處理大數據。本文將從實例出發,向讀者介紹如何使用 Scala 語言 (Spark 框架的開發語言) 開發 Spark 應用程序并且將其運行在 Spark 集群環境里。本文假設讀者已經對 Spark 基本原理和編程模型有了基本的了解,并且已經掌握了 Scala 語言開發的基礎知識,那么通過閱讀本文,相信您一定會對 Spark 應用程序的開發有更深入的認識。接下來,就讓我們開始 Spark 應用程序的開發之旅吧。 ## 關于 Spark Spark 由加州大學伯克利分校 AMP 實驗室 (Algorithms, Machines, and People Lab) 開發,可用來構建大型的、低延遲的大數據處理的應用程序。并且提供了用于機器學習 (MLlib), 流計算(Streaming), 圖計算 (GraphX) 等子模塊,最新的 1.4.0 版本更是提供了與 R 語言的集成,這使得 Spark 幾乎成為了多領域通吃的全能技術。Spark 對數據的存儲,轉換,以及計算都是基于一個叫 RDD(Resilient Distributed Dataset) 分布式內存的抽象,應用程序對需要計算的數據的操作都是通過對 RDD 的一系列轉化 (Transformation) 和動作 (Action) 算子完成的,其中轉化算子可以把一個 RDD 轉成另一個 RDD,如 filter 算子可以通過添加過濾條件生成一個只包含符合條件的數據的新的 RDD。動作算子負責完成最終的計算,如 count 算子可以計算出整個 RDD 表示的數據集中元素的個數。關于 Spark 所支持的算子 以及使用方法請參考?[Spark 官方網站](http://spark.apache.org/docs/latest/programming-guide.html)。本文所使用的 Spark 的發行版是 1.3.1,讀者可根據需要下載相應的版本。 ## 關于 Scala Scala 語言是一門類 Java 的多范式語言,其設計初衷就是為了繼承函數式編程的面向對象編程的各種特性,正如?[Scala 語言官網](http://www.scala-lang.org/)?描述的那樣:Object-Oriented Meets Functional, 就是給出了一個關于 Scala 語言特性的最簡單明了的概括。 Spark 框架使用 Scala 語言開發,那么使用 Scala 語言開發 Spark 應用程序就變成一件很自然的事情,雖然 Spark 提供了面向 Python,Java 等語言的編程接口,但是從各個方面來看使用 Scala 編程都是最簡單最容易理解的,特別是當程序出現異常或者是需要通過學習源碼來定位問題時,您會發現學習 Scala 語言來編寫 Spark 應用程序是多么有意義的事情。關于 Scala 語言,如果您還沒有基礎,請參考 * [Scala 語言官網](http://www.scala-lang.org/) * [Scala 中文網](http://scalachina.com/node/61) * Twitter 提供的?[Scala 課堂](http://twitter.github.io/scala_school/zh_cn/index.html) * [面向 Java 開發人員的 Scala 指南系列](http://www.ibm.com/developerworks/cn/java/j-scala/) 由于 Spark 1.3.1 版本使用的是 Scala 2.10.x 版本,所以本文將使用 Scala 2.10.5 版本。 ## 搭建開發環境 1. 安裝 Scala IDE 搭建 Scala 語言開發環境很容易,[Scala IDE 官網](http://scala-ide.org/download/current.html)?下載合適的版本并解壓就可以完成安裝,本文使用的版本是 4.1.0。 2. 安裝 Scala 語言包 如果下載的 Scala IDE 自帶的 Scala 語言包與 Spark 1.3.1 使用的 Scala 版本 (2.10.x) 不一致,那么就需要下載和本文所使用的 Spark 所匹配的版本,以確保實現的 Scala 程序不會因為版本問題而運行失敗。 請下載并安裝?[Scala 2.10.5 版本](http://www.scala-lang.org/download/2.10.5.html) 3. 安裝 JDK 如果您的機器上沒有安裝 JDK,請下載并安裝 1.6 版本以上的 JDK。 4. 創建并配置 Spark 工程 打開 Scala IDE,創建一個名稱為 spark-exercise 的 Scala 工程。 圖 1\. 創建 scala 工程 ![圖 1\. 創建 scala 工程](https://box.kancloud.cn/2015-09-23_56023bc18b8c1.jpg) 在工程目錄下創建一個 lib 文件夾,并且把您的 Spark 安裝包下的 spark-assembly jar 包拷貝到 lib 目錄下。 圖 2\. Spark 開發 jar 包 ![圖 2\. Spark 開發 jar 包](https://box.kancloud.cn/2015-09-23_56023bc2ca48f.jpg) 并且添加該 jar 包到工程的 classpath 并配置工程使用剛剛安裝的 Scala 2.10.5 版本.,工程目錄結構如下。 圖 3\. 添加 jar 包到 classpath ![圖 3\. 添加 jar 包到 classpath](https://box.kancloud.cn/2015-09-23_56023bc4587c0.jpg) ## 運行環境介紹 為了避免讀者對本文案例運行環境產生困惑,本節會對本文用到的集群環境的基本情況做個簡單介紹。 * 本文所有實例數據存儲的環境是一個 8 個機器的 Hadoop 集群,文件系統總容量是 1.12T,NameNode 叫 hadoop036166, 服務端口是 9000。讀者可以不關心具體的節點分布,因為這個不會影響到您閱讀后面的文章。 * 本文運行實例程序使用的 Spark 集群是一個包含四個節點的 Standalone 模式的集群, 其中包含一個 Master 節點 (監聽端口 7077) 和三個 Worker 節點,具體分布如下: | Server Name | Role | | --- | --- | | hadoop036166 | Master | | hadoop036187 | Worker | | hadoop036188 | Worker | | hadoop036227 | Worker | * Spark 提供一個 Web UI 去查看集群信息并且監控執行結果,默認地址是:http://:8080 ,對于該實例提交后我們也可以到 web 頁面上去查看執行結果,當然也可以通過查看日志去找到執行結果。 圖 4\. Spark 的 web console ![圖 4\. Spark 的 web console](https://box.kancloud.cn/2015-09-23_56023bc55d64d.jpg) ## 案例分析與編程實現 ### 案例一 a. 案例描述 提起 Word Count(詞頻數統計),相信大家都不陌生,就是統計一個或者多個文件中單詞出現的次數。本文將此作為一個入門級案例,由淺入深的開啟使用 Scala 編寫 Spark 大數據處理程序的大門。 b.案例分析 對于詞頻數統計,用 Spark 提供的算子來實現,我們首先需要將文本文件中的每一行轉化成一個個的單詞, 其次是對每一個出現的單詞進行記一次數,最后就是把所有相同單詞的計數相加得到最終的結果。 對于第一步我們自然的想到使用 flatMap 算子把一行文本 split 成多個單詞,然后對于第二步我們需要使用 map 算子把單個的單詞轉化成一個有計數的 Key-Value 對,即 word -> (word,1). 對于最后一步統計相同單詞的出現次數,我們需要使用 reduceByKey 算子把相同單詞的計數相加得到最終結果。 c. 編程實現 清單 1.SparkWordCount 類源碼 ~~~ import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object SparkWordCount { def FILE_NAME:String = "word_count_results_"; def main(args:Array[String]) { if (args.length < 1) { println("Usage:SparkWordCount FileName"); System.exit(1); } val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program"); val sc = new SparkContext(conf); val textFile = sc.textFile(args(0)); val wordCounts = textFile.flatMap(line => line.split(" ")).map( word => (word, 1)).reduceByKey((a, b) => a + b) //print the results,for debug use. //println("Word Count program running results:"); //wordCounts.collect().foreach(e => { //val (k,v) = e //println(k+"="+v) //}); wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis()); println("Word Count program running results are successfully saved."); } } ~~~ d. 提交到集群執行 本實例中, 我們將統計 HDFS 文件系統中/user/fams 目錄下所有 txt 文件中詞頻數。其中 spark-exercise.jar 是 Spark 工程打包后的 jar 包,這個 jar 包執行時會被上傳到目標服務器的/home/fams 目錄下。運行此實例的具體命令如下: 清單 2.SparkWordCount 類執行命令 ~~~ ./spark-submit \ --class com.ibm.spark.exercise.basic.SparkWordCount \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/*.txt ~~~ e. 監控執行狀態 該實例把最終的結果存儲在了 HDFS 上,那么如果程序運行正常我們可以在 HDFS 上找到生成的文件信息 圖 5\. 案例一輸出結果 ![圖 5\. 案例一輸出結果](https://box.kancloud.cn/2015-09-23_56023bc6e5d2b.jpg) 打開 Spark 集群的 Web UI, 可以看到剛才提交的 job 的執行結果。 圖 6\. 案例一完成狀態 ![圖 6\. 案例一完成狀態](https://box.kancloud.cn/2015-09-23_56023bc8f083a.jpg) 如果程序還沒運行完成,那么我們可以在 Running Applications 列表里找到它。 ### 案例二 a. 案例描述 該案例中,我們將假設我們需要統計一個 1000 萬人口的所有人的平均年齡,當然如果您想測試 Spark 對于大數據的處理能力,您可以把人口數放的更大,比如 1 億人口,當然這個取決于測試所用集群的存儲容量。假設這些年齡信息都存儲在一個文件里,并且該文件的格式如下,第一列是 ID,第二列是年齡。 圖 7\. 案例二測試數據格式預覽 ![圖 7\. 案例二測試數據格式預覽](https://box.kancloud.cn/2015-09-23_56023bca97617.jpg) 現在我們需要用 Scala 寫一個生成 1000 萬人口年齡數據的文件,源程序如下: 清單 3\. 年齡信息文件生成類源碼 ~~~ import java.io.FileWriter import java.io.File import scala.util.Random object SampleDataFileGenerator { def main(args:Array[String]) { val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false) val rand = new Random() for ( i <- 1 to 10000000) { writer.write( i + " " + rand.nextInt(100)) writer.write(System.getProperty("line.separator")) } writer.flush() writer.close() } } ~~~ b. 案例分析 要計算平均年齡,那么首先需要對源文件對應的 RDD 進行處理,也就是將它轉化成一個只包含年齡信息的 RDD,其次是計算元素個數即為總人數,然后是把所有年齡數加起來,最后平均年齡=總年齡/人數。 對于第一步我們需要使用 map 算子把源文件對應的 RDD 映射成一個新的只包含年齡數據的 RDD,很顯然需要對在 map 算子的傳入函數中使用 split 方法,得到數組后只取第二個元素即為年齡信息;第二步計算數據元素總數需要對于第一步映射的結果 RDD 使用 count 算子;第三步則是使用 reduce 算子對只包含年齡信息的 RDD 的所有元素用加法求和;最后使用除法計算平均年齡即可。 由于本例輸出結果很簡單,所以只打印在控制臺即可。 c. 編程實現 清單 4.AvgAgeCalculator 類源碼 ~~~ import org.apache.spark.SparkConf import org.apache.spark.SparkContext object AvgAgeCalculator { def main(args:Array[String]) { if (args.length < 1){ println("Usage:AvgAgeCalculator datafile") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator") val sc = new SparkContext(conf) val dataFile = sc.textFile(args(0), 5); val count = dataFile.count() val ageData = dataFile.map(line => line.split(" ")(1)) val totalAge = ageData.map(age => Integer.parseInt( String.valueOf(age))).collect().reduce((a,b) => a+b) println("Total Age:" + totalAge + ";Number of People:" + count ) val avgAge : Double = totalAge.toDouble / count.toDouble println("Average Age is " + avgAge) } } ~~~ d. 提交到集群執行 要執行本實例的程序,需要將剛剛生成的年齡信息文件上傳到 HDFS 上,假設您剛才已經在目標機器上執行生成年齡信息文件的 Scala 類,并且文件被生成到了/home/fams 目錄下。 那么您需要運行一下 HDFS 命令把文件拷貝到 HDFS 的/user/fams 目錄。 清單 5\. 年齡信息文件拷貝到 HDFS 目錄的命令 ~~~ hdfs dfs –copyFromLocal /home/fams /user/fams ~~~ 清單 6.AvgAgeCalculator 類的執行命令 ~~~ ./spark-submit \ --class com.ibm.spark.exercise.basic.AvgAgeCalculator \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt ~~~ e. 監控執行狀態 在控制臺您可以看到如下所示信息: 圖 8\. 案例二輸出結果 ![圖 8\. 案例二輸出結果](https://box.kancloud.cn/2015-09-23_56023bcc52f63.jpg) 我們也可以到 Spark Web Console 去查看 Job 的執行狀態 圖 9\. 案例二完成狀態 ![圖 9\. 案例二完成狀態](https://box.kancloud.cn/2015-09-23_56023bcd7ccb5.jpg) ### 案例三 a. 案例描述 本案例假設我們需要對某個省的人口 (1 億) 性別還有身高進行統計,需要計算出男女人數,男性中的最高和最低身高,以及女性中的最高和最低身高。本案例中用到的源文件有以下格式, 三列分別是 ID,性別,身高 (cm)。 圖 10\. 案例三測試數據格式預覽 ![圖 10\. 案例三測試數據格式預覽](https://box.kancloud.cn/2015-09-23_56023bd4317be.jpg) 我們將用以下 Scala 程序生成這個文件,源碼如下: 清單 7\. 人口信息生成類源碼 ~~~ import java.io.FileWriter import java.io.File import scala.util.Random object PeopleInfoFileGenerator { def main(args:Array[String]) { val writer = new FileWriter(new File("C:\\LOCAL_DISK_D\\sample_people_info.txt"),false) val rand = new Random() for ( i <- 1 to 100000000) { var height = rand.nextInt(220) if (height < 50) { height = height + 50 } var gender = getRandomGender if (height < 100 && gender == "M") height = height + 100 if (height < 100 && gender == "F") height = height + 50 writer.write( i + " " + getRandomGender + " " + height) writer.write(System.getProperty("line.separator")) } writer.flush() writer.close() println("People Information File generated successfully.") } def getRandomGender() :String = { val rand = new Random() val randNum = rand.nextInt(2) + 1 if (randNum % 2 == 0) { "M" } else { "F" } } } ~~~ b. 案例分析 對于這個案例,我們要分別統計男女的信息,那么很自然的想到首先需要對于男女信息從源文件的對應的 RDD 中進行分離,這樣會產生兩個新的 RDD,分別包含男女信息;其次是分別對男女信息對應的 RDD 的數據進行進一步映射,使其只包含身高數據,這樣我們又得到兩個 RDD,分別對應男性身高和女性身高;最后需要對這兩個 RDD 進行排序,進而得到最高和最低的男性或女性身高。 對于第一步,也就是分離男女信息,我們需要使用 filter 算子,過濾條件就是包含”M” 的行是男性,包含”F”的行是女性;第二步我們需要使用 map 算子把男女各自的身高數據從 RDD 中分離出來;第三步我們需要使用 sortBy 算子對男女身高數據進行排序。 c. 編程實現 在實現上,有一個需要注意的點是在 RDD 轉化的過程中需要把身高數據轉換成整數,否則 sortBy 算子會把它視為字符串,那么排序結果就會受到影響,例如 身高數據如果是:123,110,84,72,100,那么升序排序結果將會是 100,110,123,72,84,顯然這是不對的。 清單 8.PeopleInfoCalculator 類源碼 ~~~ object PeopleInfoCalculator { def main(args:Array[String]) { if (args.length < 1){ println("Usage:PeopleInfoCalculator datafile") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator") val sc = new SparkContext(conf) val dataFile = sc.textFile(args(0), 5); val maleData = dataFile.filter(line => line.contains("M")).map( line => (line.split(" ")(1) + " " + line.split(" ")(2))) val femaleData = dataFile.filter(line => line.contains("F")).map( line => (line.split(" ")(1) + " " + line.split(" ")(2))) //for debug use //maleData.collect().foreach { x => println(x)} //femaleData.collect().foreach { x => println(x)} val maleHeightData = maleData.map(line => line.split(" ")(1).toInt) val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt) //for debug use //maleHeightData.collect().foreach { x => println(x)} //femaleHeightData.collect().foreach { x => println(x)} val lowestMale = maleHeightData.sortBy(x => x,true).first() val lowestFemale = femaleHeightData.sortBy(x => x,true).first() //for debug use //maleHeightData.collect().sortBy(x => x).foreach { x => println(x)} //femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)} val highestMale = maleHeightData.sortBy(x => x, false).first() val highestFemale = femaleHeightData.sortBy(x => x, false).first() println("Number of Male Peole:" + maleData.count()) println("Number of Female Peole:" + femaleData.count()) println("Lowest Male:" + lowestMale) println("Lowest Female:" + lowestFemale) println("Highest Male:" + highestMale) println("Highest Female:" + highestFemale) } } ~~~ d. 提交到集群執行 在提交該程序到集群執行之前,我們需要將剛才生成的人口信息數據文件上傳到 HDFS 集群,具體命令可以參照上文。 清單 9.PeopleInfoCalculator 類的執行命令 ~~~ ./spark-submit \ --class com.ibm.spark.exercise.basic.PeopleInfoCalculator \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 3g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt ~~~ e. 監控執行狀態 對于該實例,如程序中打印的一樣,會在控制臺顯示如下信息: 圖 11\. 案例三輸出結果 ![圖 11\. 案例三輸出結果](https://box.kancloud.cn/2015-09-23_56023bd575c2a.jpg) 在 Spark Web Console 里可以看到具體的執行狀態信息 圖 12\. 案例三完成狀態 ![圖 12\. 案例三完成狀態](https://box.kancloud.cn/2015-09-23_56023bd6b42ca.jpg) ### 案例四 a. 案例描述 該案例中我們假設某搜索引擎公司要統計過去一年搜索頻率最高的 K 個科技關鍵詞或詞組,為了簡化問題,我們假設關鍵詞組已經被整理到一個或者多個文本文件中,并且文檔具有以下格式。 圖 13\. 案例四測試數據格式預覽 ![圖 13\. 案例四測試數據格式預覽](https://box.kancloud.cn/2015-09-23_56023bd92197a.jpg) 我們可以看到一個關鍵詞或者詞組可能出現多次,并且大小寫格式可能不一致。 b. 案例分析 要解決這個問題,首先我們需要對每個關鍵詞出現的次數進行計算,在這個過程中需要識別不同大小寫的相同單詞或者詞組,如”Spark”和“spark” 需要被認定為一個單詞。對于出現次數統計的過程和 word count 案例類似;其次我們需要對關鍵詞或者詞組按照出現的次數進行降序排序,在排序前需要把 RDD 數據元素從 (k,v) 轉化成 (v,k);最后取排在最前面的 K 個單詞或者詞組。 對于第一步,我們需要使用 map 算子對源數據對應的 RDD 數據進行全小寫轉化并且給詞組記一次數,然后調用 reduceByKey 算子計算相同詞組的出現次數;第二步我們需要對第一步產生的 RDD 的數據元素用 sortByKey 算子進行降序排序;第三步再對排好序的 RDD 數據使用 take 算子獲取前 K 個數據元素。 c. 編程實現 清單 10.TopKSearchKeyWords 類源碼 ~~~ import org.apache.spark.SparkConf import org.apache.spark.SparkContext object TopKSearchKeyWords { def main(args:Array[String]){ if (args.length < 2) { println("Usage:TopKSearchKeyWords KeyWordsFile K"); System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words") val sc = new SparkContext(conf) val srcData = sc.textFile(args(0)) val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b) //for debug use //countedData.foreach(x => println(x)) val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false) val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) } topKData.foreach(println) } } ~~~ d. 提交到集群執行 清單 11.TopKSearchKeyWords 類的執行命令 ~~~ ./spark-submit \ --class com.ibm.spark.exercise.basic.TopKSearchKeyWords \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt ~~~ e. 監控執行狀態 如果程序成功執行,我們將在控制臺看到以下信息。當然讀者也可以仿照案例二和案例三那樣,自己嘗試使用 Scala 寫一段小程序生成此案例需要的源數據文件,可以根據您的 HDFS 集群的容量,生成盡可能大的文件,用來測試本案例提供的程序。 圖 14\. 案例四輸出結果 ![圖 14\. 案例四輸出結果](https://box.kancloud.cn/2015-09-23_56023bda5ae90.jpg) 圖 15\. 案例四完成狀態 ![圖 15\. 案例四完成狀態](https://box.kancloud.cn/2015-09-23_56023bdc5650b.jpg) ## Spark job 的執行流程簡介 我們可以發現,Spark 應用程序在提交執行后,控制臺會打印很多日志信息,這些信息看起來是雜亂無章的,但是卻在一定程度上體現了一個被提交的 Spark job 在集群中是如何被調度執行的,那么在這一節,將會向大家介紹一個典型的 Spark job 是如何被調度執行的。 我們先來了解以下幾個概念: **DAG**: 即 Directed Acyclic Graph,有向無環圖,這是一個圖論中的概念。如果一個[有向圖](http://baike.baidu.com/view/807915.htm)無法從某個頂點出發經過若干條邊回到該點,則這個圖是一個有向無環圖。 **Job:**我們知道,Spark 的計算操作是 lazy 執行的,只有當碰到一個動作 (Action) 算子時才會觸發真正的計算。一個 Job 就是由動作算子而產生包含一個或多個 Stage 的計算作業。 **Stage**:Job 被確定后,Spark 的調度器 (DAGScheduler) 會根據該計算作業的計算步驟把作業劃分成一個或者多個 Stage。Stage 又分為 ShuffleMapStage 和 ResultStage,前者以 shuffle 為輸出邊界,后者會直接輸出結果,其邊界可以是獲取外部數據,也可以是以一個 ShuffleMapStage 的輸出為邊界。每一個 Stage 將包含一個 TaskSet。 **TaskSet:?**代表一組相關聯的沒有 shuffle 依賴關系的任務組成任務集。一組任務會被一起提交到更加底層的 TaskScheduler。 **Task**:代表單個數據分區上的最小處理單元。分為 ShuffleMapTask 和 ResultTask。ShuffleMapTask 執行任務并把任務的輸出劃分到 (基于 task 的對應的數據分區) 多個 bucket(ArrayBuffer) 中,ResultTask 執行任務并把任務的輸出發送給驅動程序。 Spark 的作業任務調度是復雜的,需要結合源碼來進行較為詳盡的分析,但是這已經超過本文的范圍,所以這一節我們只是對大致的流程進行分析。 Spark 應用程序被提交后,當某個動作算子觸發了計算操作時,SparkContext 會向 DAGScheduler 提交一個作業,接著 DAGScheduler 會根據 RDD 生成的依賴關系劃分 Stage,并決定各個 Stage 之間的依賴關系,Stage 之間的依賴關系就形成了 DAG。Stage 的劃分是以 ShuffleDependency 為依據的,也就是說當某個 RDD 的運算需要將數據進行 Shuffle 時,這個包含了 Shuffle 依賴關系的 RDD 將被用來作為輸入信息,進而構建一個新的 Stage。我們可以看到用這樣的方式劃分 Stage,能夠保證有依賴關系的數據可以以正確的順序執行。根據每個 Stage 所依賴的 RDD 數據的 partition 的分布,會產生出與 partition 數量相等的 Task,這些 Task 根據 partition 的位置進行分布。其次對于 finalStage 或是 mapStage 會產生不同的 Task,最后所有的 Task 會封裝到 TaskSet 內提交到 TaskScheduler 去執行。有興趣的讀者可以通過閱讀 DAGScheduler 和 TaskScheduler 的源碼獲取更詳細的執行流程。 ## 結束語 通過本文,相信讀者對如何使用 Scala 編寫 Spark 應用程序處理大數據已經有了較為深入的了解。當然在處理實際問題時,情況可能比本文舉得例子復雜很多,但是解決問題的基本思想是一致的。在碰到實際問題的時候,首先要對源數據結構格式等進行分析,然后確定如何去使用 Spark 提供的算子對數據進行轉化,最終根據實際需求選擇合適的算子操作數據并計算結果。本文并未介紹其它 Spark 模塊的知識,顯然這不是一篇文章所能完成的,希望以后會有機會總結更多的 Spark 應用程序開發以及性能調優方面的知識,寫成文章與更多的 Spark 技術愛好者分享,一起進步。由于時間倉促并且本人知識水平有限,文章難免有未考慮周全的地方甚至是錯誤,希望各位朋友不吝賜教。有任何問題,都可以在文末留下您的評論,我會及時回復。 ## 參考資料 * 參考[Spark 官網的編程指導](http://spark.apache.org/docs/1.3.1/programming-guide.html),查看 Spark 各種轉換和動作算子的描述。 * 查看[Scala 官網](http://www.scala-lang.org/),了解更多關于 Scala 語言的內容。 * 查看文章“[Spark 之任務調度](http://www.aboutyun.com/thread-8548-1-1.html)”,確認并加深對于 Spark 任務調度原理的理解。 * [developerWorks 開源技術主題](http://www.ibm.com/developerworks/cn/opensource/):查找豐富的操作信息、工具和項目更新,幫助您掌握開源技術并將其用于 IBM 產品。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看