## 外部數據集
Spark 可以從任何一個 Hadoop 支持的存儲源創建分布式數據集,包括
* 你的本地文件系統,HDFS,Cassandra,HBase,[Amazon S3](http://wiki.apache.org/hadoop/AmazonS3)等。
* Spark 支持文本文件(text files),[SequenceFiles](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html) 和其他 Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html)。
> 文本文件 RDDs 可以使用 SparkContext 的 `textFile` 方法創建。 在這個方法里傳入文件的 URI (機器上的本地路徑或 `hdfs://`,`s3n://` 等),然后它會將文件讀取成一個行集合。這里是一個調用例子:
```scala
scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08
```
一旦創建完成,`distFiile` 就能做數據集操作。例如,我們可以用下面的方式使用 `map` 和 `reduce` 操作將所有行的長度相加:`distFile.map(s => s.length).reduce((a, b) => a + b)`。
注意,Spark 讀文件時:
- 如果使用本地文件系統路徑,文件必須能在 work 節點上用相同的路徑訪問到。要么復制文件到所有的 workers,要么使用網絡的方式共享文件系統。
- 所有 Spark 的基于文件的方法,包括 `textFile`,能很好地支持文件目錄,壓縮過的文件和通配符。例如,你可以使用 `textFile("/my/文件目錄")`,`textFile("/my/文件目錄/*.txt")` 和 `textFile("/my/文件目錄/*.gz")`。
- `textFile` 方法也可以選擇第二個可選參數來控制切片(_slices_)的數目。默認情況下,Spark 為每一個文件塊(HDFS 默認文件塊大小是 64M)創建一個切片(_slice_)。但是你也可以通過一個更大的值來設置一個更高的切片數目。注意,你不能設置一個小于文件塊數目的切片值。
除了文本文件,Spark 的 Scala API 支持其他幾種數據格式:
- `SparkContext.wholeTextFiles` 讓你讀取一個包含多個小文本文件的文件目錄并且返回每一個(filename, content)對。與 `textFile` 的差異是:它記錄的是每個文件中的每一行。
- 對于 [SequenceFiles](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html),可以使用 SparkContext 的 `sequenceFile[K, V]` 方法創建,K 和 V 分別對應的是 key 和 values 的類型。像 [IntWritable](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/IntWritable.html) 與 [Text](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/Text.html) 一樣,它們必須是 Hadoop 的 [Writable](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/Writable.html) 接口的子類。另外,對于幾種通用的 Writables,Spark 允許你指定原生類型來替代。例如: `sequenceFile[Int, String]` 將會自動讀取 IntWritables 和 Text。
- 對于其他的 Hadoop InputFormats,你可以使用 `SparkContext.hadoopRDD` 方法,它可以指定任意的 `JobConf`,輸入格式(InputFormat),key 類型,values 類型。你可以跟設置 Hadoop job 一樣的方法設置輸入源。你還可以在新的 MapReduce 接口(org.apache.hadoop.mapreduce)基礎上使用 `SparkContext.newAPIHadoopRDD`(譯者注:老的接口是 `SparkContext.newHadoopRDD`)。
- `RDD.saveAsObjectFile` 和 `SparkContext.objectFile` 支持保存一個RDD,保存格式是一個簡單的 Java 對象序列化格式。這是一種效率不高的專有格式,如 Avro,它提供了簡單的方法來保存任何一個 RDD。
## 創建方式舉例
**1. 由外部存儲系統的數據集創建,包括本地的文件系統,還有所有 `Hadoop` 支持的數據集,比如 `HDFS、Cassandra、HBase` 等:**
```
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
```
**2. 通過已有的 RDD 經過算子轉換生成新的 RDD:**
```
val rdd2=rdd1.flatMap(_.split(" "))
```
**3. 由一個已經存在的 Scala 集合創建:**
```
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
```
`makeRDD` 方法底層調用了 `parallelize` 方法:

**4. 還可以對接其他的中間件,比如:kafka**
參考: [大數據系列之Spark Streaming接入Kafka數據_pyspark.streaming ssc.sockettextstream 接受kafka-CSDN博客](https://blog.csdn.net/solihawk/article/details/116479840)
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- RDD編程基礎
- 基礎介紹
- 外部數據集
- RDD 操作
- 轉換Transformations
- map與flatMap解析
- 動作Actions
- RDD持久化
- RDD容錯機制
- 傳遞函數到 Spark
- 使用鍵值對
- RDD依賴關系與DAG
- 共享變量
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 數據源例子
- join操作
- 聚合操作
- 性能調優
- 其他
- Spark SQL數據類型
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 部署
- 頂點和邊RDDs
- 圖算法
- 例子
- 更多文檔
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置
- RDD 持久化