<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                對于轉換操作,RDD 的所有轉換都不會直接計算結果。Spark 僅記錄作用于RDD 上的轉換操作邏輯,<mark>當遇到動作算子(Action)時才會進行真正的計算</mark>。RDD全部轉換算子如下表。 | Transformation | 描述 | | --- | --- | | `map(func)` | 通過函數 func 作用于源 RDD 中的每個元素,返回一個新的 RDD| | `filter(func)` | 選擇源 RDD 中的使得函數 func 為 true 的元素,返回一個新的 RDD| | `flatMap(func)` | 與 map 類似,但是每個輸入項可以映射到 0 或多個輸出項(因此 func 應該返回一個 Seq,而不是單個項)。| | `mapValues(func)`| 原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素,僅適用于PairRDD | | `mapPartitions(func)` | 與 map 類似,但是在 RDD 的每個分區上單獨運行,所以 func 在類型為 T 的 RDD 上運行時,必須是類型 Iterator`<T> `=> Iterator`<U>`| | `mapPartitionsWithIndex(func)` | 與 mapPartitions 類似,但為 func 多提供一個分區編號 ,所以 func 類型為:(Int, Iterator`<T>`) => Iterator`<U>`| | `sample(withReplacement, fraction, seed)` |使用給定的隨機數生成器種子對數據的一部分進行采樣。| | `union(otherDataset)` | 返回一個新數據集,該數據集包含源數據集中的元素和參數的并集| | `intersection(otherDataset)` | 返回一個新的 RDD,其中包含源數據集中的元素和參數的交集。| | `distinct([numPartitions]))` | 返回包含源數據集的不同元素的新數據集。| | `groupByKey([numPartitions])` | 當調用一個(K, V)對的數據集時,返回一個(K,Iterable<V>)對的數據集。| | `reduceByKey(func, [numPartitions])` | 對相同的key通過給定的函數進行聚合 | | `aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])`| seqOp 操作會聚合各分區中的元素,然后 combOp操作把所有分區的聚合結果再次聚合,兩個操作的初始值都是 zeroValue. seqOp 的操作是遍歷分區中的所有元素(T),第一個T跟zeroValue做操作,結果再作為與第二個T做操作的zeroValue,直到遍歷完整個分區。combOp 操作是把各分區聚合的結果,再聚合。| | `sortByKey([ascending], [numPartitions])`| 根據 key 進行排序,默認為升序。ascending: Boolean = true| | `join(otherDataset, [numPartitions])`| 當在類型(K, V)和(K, W)的數據集上調用時,返回一個(K, (V,W))對的數據集,其中包含每個鍵的所有對元素。外部連接由 leftOuterJoin、rightOuterJoin和 fullOuterJoin 支持。| | `cogroup(otherDataset, [numPartitions])`| 當調用類型(K, V)和(K, W)的數據集時,返回一個(K,(Iterable,Iterable))元組的數據集。這個操作也稱為groupWith。| | `cartesian(otherDataset)` | 在類型為 T 和 U 的數據集上調用時,返回一個(T, U)對(所有對元素)的數據集。| | `pipe(command, [envVars])` | 通過 shell 命令(例如 Perl 或 bash 腳本)對 RDD 的每個分區進行管道傳輸。將 RDD 元素寫入進程的stdin,并將其輸出到 stdout 的行作為字符串 RDD返回。| | `coalesce(numPartitions)` | 將 RDD 中的分區數量減少到numpartition。| | `repartition(numPartitions)` | 隨機地重新 Shuffle RDD 中的數據,以創建更多或更少的分區,并在它們之間進行平衡。| 下面是一些常用轉換算子的示例: ```scala package spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object TransformationOps { def main(args: Array[String]): Unit = { val conf:SparkConf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getName) val sc:SparkContext = SparkContext.getOrCreate(conf) // 1. map算子 // 通過函數 func 作用于源 RDD 中的每個元素,返回一個新的 RDD val rdd1 = sc.parallelize(1 to 9) rdd1.map(_*2).foreach(x => print(s"$x ")) // 14 6 16 8 18 2 10 4 12 println() // map算子將RDD變成PairRDD val rdd2 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle")) val rdd3 = rdd2.map(x => (x, 1)) rdd3.foreach(x => print(s"$x ")) // (tiger,1) (lion,1) (panther,1) (eagle,1) (cat,1) (dog,1) println() // 2. filter算子 // 選擇源 RDD 中的使得函數 func 為 true 的元素,返回一個新的 RDD val rdd4 = sc.parallelize(1 to 10) rdd4.filter(_%2==0).foreach(x => print(s"$x ")) // 8 4 2 6 10 println() // 3. mapValues算子 // 原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素,僅適用于PairRDD val rdd5 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle")) val rdd6 = rdd5.map(x => (x.length, x)) rdd6.foreach(x => print(s"$x ")) // (3,dog) (5,tiger) (4,lion) (3,cat) (7,panther) (5,eagle) println() rdd6.mapValues("x"+_+"x").foreach(x => print(s"$x ")) // (5,xtigerx) (3,xdogx) (3,xcatx) (7,xpantherx) (4,xlionx) (5,xeaglex) println() // 4. distinct算子 // 返回包含源數據集的不同元素的新數據集 val rdd7 = sc.parallelize(List(1, 2, 3, 3, 4, 4, 4, 5)) rdd7.distinct.foreach(x => print(s"$x ")) // 4 3 1 2 5 println() // 5. flatMap算子 // 類似于scala中先進行map操作, 在進行flatten操作 val rdd8 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) rdd8.flatMap(_.split("\\s+")).foreach(x => print(s"$x ")) // hello wrold hadoop spark spark python java hello println() // 6. mapPartitions算子 // 遍歷每一個分區, 對每一個分區進行操作, 返回一個新的RDD // 使用場景:例如創建數據庫連接, mapPartitions是對分區進行操作, 創建連接數量會更少 val rdd9 = sc.parallelize(1 to 10) rdd9.repartition(2).mapPartitions(part => { part.map(_*2) }).foreach(x => print(s"$x ")) // 4 2 8 6 10 12 14 16 20 18 println() // 7. mapPartitionsWithIndex算子 // mapPartitionsWithIndex比mapPartitions多了一個下標索引 val rdd10 = sc.parallelize(1 to 10) rdd10.mapPartitionsWithIndex((index, part) => part.map((index, _))).foreach(x => print(s"$x ")) // (0,1) (0,2) (1,3) (1,4) (1,5) (3,8) (3,9) (3,10) (2,6) (2,7) println() // 8. sample抽樣算子 // 第一個參數是否有放回,第二個參數是抽取的數據量比例, 第三個是隨機種子 val rdd11 = sc.parallelize(1 to 10) rdd11.sample(true, 0.5, 2).foreach(x => print(s"$x ")) // 9 6 println() // 9. union算子 // union 對兩個RDD求并集 val rdd12 = sc.parallelize(1 to 4) rdd12.union(sc.parallelize(3 to 9)).foreach(x => print(s"$x ")) // 2 1 3 4 3 4 5 6 7 8 9 println() // 10. intersection算子 // 對兩個RDD求交集 val rdd13 = sc.parallelize(1 to 5) rdd13.intersection(sc.parallelize(2 to 7)).foreach(x => print(s"$x ")) // 3 2 5 4 println() // 11. groupByKey算子 // 相同的key分到一組 val rdd14 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) rdd14.flatMap(_.split("\\s+")).map((_, 1)).groupByKey().map(x => (x._1, x._2.size)) .foreach(x => print(s"$x ")) // (python,1) (wrold,1) (spark,2) (hadoop,1) (hello,2) (java,1) println() // 12. reduceByKey算子 // 對相同的key通過給定的函數進行聚合 val rdd15 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) rdd15.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey((x, y) => x+y).foreach(x => print(s"$x ")) // (hello,2) (java,1) (python,1) (wrold,1) (spark,2) (hadoop,1) // 這種寫法與上面的效果是一樣的 // rdd15.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_).foreach(x => print(s"$x ")) println() // 13. aggregateByKey算子 // 柯里化函數, 第一個參數列表傳入0值 // 第二個參數列表需要傳入兩個參數, 第一個參數是本地聚合函數, 第二個參數是全局聚合函數 val rdd16 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) rdd16.flatMap(_.split("\\s+")).map((_, 1)).aggregateByKey(0)((x,y)=>x+y, (x,y)=>x+y).foreach(x => print(s"$x ")) // (spark,2) (hadoop,1) (python,1) (wrold,1) (hello,2) (java,1) // 或者下面這種寫法也是同樣效果 // rdd16.flatMap(_.split("\\s+")).map((_,1)).aggregateByKey(0)(_+_,_+_).foreach(x => print(s"$x ")) println() // 14. sortByKey算子 // 對key進行排序, 第一個參數是"是否是正序排列", 第二個參數是分區 // 注意: 只能是分區內有序, 不能全局有序 val rdd17 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) val sorted: RDD[(String, Int)] = rdd17.repartition(1).flatMap(_.split("\\s+")) .map((_,1)).aggregateByKey(0)(_+_,_+_).sortByKey() sorted.foreach(x => print(s"$x ")) // (hadoop,1) (hello,2) (java,1) (python,1) (spark,2) (wrold,1) println() val pairRdd1: RDD[(String, String)] = sc.parallelize(List(("1001","zhangsan"),("1002","lisi"),("1003","wangwu"))) val pairRdd2: RDD[(String, String)] = sc.parallelize(List(("1002","20"),("1003","15"),("1004","18"))) // 15. join算子 // 內連接 pairRdd1.join(pairRdd2).foreach(print) // (1003,(wangwu,15)) (1002,(lisi,20)) println() // 16. cogroup算子 // 相當于外連接, 和join不同的是會將value值分為一組 pairRdd1.cogroup(pairRdd2).foreach(println) // (1004,(CompactBuffer(),CompactBuffer(18))) // (1001,(CompactBuffer(zhangsan),CompactBuffer())) // (1003,(CompactBuffer(wangwu),CompactBuffer(15))) // (1002,(CompactBuffer(lisi),CompactBuffer(20))) // repartition算子 // 重分區,實際上調用了coalesce, 并且默認走shuffle // 我們自己調用coalesce函數的時候, 如果需要將分區由多變少, 可以傳入false,不走shuffle val rdd18 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) println(rdd18.getNumPartitions) // 默認分區為4 val rdd19 = rdd18.repartition(3) println(rdd19.getNumPartitions) // 3 val rdd20:RDD[String] = rdd18.coalesce(3, false) println(rdd20.getNumPartitions) // 3 sc.stop() // 關閉資源 } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看