<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # Transformation 和 Action 常用算子 [詳細查詢](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md) [一、Transformation](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#一Transformation) ????????[1.1 map](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#11-map) ????????[1.2 filter](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#12-filter) ????????[1.3 flatMap](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#13-flatMap) ????????[1.4 mapPartitions](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#14-mapPartitions) ????????[1.5 mapPartitionsWithIndex](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#15-mapPartitionsWithIndex) ????????[1.6 sample](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#16-sample) ????????[1.7 union](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#17-union) ????????[1.8 intersection](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#18-intersection) ????????[1.9 distinct](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#19-distinct) ????????[1.10 groupByKey](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#110-groupByKey) ????????[1.11 reduceByKey](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#111-reduceByKey) ????????[1.12 sortBy & sortByKey](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#112-sortBy--sortByKey) ????????[1.13 join](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#113-join) ????????[1.14 cogroup](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#114-cogroup) ????????[1.15 cartesian](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#115-cartesian) ????????[1.16 aggregateByKey](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#116-aggregateByKey) [二、Action](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#二Action) ????????[2.1 reduce](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#21-reduce) ????????[2.2 takeOrdered](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#22-takeOrdered) ????????[2.3 countByKey](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#23-countByKey) ????????[2.4 saveAsTextFile](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Transformation%E5%92%8CAction%E7%AE%97%E5%AD%90.md#24-saveAsTextFile) ## 一、Transformation spark 常用的 Transformation 算子如下表: | Transformation 算子 | Meaning(含義) | | --- | --- | | **map**(*func*) | 對原 RDD 中每個元素運用 *func* 函數,并生成新的 RDD | | **filter**(*func*) | 對原 RDD 中每個元素使用*func* 函數進行過濾,并生成新的 RDD | | **flatMap**(*func*) | 與 map 類似,但是每一個輸入的 item 被映射成 0 個或多個輸出的 items( *func* 返回類型需要為 Seq )。 | | **mapPartitions**(*func*) | 與 map 類似,但函數單獨在 RDD 的每個分區上運行, *func*函數的類型為 Iterator => Iterator ,其中 T 是 RDD 的類型,即 RDD\[T\] | | **mapPartitionsWithIndex**(*func*) | 與 mapPartitions 類似,但 *func* 類型為 (Int, Iterator) => Iterator ,其中第一個參數為分區索引 | | **sample**(*withReplacement*, *fraction*, *seed*) | 數據采樣,有三個可選參數:設置是否放回(withReplacement)、采樣的百分比(*fraction*)、隨機數生成器的種子(seed); | | **union**(*otherDataset*) | 合并兩個 RDD | | **intersection**(*otherDataset*) | 求兩個 RDD 的交集 | | **distinct**(\[*numTasks*\])) | 去重 | | **groupByKey**(\[*numTasks*\]) | 按照 key 值進行分區,即在一個 (K, V) 對的 dataset 上調用時,返回一個 (K, Iterable) | | **reduceByKey**(*func*, \[*numTasks*\]) | 按照 key 值進行分組,并對分組后的數據執行歸約操作。 | | **aggregateByKey**(*zeroValue*,*numPartitions*)(*seqOp*, *combOp*, \[*numTasks*\]) | 當調用(K,V)對的數據集時,返回(K,U)對的數據集,其中使用給定的組合函數和 zeroValue 聚合每個鍵的值。與 groupByKey 類似,reduce 任務的數量可通過第二個參數進行配置。 | | **sortByKey**(\[*ascending*\], \[*numTasks*\]) | 按照 key 進行排序,其中的 key 需要實現 Ordered 特質,即可比較 | | **join**(*otherDataset*, \[*numTasks*\]) | 在一個 (K, V) 和 (K, W) 類型的 dataset 上調用時,返回一個 (K, (V, W)) pairs 的 dataset,等價于內連接操作。如果想要執行外連接,可以使用 `leftOuterJoin`, `rightOuterJoin` 和 `fullOuterJoin` 等算子。 | | **cogroup**(*otherDataset*, \[*numTasks*\]) | 在一個 (K, V) 對的 dataset 上調用時,返回一個 (K, (Iterable, Iterable)) tuples 的 dataset。 | | **cartesian**(*otherDataset*) | 在一個 T 和 U 類型的 dataset 上調用時,返回一個 (T, U) 類型的 dataset(即笛卡爾積)。 | | **coalesce**(*numPartitions*) | 將 RDD 中的分區數減少為 numPartitions。 | | **repartition**(*numPartitions*) | 隨機重新調整 RDD 中的數據以創建更多或更少的分區,并在它們之間進行平衡。 | | **repartitionAndSortWithinPartitions**(*partitioner*) | 根據給定的 partitioner(分區器)對 RDD 進行重新分區,并對分區中的數據按照 key 值進行排序。這比調用 `repartition` 然后再 sorting(排序)效率更高,因為它可以將排序過程推送到 shuffle 操作所在的機器。 | 下面分別給出這些算子的基本使用示例: ### 1.1 map ~~~scala val list = List(1,2,3) sc.parallelize(list).map(_ * 10).foreach(println) // 輸出結果: 10 20 30 (這里為了節省篇幅去掉了換行,后文亦同) ~~~ ### 1.2 filter ~~~scala val list = List(3, 6, 9, 10, 12, 21) sc.parallelize(list).filter(_ >= 10).foreach(println) // 輸出: 10 12 21 ~~~ ### 1.3 flatMap `flatMap(func)` 與 `map` 類似,但每一個輸入的 item 會被映射成 0 個或多個輸出的 items( *func* 返回類型需要為 `Seq`)。 ~~~scala val list = List(List(1, 2), List(3), List(), List(4, 5)) sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println) // 輸出結果 : 10 20 30 40 50 ~~~ flatMap 這個算子在日志分析中使用概率非常高,這里進行一下演示:拆分輸入的每行數據為單個單詞,并賦值為 1,代表出現一次,之后按照單詞分組并統計其出現總次數,代碼如下: ~~~scala val lines = List("spark flume spark", "hadoop flume hive") sc.parallelize(lines).flatMap(line => line.split(" ")). map(word=>(word,1)).reduceByKey(_+_).foreach(println) // 輸出: (spark,2) (hive,1) (hadoop,1) (flume,2) ~~~ ### 1.4 mapPartitions 與 map 類似,但函數單獨在 RDD 的每個分區上運行, *func*函數的類型為 `Iterator<T> => Iterator<U>` (其中 T 是 RDD 的類型),即輸入和輸出都必須是可迭代類型。 ~~~scala val list = List(1, 2, 3, 4, 5, 6) sc.parallelize(list, 3).mapPartitions(iterator => { val buffer = new ListBuffer[Int] while (iterator.hasNext) { buffer.append(iterator.next() * 100) } buffer.toIterator }).foreach(println) //輸出結果 100 200 300 400 500 600 ~~~ ### 1.5 mapPartitionsWithIndex 與 mapPartitions 類似,但 *func* 類型為 `(Int, Iterator<T>) => Iterator<U>` ,其中第一個參數為分區索引。 ~~~scala val list = List(1, 2, 3, 4, 5, 6) sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => { val buffer = new ListBuffer[String] while (iterator.hasNext) { buffer.append(index + "分區:" + iterator.next() * 100) } buffer.toIterator }).foreach(println) //輸出 0 分區:100 0 分區:200 1 分區:300 1 分區:400 2 分區:500 2 分區:600 ~~~ ### 1.6 sample 數據采樣。有三個可選參數:設置是否放回 (withReplacement)、采樣的百分比 (fraction)、隨機數生成器的種子 (seed) : ~~~scala val list = List(1, 2, 3, 4, 5, 6) sc.parallelize(list).sample(withReplacement = false, fraction = 0.5).foreach(println) ~~~ ### 1.7 union 合并兩個 RDD: ~~~scala val list1 = List(1, 2, 3) val list2 = List(4, 5, 6) sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println) // 輸出: 1 2 3 4 5 6 ~~~ ### 1.8 intersection 求兩個 RDD 的交集: ~~~scala val list1 = List(1, 2, 3, 4, 5) val list2 = List(4, 5, 6) sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println) // 輸出: 4 5 ~~~ ### 1.9 distinct 去重: ~~~scala val list = List(1, 2, 2, 4, 4) sc.parallelize(list).distinct().foreach(println) // 輸出: 4 1 2 ~~~ ### 1.10 groupByKey 按照鍵進行分組: ~~~scala val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2)) sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println) //輸出: (spark,List(3, 5)) (hadoop,List(2, 2)) (storm,List(6)) ~~~ ### 1.11 reduceByKey 按照鍵進行歸約操作: ~~~scala val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2)) sc.parallelize(list).reduceByKey(_ + _).foreach(println) //輸出 (spark,8) (hadoop,4) (storm,6) ~~~ ### 1.12 sortBy & sortByKey 按照鍵進行排序: ~~~scala val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm")) sc.parallelize(list01).sortByKey(ascending = false).foreach(println) // 輸出 (120,storm) (90,spark) (100,hadoop) ~~~ 按照指定元素進行排序: ~~~scala val list02 = List(("hadoop",100), ("spark",90), ("storm",120)) sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println) // 輸出 (storm,120) (hadoop,100) (spark,90) ~~~ ### 1.13 join 在一個 (K, V) 和 (K, W) 類型的 Dataset 上調用時,返回一個 (K, (V, W)) 的 Dataset,等價于內連接操作。如果想要執行外連接,可以使用 `leftOuterJoin`, `rightOuterJoin` 和 `fullOuterJoin` 等算子。 ~~~scala val list01 = List((1, "student01"), (2, "student02"), (3, "student03")) val list02 = List((1, "teacher01"), (2, "teacher02"), (3, "teacher03")) sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println) // 輸出 (1,(student01,teacher01)) (3,(student03,teacher03)) (2,(student02,teacher02)) ~~~ ### 1.14 cogroup 在一個 (K, V) 對的 Dataset 上調用時,返回多個類型為 (K, (Iterable, Iterable)) 的元組所組成的 Dataset。 ~~~scala val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e")) val list02 = List((1, "A"), (2, "B"), (3, "E")) val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"),(3, "eE")) sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println) // 輸出: 同一個 RDD 中的元素先按照 key 進行分組,然后再對不同 RDD 中的元素按照 key 進行分組 (1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab]))) (3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE))) (2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB]))) ~~~ ### 1.15 cartesian 計算笛卡爾積: ~~~scala val list1 = List("A", "B", "C") val list2 = List(1, 2, 3) sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println) //輸出笛卡爾積 (A,1) (A,2) (A,3) (B,1) (B,2) (B,3) (C,1) (C,2) (C,3) ~~~ ### 1.16 aggregateByKey 當調用(K,V)對的數據集時,返回(K,U)對的數據集,其中使用給定的組合函數和 zeroValue 聚合每個鍵的值。與 `groupByKey` 類似,reduce 任務的數量可通過第二個參數 `numPartitions` 進行配置。示例如下: ~~~scala // 為了清晰,以下所有參數均使用具名傳參 val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8)) sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)( seqOp = math.max(_, _), combOp = _ + _ ).collect.foreach(println) //輸出結果: (hadoop,3) (storm,8) (spark,7) ~~~ 這里使用了 `numSlices = 2` 指定 aggregateByKey 父操作 parallelize 的分區數量為 2,其執行流程如下: [![](https://github.com/heibaiying/BigData-Notes/raw/master/pictures/spark-aggregateByKey.png)](https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-aggregateByKey.png) 基于同樣的執行流程,如果 `numSlices = 1`,則意味著只有輸入一個分區,則其最后一步 combOp 相當于是無效的,執行結果為: ~~~ini (hadoop,3) (storm,8) (spark,4) ~~~ 同樣的,如果每個單詞對一個分區,即 `numSlices = 6`,此時相當于求和操作,執行結果為: ~~~ini (hadoop,5) (storm,14) (spark,7) ~~~ `aggregateByKey(zeroValue = 0,numPartitions = 3)` 的第二個參數 `numPartitions` 決定的是輸出 RDD 的分區數量,想要驗證這個問題,可以對上面代碼進行改寫,使用 `getNumPartitions` 方法獲取分區數量: ~~~scala sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)( seqOp = math.max(_, _), combOp = _ + _ ).getNumPartitions ~~~ [![](https://github.com/heibaiying/BigData-Notes/raw/master/pictures/spark-getpartnum.png)](https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-getpartnum.png) ## 二、Action Spark 常用的 Action 算子如下: | Action(動作) | Meaning(含義) | | --- | --- | | **reduce**(*func*) | 使用函數*func*執行歸約操作 | | **collect**() | 以一個 array 數組的形式返回 dataset 的所有元素,適用于小結果集。 | | **count**() | 返回 dataset 中元素的個數。 | | **first**() | 返回 dataset 中的第一個元素,等價于 take(1)。 | | **take**(*n*) | 將數據集中的前 *n* 個元素作為一個 array 數組返回。 | | **takeSample**(*withReplacement*, *num*, \[*seed*\]) | 對一個 dataset 進行隨機抽樣 | | **takeOrdered**(*n*, *\[ordering\]*) | 按自然順序(natural order)或自定義比較器(custom comparator)排序后返回前 *n* 個元素。只適用于小結果集,因為所有數據都會被加載到驅動程序的內存中進行排序。 | | **saveAsTextFile**(*path*) | 將 dataset 中的元素以文本文件的形式寫入本地文件系統、HDFS 或其它 Hadoop 支持的文件系統中。Spark 將對每個元素調用 toString 方法,將元素轉換為文本文件中的一行記錄。 | | **saveAsSequenceFile**(*path*) | 將 dataset 中的元素以 Hadoop SequenceFile 的形式寫入到本地文件系統、HDFS 或其它 Hadoop 支持的文件系統中。該操作要求 RDD 中的元素需要實現 Hadoop 的 Writable 接口。對于 Scala 語言而言,它可以將 Spark 中的基本數據類型自動隱式轉換為對應 Writable 類型。(目前僅支持 Java and Scala) | | **saveAsObjectFile**(*path*) | 使用 Java 序列化后存儲,可以使用 `SparkContext.objectFile()` 進行加載。(目前僅支持 Java and Scala) | | **countByKey**() | 計算每個鍵出現的次數。 | | **foreach**(*func*) | 遍歷 RDD 中每個元素,并對其執行*fun*函數 | ### 2.1 reduce 使用函數*func*執行歸約操作: ~~~scala val list = List(1, 2, 3, 4, 5) sc.parallelize(list).reduce((x, y) => x + y) sc.parallelize(list).reduce(_ + _) // 輸出 15 ~~~ ### 2.2 takeOrdered 按自然順序(natural order)或自定義比較器(custom comparator)排序后返回前 *n* 個元素。需要注意的是 `takeOrdered` 使用隱式參數進行隱式轉換,以下為其源碼。所以在使用自定義排序時,需要繼承 `Ordering[T]` 實現自定義比較器,然后將其作為隱式參數引入。 ~~~scala def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { ......... } ~~~ 自定義規則排序: ~~~scala // 繼承 Ordering[T],實現自定義比較器,按照 value 值的長度進行排序 class CustomOrdering extends Ordering[(Int, String)] { override def compare(x: (Int, String), y: (Int, String)): Int = if (x._2.length > y._2.length) 1 else -1 } val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive")) // 引入隱式默認值 implicit val implicitOrdering = new CustomOrdering sc.parallelize(list).takeOrdered(5) // 輸出: Array((1,hive), (1,storm), (1,hadoop), (1,azkaban) ~~~ ### 2.3 countByKey 計算每個鍵出現的次數: ~~~scala val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1)) sc.parallelize(list).countByKey() // 輸出: Map(hadoop -> 2, storm -> 2, azkaban -> 1) ~~~ ### 2.4 saveAsTextFile 將 dataset 中的元素以文本文件的形式寫入本地文件系統、HDFS 或其它 Hadoop 支持的文件系統中。Spark 將對每個元素調用 toString 方法,將元素轉換為文本文件中的一行記錄。 ~~~scala val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1)) sc.parallelize(list).saveAsTextFile("/usr/file/temp") ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看