********前面 Transformationt 算子的測試都是在本地開發環境中直接跑代碼,這里 Actions 算子的測試主要在 spark-shell 中進行操作。需要說明的 Actions 算子如下:
#### 下面來具體說明:
**(1)reduce**
通過函數 func 聚集數據集中的所有元素。Func 函數接受 2 個參數,返回一個值。這個函數必須是關聯性的,確保可以被正確的并發執行。
關于 reduce 的執行過程,可以對比 scala 中類似的 reduce 函數。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29
scala> val ret = listRDD.reduce((v1, v2) => v1 + v2)
...
ret: Int = 21
~~~
注意:需要注意的是,不同于 Transformation 算子,其結果仍然是 RDD,但是`執行Actions算子之后,其結果不再是RDD,而是一個標量。`
**(2)collect**
在 Driver 的程序中,以數組的形式,返回數據集的所有元素。**這通常會在使用 filter 或者其它操作后**,返回一個足夠小的數據子集再使用,直接將整個 RDD 集 Collect 返回,很可能會讓 Driver 程序 OOM,這點尤其需要注意。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29
scala> val ret = listRDD.collect()
...
ret: Array[Int] = Array(1, 2, 3, 4, 5, 6)
~~~
**(3)count**
返回數據集的元素個數
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:29
scala> val ret = listRDD.count()
...
ret: Long = 6
~~~
**(4)take**
返回一個數組,由數據集的前 n 個元素組成。注意,這個操作目前并非在多個節點上,并行執行,而是 Driver 程序所在機器,單機計算所有的元素 (Gateway 的內存壓力會增大,需要謹慎使用)。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:29
scala> listRDD.take(3)
...
res7: Array[Int] = Array(1, 2, 3)
~~~
**(5)first**
返回數據集的第一個元素(類似于 take(1))
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:29
scala> listRDD.first()
...
res8: Int = 1
~~~
**(6)saveAsTextFile**
將數據集的元素,以 textfile 的形式,保存到本地文件系統,hdfs 或者任何其它 hadoop 支持的文件系統。Spark 將會調用每個元素的 toString 方法,并將它轉換為文件中的`一行文本`。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:29
scala> listRDD.saveAsTextFile("file:///home/uplooking/data/spark/action")
...
~~~
可以在文件系統中查看到保存的文件:
~~~
[root@WGH action]$ pwd
/home/uplooking/data/spark/action
[root@WGH action]$ ls
part-00000 part-00001 part-00002 part-00003 _SUCCESS
~~~
其實可以看到,保存的跟 Hadoop 的格式是一樣的。
當然因為我的 spark 集群中已經做了跟 hadoop 相關的配置,所以也可以把文件保存到 hdfs 中:
~~~
scala> listRDD.saveAsTextFile("hdfs://ns1/output/spark/action")
...
~~~
然后就可以在 hdfs 中查看到保存的文件:
~~~
[root@WGH action]$ hdfs dfs -ls /output/spark/action
18/04/27 10:27:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r-- 3 root supergroup 0 2018-04-27 10:25 /output/spark/action/_SUCCESS
-rw-r--r-- 3 root supergroup 2 2018-04-27 10:25 /output/spark/action/part-00000
-rw-r--r-- 3 root supergroup 4 2018-04-27 10:25 /output/spark/action/part-00001
-rw-r--r-- 3 root supergroup 2 2018-04-27 10:25 /output/spark/action/part-00002
-rw-r--r-- 3 root supergroup 4 2018-04-27 10:25 /output/spark/action/part-00003
~~~
可以看到,保存的格式跟保存到本地文件系統是一樣的。
**(7)foreach**
在數據集的每一個元素上,運行函數 func。這通常用于更新一個累加器變量,或者和外部存儲系統做交互。
~~~
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:29
scala> listRDD.foreach(println)
...
~~~
**(8)saveAsNewAPIHadoopFile**
也就是將數據保存到 Hadoop HDFS 中,但是需要注意的是,前面使用 saveAsTextFile 也可以進行相關操作,其使用的就是 saveAsNewAPIHadoopFile 或者 saveAsHadoopFile 這兩個 API,而其兩者的區別是:
saveAsHadoopFile 的 OutputFormat 使用的:org.apache.hadoop.mapred 中的早期的類
saveAsNewAPIHadoopFile 的 OutputFormat 使用的:org.apache.hadoop.mapreduce 中的新的類。但不管使用哪一個,都是可以完成工作的。
測試代碼如下:
~~~
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark算子操作之Action
* saveAsNewAPIHAdoopFile
* * saveAsHadoopFile
* 和saveAsNewAPIHadoopFile的唯一區別就在于OutputFormat的不同
* saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的類
* saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的類
* 使用哪一個都可以完成工作
*
* 前面在使用saveAsTextFile時也可以保存到hadoop文件系統中,注意其源代碼也是使用上面的操作的
*
* Caused by: java.net.UnknownHostException: ns1
... 35 more
找不到ns1,因為我們在本地沒有配置,無法正常解析,就需要將hadoop的配置文件信息給我們加載進來
hdfs-site.xml.heihei,core-site.xml.heihei
*/
object _05SparkActionOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkActionOps.getClass.getSimpleName)
val sc = new SparkContext(conf)
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val pairsRDD = listRDD.map(word => (word, 1))
val retRDD = pairsRDD.reduceByKey((v1, v2) => v1 + v2)
retRDD.saveAsNewAPIHadoopFile(
"hdfs://ns1/spark/action", // 保存的路徑
classOf[Text], // 相當于mr中的k3
classOf[IntWritable], // 相當于mr中的v3
classOf[TextOutputFormat[Text, IntWritable]] // 設置(k3, v3)的outputFormatClass
)
}
}
~~~
之后我們可以在 hdfs 中查看到相應的文件輸出:
~~~
[root@WGH ~]$ hdfs dfs -ls /spark/action
18/04/27 12:07:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r-- 3 Administrator supergroup 0 2018-04-27 12:07 /spark/action/_SUCCESS
-rw-r--r-- 3 Administrator supergroup 13 2018-04-27 12:07 /spark/action/part-r-00000
-rw-r--r-- 3 Administrator supergroup 11 2018-04-27 12:07 /spark/action/part-r-00001
[root@WGH ~]$ hdfs dfs -text /spark/action/part-r-00000
18/04/27 12:08:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello 3
me 1
[root@WGH ~]$ hdfs dfs -text /spark/action/part-r-00001
18/04/27 12:08:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
you 1
he 1
~~~
## Actions
下面的表格列了 Spark 支持的一些常用 actions。詳細內容請參閱 RDD API 文檔([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html), [Python](https://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html)) 和 PairRDDFunctions 文檔([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。
**Action 動作算子**:
| 動作算子 | 含義 |
| --------------------------------------- | ------------------------------------------------------------ |
| reduce(func) | 通過 func 函數聚集 RDD 中的所有元素,這個功能必須是可交換且可并聯的 |
| collect() | 在驅動程序中,以數組的形式返回數據集的所有元素 |
| count() | 返回 RDD 的元素個數 |
| first() | 返回 RDD 的第一個元素 (類似于 take(1)) |
| take(n) | 返回一個由數據集的前 n 個元素組成的數組 |
| takeSample(withReplacement,num, [seed]) | 返回一個數組,該數組由從數據集中隨機采樣的 num 個元素組成,可以選擇是否用隨機數替換不足的部分,seed 用于指定隨機數生成器種子 |
| takeOrdered(n, [ordering]) | 返回自然順序或者自定義順序的前 n 個元素 |
| **saveAsTextFile**(path) | 將數據集的元素以 textfile 的形式保存到 HDFS 文件系統或者其他支持的文件系統,對于每個元素,Spark 將會調用 toString 方法,將它裝換為文件中的文本 |
| **saveAsSequenceFile**(path) | 將數據集中的元素以 Hadoop sequencefile 的格式保存到指定的目錄下,可以使 HDFS 或者其他 Hadoop 支持的文件系統 |
| saveAsObjectFile(path) | 將數據集的元素,以 Java 序列化的方式保存到指定的目錄下 |
| **countByKey**() | 針對 (K,V) 類型的 RDD,返回一個 (K,Int) 的 map,表示每一個 key 對應的元素個數 |
| foreach(func) | 在數據集的每一個元素上,運行函數 func 進行更新 |
| **foreachPartition**(func) | 在數據集的每一個分區上,運行函數 func |
**統計操作**:
| 算子 | 含義 |
| -------------- | -------------------------- |
| count | 個數 |
| mean | 均值 |
| sum | 求和 |
| max | 最大值 |
| min | 最小值 |
| variance | 方差 |
| sampleVariance | 從采樣中計算方差 |
| stdev | 標準差: 衡量數據的離散程度 |
| sampleStdev | 采樣的標準差 |
| stats | 查看統計結果 |
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- RDD編程基礎
- 基礎介紹
- 外部數據集
- RDD 操作
- 轉換Transformations
- map與flatMap解析
- 動作Actions
- RDD持久化
- RDD容錯機制
- 傳遞函數到 Spark
- 使用鍵值對
- RDD依賴關系與DAG
- 共享變量
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 數據源例子
- join操作
- 聚合操作
- 性能調優
- 其他
- Spark SQL數據類型
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 部署
- 頂點和邊RDDs
- 圖算法
- 例子
- 更多文檔
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置
- RDD 持久化