對于轉換操作,RDD 的所有轉換都不會直接計算結果。Spark 僅記錄作用于RDD 上的轉換操作邏輯,<mark>當遇到動作算子(Action)時才會進行真正的計算</mark>。RDD全部轉換算子如下表。
| Transformation | 描述 |
| --- | --- |
| `map(func)` | 通過函數 func 作用于源 RDD 中的每個元素,返回一個新的 RDD|
| `filter(func)` | 選擇源 RDD 中的使得函數 func 為 true 的元素,返回一個新的 RDD|
| `flatMap(func)` | 與 map 類似,但是每個輸入項可以映射到 0 或多個輸出項(因此 func 應該返回一個 Seq,而不是單個項)。|
| `mapValues(func)`| 原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素,僅適用于PairRDD |
| `mapPartitions(func)` | 與 map 類似,但是在 RDD 的每個分區上單獨運行,所以 func 在類型為 T 的 RDD 上運行時,必須是類型 Iterator`<T> `=> Iterator`<U>`|
| `mapPartitionsWithIndex(func)` | 與 mapPartitions 類似,但為 func 多提供一個分區編號 ,所以 func 類型為:(Int, Iterator`<T>`) => Iterator`<U>`|
| `sample(withReplacement, fraction, seed)` |使用給定的隨機數生成器種子對數據的一部分進行采樣。|
| `union(otherDataset)` | 返回一個新數據集,該數據集包含源數據集中的元素和參數的并集|
| `intersection(otherDataset)` | 返回一個新的 RDD,其中包含源數據集中的元素和參數的交集。|
| `distinct([numPartitions]))` | 返回包含源數據集的不同元素的新數據集。|
| `groupByKey([numPartitions])` | 當調用一個(K, V)對的數據集時,返回一個(K,Iterable<V>)對的數據集。|
| `reduceByKey(func, [numPartitions])` | 對相同的key通過給定的函數進行聚合 |
| `aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])`| seqOp 操作會聚合各分區中的元素,然后 combOp操作把所有分區的聚合結果再次聚合,兩個操作的初始值都是 zeroValue. seqOp 的操作是遍歷分區中的所有元素(T),第一個T跟zeroValue做操作,結果再作為與第二個T做操作的zeroValue,直到遍歷完整個分區。combOp 操作是把各分區聚合的結果,再聚合。|
| `sortByKey([ascending], [numPartitions])`| 根據 key 進行排序,默認為升序。ascending: Boolean = true|
| `join(otherDataset, [numPartitions])`| 當在類型(K, V)和(K, W)的數據集上調用時,返回一個(K, (V,W))對的數據集,其中包含每個鍵的所有對元素。外部連接由 leftOuterJoin、rightOuterJoin和 fullOuterJoin 支持。|
| `cogroup(otherDataset, [numPartitions])`| 當調用類型(K, V)和(K, W)的數據集時,返回一個(K,(Iterable,Iterable))元組的數據集。這個操作也稱為groupWith。|
| `cartesian(otherDataset)` | 在類型為 T 和 U 的數據集上調用時,返回一個(T, U)對(所有對元素)的數據集。|
| `pipe(command, [envVars])` | 通過 shell 命令(例如 Perl 或 bash 腳本)對 RDD 的每個分區進行管道傳輸。將 RDD 元素寫入進程的stdin,并將其輸出到 stdout 的行作為字符串 RDD返回。|
| `coalesce(numPartitions)` | 將 RDD 中的分區數量減少到numpartition。|
| `repartition(numPartitions)` | 隨機地重新 Shuffle RDD 中的數據,以創建更多或更少的分區,并在它們之間進行平衡。|
下面是一些常用轉換算子的示例:
```scala
package spark.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object TransformationOps {
def main(args: Array[String]): Unit = {
val conf:SparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getName)
val sc:SparkContext = SparkContext.getOrCreate(conf)
// 1. map算子
// 通過函數 func 作用于源 RDD 中的每個元素,返回一個新的 RDD
val rdd1 = sc.parallelize(1 to 9)
rdd1.map(_*2).foreach(x => print(s"$x ")) // 14 6 16 8 18 2 10 4 12
println()
// map算子將RDD變成PairRDD
val rdd2 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"))
val rdd3 = rdd2.map(x => (x, 1))
rdd3.foreach(x => print(s"$x ")) // (tiger,1) (lion,1) (panther,1) (eagle,1) (cat,1) (dog,1)
println()
// 2. filter算子
// 選擇源 RDD 中的使得函數 func 為 true 的元素,返回一個新的 RDD
val rdd4 = sc.parallelize(1 to 10)
rdd4.filter(_%2==0).foreach(x => print(s"$x ")) // 8 4 2 6 10
println()
// 3. mapValues算子
// 原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素,僅適用于PairRDD
val rdd5 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"))
val rdd6 = rdd5.map(x => (x.length, x))
rdd6.foreach(x => print(s"$x ")) // (3,dog) (5,tiger) (4,lion) (3,cat) (7,panther) (5,eagle)
println()
rdd6.mapValues("x"+_+"x").foreach(x => print(s"$x "))
// (5,xtigerx) (3,xdogx) (3,xcatx) (7,xpantherx) (4,xlionx) (5,xeaglex)
println()
// 4. distinct算子
// 返回包含源數據集的不同元素的新數據集
val rdd7 = sc.parallelize(List(1, 2, 3, 3, 4, 4, 4, 5))
rdd7.distinct.foreach(x => print(s"$x ")) // 4 3 1 2 5
println()
// 5. flatMap算子
// 類似于scala中先進行map操作, 在進行flatten操作
val rdd8 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello"))
rdd8.flatMap(_.split("\\s+")).foreach(x => print(s"$x "))
// hello wrold hadoop spark spark python java hello
println()
// 6. mapPartitions算子
// 遍歷每一個分區, 對每一個分區進行操作, 返回一個新的RDD
// 使用場景:例如創建數據庫連接, mapPartitions是對分區進行操作, 創建連接數量會更少
val rdd9 = sc.parallelize(1 to 10)
rdd9.repartition(2).mapPartitions(part => {
part.map(_*2)
}).foreach(x => print(s"$x ")) // 4 2 8 6 10 12 14 16 20 18
println()
// 7. mapPartitionsWithIndex算子
// mapPartitionsWithIndex比mapPartitions多了一個下標索引
val rdd10 = sc.parallelize(1 to 10)
rdd10.mapPartitionsWithIndex((index, part) => part.map((index, _))).foreach(x => print(s"$x "))
// (0,1) (0,2) (1,3) (1,4) (1,5) (3,8) (3,9) (3,10) (2,6) (2,7)
println()
// 8. sample抽樣算子
// 第一個參數是否有放回,第二個參數是抽取的數據量比例, 第三個是隨機種子
val rdd11 = sc.parallelize(1 to 10)
rdd11.sample(true, 0.5, 2).foreach(x => print(s"$x "))
// 9 6
println()
// 9. union算子
// union 對兩個RDD求并集
val rdd12 = sc.parallelize(1 to 4)
rdd12.union(sc.parallelize(3 to 9)).foreach(x => print(s"$x "))
// 2 1 3 4 3 4 5 6 7 8 9
println()
// 10. intersection算子
// 對兩個RDD求交集
val rdd13 = sc.parallelize(1 to 5)
rdd13.intersection(sc.parallelize(2 to 7)).foreach(x => print(s"$x "))
// 3 2 5 4
println()
// 11. groupByKey算子
// 相同的key分到一組
val rdd14 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello"))
rdd14.flatMap(_.split("\\s+")).map((_, 1)).groupByKey().map(x => (x._1, x._2.size))
.foreach(x => print(s"$x ")) // (python,1) (wrold,1) (spark,2) (hadoop,1) (hello,2) (java,1)
println()
// 12. reduceByKey算子
// 對相同的key通過給定的函數進行聚合
val rdd15 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello"))
rdd15.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey((x, y) => x+y).foreach(x => print(s"$x "))
// (hello,2) (java,1) (python,1) (wrold,1) (spark,2) (hadoop,1)
// 這種寫法與上面的效果是一樣的
// rdd15.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_).foreach(x => print(s"$x "))
println()
// 13. aggregateByKey算子
// 柯里化函數, 第一個參數列表傳入0值
// 第二個參數列表需要傳入兩個參數, 第一個參數是本地聚合函數, 第二個參數是全局聚合函數
val rdd16 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello"))
rdd16.flatMap(_.split("\\s+")).map((_, 1)).aggregateByKey(0)((x,y)=>x+y, (x,y)=>x+y).foreach(x => print(s"$x "))
// (spark,2) (hadoop,1) (python,1) (wrold,1) (hello,2) (java,1)
// 或者下面這種寫法也是同樣效果
// rdd16.flatMap(_.split("\\s+")).map((_,1)).aggregateByKey(0)(_+_,_+_).foreach(x => print(s"$x "))
println()
// 14. sortByKey算子
// 對key進行排序, 第一個參數是"是否是正序排列", 第二個參數是分區
// 注意: 只能是分區內有序, 不能全局有序
val rdd17 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello"))
val sorted: RDD[(String, Int)] = rdd17.repartition(1).flatMap(_.split("\\s+"))
.map((_,1)).aggregateByKey(0)(_+_,_+_).sortByKey()
sorted.foreach(x => print(s"$x ")) // (hadoop,1) (hello,2) (java,1) (python,1) (spark,2) (wrold,1)
println()
val pairRdd1: RDD[(String, String)] = sc.parallelize(List(("1001","zhangsan"),("1002","lisi"),("1003","wangwu")))
val pairRdd2: RDD[(String, String)] = sc.parallelize(List(("1002","20"),("1003","15"),("1004","18")))
// 15. join算子
// 內連接
pairRdd1.join(pairRdd2).foreach(print) // (1003,(wangwu,15)) (1002,(lisi,20))
println()
// 16. cogroup算子
// 相當于外連接, 和join不同的是會將value值分為一組
pairRdd1.cogroup(pairRdd2).foreach(println)
// (1004,(CompactBuffer(),CompactBuffer(18)))
// (1001,(CompactBuffer(zhangsan),CompactBuffer()))
// (1003,(CompactBuffer(wangwu),CompactBuffer(15)))
// (1002,(CompactBuffer(lisi),CompactBuffer(20)))
// repartition算子
// 重分區,實際上調用了coalesce, 并且默認走shuffle
// 我們自己調用coalesce函數的時候, 如果需要將分區由多變少, 可以傳入false,不走shuffle
val rdd18 = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello"))
println(rdd18.getNumPartitions) // 默認分區為4
val rdd19 = rdd18.repartition(3)
println(rdd19.getNumPartitions) // 3
val rdd20:RDD[String] = rdd18.coalesce(3, false)
println(rdd20.getNumPartitions) // 3
sc.stop() // 關閉資源
}
}
```
- Hadoop
- hadoop是什么?
- Hadoop組成
- hadoop官網
- hadoop安裝
- hadoop配置
- 本地運行模式配置
- 偽分布運行模式配置
- 完全分布運行模式配置
- HDFS分布式文件系統
- HDFS架構
- HDFS設計思想
- HDFS組成架構
- HDFS文件塊大小
- HDFS優缺點
- HDFS Shell操作
- HDFS JavaAPI
- 基本使用
- HDFS的I/O 流操作
- 在SpringBoot項目中的API
- HDFS讀寫流程
- HDFS寫流程
- HDFS讀流程
- NN和SNN關系
- NN和SNN工作機制
- Fsimage和 Edits解析
- checkpoint時間設置
- NameNode故障處理
- 集群安全模式
- DataNode工作機制
- 支持的文件格式
- MapReduce分布式計算模型
- MapReduce是什么?
- MapReduce設計思想
- MapReduce優缺點
- MapReduce基本使用
- MapReduce編程規范
- WordCount案例
- MapReduce任務進程
- Hadoop序列化對象
- 為什么要序列化
- 常用數據序列化類型
- 自定義序列化對象
- MapReduce框架原理
- MapReduce工作流程
- MapReduce核心類
- MapTask工作機制
- Shuffle機制
- Partition分區
- Combiner合并
- ReduceTask工作機制
- OutputFormat
- 使用MapReduce實現SQL Join操作
- Reduce join
- Reduce join 代碼實現
- Map join
- Map join 案例實操
- MapReduce 開發總結
- Hadoop 優化
- MapReduce 優化需要考慮的點
- MapReduce 優化方法
- 分布式資源調度框架 Yarn
- Yarn 基本架構
- ResourceManager(RM)
- NodeManager(NM)
- ApplicationMaster
- Container
- 作業提交全過程
- JobHistoryServer 使用
- 資源調度器
- 先進先出調度器(FIFO)
- 容量調度器(Capacity Scheduler)
- 公平調度器(Fair Scheduler)
- Yarn 常用命令
- Zookeeper
- zookeeper是什么?
- zookeeper完全分布式搭建
- Zookeeper特點
- Zookeeper數據結構
- Zookeeper 內部原理
- 選舉機制
- stat 信息中字段解釋
- 選擇機制中的概念
- 選舉消息內容
- 監聽器原理
- Hadoop 高可用集群搭建
- Zookeeper 應用
- Zookeeper Shell操作
- Zookeeper Java應用
- Hive
- Hive是什么?
- Hive的優缺點
- Hive架構
- Hive元數據存儲模式
- 內嵌模式
- 本地模式
- 遠程模式
- Hive環境搭建
- 偽分布式環境搭建
- Hive命令工具
- 命令行模式
- 交互模式
- Hive數據類型
- Hive數據結構
- 參數配置方式
- Hive數據庫
- 數據庫存儲位置
- 數據庫操作
- 表的創建
- 建表基本語法
- 內部表
- 外部表
- 臨時表
- 建表高階語句
- 表的刪除與修改
- 分區表
- 靜態分區
- 動態分區
- 分桶表
- 創建分桶表
- 分桶抽樣
- Hive視圖
- 視圖的創建
- 側視圖Lateral View
- Hive數據導入導出
- 導入數據
- 導出數據
- 查詢表數據量
- Hive事務
- 事務是什么?
- Hive事務的局限性和特點
- Hive事務的開啟和設置
- Hive PLSQL
- Hive高階查詢
- 查詢基本語法
- 基本查詢
- distinct去重
- where語句
- 列正則表達式
- 虛擬列
- CTE查詢
- 嵌套查詢
- join語句
- 內連接
- 左連接
- 右連接
- 全連接
- 多表連接
- 笛卡爾積
- left semi join
- group by分組
- having刷選
- union與union all
- 排序
- order by
- sort by
- distribute by
- cluster by
- 聚合運算
- 基本聚合
- 高級聚合
- 窗口函數
- 序列窗口函數
- 聚合窗口函數
- 分析窗口函數
- 窗口函數練習
- 窗口子句
- Hive函數
- Hive函數分類
- 字符串函數
- 類型轉換函數
- 數學函數
- 日期函數
- 集合函數
- 條件函數
- 聚合函數
- 表生成函數
- 自定義Hive函數
- 自定義函數分類
- 自定義Hive函數流程
- 添加JAR包的方式
- 自定義臨時函數
- 自定義永久函數
- Hive優化
- Hive性能調優工具
- EXPLAIN
- ANALYZE
- Fetch抓取
- 本地模式
- 表的優化
- 小表 join 大表
- 大表 join 大表
- 開啟Map Join
- group by
- count(distinct)
- 笛卡爾積
- 行列過濾
- 動態分區調整
- 分區分桶表
- 數據傾斜
- 數據傾斜原因
- 調整Map數
- 調整Reduce數
- 產生數據傾斜的場景
- 并行執行
- 嚴格模式
- JVM重用
- 推測執行
- 啟用CBO
- 啟動矢量化
- 使用Tez引擎
- 壓縮算法和文件格式
- 文件格式
- 壓縮算法
- Zeppelin
- Zeppelin是什么?
- Zeppelin安裝
- 配置Hive解釋器
- Hbase
- Hbase是什么?
- Hbase環境搭建
- Hbase分布式環境搭建
- Hbase偽分布式環境搭建
- Hbase架構
- Hbase架構組件
- Hbase數據存儲結構
- Hbase原理
- Hbase Shell
- 基本操作
- 表操作
- namespace
- Hbase Java Api
- Phoenix集成Hbase
- Phoenix是什么?
- 安裝Phoenix
- Phoenix數據類型
- Phoenix Shell
- HBase與Hive集成
- HBase與Hive的對比
- HBase與Hive集成使用
- Hbase與Hive集成原理
- HBase優化
- RowKey設計
- 內存優化
- 基礎優化
- Hbase管理
- 權限管理
- Region管理
- Region的自動拆分
- Region的預拆分
- 到底采用哪種拆分策略?
- Region的合并
- HFile的合并
- 為什么要有HFile的合并
- HFile合并方式
- Compaction執行時間
- Compaction相關控制參數
- 演示示例
- Sqoop
- Sqoop是什么?
- Sqoop環境搭建
- RDBMS導入到HDFS
- RDBMS導入到Hive
- RDBMS導入到Hbase
- HDFS導出到RDBMS
- 使用sqoop腳本
- Sqoop常用命令
- Hadoop數據模型
- TextFile
- SequenceFile
- Avro
- Parquet
- RC&ORC
- 文件存儲格式比較
- Spark
- Spark是什么?
- Spark優勢
- Spark與MapReduce比較
- Spark技術棧
- Spark安裝
- Spark Shell
- Spark架構
- Spark編程入口
- 編程入口API
- SparkContext
- SparkSession
- Spark的maven依賴
- Spark RDD編程
- Spark核心數據結構-RDD
- RDD 概念
- RDD 特性
- RDD編程
- RDD編程流程
- pom依賴
- 創建算子
- 轉換算子
- 動作算子
- 持久化算子
- RDD 與閉包
- csv/json數據源
- Spark分布式計算原理
- RDD依賴
- RDD轉換
- RDD依賴
- DAG工作原理
- Spark Shuffle原理
- Shuffle的作用
- ShuffleManager組件
- Shuffle實踐
- RDD持久化
- 緩存機制
- 檢查點
- 檢查點與緩存的區別
- RDD共享變量
- 廣播變量
- 累計器
- RDD分區設計
- 數據傾斜
- 數據傾斜的根本原因
- 定位導致的數據傾斜
- 常見數據傾斜解決方案
- Spark SQL
- SQL on Hadoop
- Spark SQL是什么
- Spark SQL特點
- Spark SQL架構
- Spark SQL運行原理
- Spark SQL編程
- Spark SQL編程入口
- 創建Dataset
- Dataset是什么
- SparkSession創建Dataset
- 樣例類創建Dataset
- 創建DataFrame
- DataFrame是什么
- 結構化數據文件創建DataFrame
- RDD創建DataFrame
- Hive表創建DataFrame
- JDBC創建DataFrame
- SparkSession創建
- RDD、DataFrame、Dataset
- 三者對比
- 三者相互轉換
- RDD轉換為DataFrame
- DataFrame轉換為RDD
- DataFrame API
- DataFrame API分類
- Action 操作
- 基礎 Dataset 函數
- 強類型轉換
- 弱類型轉換
- Spark SQL外部數據源
- Parquet文件
- Hive表
- RDBMS表
- JSON/CSV
- Spark SQL函數
- Spark SQL內置函數
- 自定SparkSQL函數
- Spark SQL CLI
- Spark SQL性能優化
- Spark GraphX圖形數據分析
- 為什么需要圖計算
- 圖的概念
- 圖的術語
- 圖的經典表示法
- Spark Graphix簡介
- Graphx核心抽象
- Graphx Scala API
- 核心組件
- 屬性圖應用示例1
- 屬性圖應用示例2
- 查看圖信息
- 圖的算子
- 連通分量
- PageRank算法
- Pregel分布式計算框架
- Flume日志收集
- Flume是什么?
- Flume官方文檔
- Flume架構
- Flume安裝
- Flume使用過程
- Flume組件
- Flume工作流程
- Flume事務
- Source、Channel、Sink文檔
- Source文檔
- Channel文檔
- Sink文檔
- Flume攔截器
- Flume攔截器概念
- 配置攔截器
- 自定義攔截器
- Flume可靠性保證
- 故障轉移
- 負載均衡
- 多層代理
- 多路復用
- Kafka
- 消息中間件MQ
- Kafka是什么?
- Kafka安裝
- Kafka本地單機部署
- Kafka基本命令使用
- Topic的生產與消費
- 基本命令
- 查看kafka目錄
- Kafka架構
- Kafka Topic
- Kafka Producer
- Kafka Consumer
- Kafka Partition
- Kafka Message
- Kafka Broker
- 存儲策略
- ZooKeeper在Kafka中的作用
- 副本同步
- 容災
- 高吞吐
- Leader均衡機制
- Kafka Scala API
- Producer API
- Consumer API
- Kafka優化
- 消費者參數優化
- 生產者參數優化
- Spark Streaming
- 什么是流?
- 批處理和流處理
- Spark Streaming簡介
- 流數據處理架構
- 內部工作流程
- StreamingContext組件
- SparkStreaming的編程入口
- WordCount案例
- DStream
- DStream是什么?
- Input DStream與Receivers接收器
- DStream API
- 轉換操作
- 輸出操作
- 數據源
- 數據源分類
- Socket數據源
- 統計HDFS文件的詞頻
- 處理狀態數據
- SparkStreaming整合SparkSQL
- SparkStreaming整合Flume
- SparkStreaming整合Kafka
- 自定義數據源
- Spark Streaming優化策略
- 優化運行時間
- 優化內存使用
- 數據倉庫
- 數據倉庫是什么?
- 數據倉庫的意義
- 數據倉庫和數據庫的區別
- OLTP和OLAP的區別
- OLTP的特點
- OLAP的特點
- OLTP與OLAP對比
- 數據倉庫架構
- Inmon架構
- Kimball架構
- 混合型架構
- 數據倉庫的解決方案
- 數據ETL
- 數據倉庫建模流程
- 維度模型
- 星型模式
- 雪花模型
- 星座模型
- 數據ETL處理
- 數倉分層術語
- 數據抽取方式
- CDC抽取方案
- 數據轉換
- 常見的ETL工具