# RDDs
Spark支持兩種方法將存在的RDDs轉換為SchemaRDDs(DataFrame/DataSet)。
* 第一種方法使用反射來推斷包含特定對象類型的RDD的模式(schema)。在你寫spark程序的同時,當你已經知道了模式,這種基于反射的
方法可以使代碼更簡潔并且程序工作得更好。
* 創建SchemaRDDs的第二種方法是通過一個編程接口來實現,這個接口允許你構造一個模式,然后在存在的RDDs上使用它。雖然這種方法更冗長,但是它允許你在運行期之前不知道列以及列的類型的情況下構造SchemaRDDs。
## 一、利用反射推斷模式
Spark SQL的Scala接口支持將包含樣本類的RDDs自動轉換為SchemaRDD。這個樣本類定義了表的模式。
給樣本類的參數名字通過反射來讀取,然后作為列的名字。樣本類可以嵌套或者包含復雜的類型如序列或者數組。這個RDD可以隱式轉化為一個SchemaRDD,然后注冊為一個表。表可以在后續的
sql語句中使用。
```scala
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
```
## 二、編程指定模式
當樣本類不能提前確定(例如,記錄的結構是經過編碼的字符串,或者一個文本集合將會被解析,不同的字段投影給不同的用戶),一個SchemaRDD可以通過三步來創建。
- 從原來的RDD創建一個行的RDD
- 創建由一個`StructType`表示的模式與第一步創建的RDD的行結構相匹配
- 在行RDD上通過`applySchema`方法應用模式
```scala
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Spark SQL data types and Row.
import org.apache.spark.sql._
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemaRDD.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
results.map(t => "Name: " + t(0)).collect().foreach(println)
```
```
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 1.定義每個列的列類型
val fields = Array(StructField("deptno", LongType, nullable = true),
StructField("dname", StringType, nullable = true),
StructField("loc", StringType, nullable = true))
// 2.創建 schema
val schema = StructType(fields)
// 3.創建 RDD
val deptRDD = spark.sparkContext.textFile("/usr/file/dept.txt")
val rowRDD = deptRDD.map(_.split("\t")).map(line => Row(line(0).toLong, line(1), line(2)))
// 4.將 RDD 轉換為 dataFrame
val deptDF = spark.createDataFrame(rowRDD, schema)
deptDF.show()
```
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- RDD編程基礎
- 基礎介紹
- 外部數據集
- RDD 操作
- 轉換Transformations
- map與flatMap解析
- 動作Actions
- RDD持久化
- RDD容錯機制
- 傳遞函數到 Spark
- 使用鍵值對
- RDD依賴關系與DAG
- 共享變量
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 數據源例子
- join操作
- 聚合操作
- 性能調優
- 其他
- Spark SQL數據類型
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 部署
- 頂點和邊RDDs
- 圖算法
- 例子
- 更多文檔
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置
- RDD 持久化