<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ## 一、常用的轉換算子 需要操作的Transformation算子說明如下: ### 1.1. map map(func)返回一個新的分布式數據集,由每個原元素經過func函數轉換后組成 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps1(sc) sc.stop() } /** * 1、map:將集合中每個元素乘以7 * map(func):返回一個新的分布式數據集,由每個原元素經過func函數轉換后組成 */ def transformationOps1(sc:SparkContext): Unit = { val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val listRDD = sc.parallelize(list) val retRDD = listRDD.map(num => num * 7) retRDD.foreach(num => println(num)) } } ``` 輸出結果 ``` 42 7 49 14 56 21 63 28 70 35 ``` ### 1.2. filter filter(func)返回一個新的數據集,由經過func函數后返回值為true的原元素組成 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps2(sc) sc.stop() } /** * 2、filter:過濾出集合中的奇數 * filter(func): 返回一個新的數據集,由經過func函數后返回值為true的原元素組成 * * 一般在filter操作之后都要做重新分區(因為可能數據量減少了很多) */ def transformationOps2(sc:SparkContext): Unit = { val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val listRDD = sc.parallelize(list) val retRDD = listRDD.filter(num => num % 2 == 0) retRDD.foreach(println) } } ``` 輸出結果 ``` 6 2 8 4 10 ``` ### 1.3.flatMap flatMap(func)類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素) ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps3(sc) sc.stop() } /** * 3、flatMap:將行拆分為單詞 * flatMap(func):類似于map,但是每一個輸入元素, * 會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素) */ def transformationOps3(sc:SparkContext): Unit = { val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val wordsRDD = listRDD.flatMap(line => line.split(" ")) wordsRDD.foreach(println) } } ``` 輸出結果 ``` hello hello he you hello mes ``` ### 1.4. sample sample(withReplacement, frac, seed)根據給定的隨機種子seed,隨機抽樣出數量為frac的數據. ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps4(sc) sc.stop() } /** * 4、sample:根據給定的隨機種子seed,隨機抽樣出數量為frac的數據 * sample(withReplacement, frac, seed): 根據給定的隨機種子seed,隨機抽樣出數量為frac的數據 * 抽樣的目的:就是以樣本評估整體 * withReplacement: * true:有放回的抽樣 * false:無放回的抽樣 * frac:就是樣本空間的大小,以百分比小數的形式出現,比如20%,就是0.2 * * 使用sample算子計算出來的結果可能不是很準確,1000個數,20%,樣本數量在200個左右,不一定為200 * * 一般情況下,使用sample算子在做spark優化(數據傾斜)的方面應用最廣泛 */ def transformationOps4(sc:SparkContext): Unit = { val list = 1 to 1000 val listRDD = sc.parallelize(list) val sampleRDD = listRDD.sample(false, 0.2) sampleRDD.foreach(num => print(num + " ")) println println("sampleRDD count: " + sampleRDD.count()) println("Another sampleRDD count: " + sc.parallelize(list).sample(false, 0.2).count()) } } ``` 輸出結果 ``` sampleRDD count: 219 Another sampleRDD count: 203 ``` ### 1.5.union union(otherDataset)返回一個新的數據集,由原數據集和參數聯合而成 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps5(sc) sc.stop() } /** * 5、union:返回一個新的數據集,由原數據集和參數聯合而成 * union(otherDataset): 返回一個新的數據集,由原數據集和參數聯合而成 * 類似數學中的并集,就是sql中的union操作,將兩個集合的所有元素整合在一塊,包括重復元素 */ def transformationOps5(sc:SparkContext): Unit = { val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val list2 = List(7, 8, 9, 10, 11, 12) val listRDD1 = sc.parallelize(list1) val listRDD2 = sc.parallelize(list2) val unionRDD = listRDD1.union(listRDD2) unionRDD.foreach(println) } } ``` 輸出結果 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps5(sc) sc.stop() } /** * 5、union:返回一個新的數據集,由原數據集和參數聯合而成 * union(otherDataset): 返回一個新的數據集,由原數據集和參數聯合而成 * 類似數學中的并集,就是sql中的union操作,將兩個集合的所有元素整合在一塊,包括重復元素 */ def transformationOps5(sc:SparkContext): Unit = { val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val list2 = List(7, 8, 9, 10, 11, 12) val listRDD1 = sc.parallelize(list1) val listRDD2 = sc.parallelize(list2) val unionRDD = listRDD1.union(listRDD2) unionRDD.foreach(println) } } ``` 輸出結果 ``` 1 6 2 7 3 8 4 9 5 10 7 8 9 10 11 12 ``` ### 1.6. groupByKey groupByKey([numTasks])在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意:默認情況下,使用8個并行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps6(sc) sc.stop() } /** * 6、groupByKey:對數組進行 group by key操作 * groupByKey([numTasks]): 在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。 * 注意:默認情況下,使用8個并行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task * mr中: * <k1, v1>--->map操作---><k2, v2>--->shuffle---><k2, [v21, v22, v23...]>---><k3, v3> * groupByKey類似于shuffle操作 * * 和reduceByKey有點類似,但是有區別,reduceByKey有本地的規約,而groupByKey沒有本地規約,所以一般情況下, * 盡量慎用groupByKey,如果一定要用的話,可以自定義一個groupByKey,在自定義的gbk中添加本地預聚合操作 */ def transformationOps6(sc:SparkContext): Unit = { val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val wordsRDD = listRDD.flatMap(line => line.split(" ")) val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1)) pairsRDD.foreach(println) val gbkRDD:RDD[(String, Iterable[Int])] = pairsRDD.groupByKey() println("=============================================") gbkRDD.foreach(t => println(t._1 + "..." + t._2)) } } ``` 輸出結果 ``` (hello,1) (hello,1) (you,1) (he,1) (hello,1) (me,1) ============================================= you...CompactBuffer(1) hello...CompactBuffer(1, 1, 1) he...CompactBuffer(1) me...CompactBuffer(1) ``` ### 1.7. reduceByKey reduceByKey(func, [numTasks])在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps7(sc) sc.stop() } /** * 7、reduceByKey:統計每個班級的人數 * reduceByKey(func, [numTasks]): 在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集, * key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。 * * 需要注意的是還有一個reduce的操作,其為action算子,并且其返回的結果只有一個,而不是一個數據集 * 而reduceByKey是一個transformation算子,其返回的結果是一個數據集 */ def transformationOps7(sc:SparkContext): Unit = { val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val wordsRDD = listRDD.flatMap(line => line.split(" ")) val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1)) val retRDD:RDD[(String, Int)] = pairsRDD.reduceByKey((v1, v2) => v1 + v2) retRDD.foreach(t => println(t._1 + "..." + t._2)) } } ``` 輸出結果如下: ``` you...1 hello...3 he...1 me...1 ``` ### 1.8. join雙流融合 join(otherDataset, [numTasks])在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps8(sc) sc.stop() } /** * 8、join:打印關聯的組合信息 * join(otherDataset, [numTasks]): 在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集 * 學生基礎信息表和學生考試成績表 * stu_info(sid ,name, birthday, class) * stu_score(sid, chinese, english, math) * * * Serialization stack: - object not serializable 這種分布式計算的過程,一個非常重要的點,傳遞的數據必須要序列化 通過代碼測試,該join是等值連接(inner join) A.leftOuterJoin(B) A表所有的數據都包涵,B表中在A表沒有關聯的數據,顯示為null 之后執行一次filter就是join的結果 */ def transformationOps8(sc: SparkContext): Unit = { val infoList = List( "1,鐘 瀟,1988-02-04,bigdata", "2,劉向前,1989-03-24,linux", "3,包維寧,1984-06-16,oracle") val scoreList = List( "1,50,21,61", "2,60,60,61", "3,62,90,81", "4,72,80,81" ) val infoRDD:RDD[String] = sc.parallelize(infoList) val scoreRDD:RDD[String] = sc.parallelize(scoreList) val infoPairRDD:RDD[(String, Student)] = infoRDD.map(line => { val fields = line.split(",") val student = new Student(fields(0), fields(1), fields(2), fields(3)) (fields(0), student) }) val scorePairRDD:RDD[(String, Score)] = scoreRDD.map(line => { val fields = line.split(",") val score = new Score(fields(0), fields(1).toFloat, fields(2).toFloat, fields(3).toFloat) (fields(0), score) }) val joinedRDD:RDD[(String, (Student, Score))] = infoPairRDD.join(scorePairRDD) joinedRDD.foreach(t => { val sid = t._1 val student = t._2._1 val score = t._2._2 println(sid + "\t" + student + "\t" + score) }) println("=========================================") val leftOuterRDD:RDD[(String, (Score, Option[Student]))] = scorePairRDD.leftOuterJoin(infoPairRDD) leftOuterRDD.foreach(println) } } ``` 輸出結果如下: ``` 3 3 包維寧 1984-06-16 oracle 3 62.0 90.0 81.0 2 2 劉向前 1989-03-24 linux 2 60.0 60.0 61.0 1 1 鐘 瀟 1988-02-04 bigdata 1 50.0 21.0 61.0 ========================================= (4,(4 72.0 80.0 81.0,None)) (3,(3 62.0 90.0 81.0,Some(3 包維寧 1984-06-16 oracle))) (2,(2 60.0 60.0 61.0,Some(2 劉向前 1989-03-24 linux))) (1,(1 50.0 21.0 61.0,Some(1 鐘 瀟 1988-02-04 bigdata))) ``` ### 1.9.sortByKey 測試代碼如下: ~~~ object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps7(sc) sc.stop() } /** * sortByKey:將學生身高進行(降序)排序 * 身高相等,按照年齡排(升序) */ def transformationOps9(sc: SparkContext): Unit = { val list = List( "1,李 磊,22,175", "2,劉銀鵬,23,175", "3,齊彥鵬,22,180", "4,楊 柳,22,168", "5,敦 鵬,20,175" ) val listRDD:RDD[String] = sc.parallelize(list) /* // 使用sortBy操作完成排序 val retRDD:RDD[String] = listRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] { override def compare(x: String, y: String): Int = { val xFields = x.split(",") val yFields = y.split(",") val xHgiht = xFields(3).toFloat val yHgiht = yFields(3).toFloat val xAge = xFields(2).toFloat val yAge = yFields(2).toFloat var ret = yHgiht.compareTo(xHgiht) if (ret == 0) { ret = xAge.compareTo(yAge) } ret } } ,ClassTag.Object.asInstanceOf[ClassTag[String]]) */ // 使用sortByKey完成操作,只做身高降序排序 val heightRDD:RDD[(String, String)] = listRDD.map(line => { val fields = line.split(",") (fields(3), line) }) val retRDD:RDD[(String, String)] = heightRDD.sortByKey(ascending = false, numPartitions = 1) // 需要設置1個分區,否則只是各分區內有序 retRDD.foreach(println) // 使用sortByKey如何實現sortBy的二次排序?將上面的信息寫成一個java對象,然后重寫compareTo方法,在做map時,key就為該對象本身,而value可以為null } } ~~~ 輸出結果如下: ~~~ (180,3,齊彥鵬,22,180) (175,1,李 磊,22,175) (175,2,劉銀鵬,23,175) (175,5,敦 鵬,20,175) (168,4,楊 柳,22,168) ~~~ 下面是一個快速入門的 demo: ~~~ scala> val rdd = sc.parallelize(Seq((1,"one"),(2,"two"),(3,"three"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[10] at parallelize at <console>:21 scala> rdd.sortByKey(true, 1).foreach(println) (1,one) (2,two) (3,three) ~~~ ### 1.10.combineByKey 與 aggregateByKey 下面的代碼分別使用 combineByKey 和 aggregateByKey 來模擬 groupByKey 和 reduceBykey,所以是有 4 個操作,只要把 combineByKey 模擬 groupByKey 的例子掌握了,其它三個相對就容易許多了。 ~~~ /** * spark的transformation操作: * aggregateByKey * combineByKey * * 使用combineByKey和aggregateByKey模擬groupByKey和reduceByKey * * 通過查看源碼,我們發現aggregateByKey底層,還是combineByKey * * 問題:combineByKey和aggregateByKey的區別? * aggregateByKey是柯里化形式的,目前底層源碼還沒時間去分析,所知道的區別是這個 */ object _03SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_03SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) // combineByKey2GroupByKey(sc) // combineByKey2ReduceByKey(sc) // aggregateByKey2ReduceByKey(sc) aggregateByKey2GroupByKey(sc) sc.stop() } /** * 使用aggregateByKey模擬groupByKey */ def aggregateByKey2GroupByKey(sc: SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) val retRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.aggregateByKey(ArrayBuffer[Int]()) ( // 這里需要指定value的類型為ArrayBuffer[Int]() (part, num) => { part.append(num) part }, (part1, part2) => { part1.++=(part2) part1 } ) retRDD.foreach(println) } /** * 使用aggregateByKey模擬reduceByKey * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] (zeroValue: U)就對應的是combineByKey中的第一個函數的返回值 seqOp 就對應的是combineByKey中的第二個函數,也就是mergeValue combOp 就對應的是combineByKey中的第三個函數,也就是mergeCombiners */ def aggregateByKey2ReduceByKey(sc:SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) val retRDD:RDD[(String, Int)] = pairsRDD.aggregateByKey(0) ( (partNum, num) => partNum + num, // 也就是mergeValue (partNum1, partNum2) => partNum1 + partNum2 // 也就是mergeCombiners ) retRDD.foreach(println) } /** * 使用reduceByKey模擬groupByKey */ def combineByKey2ReduceByKey(sc:SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) /** * 對于createCombiner1 mergeValue1 mergeCombiners1 * 代碼的參數已經體現得很清楚了,其實只要理解了combineByKey模擬groupByKey的例子,這個就非常容易了 */ var retRDD:RDD[(String, Int)] = pairsRDD.combineByKey(createCombiner1, mergeValue1, mergeCombiners1) retRDD.foreach(println) } /** * reduceByKey操作,value就是該數值本身,則上面的數據會產生: * (hello, 1) (bo, 1) (bo, 1) * (zhou, 1) (xin, 1) (xin, 1) * (hello, 1) (song, 1) (bo, 1) * 注意有別于groupByKey的操作,它是創建一個容器 */ def createCombiner1(num:Int):Int = { num } /** * 同一partition內,對于有相同key的,這里的mergeValue直接將其value相加 * 注意有別于groupByKey的操作,它是添加到value到一個容器中 */ def mergeValue1(localNum1:Int, localNum2:Int): Int = { localNum1 + localNum2 } /** * 將兩個不同partition中的key相同的value值相加起來 * 注意有別于groupByKey的操作,它是合并兩個容器 */ def mergeCombiners1(thisPartitionNum1:Int, anotherPartitionNum2:Int):Int = { thisPartitionNum1 + anotherPartitionNum2 } /** * 使用combineByKey模擬groupByKey */ def combineByKey2GroupByKey(sc:SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) // 輸出每個partition中的map對 pairsRDD.foreachPartition( partition => { println("<=========partition-start=========>") partition.foreach(println) println("<=========partition-end=========>") }) val gbkRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners) gbkRDD.foreach(println) // 如果要測試最后groupByKey的結果是在幾個分區,可以使用下面的代碼進行測試 /*gbkRDD.foreachPartition(partition => { println("~~~~~~~~~~~~~~~~~~~~~~~~~~~") partition.foreach(println) })*/ } /** * 初始化,將value轉變成為標準的格式數據 * 是在每個分區中進行的操作,去重后的key有幾個,就調用次, * 因為對于每個key,其容器創建一次就ok了,之后有key相同的,只需要執行mergeValue到已經創建的容器中即可 */ def createCombiner(num:Int):ArrayBuffer[Int] = { println("----------createCombiner----------") ArrayBuffer[Int](num) } /** * 將key相同的value,添加到createCombiner函數創建的ArrayBuffer容器中 * 一個分區內的聚合操作,將一個分區內key相同的數據,合并 */ def mergeValue(ab:ArrayBuffer[Int], num:Int):ArrayBuffer[Int] = { println("----------mergeValue----------") ab.append(num) ab } /** * 將key相同的多個value數組,進行整合 * 分區間的合并操作 */ def mergeCombiners(ab1:ArrayBuffer[Int], ab2:ArrayBuffer[Int]):ArrayBuffer[Int] = { println("----------mergeCombiners----------") ab1 ++= ab2 ab1 } } ~~~ 輸出結果如下: ~~~ /* combineByKey模擬groupByKey的一個輸出效果,可以很好地說明createCombiner、mergeValue和mergeCombiners各個階段的執行時機: <=========partition-start=========> <=========partition-start=========> (hello,1) (zhou,1) (bo,1) (xin,1) (bo,1) (xin,1) <=========partition-end=========> (hello,1) (song,1) (bo,1) <=========partition-end=========> ----------createCombiner---------- ----------createCombiner---------- ----------createCombiner---------- ----------createCombiner---------- ----------mergeValue---------- ----------mergeValue---------- ----------createCombiner---------- ----------createCombiner---------- ----------createCombiner---------- ----------mergeCombiners---------- ----------mergeCombiners---------- (song,ArrayBuffer(1)) (hello,ArrayBuffer(1, 1)) (bo,ArrayBuffer(1, 1, 1)) (zhou,ArrayBuffer(1)) (xin,ArrayBuffer(1, 1)) */ ~~~ ## 二、Transformations 下面的表格列了 Spark 支持的一些常用 transformations。詳細內容請參閱 RDD API 文檔([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html), [Python](https://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html)) 和 PairRDDFunctions 文檔([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。 **Transformation 轉換算子**: | 轉換算子 | 含義 | | ---------------------------------------------------- | ------------------------------------------------------------ | | **map**(func) | 返回一個新的 RDD,該 RDD 由每一個輸入元素經過 func 函數轉換后組成 | | **filter**(func) | 返回一個新的 RDD,該 RDD 由經過 func 函數計算后返回值為 true 的輸入元素組成 | | **flatMap**(func) | 類似于 map,但是每一個輸入元素可以被映射為 0 或多個輸出元素 (所以 func 應該返回一個序列,而不是單一元素) | | **mapPartitions**(func) | 類似于 map,但獨立地在 RDD 的每一個分片上運行,因此在類型為 T 的 RDD 上運行時,func 的函數類型必須是 Iterator[T] => Iterator[U] | | **mapPartitionsWithIndex**(func) | 類似于 mapPartitions,但 func 帶有一個整數參數表示分片的索引值,因此在類型為 T 的 RDD 上運行時,func 的函數類型必須是 (Int, Interator[T]) => Iterator[U] | | sample(withReplacement, fraction, seed) | 根據 fraction 指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed 用于指定隨機數生成器種子 | | **union**(otherDataset) | 對源 RDD 和參數 RDD 求并集后返回一個新的 RDD | | intersection(otherDataset) | 對源 RDD 和參數 RDD 求交集后返回一個新的 RDD | | **distinct**([numTasks])) | 對源 RDD 進行去重后返回一個新的 RDD | | **groupByKey**([numTasks]) | 在一個 (K,V) 的 RDD 上調用,返回一個 (K, Iterator[V]) 的 RDD | | **reduceByKey**(func, [numTasks]) | 在一個 (K,V) 的 RDD 上調用,返回一個 (K,V) 的 RDD,使用指定的 reduce 函數,將相同 key 的值聚合到一起,與 groupByKey 類似,reduce 任務的個數可以通過第二個可選的參數來設置 | | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 對 PairRDD 中相同的 Key 值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值。和 aggregate 函數類似,aggregateByKey 返回值的類型不需要和 RDD 中 value 的類型一致 | | **sortByKey**([ascending], [numTasks]) | 在一個 (K,V) 的 RDD 上調用,K 必須實現 Ordered 接口,返回一個按照 key 進行排序的 (K,V) 的 RDD | | sortBy(func,[ascending], [numTasks]) | 與 sortByKey 類似,但是更靈活 | | **join**(otherDataset, [numTasks]) | 在類型為 (K,V) 和(K,W)的 RDD 上調用,返回一個相同 key 對應的所有元素對在一起的 (K,(V,W)) 的 RDD | | cogroup(otherDataset, [numTasks]) | 在類型為 (K,V) 和(K,W)的 RDD 上調用,返回一個 (K,(Iterable,Iterable)) 類型的 RDD | | cartesian(otherDataset) | 笛卡爾積 | | pipe(command, [envVars]) | 對 rdd 進行管道操作 | | **coalesce**(numPartitions) | 減少 RDD 的分區數到指定值。在過濾大量數據之后,可以執行此操作 | | **repartition**(numPartitions) | 重新給 RDD 分區 |
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看