### 3.4 Spark SQL 應用
#### 3.4.1 創建 DataFrame/DataSet
**方式一:讀取本地文件**
**① 在本地創建一個文件,有 id、name、age 三列,用空格分隔,然后上傳到 hdfs 上。**
```
vim /root/person.txt
```
內容如下:
```
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
```
**② 打開 spark-shell**
```
spark/bin/spark-shell
##創建 RDD
val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]
```
**③ 定義 case class(相當于表的 schema)**
```
case class Person(id:Int, name:String, age:Int)
```
**④ 將 RDD 和 case class 關聯**
```
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]
```
**⑤ 將 RDD 轉換成 DataFrame**
```
val personDF = personRDD.toDF //DataFrame
```
**⑥ 查看數據和 schema**
```
personDF.show
```
**⑦ 注冊表**
```
personDF.createOrReplaceTempView("t_person")
```
**⑧ 執行 SQL**
```
spark.sql("select id,name from t_person where id > 3").show
```
**⑨ 也可以通過 SparkSession 構建 DataFrame**
```
val dataFrame=spark.read.text("hdfs://node1:8020/person.txt")
dataFrame.show //注意:直接讀取的文本文件沒有完整schema信息
dataFrame.printSchema
```
**方式二:讀取 json 文件**
```
val jsonDF= spark.read.json("file:///resources/people.json")
```
接下來就可以使用 `DataFrame` 的函數操作
```
jsonDF.show
```
注意:直接讀取 `json` 文件有`schema` 信息,因為`json`文件本身含有`Schema`信息,`SparkSQL` 可以自動解析。
**方式三:讀取 parquet 文件**
```
val parquetDF=spark.read.parquet("file:///resources/users.parquet")
```
接下來就可以使用 `DataFrame` 的函數操作
```
parquetDF.show
```
注意:直接讀取 `parquet` 文件有 `schema` 信息,因為 `parquet` 文件中保存了列的信息。
#### 3.4.2 兩種查詢風格:DSL 和 SQL
DSL 風格示例:
```
personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show
```
SQL 風格示例:
```
spark.sql("select * from t_person").show
```
**總結**:
- `DataFrame` 和 `DataSet` 都可以通過`RDD`來進行創建;
- 也可以通過讀取普通文本創建–注意: 直接讀取沒有完整的約束,需要通過 `RDD`+`Schema`;
- 通過 `josn/parquet` 會有完整的約束;
- 不管是 `DataFrame` 還是 `DataSet` 都可以注冊成表,之后就可以使用 `SQL` 進行查詢了! 也可以使用 `DSL`!
#### 3.4.3 Spark SQL 面向多種數據源
讀取 json 文件:
```
spark.read.json("D:\\data\\output\\json").show()
```
讀取 csv 文件:
```
spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()
```
讀取 parquet 文件:
```
spark.read.parquet("D:\\data\\output\\parquet").show()
```
讀取 mysql 表:
```
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
```
寫入 json 文件:
```
personDF.write.json("D:\\data\\output\\json")
```
寫入 csv 文件:
```
personDF.write.csv("D:\\data\\output\\csv")
```
寫入 parquet 文件:
```
personDF.write.parquet("D:\\data\\output\\parquet")
```
寫入 mysql 表:
```
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
```
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- RDD編程基礎
- 基礎介紹
- 外部數據集
- RDD 操作
- 轉換Transformations
- map與flatMap解析
- 動作Actions
- RDD持久化
- RDD容錯機制
- 傳遞函數到 Spark
- 使用鍵值對
- RDD依賴關系與DAG
- 共享變量
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 數據源例子
- join操作
- 聚合操作
- 性能調優
- 其他
- Spark SQL數據類型
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 部署
- 頂點和邊RDDs
- 圖算法
- 例子
- 更多文檔
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置
- RDD 持久化