## 一、常用的轉換算子
需要操作的Transformation算子說明如下:
### 1.1. map
map(func)返回一個新的分布式數據集,由每個原元素經過func函數轉換后組成
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps1(sc)
sc.stop()
}
/**
* 1、map:將集合中每個元素乘以7
* map(func):返回一個新的分布式數據集,由每個原元素經過func函數轉換后組成
*/
def transformationOps1(sc:SparkContext): Unit = {
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD = sc.parallelize(list)
val retRDD = listRDD.map(num => num * 7)
retRDD.foreach(num => println(num))
}
}
```
輸出結果
```
42
7
49
14
56
21
63
28
70
35
```
### 1.2. filter
filter(func)返回一個新的數據集,由經過func函數后返回值為true的原元素組成
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps2(sc)
sc.stop()
}
/**
* 2、filter:過濾出集合中的奇數
* filter(func): 返回一個新的數據集,由經過func函數后返回值為true的原元素組成
*
* 一般在filter操作之后都要做重新分區(因為可能數據量減少了很多)
*/
def transformationOps2(sc:SparkContext): Unit = {
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD = sc.parallelize(list)
val retRDD = listRDD.filter(num => num % 2 == 0)
retRDD.foreach(println)
}
}
```
輸出結果
```
6
2
8
4
10
```
### 1.3.flatMap
flatMap(func)類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素)
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps3(sc)
sc.stop()
}
/**
* 3、flatMap:將行拆分為單詞
* flatMap(func):類似于map,但是每一個輸入元素,
* 會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素)
*/
def transformationOps3(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
wordsRDD.foreach(println)
}
}
```
輸出結果
```
hello
hello
he
you
hello
mes
```
### 1.4. sample
sample(withReplacement, frac, seed)根據給定的隨機種子seed,隨機抽樣出數量為frac的數據.
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps4(sc)
sc.stop()
}
/**
* 4、sample:根據給定的隨機種子seed,隨機抽樣出數量為frac的數據
* sample(withReplacement, frac, seed): 根據給定的隨機種子seed,隨機抽樣出數量為frac的數據
* 抽樣的目的:就是以樣本評估整體
* withReplacement:
* true:有放回的抽樣
* false:無放回的抽樣
* frac:就是樣本空間的大小,以百分比小數的形式出現,比如20%,就是0.2
*
* 使用sample算子計算出來的結果可能不是很準確,1000個數,20%,樣本數量在200個左右,不一定為200
*
* 一般情況下,使用sample算子在做spark優化(數據傾斜)的方面應用最廣泛
*/
def transformationOps4(sc:SparkContext): Unit = {
val list = 1 to 1000
val listRDD = sc.parallelize(list)
val sampleRDD = listRDD.sample(false, 0.2)
sampleRDD.foreach(num => print(num + " "))
println
println("sampleRDD count: " + sampleRDD.count())
println("Another sampleRDD count: " + sc.parallelize(list).sample(false, 0.2).count())
}
}
```
輸出結果
```
sampleRDD count: 219
Another sampleRDD count: 203
```
### 1.5.union
union(otherDataset)返回一個新的數據集,由原數據集和參數聯合而成
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps5(sc)
sc.stop()
}
/**
* 5、union:返回一個新的數據集,由原數據集和參數聯合而成
* union(otherDataset): 返回一個新的數據集,由原數據集和參數聯合而成
* 類似數學中的并集,就是sql中的union操作,將兩個集合的所有元素整合在一塊,包括重復元素
*/
def transformationOps5(sc:SparkContext): Unit = {
val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val list2 = List(7, 8, 9, 10, 11, 12)
val listRDD1 = sc.parallelize(list1)
val listRDD2 = sc.parallelize(list2)
val unionRDD = listRDD1.union(listRDD2)
unionRDD.foreach(println)
}
}
```
輸出結果
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps5(sc)
sc.stop()
}
/**
* 5、union:返回一個新的數據集,由原數據集和參數聯合而成
* union(otherDataset): 返回一個新的數據集,由原數據集和參數聯合而成
* 類似數學中的并集,就是sql中的union操作,將兩個集合的所有元素整合在一塊,包括重復元素
*/
def transformationOps5(sc:SparkContext): Unit = {
val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val list2 = List(7, 8, 9, 10, 11, 12)
val listRDD1 = sc.parallelize(list1)
val listRDD2 = sc.parallelize(list2)
val unionRDD = listRDD1.union(listRDD2)
unionRDD.foreach(println)
}
}
```
輸出結果
```
1
6
2
7
3
8
4
9
5
10
7
8
9
10
11
12
```
### 1.6. groupByKey
groupByKey([numTasks])在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意:默認情況下,使用8個并行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps6(sc)
sc.stop()
}
/**
* 6、groupByKey:對數組進行 group by key操作
* groupByKey([numTasks]): 在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。
* 注意:默認情況下,使用8個并行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task
* mr中:
* <k1, v1>--->map操作---><k2, v2>--->shuffle---><k2, [v21, v22, v23...]>---><k3, v3>
* groupByKey類似于shuffle操作
*
* 和reduceByKey有點類似,但是有區別,reduceByKey有本地的規約,而groupByKey沒有本地規約,所以一般情況下,
* 盡量慎用groupByKey,如果一定要用的話,可以自定義一個groupByKey,在自定義的gbk中添加本地預聚合操作
*/
def transformationOps6(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
pairsRDD.foreach(println)
val gbkRDD:RDD[(String, Iterable[Int])] = pairsRDD.groupByKey()
println("=============================================")
gbkRDD.foreach(t => println(t._1 + "..." + t._2))
}
}
```
輸出結果
```
(hello,1)
(hello,1)
(you,1)
(he,1)
(hello,1)
(me,1)
=============================================
you...CompactBuffer(1)
hello...CompactBuffer(1, 1, 1)
he...CompactBuffer(1)
me...CompactBuffer(1)
```
### 1.7. reduceByKey
reduceByKey(func, [numTasks])在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps7(sc)
sc.stop()
}
/**
* 7、reduceByKey:統計每個班級的人數
* reduceByKey(func, [numTasks]): 在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,
* key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。
*
* 需要注意的是還有一個reduce的操作,其為action算子,并且其返回的結果只有一個,而不是一個數據集
* 而reduceByKey是一個transformation算子,其返回的結果是一個數據集
*/
def transformationOps7(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
val retRDD:RDD[(String, Int)] = pairsRDD.reduceByKey((v1, v2) => v1 + v2)
retRDD.foreach(t => println(t._1 + "..." + t._2))
}
}
```
輸出結果如下:
```
you...1
hello...3
he...1
me...1
```
### 1.8. join雙流融合
join(otherDataset, [numTasks])在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集
```
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps8(sc)
sc.stop()
}
/**
* 8、join:打印關聯的組合信息
* join(otherDataset, [numTasks]): 在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集
* 學生基礎信息表和學生考試成績表
* stu_info(sid ,name, birthday, class)
* stu_score(sid, chinese, english, math)
*
* * Serialization stack:
- object not serializable
這種分布式計算的過程,一個非常重要的點,傳遞的數據必須要序列化
通過代碼測試,該join是等值連接(inner join)
A.leftOuterJoin(B)
A表所有的數據都包涵,B表中在A表沒有關聯的數據,顯示為null
之后執行一次filter就是join的結果
*/
def transformationOps8(sc: SparkContext): Unit = {
val infoList = List(
"1,鐘 瀟,1988-02-04,bigdata",
"2,劉向前,1989-03-24,linux",
"3,包維寧,1984-06-16,oracle")
val scoreList = List(
"1,50,21,61",
"2,60,60,61",
"3,62,90,81",
"4,72,80,81"
)
val infoRDD:RDD[String] = sc.parallelize(infoList)
val scoreRDD:RDD[String] = sc.parallelize(scoreList)
val infoPairRDD:RDD[(String, Student)] = infoRDD.map(line => {
val fields = line.split(",")
val student = new Student(fields(0), fields(1), fields(2), fields(3))
(fields(0), student)
})
val scorePairRDD:RDD[(String, Score)] = scoreRDD.map(line => {
val fields = line.split(",")
val score = new Score(fields(0), fields(1).toFloat, fields(2).toFloat, fields(3).toFloat)
(fields(0), score)
})
val joinedRDD:RDD[(String, (Student, Score))] = infoPairRDD.join(scorePairRDD)
joinedRDD.foreach(t => {
val sid = t._1
val student = t._2._1
val score = t._2._2
println(sid + "\t" + student + "\t" + score)
})
println("=========================================")
val leftOuterRDD:RDD[(String, (Score, Option[Student]))] = scorePairRDD.leftOuterJoin(infoPairRDD)
leftOuterRDD.foreach(println)
}
}
```
輸出結果如下:
```
3 3 包維寧 1984-06-16 oracle 3 62.0 90.0 81.0
2 2 劉向前 1989-03-24 linux 2 60.0 60.0 61.0
1 1 鐘 瀟 1988-02-04 bigdata 1 50.0 21.0 61.0
=========================================
(4,(4 72.0 80.0 81.0,None))
(3,(3 62.0 90.0 81.0,Some(3 包維寧 1984-06-16 oracle)))
(2,(2 60.0 60.0 61.0,Some(2 劉向前 1989-03-24 linux)))
(1,(1 50.0 21.0 61.0,Some(1 鐘 瀟 1988-02-04 bigdata)))
```
### 1.9.sortByKey
測試代碼如下:
~~~
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps7(sc)
sc.stop()
}
/**
* sortByKey:將學生身高進行(降序)排序
* 身高相等,按照年齡排(升序)
*/
def transformationOps9(sc: SparkContext): Unit = {
val list = List(
"1,李 磊,22,175",
"2,劉銀鵬,23,175",
"3,齊彥鵬,22,180",
"4,楊 柳,22,168",
"5,敦 鵬,20,175"
)
val listRDD:RDD[String] = sc.parallelize(list)
/* // 使用sortBy操作完成排序
val retRDD:RDD[String] = listRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] {
override def compare(x: String, y: String): Int = {
val xFields = x.split(",")
val yFields = y.split(",")
val xHgiht = xFields(3).toFloat
val yHgiht = yFields(3).toFloat
val xAge = xFields(2).toFloat
val yAge = yFields(2).toFloat
var ret = yHgiht.compareTo(xHgiht)
if (ret == 0) {
ret = xAge.compareTo(yAge)
}
ret
}
} ,ClassTag.Object.asInstanceOf[ClassTag[String]])
*/
// 使用sortByKey完成操作,只做身高降序排序
val heightRDD:RDD[(String, String)] = listRDD.map(line => {
val fields = line.split(",")
(fields(3), line)
})
val retRDD:RDD[(String, String)] = heightRDD.sortByKey(ascending = false, numPartitions = 1) // 需要設置1個分區,否則只是各分區內有序
retRDD.foreach(println)
// 使用sortByKey如何實現sortBy的二次排序?將上面的信息寫成一個java對象,然后重寫compareTo方法,在做map時,key就為該對象本身,而value可以為null
}
}
~~~
輸出結果如下:
~~~
(180,3,齊彥鵬,22,180)
(175,1,李 磊,22,175)
(175,2,劉銀鵬,23,175)
(175,5,敦 鵬,20,175)
(168,4,楊 柳,22,168)
~~~
下面是一個快速入門的 demo:
~~~
scala> val rdd = sc.parallelize(Seq((1,"one"),(2,"two"),(3,"three")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[10] at parallelize at <console>:21
scala> rdd.sortByKey(true, 1).foreach(println)
(1,one)
(2,two)
(3,three)
~~~
### 1.10.combineByKey 與 aggregateByKey
下面的代碼分別使用 combineByKey 和 aggregateByKey 來模擬 groupByKey 和 reduceBykey,所以是有 4 個操作,只要把 combineByKey 模擬 groupByKey 的例子掌握了,其它三個相對就容易許多了。
~~~
/**
* spark的transformation操作:
* aggregateByKey
* combineByKey
*
* 使用combineByKey和aggregateByKey模擬groupByKey和reduceByKey
*
* 通過查看源碼,我們發現aggregateByKey底層,還是combineByKey
*
* 問題:combineByKey和aggregateByKey的區別?
* aggregateByKey是柯里化形式的,目前底層源碼還沒時間去分析,所知道的區別是這個
*/
object _03SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_03SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
// combineByKey2GroupByKey(sc)
// combineByKey2ReduceByKey(sc)
// aggregateByKey2ReduceByKey(sc)
aggregateByKey2GroupByKey(sc)
sc.stop()
}
/**
* 使用aggregateByKey模擬groupByKey
*/
def aggregateByKey2GroupByKey(sc: SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
val retRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.aggregateByKey(ArrayBuffer[Int]()) ( // 這里需要指定value的類型為ArrayBuffer[Int]()
(part, num) => {
part.append(num)
part
},
(part1, part2) => {
part1.++=(part2)
part1
}
)
retRDD.foreach(println)
}
/**
* 使用aggregateByKey模擬reduceByKey
* def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
(zeroValue: U)就對應的是combineByKey中的第一個函數的返回值
seqOp 就對應的是combineByKey中的第二個函數,也就是mergeValue
combOp 就對應的是combineByKey中的第三個函數,也就是mergeCombiners
*/
def aggregateByKey2ReduceByKey(sc:SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
val retRDD:RDD[(String, Int)] = pairsRDD.aggregateByKey(0) (
(partNum, num) => partNum + num, // 也就是mergeValue
(partNum1, partNum2) => partNum1 + partNum2 // 也就是mergeCombiners
)
retRDD.foreach(println)
}
/**
* 使用reduceByKey模擬groupByKey
*/
def combineByKey2ReduceByKey(sc:SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
/**
* 對于createCombiner1 mergeValue1 mergeCombiners1
* 代碼的參數已經體現得很清楚了,其實只要理解了combineByKey模擬groupByKey的例子,這個就非常容易了
*/
var retRDD:RDD[(String, Int)] = pairsRDD.combineByKey(createCombiner1, mergeValue1, mergeCombiners1)
retRDD.foreach(println)
}
/**
* reduceByKey操作,value就是該數值本身,則上面的數據會產生:
* (hello, 1) (bo, 1) (bo, 1)
* (zhou, 1) (xin, 1) (xin, 1)
* (hello, 1) (song, 1) (bo, 1)
* 注意有別于groupByKey的操作,它是創建一個容器
*/
def createCombiner1(num:Int):Int = {
num
}
/**
* 同一partition內,對于有相同key的,這里的mergeValue直接將其value相加
* 注意有別于groupByKey的操作,它是添加到value到一個容器中
*/
def mergeValue1(localNum1:Int, localNum2:Int): Int = {
localNum1 + localNum2
}
/**
* 將兩個不同partition中的key相同的value值相加起來
* 注意有別于groupByKey的操作,它是合并兩個容器
*/
def mergeCombiners1(thisPartitionNum1:Int, anotherPartitionNum2:Int):Int = {
thisPartitionNum1 + anotherPartitionNum2
}
/**
* 使用combineByKey模擬groupByKey
*/
def combineByKey2GroupByKey(sc:SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
// 輸出每個partition中的map對
pairsRDD.foreachPartition( partition => {
println("<=========partition-start=========>")
partition.foreach(println)
println("<=========partition-end=========>")
})
val gbkRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)
gbkRDD.foreach(println)
// 如果要測試最后groupByKey的結果是在幾個分區,可以使用下面的代碼進行測試
/*gbkRDD.foreachPartition(partition => {
println("~~~~~~~~~~~~~~~~~~~~~~~~~~~")
partition.foreach(println)
})*/
}
/**
* 初始化,將value轉變成為標準的格式數據
* 是在每個分區中進行的操作,去重后的key有幾個,就調用次,
* 因為對于每個key,其容器創建一次就ok了,之后有key相同的,只需要執行mergeValue到已經創建的容器中即可
*/
def createCombiner(num:Int):ArrayBuffer[Int] = {
println("----------createCombiner----------")
ArrayBuffer[Int](num)
}
/**
* 將key相同的value,添加到createCombiner函數創建的ArrayBuffer容器中
* 一個分區內的聚合操作,將一個分區內key相同的數據,合并
*/
def mergeValue(ab:ArrayBuffer[Int], num:Int):ArrayBuffer[Int] = {
println("----------mergeValue----------")
ab.append(num)
ab
}
/**
* 將key相同的多個value數組,進行整合
* 分區間的合并操作
*/
def mergeCombiners(ab1:ArrayBuffer[Int], ab2:ArrayBuffer[Int]):ArrayBuffer[Int] = {
println("----------mergeCombiners----------")
ab1 ++= ab2
ab1
}
}
~~~
輸出結果如下:
~~~
/*
combineByKey模擬groupByKey的一個輸出效果,可以很好地說明createCombiner、mergeValue和mergeCombiners各個階段的執行時機:
<=========partition-start=========>
<=========partition-start=========>
(hello,1)
(zhou,1)
(bo,1)
(xin,1)
(bo,1)
(xin,1)
<=========partition-end=========>
(hello,1)
(song,1)
(bo,1)
<=========partition-end=========>
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeValue----------
----------mergeValue----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeCombiners----------
----------mergeCombiners----------
(song,ArrayBuffer(1))
(hello,ArrayBuffer(1, 1))
(bo,ArrayBuffer(1, 1, 1))
(zhou,ArrayBuffer(1))
(xin,ArrayBuffer(1, 1))
*/
~~~
## 二、Transformations
下面的表格列了 Spark 支持的一些常用 transformations。詳細內容請參閱 RDD API 文檔([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html), [Python](https://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html)) 和 PairRDDFunctions 文檔([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。
**Transformation 轉換算子**:
| 轉換算子 | 含義 |
| ---------------------------------------------------- | ------------------------------------------------------------ |
| **map**(func) | 返回一個新的 RDD,該 RDD 由每一個輸入元素經過 func 函數轉換后組成 |
| **filter**(func) | 返回一個新的 RDD,該 RDD 由經過 func 函數計算后返回值為 true 的輸入元素組成 |
| **flatMap**(func) | 類似于 map,但是每一個輸入元素可以被映射為 0 或多個輸出元素 (所以 func 應該返回一個序列,而不是單一元素) |
| **mapPartitions**(func) | 類似于 map,但獨立地在 RDD 的每一個分片上運行,因此在類型為 T 的 RDD 上運行時,func 的函數類型必須是 Iterator[T] => Iterator[U] |
| **mapPartitionsWithIndex**(func) | 類似于 mapPartitions,但 func 帶有一個整數參數表示分片的索引值,因此在類型為 T 的 RDD 上運行時,func 的函數類型必須是 (Int, Interator[T]) => Iterator[U] |
| sample(withReplacement, fraction, seed) | 根據 fraction 指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed 用于指定隨機數生成器種子 |
| **union**(otherDataset) | 對源 RDD 和參數 RDD 求并集后返回一個新的 RDD |
| intersection(otherDataset) | 對源 RDD 和參數 RDD 求交集后返回一個新的 RDD |
| **distinct**([numTasks])) | 對源 RDD 進行去重后返回一個新的 RDD |
| **groupByKey**([numTasks]) | 在一個 (K,V) 的 RDD 上調用,返回一個 (K, Iterator[V]) 的 RDD |
| **reduceByKey**(func, [numTasks]) | 在一個 (K,V) 的 RDD 上調用,返回一個 (K,V) 的 RDD,使用指定的 reduce 函數,將相同 key 的值聚合到一起,與 groupByKey 類似,reduce 任務的個數可以通過第二個可選的參數來設置 |
| aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 對 PairRDD 中相同的 Key 值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值。和 aggregate 函數類似,aggregateByKey 返回值的類型不需要和 RDD 中 value 的類型一致 |
| **sortByKey**([ascending], [numTasks]) | 在一個 (K,V) 的 RDD 上調用,K 必須實現 Ordered 接口,返回一個按照 key 進行排序的 (K,V) 的 RDD |
| sortBy(func,[ascending], [numTasks]) | 與 sortByKey 類似,但是更靈活 |
| **join**(otherDataset, [numTasks]) | 在類型為 (K,V) 和(K,W)的 RDD 上調用,返回一個相同 key 對應的所有元素對在一起的 (K,(V,W)) 的 RDD |
| cogroup(otherDataset, [numTasks]) | 在類型為 (K,V) 和(K,W)的 RDD 上調用,返回一個 (K,(Iterable,Iterable)) 類型的 RDD |
| cartesian(otherDataset) | 笛卡爾積 |
| pipe(command, [envVars]) | 對 rdd 進行管道操作 |
| **coalesce**(numPartitions) | 減少 RDD 的分區數到指定值。在過濾大量數據之后,可以執行此操作 |
| **repartition**(numPartitions) | 重新給 RDD 分區 |
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- RDD編程基礎
- 基礎介紹
- 外部數據集
- RDD 操作
- 轉換Transformations
- map與flatMap解析
- 動作Actions
- RDD持久化
- RDD容錯機制
- 傳遞函數到 Spark
- 使用鍵值對
- RDD依賴關系與DAG
- 共享變量
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 數據源例子
- join操作
- 聚合操作
- 性能調優
- 其他
- Spark SQL數據類型
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 部署
- 頂點和邊RDDs
- 圖算法
- 例子
- 更多文檔
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置
- RDD 持久化