<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                # Spark SQL, DataFrames and Datasets Guide * [Overview](#overview) * [SQL](#sql) * [Datasets and DataFrames](#datasets-and-dataframes) * [開始入門](#開始入門) * [起始點:SparkSession](#起始點-sparksession) * [創建 DataFrames](#創建-dataframes) * [無類型的 Dataset 操作(aka DataFrame 操作)](#無類型的dataset操作-aka-dataframe-操作) * [Running SQL Queries Programmatically](#running-sql-queries-programmatically) * [全局臨時視圖](#全局臨時視圖) * [創建 Datasets](#創建datasets) * [RDD 的互操作性](#rdd的互操作性) * [使用反射推斷 Schema](#使用反射推斷schema) * [以編程的方式指定 Schema](#以編程的方式指定schema) * [Aggregations](#aggregations) * [Untyped User-Defined Aggregate Functions](#untyped-user-defined-aggregate-functions) * [Type-Safe User-Defined Aggregate Functions](#type-safe-user-defined-aggregate-functions) * [Data Sources(數據源)](#data-sources-數據源) * [Generic Load/Save Functions(通用 加載/保存 功能)](#generic-loadsave-functions-通用-加載保存-功能) * [Manually Specifying Options(手動指定選項)](#manually-specifying-options-手動指定選項) * [Run SQL on files directly(直接在文件上運行 SQL)](#run-sql-on-files-directly-直接在文件上運行-sql) * [Save Modes(保存模式)](#save-modes-保存模式) * [Saving to Persistent Tables(保存到持久表)](#saving-to-persistent-tables-保存到持久表) * [Bucketing, Sorting and Partitioning(分桶,排序和分區)](#bucketing-sorting-and-partitioning-分桶-排序和分區) * [Parquet Files](#parquet-files) * [Loading Data Programmatically(以編程的方式加載數據)](#loading-data-programmatically-以編程的方式加載數據) * [Partition Discovery(分區發現)](#partition-discovery-分區發現) * [Schema Merging(模式合并)](#schema-merging-模式合并) * [Hive metastore Parquet table conversion(Hive metastore Parquet table 轉換)](#hive-metastore-parquet-table-conversion-hive-metastore-parquet-table-轉換) * [Hive/Parquet Schema Reconciliation](#hiveparquet-schema-reconciliation) * [Metadata Refreshing(元數據刷新)](#metadata-refreshing-元數據刷新) * [Configuration(配置)](#configuration-配置) * [JSON Datasets(JSON 數據集)](#json-datasets-json-數據集) * [Hive 表](#hive-表) * [指定 Hive 表的存儲格式](#指定-hive-表的存儲格式) * [與不同版本的 Hive Metastore 進行交互](#與不同版本的-hive-metastore-進行交互) * [JDBC 連接其它數據庫](#jdbc-連接其它數據庫) * [故障排除](#故障排除) * [性能調優](#性能調優) * [在內存中緩存數據](#在內存中緩存數據) * [其他配置選項](#其他配置選項) * [分布式 SQL 引擎](#分布式-sql-引擎) * [運行 Thrift JDBC/ODBC 服務器](#運行-thrift-jdbcodbc-服務器) * [運行 Spark SQL CLI](#運行-spark-sql-cli) * [遷移指南](#遷移指南) * [從 Spark SQL 2.1 升級到 2.2](#從-spark-sql-21-升級到-22) * [從 Spark SQL 2.0 升級到 2.1](#從-spark-sql-20-升級到-21) * [從 Spark SQL 1.6 升級到 2.0](#從-spark-sql-16-升級到-20) * [從 Spark SQL 1.5 升級到 1.6](#從-spark-sql-15-升級到-16) * [從 Spark SQL 1.4 升級到 1.5](#從-spark-sql-14-升級到-15) * [從 Spark SQL 1.3 升級到 1.4](#從-spark-sql-13-升級到-14) * [DataFrame data reader/writer interface](#dataframe-data-readerwriter-interface) * [DataFrame.groupBy 保留 grouping columns(分組的列)](#dataframegroupby-保留-grouping-columns分組的列) * [DataFrame.withColumn 上的行為更改](#dataframewithcolumn-上的行為更改) * [從 Spark SQL 1.0-1.2 升級到 1.3](#從-spark-sql-10-12-升級到-13) * [重命名 DataFrame 的 SchemaRDD](#重命名-dataframe-的-schemardd) * [Java 和 Scala APIs 的統一](#java-和-scala-apis-的統一) * [隔離隱式轉換和刪除 dsl 包(僅Scala)](#隔離隱式轉換和刪除-dsl-包僅scala) * [針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限于 Scala)](#針對-datatype-刪除在-orgapachesparksql-包中的一些類型別名僅限于-scala) * [UDF 注冊遷移到 `sqlContext.udf` 中(Java & Scala)](#udf-注冊遷移到-sqlcontextudf-中-java--scala) * [Python DataTypes 不再是 Singletons(單例的)](#python-datatypes-不再是-singletons單例的) * [與 Apache Hive 的兼容](#與-apache-hive-的兼容) * [在現有的 Hive Warehouses 中部署](#在現有的-hive-warehouses-中部署) * [所支持的 Hive 特性](#所支持的-hive-特性) * [未支持的 Hive 函數](#未支持的-hive-函數) * [參考](#參考) * [數據類型](#數據類型) * [NaN Semantics](#nan-semantics) # Overview Spark SQL 是 Spark 處理結構化數據的一個模塊。與基礎的 Spark RDD API 不同,Spark SQL 提供了查詢結構化數據及計算結果等信息的接口。在內部,Spark SQL 使用這個額外的信息去執行額外的優化。有幾種方式可以跟 Spark SQL 進行交互,包括 SQL 和 Dataset API。當使用相同執行引擎進行計算時,無論使用哪種 API / 語言都可以快速的計算。這種統一意味著開發人員能夠在基于提供最自然的方式來表達一個給定的 transformation API 之間實現輕松的來回切換不同的。 該頁面所有例子使用的示例數據都包含在 Spark 的發布中,并且可以使用 `spark-shell`,`pyspark` shell,或者?`sparkR` shell來運行。 ## SQL Spark SQL 的功能之一是執行 SQL 查詢。Spark SQL 也能夠被用于從已存在的 Hive 環境中讀取數據。更多關于如何配置這個特性的信息,請參考 [Hive 表](#hive-tables) 這部分。當以另外的編程語言運行SQL 時,查詢結果將以 [Dataset/DataFrame](#datasets-and-dataframes)的形式返回。您也可以使用 [命令行](#running-the-spark-sql-cli)或者通過 [JDBC/ODBC](#running-the-thrift-jdbcodbc-server)與 SQL 接口交互。 ## Datasets and DataFrames 一個 Dataset 是一個分布式的數據集合 Dataset 是在 Spark 1.6 中被添加的新接口,它提供了 RDD 的優點(強類型化,能夠使用強大的 lambda 函數)與Spark SQL執行引擎的優點。一個 Dataset 可以從 JVM 對象來 [構造](#creating-datasets) 并且使用轉換功能(map,flatMap,filter,等等)。Dataset API 在[Scala](api/scala/index.html#org.apache.spark.sql.Dataset) 和 [Java](api/java/index.html?org/apache/spark/sql/Dataset.html)是可用的。Python 不支持 Dataset API。但是由于 Python 的動態特性,許多 Dataset API 的優點已經可用了(也就是說,你可能通過 name 天生的`row.columnName`屬性訪問一行中的字段)。這種情況和 R 相似。 一個 DataFrame 是一個 _Dataset_ 組成的指定列。它的概念與一個在關系型數據庫或者在 R/Python 中的表是相等的,但是有很多優化。DataFrames 可以從大量的 [sources](#data-sources) 中構造出來,比如:結構化的文本文件,Hive中的表,外部數據庫,或者已經存在的 RDDs。DataFrame API 可以在 Scala,Java,[Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame),和 [R](api/R/index.html)中實現。在 Scala 和 Java中,DataFrame 由 DataSet 中的 `RowS`(多個 Row)來表示。在 [the Scala API](api/scala/index.html#org.apache.spark.sql.Dataset)中,`DataFrame` 僅僅是一個 `Dataset[Row]`類型的別名。然而,在?[Java API](api/java/index.html?org/apache/spark/sql/Dataset.html)中,用戶需要去使用 `Dataset&lt;Row&gt;` 去代表一個 `DataFrame`。 在此文檔中,我們將常常會引用 Scala/Java Datasets 的 `Row`s 作為 DataFrames。 # 開始入門 ## 起始點:SparkSession Spark SQL中所有功能的入口點是 [`SparkSession`](api/scala/index.html#org.apache.spark.sql.SparkSession) 類。要創建一個 `SparkSession`,僅使用 `SparkSession.builder()`就可以了: ``` import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small> Spark SQL中所有功能的入口點是 [`SparkSession`](api/java/index.html#org.apache.spark.sql.SparkSession) 類。要創建一個 `SparkSession`,僅使用 `SparkSession.builder()`就可以了: ``` import org.apache.spark.sql.SparkSession; SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate(); ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small> Spark SQL中所有功能的入口點是 [`SparkSession`](api/python/pyspark.sql.html#pyspark.sql.SparkSession) 類。要穿件一個 `SparkSession`,僅使用 `SparkSession.builder`就可以了: ``` from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() ``` <small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small> Spark SQL中所有功能的入口點是 [`SparkSession`](api/R/sparkR.session.html) 類。要初始化一個基本的 `SparkSession`,僅調用 `sparkR.session()`即可: ``` sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value")) ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> 注意第一次調用時,`sparkR.session()` 初始化一個全局的 `SparkSession` 單實例,并且總是返回一個引用此實例,可以連續的調用。通過這種方式,用戶僅需要創建一次 `SparkSession`,然后像 `read.df` SparkR函數就能夠立即獲取全局的實例,用戶不需要再 `SparkSession` 之間進行實例的傳遞。 Spark 2.0 中的`SparkSession` 為 Hive 特性提供了內嵌的支持,包括使用 HiveQL 編寫查詢的能力,訪問 Hive UDF,以及從 Hive 表中讀取數據的能力。為了使用這些特性,你不需要去有一個已存在的 Hive 設置。 ## 創建 DataFrames 在一個 `SparkSession`中,應用程序可以從一個 [已經存在的 `RDD`](#interoperating-with-rdds),從hive表,或者從 [Spark數據源](#data-sources)中創建一個DataFrames。 舉個例子,下面就是基于一個JSON文件創建一個DataFrame: ``` val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small> 在一個 `SparkSession`中,應用程序可以從一個 [已經存在的 `RDD`](#interoperating-with-rdds),從hive表,或者從 [Spark數據源](#data-sources)中創建一個DataFrames。 舉個例子,下面就是基于一個JSON文件創建一個DataFrame: ``` import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small> 在一個 `SparkSession`中,應用程序可以從一個 [已經存在的 `RDD`](#interoperating-with-rdds),從hive表,或者從 [Spark數據源](#data-sources)中創建一個DataFrames。 舉個例子,下面就是基于一個JSON文件創建一個DataFrame: ``` # spark is an existing SparkSession df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ ``` <small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small> 在一個 `SparkSession`中,應用程序可以從一個本地的R frame 數據,從hive表,或者從[Spark數據源](#data-sources)。 舉個例子,下面就是基于一個JSON文件創建一個DataFrame: ``` df <- read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame head(df) ## age name ## 1 NA Michael ## 2 30 Andy ## 3 19 Justin # Another method to print the first few rows and optionally truncate the printing of long values showDF(df) ## +----+-------+ ## | age| name| ## +----+-------+ ## |null|Michael| ## | 30| Andy| ## | 19| Justin| ## +----+-------+ ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ## 無類型的 Dataset 操作(aka DataFrame 操作) DataFrames 提供了一個特定的語法用在 [Scala](api/scala/index.html#org.apache.spark.sql.Dataset),[Java](api/java/index.html?org/apache/spark/sql/Dataset.html),[Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame) and [R](api/R/SparkDataFrame.html)中機構化數據的操作。 正如上面提到的一樣,Spark 2.0 中,DataFrames 在 Scala 和 Java API 中,僅僅是多個 `Row`s 的 Dataset。這些操作也參考了與強類型的 Scala/Java Datasets 中的 “類型轉換” 對應的 “無類型轉換”。 這里包括一些使用 Dataset 進行結構化數據處理的示例 : ``` // This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small> 能夠在 DataFrame 上被執行的操作類型的完整列表請參考 [API 文檔](api/scala/index.html#org.apache.spark.sql.Dataset)。 除了簡單的列引用和表達式之外,DataFrame 也有豐富的函數庫,包括 string 操作,date 算術,常見的 math 操作以及更多。可用的完整列表請參考 ?[DataFrame 函數指南](api/scala/index.html#org.apache.spark.sql.functions$)。 ``` // col("...") is preferable to df.col("...") import static org.apache.spark.sql.functions.col; // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select(col("name"), col("age").plus(1)).show(); // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter(col("age").gt(21)).show(); // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show(); // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small> 為了能夠在 DataFrame 上被執行的操作類型的完整列表請參考 [API 文檔](api/java/org/apache/spark/sql/Dataset.html)。 除了簡單的列引用和表達式之外,DataFrame 也有豐富的函數庫,包括 string 操作,date 算術,常見的 math 操作以及更多。可用的完整列表請參考 ?[DataFrame 函數指南](api/java/org/apache/spark/sql/functions.html)。 在Python中,可以通過(`df.age`) 或者(`df['age']`)來獲取DataFrame的列。雖然前者便于交互式操作,但是還是建議用戶使用后者,這樣不會破壞列名,也能引用DataFrame的類。 ``` # spark, df are from the previous example # Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+ ``` <small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small> 為了能夠在 DataFrame 上被執行的操作類型的完整列表請參考 [API 文檔](api/python/pyspark.sql.html#pyspark.sql.DataFrame)。 除了簡單的列引用和表達式之外,DataFrame 也有豐富的函數庫,包括 string 操作,date 算術,常見的 math 操作以及更多。可用的完整列表請參考 ?[DataFrame 函數指南](api/python/pyspark.sql.html#module-pyspark.sql.functions)。 ``` # Create the DataFrame df <- read.json("examples/src/main/resources/people.json") # Show the content of the DataFrame head(df) ## age name ## 1 NA Michael ## 2 30 Andy ## 3 19 Justin # Print the schema in a tree format printSchema(df) ## root ## |-- age: long (nullable = true) ## |-- name: string (nullable = true) # Select only the "name" column head(select(df, "name")) ## name ## 1 Michael ## 2 Andy ## 3 Justin # Select everybody, but increment the age by 1 head(select(df, df$name, df$age + 1)) ## name (age + 1.0) ## 1 Michael NA ## 2 Andy 31 ## 3 Justin 20 # Select people older than 21 head(where(df, df$age > 21)) ## age name ## 1 30 Andy # Count people by age head(count(groupBy(df, "age"))) ## age count ## 1 19 1 ## 2 NA 1 ## 3 30 1 ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> 為了能夠在 DataFrame 上被執行的操作類型的完整列表請參考 [API 文檔](api/R/index.html)。 除了簡單的列引用和表達式之外,DataFrame 也有豐富的函數庫,包括 string 操作,date 算術,常見的 math 操作以及更多。可用的完整列表請參考 ?[DataFrame 函數指南](api/R/SparkDataFrame.html)。 ## Running SQL Queries Programmatically `SparkSession` 的 `sql` 函數可以讓應用程序以編程的方式運行 SQL 查詢,并將結果作為一個 `DataFrame` 返回。 ``` // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small> `SparkSession` 的 `sql` 函數可以讓應用程序以編程的方式運行 SQL 查詢,并將結果作為一個 `Dataset&lt;Row&gt;` 返回。 ``` import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people"); Dataset<Row> sqlDF = spark.sql("SELECT * FROM people"); sqlDF.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small> `SparkSession` 的 `sql` 函數可以讓應用程序以編程的方式運行 SQL 查詢,并將結果作為一個 `DataFrame` 返回。 ``` # Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ ``` <small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small> `SparkSession` 的 `sql` 函數可以讓應用程序以編程的方式運行 SQL 查詢,并將結果作為一個 `DataFrame` 返回。 ``` df <- sql("SELECT * FROM table") ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ## 全局臨時視圖 Spark SQL中的臨時視圖是session級別的,也就是會隨著session的消失而消失。如果你想讓一個臨時視圖在所有session中相互傳遞并且可用,直到Spark 應用退出,你可以建立一個全局的臨時視圖。全局的臨時視圖存在于系統數據庫 `global_temp`中,我們必須加上庫名去引用它,比如。 `SELECT * FROM global_temp.view1`。 ``` // Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small> ``` // Register the DataFrame as a global temporary view df.createGlobalTempView("people"); // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small> ``` # Register the DataFrame as a global temporary view df.createGlobalTempView("people") # Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ ``` <small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small> ``` CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl SELECT * FROM global_temp.temp_view ``` ## 創建Datasets Dataset 與 RDD 相似,然而,并不是使用 Java 序列化或者 Kryo [編碼器](api/scala/index.html#org.apache.spark.sql.Encoder) 來序列化用于處理或者通過網絡進行傳輸的對象。雖然編碼器和標準的序列化都負責將一個對象序列化成字節,編碼器是動態生成的代碼,并且使用了一種允許 Spark 去執行許多像 filtering,sorting 以及 hashing 這樣的操作,不需要將字節反序列化成對象的格式。 ``` // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small> ``` import java.util.Arrays; import java.util.Collections; import java.io.Serializable; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } // Create an instance of a Bean class Person person = new Person(); person.setName("Andy"); person.setAge(32); // Encoders are created for Java beans Encoder<Person> personEncoder = Encoders.bean(Person.class); Dataset<Person> javaBeanDS = spark.createDataset( Collections.singletonList(person), personEncoder ); javaBeanDS.show(); // +---+----+ // |age|name| // +---+----+ // | 32|Andy| // +---+----+ // Encoders for most common types are provided in class Encoders Encoder<Integer> integerEncoder = Encoders.INT(); Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); Dataset<Integer> transformedDS = primitiveDS.map( (MapFunction<Integer, Integer>) value -> value + 1, integerEncoder); transformedDS.collect(); // Returns [2, 3, 4] // DataFrames can be converted to a Dataset by providing a class. Mapping based on name String path = "examples/src/main/resources/people.json"; Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder); peopleDS.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small> ## RDD的互操作性 Spark SQL 支持兩種不同的方法用于轉換已存在的 RDD 成為 Dataset。第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema。在你的 Spark 應用程序中當你已知 Schema 時這個基于方法的反射可以讓你的代碼更簡潔。 第二種用于創建 Dataset 的方法是通過一個允許你構造一個 Schema 然后把它應用到一個已存在的 RDD 的編程接口。然而這種方法更繁瑣,當列和它們的類型知道運行時都是未知時它允許你去構造 Dataset。 ### 使用反射推斷Schema Spark SQL 的 Scala 接口支持自動轉換一個包含 case classes 的 RDD 為 DataFrame。Case class 定義了表的 Schema。Case class 的參數名使用反射讀取并且成為了列名。Case class 也可以是嵌套的或者包含像 `Seq` 或者 `Array` 這樣的復雜類型。這個 RDD 能夠被隱式轉換成一個 DataFrame 然后被注冊為一個表。表可以用于后續的 SQL 語句。 ``` // For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19)) ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small> Spark SQL 支持一個[JavaBeans]的RDD(http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly)自動轉換為一個DataFrame。`BeanInfo`利用反射定義表的schema。目前Spark SQL不支持含有`Map`的JavaBeans。但是支持嵌套`List`或者 `Array`JavaBeans。你可以通過創建一個有getters和setters的序列化的類來創建一個JavaBean。 ``` import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; // Create an RDD of Person objects from a text file JavaRDD<Person> peopleRDD = spark.read() .textFile("examples/src/main/resources/people.txt") .javaRDD() .map(line -> { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; }); // Apply a schema to an RDD of JavaBeans to get a DataFrame Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class); // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people"); // SQL statements can be run by using the sql methods provided by spark Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); // The columns of a row in the result can be accessed by field index Encoder<String> stringEncoder = Encoders.STRING(); Dataset<String> teenagerNamesByIndexDF = teenagersDF.map( (MapFunction<Row, String>) row -> "Name: " + row.getString(0), stringEncoder); teenagerNamesByIndexDF.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name Dataset<String> teenagerNamesByFieldDF = teenagersDF.map( (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"), stringEncoder); teenagerNamesByFieldDF.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small> Spark SQL能夠把RDD 轉換為一個DataFrame,并推斷其類型。這些行由一系列key/value鍵值對組成。key值代表了表的列名,類型按抽樣推斷整個數據集,同樣的也適用于JSON文件。 ``` from pyspark.sql import Row sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin ``` <small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small> ### 以編程的方式指定Schema 當 case class 不能夠在執行之前被定義(例如,records 記錄的結構在一個 string 字符串中被編碼了,或者一個 text 文本 dataset 將被解析并且不同的用戶投影的字段是不一樣的)。一個 `DataFrame` 可以使用下面的三步以編程的方式來創建。 1. 從原始的 RDD 創建 RDD 的 `Row`(行)。 2. Step 1 被創建后,創建 Schema 表示一個 `StructType` 匹配 RDD 中的 `Row`(行)的結構。 3. 通過 `SparkSession` 提供的 `createDataFrame` 方法應用 Schema 到 RDD 的 RowS(行)。 例如: ``` import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small> When JavaBean classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a `Dataset&lt;Row&gt;` can be created programmatically with three steps. 1. Create an RDD of `Row`s from the original RDD; 2. Create the schema represented by a `StructType` matching the structure of `Row`s in the RDD created in Step 1. 3. Apply the schema to the RDD of `Row`s via `createDataFrame` method provided by `SparkSession`. For example: ``` import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; // Create an RDD JavaRDD<String> peopleRDD = spark.sparkContext() .textFile("examples/src/main/resources/people.txt", 1) .toJavaRDD(); // The schema is encoded in a string String schemaString = "name age"; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> { String[] attributes = record.split(","); return RowFactory.create(attributes[0], attributes[1].trim()); }); // Apply the schema to the RDD Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema); // Creates a temporary view using the DataFrame peopleDataFrame.createOrReplaceTempView("people"); // SQL can be run over a temporary view created using DataFrames Dataset<Row> results = spark.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name Dataset<String> namesDS = results.map( (MapFunction<Row, String>) row -> "Name: " + row.getString(0), Encoders.STRING()); namesDS.show(); // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small> 當一個字典不能被提前定義(例如,記錄的結構是在一個字符串中,抑或一個文本中解析,被不同的用戶所屬),一個 `DataFrame` 可以通過以下 3 步來創建。 1. RDD 從原始的 RDD 穿件一個 RDD 的 toples 或者一個列表; 2. Step 1 被創建后,創建 Schema 表示一個 `StructType` 匹配 RDD 中的結構。 3. 通過 `SparkSession` 提供的 `createDataFrame` 方法應用 Schema 到 RDD。 For example: ``` # Import data types from pyspark.sql.types import * sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema) # Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. results = spark.sql("SELECT name FROM people") results.show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ ``` <small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small> ## Aggregations The [built-in DataFrames functions](api/scala/index.html#org.apache.spark.sql.functions$) provide common aggregations such as `count()`, `countDistinct()`, `avg()`, `max()`, `min()`, etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in [Scala](api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$) and [Java](api/java/org/apache/spark/sql/expressions/javalang/typed.html) to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own. ### Untyped User-Defined Aggregate Functions Users have to extend the [UserDefinedAggregateFunction](api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction) abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like: ``` import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.</small> ``` import java.util.ArrayList; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public static class MyAverage extends UserDefinedAggregateFunction { private StructType inputSchema; private StructType bufferSchema; public MyAverage() { List<StructField> inputFields = new ArrayList<>(); inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true)); inputSchema = DataTypes.createStructType(inputFields); List<StructField> bufferFields = new ArrayList<>(); bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true)); bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true)); bufferSchema = DataTypes.createStructType(bufferFields); } // Data types of input arguments of this aggregate function public StructType inputSchema() { return inputSchema; } // Data types of values in the aggregation buffer public StructType bufferSchema() { return bufferSchema; } // The data type of the returned value public DataType dataType() { return DataTypes.DoubleType; } // Whether this function always returns the same output on the identical input public boolean deterministic() { return true; } // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. public void initialize(MutableAggregationBuffer buffer) { buffer.update(0, 0L); buffer.update(1, 0L); } // Updates the given aggregation buffer `buffer` with new input data from `input` public void update(MutableAggregationBuffer buffer, Row input) { if (!input.isNullAt(0)) { long updatedSum = buffer.getLong(0) + input.getLong(0); long updatedCount = buffer.getLong(1) + 1; buffer.update(0, updatedSum); buffer.update(1, updatedCount); } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` public void merge(MutableAggregationBuffer buffer1, Row buffer2) { long mergedSum = buffer1.getLong(0) + buffer2.getLong(0); long mergedCount = buffer1.getLong(1) + buffer2.getLong(1); buffer1.update(0, mergedSum); buffer1.update(1, mergedCount); } // Calculates the final result public Double evaluate(Row buffer) { return ((double) buffer.getLong(0)) / buffer.getLong(1); } } // Register the function to access it spark.udf().register("myAverage", new MyAverage()); Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json"); df.createOrReplaceTempView("employees"); df.show(); // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees"); result.show(); // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java" in the Spark repo.</small> ### Type-Safe User-Defined Aggregate Functions User-defined aggregations for strongly typed Datasets revolve around the [Aggregator](api/scala/index.html#org.apache.spark.sql.expressions.Aggregator) abstract class. For example, a type-safe user-defined average can look like: ``` import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder import org.apache.spark.sql.Encoders import org.apache.spark.sql.SparkSession case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble } val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ // Convert the function to a `TypedColumn` and give it a name val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.</small> ``` import java.io.Serializable; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.expressions.Aggregator; public static class Employee implements Serializable { private String name; private long salary; // Constructors, getters, setters... } public static class Average implements Serializable { private long sum; private long count; // Constructors, getters, setters... } public static class MyAverage extends Aggregator<Employee, Average, Double> { // A zero value for this aggregation. Should satisfy the property that any b + zero = b public Average zero() { return new Average(0L, 0L); } // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object public Average reduce(Average buffer, Employee employee) { long newSum = buffer.getSum() + employee.getSalary(); long newCount = buffer.getCount() + 1; buffer.setSum(newSum); buffer.setCount(newCount); return buffer; } // Merge two intermediate values public Average merge(Average b1, Average b2) { long mergedSum = b1.getSum() + b2.getSum(); long mergedCount = b1.getCount() + b2.getCount(); b1.setSum(mergedSum); b1.setCount(mergedCount); return b1; } // Transform the output of the reduction public Double finish(Average reduction) { return ((double) reduction.getSum()) / reduction.getCount(); } // Specifies the Encoder for the intermediate value type public Encoder<Average> bufferEncoder() { return Encoders.bean(Average.class); } // Specifies the Encoder for the final output value type public Encoder<Double> outputEncoder() { return Encoders.DOUBLE(); } } Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class); String path = "examples/src/main/resources/employees.json"; Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder); ds.show(); // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ MyAverage myAverage = new MyAverage(); // Convert the function to a `TypedColumn` and give it a name TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary"); Dataset<Double> result = ds.select(averageSalary); result.show(); // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java" in the Spark repo.</small> # Data Sources(數據源) Spark SQL 支持通過 DataFrame 接口對各種 data sources(數據源)進行操作。DataFrame 可以使用 relational transformations(關系轉換)操作,也可用于創建 temporary view(臨時視圖)。將 DataFrame 注冊為 temporary view(臨時視圖)允許您對其數據運行 SQL 查詢。本節 描述了使用 Spark Data Sources 加載和保存數據的一般方法,然后涉及可用于 built-in data sources(內置數據源)的 specific options(特定選項)。 ## Generic Load/Save Functions(通用 加載/保存 功能) 在最簡單的形式中,默認數據源(`parquet`,除非另有配置 `spark.sql.sources.default`)將用于所有操作。 ``` val usersDF = spark.read.load("examples/src/main/resources/users.parquet") usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> ``` Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet"); usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> ``` df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> ``` df <- read.df("examples/src/main/resources/users.parquet") write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet") ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ### Manually Specifying Options(手動指定選項) 您還可以 manually specify(手動指定)將與任何你想傳遞給 data source 的其他選項一起使用的 data source。Data sources 由其 fully qualified name(完全限定名稱)(即 `org.apache.spark.sql.parquet`),但是對于 built-in sources(內置的源),你也可以使用它們的 shortnames(短名稱)(`json`,`parquet`,`jdbc`,`orc`,`libsvm`,`csv`,`text`)。從任何 data source type(數據源類型)加載 DataFrames 可以使用此 syntax(語法)轉換為其他類型。 ``` val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet") ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> ``` Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json"); peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> ``` df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> ``` df <- read.df("examples/src/main/resources/people.json", "json") namesAndAges <- select(df, "name", "age") write.df(namesAndAges, "namesAndAges.parquet", "parquet") ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ### Run SQL on files directly(直接在文件上運行 SQL) 不使用讀取 API 將文件加載到 DataFrame 并進行查詢,也可以直接用 SQL 查詢該文件. ``` val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> ``` Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> ``` df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> ``` df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ### Save Modes(保存模式) Save operations(保存操作)可以選擇使用 `SaveMode`,它指定如何處理現有數據如果存在的話。重要的是要意識到,這些 save modes(保存模式)不使用任何 locking(鎖定)并且不是 atomic(原子)。另外,當執行 `Overwrite` 時,數據將在新數據寫出之前被刪除。 | Scala/Java | Any Language | Meaning | | --- | --- | --- | | `SaveMode.ErrorIfExists` (default) | `"error"` (default) | 將 DataFrame 保存到 data source(數據源)時,如果數據已經存在,則會拋出異常。 | `SaveMode.Append` | `"append"` | 將 DataFrame 保存到 data source(數據源)時,如果 data/table 已存在,則 DataFrame 的內容將被 append(附加)到現有數據中。 | `SaveMode.Overwrite` | `"overwrite"` | Overwrite mode(覆蓋模式)意味著將 DataFrame 保存到 data source(數據源)時,如果 data/table 已經存在,則預期 DataFrame 的內容將 overwritten(覆蓋)現有數據。 | `SaveMode.Ignore` | `"ignore"` | Ignore mode(忽略模式)意味著當將 DataFrame 保存到 data source(數據源)時,如果數據已經存在,則保存操作預期不會保存 DataFrame 的內容,并且不更改現有數據。這與 SQL 中的 `CREATE TABLE IF NOT EXISTS` 類似。 ### Saving to Persistent Tables(保存到持久表) `DataFrames` 也可以使用 `saveAsTable` 命令作為 persistent tables(持久表)保存到 Hive metastore 中。請注意,existing Hive deployment(現有的 Hive 部署)不需要使用此功能。Spark 將為您創建默認的 local Hive metastore(本地 Hive metastore)(使用 Derby)。與 `createOrReplaceTempView` 命令不同,`saveAsTable` 將 materialize(實現)DataFrame 的內容,并創建一個指向 Hive metastore 中數據的指針。即使您的 Spark 程序重新啟動,Persistent tables(持久性表)仍然存在,因為您保持與同一個 metastore 的連接。可以通過使用表的名稱在 `SparkSession` 上調用 `table` 方法來創建 persistent tabl(持久表)的 DataFrame。 對于 file-based(基于文件)的 data source(數據源),例如 text,parquet,json等,您可以通過 `path` 選項指定 custom table path(自定義表路徑),例如 `df.write.option("path", "/some/path").saveAsTable("t")`。當表被 dropped(刪除)時,custom table path(自定義表路徑)將不會被刪除,并且表數據仍然存在。如果未指定自定義表路徑,Spark 將把數據寫入 warehouse directory(倉庫目錄)下的默認表路徑。當表被刪除時,默認的表路徑也將被刪除。 從 Spark 2.1 開始,persistent datasource tables(持久性數據源表)將 per-partition metadata(每個分區元數據)存儲在 Hive metastore 中。這帶來了幾個好處: * 由于 metastore 只能返回查詢的必要 partitions(分區),因此不再需要將第一個查詢上的所有 partitions discovering 到表中。 * Hive DDLs 如 `ALTER TABLE PARTITION ... SET LOCATION` 現在可用于使用 Datasource API 創建的表。 請注意,創建 external datasource tables(外部數據源表)(帶有 `path` 選項)的表時,默認情況下不會收集 partition information(分區信息)。要 sync(同步)metastore 中的分區信息,可以調用 `MSCK REPAIR TABLE`。 ### Bucketing, Sorting and Partitioning(分桶,排序和分區) 對于 file-based data source(基于文件的數據源),也可以對 output(輸出)進行 bucket 和 sort 或者 partition。Bucketing 和 sorting 僅適用于 persistent tables : ``` peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> ``` peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed"); ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> ``` df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> ``` CREATE TABLE users_bucketed_by_name( name STRING, favorite_color STRING, favorite_numbers array<integer> ) USING parquet CLUSTERED BY(name) INTO 42 BUCKETS; ``` 在使用 Dataset API 時,partitioning 可以同時與 `save` 和 `saveAsTable` 一起使用. ``` usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> ``` usersDF .write() .partitionBy("favorite_color") .format("parquet") .save("namesPartByColor.parquet"); ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> ``` df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> ``` CREATE TABLE users_by_favorite_color( name STRING, favorite_color STRING, favorite_numbers array<integer> ) USING csv PARTITIONED BY(favorite_color); ``` 可以為 single table(單個表)使用 partitioning 和 bucketing: ``` peopleDF .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed") ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> ``` peopleDF .write() .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed"); ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> ``` df = spark.read.parquet("examples/src/main/resources/users.parquet") (df .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed")) ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> ``` CREATE TABLE users_bucketed_and_partitioned( name STRING, favorite_color STRING, favorite_numbers array<integer> ) USING parquet PARTITIONED BY (favorite_color) CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS; ``` `partitionBy` 創建一個 directory structure(目錄結構),如 [Partition Discovery](#partition-discovery) 部分所述。因此,對 cardinality(基數)較高的 columns 的適用性有限。相反,`bucketBy` 可以在固定數量的 buckets 中分配數據,并且可以在 a number of unique values is unbounded(多個唯一值無界時)使用數據。 ## Parquet Files [Parquet](http://parquet.io) 是許多其他數據處理系統支持的 columnar format(柱狀格式)。Spark SQL 支持讀寫 Parquet 文件,可自動保留 schema of the original data(原始數據的模式)。當編寫 Parquet 文件時,出于兼容性原因,所有 columns 都將自動轉換為可空。 ### Loading Data Programmatically(以編程的方式加載數據) 使用上面例子中的數據: ``` // Encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> ``` import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json"); // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write().parquet("people.parquet"); // Read in the Parquet file created above. // Parquet files are self-describing so the schema is preserved // The result of loading a parquet file is also a DataFrame Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet"); // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile"); Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); Dataset<String> namesDS = namesDF.map( (MapFunction<Row, String>) row -> "Name: " + row.getString(0), Encoders.STRING()); namesDS.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> ``` peopleDF = spark.read.json("examples/src/main/resources/people.json") # DataFrames can be saved as Parquet files, maintaining the schema information. peopleDF.write.parquet("people.parquet") # Read in the Parquet file created above. # Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile = spark.read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.show() # +------+ # | name| # +------+ # |Justin| # +------+ ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> ``` df <- read.df("examples/src/main/resources/people.json", "json") # SparkDataFrame can be saved as Parquet files, maintaining the schema information. write.parquet(df, "people.parquet") # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile <- read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. createOrReplaceTempView(parquetFile, "parquetFile") teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") head(teenagers) ## name ## 1 Justin # We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:" schema <- structType(structField("name", "string")) teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema) for (teenName in collect(teenNames)$name) { cat(teenName, "\n") } ## Name: Michael ## Name: Andy ## Name: Justin ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ``` CREATE TEMPORARY VIEW parquetTable USING org.apache.spark.sql.parquet OPTIONS ( path "examples/src/main/resources/people.parquet" ) SELECT * FROM parquetTable ``` ### Partition Discovery(分區發現) Table partitioning(表分區)是在像 Hive 這樣的系統中使用的常見的優化方法。在 partitioned table(分區表)中,數據通常存儲在不同的目錄中,partitioning column values encoded(分區列值編碼)在每個 partition directory(分區目錄)的路徑中。Parquet data source(Parquet 數據源)現在可以自動 discover(發現)和 infer(推斷)分區信息。例如,我們可以使用以下 directory structure(目錄結構)將所有以前使用的 population data(人口數據)存儲到 partitioned table(分區表)中,其中有兩個額外的列 `gender` 和 `country` 作為 partitioning columns(分區列): ``` path └── to └── table ├── gender=male │?? ├── ... │?? │ │?? ├── country=US │?? │?? └── data.parquet │?? ├── country=CN │?? │?? └── data.parquet │?? └── ... └── gender=female ?? ├── ... ?? │ ?? ├── country=US ?? │?? └── data.parquet ?? ├── country=CN ?? │?? └── data.parquet ?? └── ... ``` 通過將 `path/to/table` 傳遞給 `SparkSession.read.parquet` 或 `SparkSession.read.load`,Spark SQL 將自動從路徑中提取 partitioning information(分區信息)。現在返回的 DataFrame 的 schema(模式)變成: ``` root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true) ``` 請注意,會自動 inferred(推斷)partitioning columns(分區列)的 data types(數據類型)。目前,支持 numeric data types(數字數據類型)和 string type(字符串類型)。有些用戶可能不想自動推斷 partitioning columns(分區列)的數據類型。對于這些用例,automatic type inference(自動類型推斷)可以由 `spark.sql.sources.partitionColumnTypeInference.enabled` 配置,默認為 `true`。當禁用 type inference(類型推斷)時,string type(字符串類型)將用于 partitioning columns(分區列)。 從 Spark 1.6.0 開始,默認情況下,partition discovery(分區發現)只能找到給定路徑下的 partitions(分區)。對于上述示例,如果用戶將 `path/to/table/gender=male` 傳遞給 `SparkSession.read.parquet` 或 `SparkSession.read.load`,則 `gender` 將不被視為 partitioning column(分區列)。如果用戶需要指定 partition discovery(分區發現)應該開始的基本路徑,則可以在數據源選項中設置 `basePath`。例如,當 `path/to/table/gender=male` 是數據的路徑并且用戶將 `basePath` 設置為 `path/to/table/`,`gender` 將是一個 partitioning column(分區列)。 ### Schema Merging(模式合并) 像 ProtocolBuffer,Avro 和 Thrift 一樣,Parquet 也支持 schema evolution(模式演進)。用戶可以從一個 simple schema(簡單的架構)開始,并根據需要逐漸向 schema 添加更多的 columns(列)。以這種方式,用戶可能會使用不同但相互兼容的 schemas 的 multiple Parquet files(多個 Parquet 文件)。Parquet data source(Parquet 數據源)現在能夠自動檢測這種情況并 merge(合并)所有這些文件的 schemas。 由于 schema merging(模式合并)是一個 expensive operation(相對昂貴的操作),并且在大多數情況下不是必需的,所以默認情況下從 1.5.0 開始。你可以按照如下的方式啟用它: 1. 讀取 Parquet 文件時,將 data source option(數據源選項)`mergeSchema` 設置為 `true`(如下面的例子所示),或 2. 將 global SQL option(全局 SQL 選項)`spark.sql.parquet.mergeSchema` 設置為 `true`。 ``` // This is used to implicitly convert an RDD to a DataFrame. import spark.implicits._ // Create a simple DataFrame, store into a partition directory val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true) ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> ``` import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public static class Square implements Serializable { private int value; private int square; // Getters and setters... } public static class Cube implements Serializable { private int value; private int cube; // Getters and setters... } List<Square> squares = new ArrayList<>(); for (int value = 1; value <= 5; value++) { Square square = new Square(); square.setValue(value); square.setSquare(value * value); squares.add(square); } // Create a simple DataFrame, store into a partition directory Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class); squaresDF.write().parquet("data/test_table/key=1"); List<Cube> cubes = new ArrayList<>(); for (int value = 6; value <= 10; value++) { Cube cube = new Cube(); cube.setValue(value); cube.setCube(value * value * value); cubes.add(cube); } // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class); cubesDF.write().parquet("data/test_table/key=2"); // Read the partitioned table Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table"); mergedDF.printSchema(); // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true) ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> ``` from pyspark.sql import Row # spark is from the previous example. # Create a simple DataFrame, stored into a partition directory sc = spark.sparkContext squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6)) .map(lambda i: Row(single=i, double=i ** 2))) squaresDF.write.parquet("data/test_table/key=1") # Create another DataFrame in a new partition directory, # adding a new column and dropping an existing column cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11)) .map(lambda i: Row(single=i, triple=i ** 3))) cubesDF.write.parquet("data/test_table/key=2") # Read the partitioned table mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() # The final schema consists of all 3 columns in the Parquet files together # with the partitioning column appeared in the partition directory paths. # root # |-- double: long (nullable = true) # |-- single: long (nullable = true) # |-- triple: long (nullable = true) # |-- key: integer (nullable = true) ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> ``` df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23))) df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18))) # Create a simple DataFrame, stored into a partition directory write.df(df1, "data/test_table/key=1", "parquet", "overwrite") # Create another DataFrame in a new partition directory, # adding a new column and dropping an existing column write.df(df2, "data/test_table/key=2", "parquet", "overwrite") # Read the partitioned table df3 <- read.df("data/test_table", "parquet", mergeSchema = "true") printSchema(df3) # The final schema consists of all 3 columns in the Parquet files together # with the partitioning column appeared in the partition directory paths ## root ## |-- single: double (nullable = true) ## |-- double: double (nullable = true) ## |-- triple: double (nullable = true) ## |-- key: integer (nullable = true) ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ### Hive metastore Parquet table conversion(Hive metastore Parquet table 轉換) 當讀取和寫入 Hive metastore Parquet 表時,Spark SQL 將嘗試使用自己的 Parquet support(Parquet 支持),而不是 Hive SerDe 來獲得更好的性能。此 behavior(行為)由 `spark.sql.hive.convertMetastoreParquet` 配置控制,默認情況下 turned on(打開)。 #### Hive/Parquet Schema Reconciliation 從 table schema processing(表格模式處理)的角度來說,Hive 和 Parquet 之間有兩個關鍵的區別。 1. Hive 不區分大小寫,而 Parquet 不是 2. Hive 認為所有 columns(列)都可以為空,而 Parquet 中的可空性是 significant(重要)的。 由于這個原因,當將 Hive metastore Parquet 表轉換為 Spark SQL Parquet 表時,我們必須調整 metastore schema 與 Parquet schema。reconciliation 規則是: 1. 在兩個 schema 中具有 same name(相同名稱)的 Fields(字段)必須具有 same data type(相同的數據類型),而不管 nullability(可空性)。reconciled field 應具有 Parquet 的數據類型,以便 nullability(可空性)得到尊重。 2. reconciled schema(調和模式)正好包含 Hive metastore schema 中定義的那些字段。 * 只出現在 Parquet schema 中的任何字段將被 dropped(刪除)在 reconciled schema 中。 * 僅在 Hive metastore schema 中出現的任何字段在 reconciled schema 中作為 nullable field(可空字段)添加。 #### Metadata Refreshing(元數據刷新) Spark SQL 緩存 Parquet metadata 以獲得更好的性能。當啟用 Hive metastore Parquet table conversion(轉換)時,這些 converted tables(轉換表)的 metadata(元數據)也被 cached(緩存)。如果這些表由 Hive 或其他外部工具更新,則需要手動刷新以確保 consistent metadata(一致的元數據)。 ``` // spark is an existing SparkSession spark.catalog.refreshTable("my_table") ``` ``` // spark is an existing SparkSession spark.catalog().refreshTable("my_table"); ``` ``` # spark is an existing SparkSession spark.catalog.refreshTable("my_table") ``` ``` REFRESH TABLE my_table; ``` ### Configuration(配置) 可以使用 `SparkSession` 上的 `setConf` 方法或使用 SQL 運行 `SET key = value` 命令來完成 Parquet 的配置. | Property Name(參數名稱)| Default(默認)| Meaning(含義)| | --- | --- | --- | | `spark.sql.parquet.binaryAsString` | false | 一些其他 Parquet-producing systems(Parquet 生產系統),特別是 Impala,Hive 和舊版本的 Spark SQL,在 writing out(寫出)Parquet schema 時,不區分 binary data(二進制數據)和 strings(字符串)。該 flag 告訴 Spark SQL 將 binary data(二進制數據)解釋為 string(字符串)以提供與這些系統的兼容性。 | `spark.sql.parquet.int96AsTimestamp` | true | 一些 Parquet-producing systems,特別是 Impala 和 Hive,將 Timestamp 存入INT96。該 flag 告訴 Spark SQL 將 INT96 數據解析為 timestamp 以提供與這些系統的兼容性。 | `spark.sql.parquet.cacheMetadata` | true | 打開 Parquet schema metadata 的緩存。可以加快查詢靜態數據。 | `spark.sql.parquet.compression.codec` | snappy | 在編寫 Parquet 文件時設置 compression codec(壓縮編解碼器)的使用。可接受的值包括:uncompressed,snappy,gzip,lzo。 | `spark.sql.parquet.filterPushdown` | true | 設置為 true 時啟用 Parquet filter push-down optimization。 | `spark.sql.hive.convertMetastoreParquet` | true | 當設置為 false 時,Spark SQL 將使用 Hive SerDe 作為 parquet tables,而不是內置的支持。 | `spark.sql.parquet.mergeSchema` | false | 當為 true 時,Parquet data source(Parquet 數據源)merges(合并)從所有 data files(數據文件)收集的 schemas,否則如果沒有可用的 summary file,則從 summary file 或 random data file 中挑選 schema。 | `spark.sql.optimizer.metadataOnly` | true | 如果為 true,則啟用使用表的 metadata 的 metadata-only query optimization 來生成 partition columns(分區列)而不是 table scans(表掃描)。當 scanned(掃描)的所有 columns(列)都是 partition columns(分區列)并且 query(查詢)具有滿足 distinct semantics(不同語義)的 aggregate operator(聚合運算符)時,它將適用。 ## JSON Datasets(JSON 數據集) Spark SQL 可以 automatically infer(自動推斷)JSON dataset 的 schema,并將其作為 `Dataset[Row]` 加載。這個 conversion(轉換)可以在 `Dataset[String]` 上使用 `SparkSession.read.json()` 來完成,或 JSON 文件。 請注意,以 _a json file_ 提供的文件不是典型的 JSON 文件。每行必須包含一個 separate(單獨的),self-contained valid(獨立的有效的)JSON 對象。有關更多信息,請參閱 [JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/)。 對于 regular multi-line JSON file(常規的多行 JSON 文件),將 `multiLine` 選項設置為 `true`。 ``` // Primitive types (Int, String, etc) and Product types (case classes) encoders are // supported by importing this when creating a Dataset. import spark.implicits._ // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset[String] storing one JSON object per string val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> Spark SQL 可以 automatically infer(自動推斷)JSON dataset 的 schema,并將其作為 `Dataset&lt;Row&gt;` 加載。這個 conversion(轉換)可以在 `Dataset&lt;String&gt;` 上使用 `SparkSession.read.json()` 來完成,或 JSON 文件。 請注意,以 _a json file_ 提供的文件不是典型的 JSON 文件。每行必須包含一個 separate(單獨的),self-contained valid(獨立的有效的)JSON 對象。有關更多信息,請參閱 [JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/) 對于 regular multi-line JSON file(常規的多行 JSON 文件),將 `multiLine` 選項設置為 `true`。 ``` import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method people.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame people.createOrReplaceTempView("people"); // SQL statements can be run by using the sql methods provided by spark Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); namesDF.show(); // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset<String> storing one JSON object per string. List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING()); Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset); anotherPeople.show(); // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+ ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> Spark SQL 可以 automatically infer(自動推斷)JSON dataset 的 schema,并將其作為 DataFrame 加載。可以使用 JSON 文件中的 `SparkSession.read.json` 進行此 conversion(轉換)。 請注意,以 _a json file_ 提供的文件不是典型的 JSON 文件。每行必須包含一個 separate(單獨的),self-contained valid(獨立的有效的)JSON 對象。有關更多信息,請參閱 [JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/) 對于 regular multi-line JSON file(常規的多行 JSON 文件),將 `multiLine` 選項設置為 `true`。 ``` # spark is from the previous example. sc = spark.sparkContext # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files path = "examples/src/main/resources/people.json" peopleDF = spark.read.json(path) # The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") # SQL statements can be run by using the sql methods provided by spark teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() # +------+ # | name| # +------+ # |Justin| # +------+ # Alternatively, a DataFrame can be created for a JSON dataset represented by # an RDD[String] storing one JSON object per string jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'] otherPeopleRDD = sc.parallelize(jsonStrings) otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show() # +---------------+----+ # | address|name| # +---------------+----+ # |[Columbus,Ohio]| Yin| # +---------------+----+ ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> Spark SQL 可以 automatically infer(自動推斷)JSON dataset 的 schema,并將其作為 DataFrame 加載。使用 `read.json()` 函數,它從 JSON 文件的目錄中加載數據,其中每一行文件都是一個 JSON 對象。 請注意,以 _a json file_ 提供的文件不是典型的 JSON 文件。每行必須包含一個 separate(單獨的),self-contained valid(獨立的有效的)JSON 對象。有關更多信息,請參閱 [JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/)。 對于 regular multi-line JSON file(常規的多行 JSON 文件),將 `multiLine` 選項設置為 `true`。 ``` # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files. path <- "examples/src/main/resources/people.json" # Create a DataFrame from the file(s) pointed to by path people <- read.json(path) # The inferred schema can be visualized using the printSchema() method. printSchema(people) ## root ## |-- age: long (nullable = true) ## |-- name: string (nullable = true) # Register this DataFrame as a table. createOrReplaceTempView(people, "people") # SQL statements can be run by using the sql methods. teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") head(teenagers) ## name ## 1 Justin ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ``` CREATE TEMPORARY VIEW jsonTable USING org.apache.spark.sql.json OPTIONS ( path "examples/src/main/resources/people.json" ) SELECT * FROM jsonTable ``` ## Hive 表 Spark SQL 還支持讀取和寫入存儲在 [Apache Hive](http://hive.apache.org/) 中的數據。但是,由于 Hive 具有大量依賴關系,因此這些依賴關系不包含在默認 Spark 分發中。如果在類路徑中找到 Hive 依賴項,Spark 將自動加載它們。請注意,這些 Hive 依賴關系也必須存在于所有工作節點上,因為它們將需要訪問 Hive 序列化和反序列化庫(SerDes),以訪問存儲在 Hive 中的數據。 通過將 `hive-site.xml`,`core-site.xml`(用于安全配置)和 `hdfs-site.xml`(用于 HDFS 配置)文件放在 `conf/` 中來完成配置。 當使用 Hive 時,必須用 Hive 支持實例化 `SparkSession`,包括連接到持續的 Hive 轉移,支持 Hive serdes 和 Hive 用戶定義的功能。沒有現有 Hive 部署的用戶仍然可以啟用 Hive 支持。當 `hive-site.xml` 未配置時,上下文會自動在當前目錄中創建 `metastore_db`,并創建由 `spark.sql.warehouse.dir` 配置的目錄,該目錄默認為Spark應用程序當前目錄中的 `spark-warehouse` 目錄 開始了 請注意,自從2.0.0以來,`hive-site.xml` 中的 `hive.metastore.warehouse.dir` 屬性已被棄用。而是使用 `spark.sql.warehouse.dir` 來指定倉庫中數據庫的默認位置。您可能需要向啟動 Spark 應用程序的用戶授予寫權限。? ``` import java.io.File import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) // warehouseLocation points to the default location for managed databases and tables val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sql("SELECT * FROM src").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") // The items in DataFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") // Queries can then join DataFrame data with data stored in Hive. sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // | 5| val_5| 5| val_5| // ... ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.</small> ``` import java.io.File; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public static class Record implements Serializable { private int key; private String value; public int getKey() { return key; } public void setKey(int key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } } // warehouseLocation points to the default location for managed databases and tables String warehouseLocation = new File("spark-warehouse").getAbsolutePath(); SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show(); // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show(); // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // The items in DataFrames are of type Row, which lets you to access each column by ordinal. Dataset<String> stringsDS = sqlDF.map( (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1), Encoders.STRING()); stringsDS.show(); // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. List<Record> records = new ArrayList<>(); for (int key = 1; key < 100; key++) { Record record = new Record(); record.setKey(key); record.setValue("val_" + key); records.add(record); } Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class); recordsDF.createOrReplaceTempView("records"); // Queries can then join DataFrames data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show(); // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // ... ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java" in the Spark repo.</small> ``` from os.path import expanduser, join, abspath from pyspark.sql import SparkSession from pyspark.sql import Row # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate() # spark is an existing SparkSession spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show() # +---+-------+ # |key| value| # +---+-------+ # |238|val_238| # | 86| val_86| # |311|val_311| # ... # Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show() # +--------+ # |count(1)| # +--------+ # | 500 | # +--------+ # The results of SQL queries are themselves DataFrames and support all normal functions. sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") # The items in DataFrames are of type Row, which allows you to access each column by ordinal. stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value)) for record in stringsDS.collect(): print(record) # Key: 0, Value: val_0 # Key: 0, Value: val_0 # Key: 0, Value: val_0 # ... # You can also use DataFrames to create temporary views within a SparkSession. Record = Row("key", "value") recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)]) recordsDF.createOrReplaceTempView("records") # Queries can then join DataFrame data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() # +---+------+---+------+ # |key| value|key| value| # +---+------+---+------+ # | 2| val_2| 2| val_2| # | 4| val_4| 4| val_4| # | 5| val_5| 5| val_5| # ... ``` <small>Find full example code at "examples/src/main/python/sql/hive.py" in the Spark repo.</small> 當使用 Hive 時,必須使用 Hive 支持實例化 `SparkSession`。這個增加了在 MetaStore 中查找表并使用 HiveQL 編寫查詢的支持。 ``` # enableHiveSupport defaults to TRUE sparkR.session(enableHiveSupport = TRUE) sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. results <- collect(sql("FROM src SELECT key, value")) ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ### 指定 Hive 表的存儲格式 創建 Hive 表時,需要定義如何 從/向 文件系統 read/write 數據,即 “輸入格式” 和 “輸出格式”。您還需要定義該表如何將數據反序列化為行,或將行序列化為數據,即 “serde”。以下選項可用于指定存儲格式(“serde”, “input format”, “output format”),例如,`CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`。默認情況下,我們將以純文本形式讀取表格文件。請注意,Hive 存儲處理程序在創建表時不受支持,您可以使用 Hive 端的存儲處理程序創建一個表,并使用 Spark SQL 來讀取它。 | Property Name | Meaning | | --- | --- | | `fileFormat` | fileFormat是一種存儲格式規范的包,包括 "serde","input format" 和 "output format"。目前我們支持6個文件格式:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。 | | `inputFormat, outputFormat` | 這兩個選項將相應的 "InputFormat" 和 "OutputFormat" 類的名稱指定為字符串文字,例如:`org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。這兩個選項必須成對出現,如果您已經指定了 "fileFormat" 選項,則無法指定它們。 | | `serde` | 此選項指定 serde 類的名稱。當指定 `fileFormat` 選項時,如果給定的 `fileFormat` 已經包含 serde 的信息,那么不要指定這個選項。目前的 "sequencefile","textfile" 和 "rcfile" 不包含 serde 信息,你可以使用這3個文件格式的這個選項。 | | `fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim` | 這些選項只能與 "textfile" 文件格式一起使用。它們定義如何將分隔的文件讀入行。 | 使用 `OPTIONS` 定義的所有其他屬性將被視為 Hive serde 屬性。 ### 與不同版本的 Hive Metastore 進行交互 Spark SQL 的 Hive 支持的最重要的部分之一是與 Hive metastore 進行交互,這使得 Spark SQL 能夠訪問 Hive 表的元數據。從 Spark 1.4.0 開始,使用 Spark SQL 的單一二進制構建可以使用下面所述的配置來查詢不同版本的 Hive 轉移。請注意,獨立于用于與轉移點通信的 Hive 版本,內部 Spark SQL 將針對 Hive 1.2.1 進行編譯,并使用這些類進行內部執行(serdes,UDF,UDAF等)。 以下選項可用于配置用于檢索元數據的 Hive 版本: | 屬性名稱 | 默認值 | 含義 | | --- | --- | --- | | `spark.sql.hive.metastore.version` | `1.2.1` | Hive metastore 版本。可用選項為 `0.12.0` 至 `1.2.1`。 | | `spark.sql.hive.metastore.jars` | `builtin` | 當啟用 `-Phive` 時,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一起。選擇此選項時,spark.sql.hive.metastore.version 必須為 `1.2.1` 或未定義。行家 使用從Maven存儲庫下載的指定版本的Hive jar。通常不建議在生產部署中使用此配置。***** 應用于實例化 HiveMetastoreClient 的 jar 的位置。該屬性可以是三個選項之一: 1. `builtin` `-Phive` `spark.sql.hive.metastore.version` `1.2.1` 5. `maven` 6. JVM 的標準格式的 classpath。該類路徑必須包含所有 Hive 及其依賴項,包括正確版本的 Hadoop。這些罐只需要存在于 driver 程序中,但如果您正在運行在 yarn 集群模式,那么您必須確保它們與應用程序一起打包。 | | `spark.sql.hive.metastore.sharedPrefixes` | `com.mysql.jdbc, org.postgresql, com.microsoft.sqlserver, oracle.jdbc` | 使用逗號分隔的類前綴列表,應使用在 Spark SQL 和特定版本的 Hive 之間共享的類加載器來加載。一個共享類的示例就是用來訪問 Hive metastore 的 JDBC driver。其它需要共享的類,是需要與已經共享的類進行交互的。例如,log4j 使用的自定義 appender。 | | `spark.sql.hive.metastore.barrierPrefixes` | `(empty)` | 一個逗號分隔的類前綴列表,應該明確地為 Spark SQL 正在通信的 Hive 的每個版本重新加載。例如,在通常將被共享的前綴中聲明的 Hive UDF(即: `org.apache.spark.*`)。 | ## JDBC 連接其它數據庫 Spark SQL 還包括可以使用 JDBC 從其他數據庫讀取數據的數據源。此功能應優于使用 [JdbcRDD](api/scala/index.html#org.apache.spark.rdd.JdbcRDD)。這是因為結果作為 DataFrame 返回,并且可以輕松地在 Spark SQL 中處理或與其他數據源連接。JDBC 數據源也更容易從 Java 或 Python 使用,因為它不需要用戶提供 ClassTag。(請注意,這不同于 Spark SQL JDBC 服務器,允許其他應用程序使用 Spark SQL 運行查詢)。 要開始使用,您需要在 Spark 類路徑中包含特定數據庫的 JDBC driver 程序。例如,要從 Spark Shell 連接到 postgres,您將運行以下命令: ``` bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar ``` 可以使用 Data Sources API 將來自遠程數據庫的表作為 DataFrame 或 Spark SQL 臨時視圖進行加載。用戶可以在數據源選項中指定 JDBC 連接屬性。`用戶` 和 `密碼`通常作為登錄數據源的連接屬性提供。除了連接屬性外,Spark 還支持以下不區分大小寫的選項: | 屬性名稱 | 含義 | | --- | --- | | `url` | 要連接的JDBC URL。源特定的連接屬性可以在URL中指定。例如jdbc:`jdbc:postgresql://localhost/test?user=fred&password=secret` | | `dbtable` | 應該讀取的 JDBC 表。請注意,可以使用在SQL查詢的 `FROM` 子句中有效的任何內容。例如,您可以使用括號中的子查詢代替完整表。 | | `driver` | 用于連接到此 URL 的 JDBC driver 程序的類名。 | | `partitionColumn, lowerBound, upperBound` | 如果指定了這些選項,則必須指定這些選項。另外,必須指定 `numPartitions`。他們描述如何從多個 worker 并行讀取數據時將表給分區。`partitionColumn` 必須是有問題的表中的數字列。請注意,`lowerBound` 和 `upperBound` 僅用于決定分區的大小,而不是用于過濾表中的行。因此,表中的所有行將被分區并返回。此選項僅適用于讀操作。 | | `numPartitions` | 在表讀寫中可以用于并行度的最大分區數。這也確定并發JDBC連接的最大數量。如果要寫入的分區數超過此限制,則在寫入之前通過調用 `coalesce(numPartitions)` 將其減少到此限制。 | | `fetchsize` | JDBC 抓取的大小,用于確定每次數據往返傳遞的行數。這有利于提升 JDBC driver 的性能,它們的默認值較小(例如:Oracle 是 10 行)。該選項僅適用于讀取操作。 | | `batchsize` | JDBC 批處理的大小,用于確定每次數據往返傳遞的行數。這有利于提升 JDBC driver 的性能。該選項僅適用于寫操作。默認值為 `1000`。 | `isolationLevel` | 事務隔離級別,適用于當前連接。它可以是 `NONE`,`READ_COMMITTED`,`READ_UNCOMMITTED`,`REPEATABLE_READ`,或 `SERIALIZABLE` 之一,對應于 JDBC 連接對象定義的標準事務隔離級別,默認為 `READ_UNCOMMITTED`。此選項僅適用于寫操作。請參考 `java.sql.Connection` 中的文檔。 | | `truncate` | 這是一個與 JDBC 相關的選項。啟用 `SaveMode.Overwrite` 時,此選項會導致 Spark 截斷現有表,而不是刪除并重新創建。這可以更有效,并且防止表元數據(例如,索引)被移除。但是,在某些情況下,例如當新數據具有不同的模式時,它將無法工作。它默認為 `false`。此選項僅適用于寫操作。 | | `createTableOptions` | 這是一個與JDBC相關的選項。如果指定,此選項允許在創建表時設置特定于數據庫的表和分區選項(例如:`CREATE TABLE t (name string) ENGINE=InnoDB.`)。此選項僅適用于寫操作。 | | `createTableColumnTypes` | 使用數據庫列數據類型而不是默認值,創建表時。數據類型信息應以與 CREATE TABLE 列語法相同的格式指定(例如:`"name CHAR(64), comments VARCHAR(1024)"`)。指定的類型應該是有效的 spark sql 數據類型。此選項僅適用于寫操作。 | ``` // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small> ``` // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source Dataset<Row> jdbcDF = spark.read() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load(); Properties connectionProperties = new Properties(); connectionProperties.put("user", "username"); connectionProperties.put("password", "password"); Dataset<Row> jdbcDF2 = spark.read() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Saving data to a JDBC source jdbcDF.write() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save(); jdbcDF2.write() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Specifying create table column data types on write jdbcDF.write() .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); ``` <small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small> ``` # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods # Loading data from a JDBC source jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .load() jdbcDF2 = spark.read \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Saving data to a JDBC source jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save() jdbcDF2.write \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write \ .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) ``` <small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small> ``` # Loading data from a JDBC source df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password") # Saving data to a JDBC source write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password") ``` <small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small> ``` CREATE TEMPORARY VIEW jdbcTable USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:postgresql:dbserver", dbtable "schema.tablename", user 'username', password 'password' ) INSERT INTO TABLE jdbcTable SELECT * FROM resultTable ``` ## 故障排除 * JDBC driver 程序類必須對客戶端會話和所有執行程序上的原始類加載器可見。這是因為 Java 的 DriverManager 類執行安全檢查,導致它忽略原始類加載器不可見的所有 driver 程序,當打開連接時。一個方便的方法是修改所有工作節點上的compute_classpath.sh 以包含您的 driver 程序 JAR。 * 一些數據庫,例如 H2,將所有名稱轉換為大寫。您需要使用大寫字母來引用 Spark SQL 中的這些名稱。 # 性能調優 對于某些工作負載,可以通過緩存內存中的數據或打開一些實驗選項來提高性能。 ## 在內存中緩存數據 Spark SQL 可以通過調用 `spark.catalog.cacheTable("tableName")` 或 `dataFrame.cache()` 來使用內存中的列格式來緩存表。然后,Spark SQL 將只掃描所需的列,并將自動調整壓縮以最小化內存使用量和 GC 壓力。您可以調用 `spark.catalog.uncacheTable("tableName")` 從內存中刪除該表。 內存緩存的配置可以使用 `SparkSession` 上的 `setConf` 方法或使用 SQL 運行 `SET key=value` 命令來完成。 | 屬性名稱 | 默認 | 含義 | | --- | --- | --- | | `spark.sql.inMemoryColumnarStorage.compressed` | true | 當設置為 true 時,Spark SQL 將根據數據的統計信息為每個列自動選擇一個壓縮編解碼器。 | | `spark.sql.inMemoryColumnarStorage.batchSize` | 10000 | 控制批量的柱狀緩存的大小。更大的批量大小可以提高內存利用率和壓縮率,但是在緩存數據時會冒出 OOM 風險。 | ## 其他配置選項 以下選項也可用于調整查詢執行的性能。這些選項可能會在將來的版本中被廢棄,因為更多的優化是自動執行的。 | 屬性名稱 | 默認值 | 含義 | | --- | --- | --- | | `spark.sql.files.maxPartitionBytes` | 134217728 (128 MB) | 在讀取文件時,將單個分區打包的最大字節數。 | | `spark.sql.files.openCostInBytes` | 4194304 (4 MB) | 按照字節數來衡量的打開文件的估計費用可以在同一時間進行掃描。將多個文件放入分區時使用。最好過度估計,那么具有小文件的分區將比具有較大文件的分區(首先計劃的)更快。 | | `spark.sql.broadcastTimeout` | 300 | 廣播連接中的廣播等待時間超時(秒)| | `spark.sql.autoBroadcastJoinThreshold` | 10485760 (10 MB) | 配置執行連接時將廣播給所有工作節點的表的最大大小(以字節為單位)。通過將此值設置為-1可以禁用廣播。請注意,目前的統計信息僅支持 Hive Metastore 表,其中已運行命令 `ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan`。 | | `spark.sql.shuffle.partitions` | 200 | Configures the number of partitions to use when shuffling data for joins or aggregations。 # 分布式 SQL 引擎 Spark SQL 也可以充當使用其 JDBC/ODBC 或命令行界面的分布式查詢引擎。在這種模式下,最終用戶或應用程序可以直接與 Spark SQL 交互運行 SQL 查詢,而不需要編寫任何代碼。 ## 運行 Thrift JDBC/ODBC 服務器 這里實現的 Thrift JDBC/ODBC 服務器對應于 Hive 1.2 中的 [`HiveServer2`](https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2)。您可以使用 Spark 或 Hive 1.2.1 附帶的直線腳本測試 JDBC 服務器。 要啟動 JDBC/ODBC 服務器,請在 Spark 目錄中運行以下命令: ``` ./sbin/start-thriftserver.sh ``` 此腳本接受所有 `bin/spark-submit` 命令行選項,以及 `--hiveconf` 選項來指定 Hive 屬性。您可以運行 `./sbin/start-thriftserver.sh --help` 查看所有可用選項的完整列表。默認情況下,服務器監聽 localhost:10000\。您可以通過環境變量覆蓋此行為,即: ``` export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ... ``` or system properties: ``` ./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ... ``` 現在,您可以使用 beeline 來測試 Thrift JDBC/ODBC 服務器: ``` ./bin/beeline ``` 使用 beeline 方式連接到 JDBC/ODBC 服務器: ``` beeline> !connect jdbc:hive2://localhost:10000 ``` Beeline 將要求您輸入用戶名和密碼。在非安全模式下,只需輸入機器上的用戶名和空白密碼即可。對于安全模式,請按照 [beeline 文檔](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients) 中的說明進行操作。 配置Hive是通過將 `hive-site.xml`, `core-site.xml` 和 `hdfs-site.xml` 文件放在 `conf/` 中完成的。 您也可以使用 Hive 附帶的 beeline 腳本。 Thrift JDBC 服務器還支持通過 HTTP 傳輸發送 thrift RPC 消息。使用以下設置啟用 HTTP 模式作為系統屬性或在 `conf/` 中的 `hive-site.xml` 文件中啟用: ``` hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice ``` 要測試,請使用 beeline 以 http 模式連接到 JDBC/ODBC 服務器: ``` beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint> ``` ## 運行 Spark SQL CLI Spark SQL CLI 是在本地模式下運行 Hive 轉移服務并執行從命令行輸入的查詢的方便工具。請注意,Spark SQL CLI 不能與 Thrift JDBC 服務器通信。 要啟動 Spark SQL CLI,請在 Spark 目錄中運行以下命令: ``` ./bin/spark-sql ``` 配置 Hive 是通過將 `hive-site.xml`,`core-site.xml` 和 `hdfs-site.xml` 文件放在 `conf/` 中完成的。您可以運行 `./bin/spark-sql --help` 獲取所有可用選項的完整列表。 # 遷移指南 ## 從 Spark SQL 2.1 升級到 2.2 * Spark 2.1.1 介紹了一個新的配置 key:`spark.sql.hive.caseSensitiveInferenceMode`。它的默認設置是 `NEVER_INFER`,其行為與 2.1.0 保持一致。但是,Spark 2.2.0 將此設置的默認值更改為 “INFER_AND_SAVE”,以恢復與底層文件 schema(模式)具有大小寫混合的列名稱的 Hive metastore 表的兼容性。使用 `INFER_AND_SAVE` 配置的 value,在第一次訪問 Spark 將對其尚未保存推測 schema(模式)的任何 Hive metastore 表執行 schema inference(模式推斷)。請注意,對于具有數千個 partitions(分區)的表,模式推斷可能是非常耗時的操作。如果不兼容大小寫混合的列名,您可以安全地將`spark.sql.hive.caseSensitiveInferenceMode` 設置為 `NEVER_INFER`,以避免模式推斷的初始開銷。請注意,使用新的默認`INFER_AND_SAVE` 設置,模式推理的結果被保存為 metastore key 以供將來使用。因此,初始模式推斷僅發生在表的第一次訪問。 ## 從 Spark SQL 2.0 升級到 2.1 * Datasource tables(數據源表)現在存儲了 Hive metastore 中的 partition metadata(分區元數據)。這意味著諸如 `ALTER TABLE PARTITION ... SET LOCATION` 這樣的 Hive DDLs 現在使用 Datasource API 可用于創建 tables(表)。 * 遺留的數據源表可以通過 `MSCK REPAIR TABLE` 命令遷移到這種格式。建議遷移遺留表利用 Hive DDL 的支持和提供的計劃性能。 * 要確定表是否已遷移,當在表上發出 `DESCRIBE FORMATTED` 命令時請查找 `PartitionProvider: Catalog` 屬性。 * Datasource tables(數據源表)的 `INSERT OVERWRITE TABLE ... PARTITION ...` 行為的更改。 * 在以前的 Spark 版本中,`INSERT OVERWRITE` 覆蓋了整個 Datasource table,即使給出一個指定的 partition。現在只有匹配規范的 partition 被覆蓋。 * 請注意,這仍然與 Hive 表的行為不同,Hive 表僅覆蓋與新插入數據重疊的分區。 ## 從 Spark SQL 1.6 升級到 2.0 * `SparkSession` 現在是 Spark 新的切入點,它替代了老的 `SQLContext` 和 `HiveContext`。注意:為了向下兼容,老的 SQLContext 和 HiveContext 仍然保留。可以從 `SparkSession` 獲取一個新的 `catalog` 接口 — 現有的訪問數據庫和表的 API,如 `listTables`,`createExternalTable`,`dropTempView`,`cacheTable` 都被移到該接口。 * Dataset API 和 DataFrame API 進行了統一。在 Scala 中,`DataFrame` 變成了 `Dataset[Row]` 類型的一個別名,而 Java API 使用者必須將 `DataFrame` 替換成 `Dataset&lt;Row&gt;`。Dataset 類既提供了強類型轉換操作(如 `map`,`filter` 以及 `groupByKey`)也提供了非強類型轉換操作(如 `select` 和 `groupBy`)。由于編譯期的類型安全不是 Python 和 R 語言的一個特性,Dataset 的概念并不適用于這些語言的 API。相反,`DataFrame` 仍然是最基本的編程抽象,就類似于這些語言中單節點 data frame 的概念。 * Dataset 和 DataFrame API 中 unionAll 已經過時并且由 `union` 替代。 * Dataset 和 DataFrame API 中 explode 已經過時,作為選擇,可以結合 select 或 flatMap 使用 `functions.explode()`。 * Dataset 和 DataFrame API 中 `registerTempTable` 已經過時并且由 `createOrReplaceTempView` 替代。 * 對 Hive tables `CREATE TABLE ... LOCATION` 行為的更改。 * 從 Spark 2.0 開始,`CREATE TABLE ... LOCATION` 與 `CREATE EXTERNAL TABLE ... LOCATION` 是相同的,以防止意外丟棄用戶提供的 locations(位置)中的現有數據。這意味著,在用戶指定位置的 Spark SQL 中創建的 Hive 表始終是 Hive 外部表。刪除外部表將不會刪除數據。用戶不能指定 Hive managed tables(管理表)的位置。請注意,這與Hive行為不同。 * 因此,這些表上的 “DROP TABLE” 語句不會刪除數據。 ## 從 Spark SQL 1.5 升級到 1.6 * 從 Spark 1.6 開始,默認情況下服務器在多 session(會話)模式下運行。這意味著每個 JDBC/ODBC 連接擁有一份自己的 SQL 配置和臨時函數注冊。緩存表仍在并共享。如果您希望以舊的單會話模式運行 Thrift server,請設置選項 `spark.sql.hive.thriftServer.singleSession` 為 `true`。您既可以將此選項添加到 `spark-defaults.conf`,或者通過 `--conf` 將它傳遞給 `start-thriftserver.sh`。 ``` ./sbin/start-thriftserver.sh \ --conf spark.sql.hive.thriftServer.singleSession=true \ ... ``` * 從 1.6.1 開始,在 sparkR 中 withColumn 方法支持添加一個新列或更換 DataFrame 同名的現有列。 * 從 Spark 1.6 開始,LongType 強制轉換為 TimestampType 期望是秒,而不是微秒。這種更改是為了匹配 Hive 1.2 的行為,以便從 numeric(數值)類型進行更一致的類型轉換到 TimestampType。更多詳情請參閱 [SPARK-11724](https://issues.apache.org/jira/browse/SPARK-11724)。 ## 從 Spark SQL 1.4 升級到 1.5 * 使用手動管理的內存優化執行,現在是默認啟用的,以及代碼生成表達式求值。這些功能既可以通過設置 `spark.sql.tungsten.enabled` 為 `false` 來禁止使用。 * Parquet 的模式合并默認情況下不再啟用。它可以通過設置 `spark.sql.parquet.mergeSchema` 到 `true` 以重新啟用。 * 字符串在 Python 列的 columns(列)現在支持使用點(`.`)來限定列或訪問嵌套值。例如 `df['table.column.nestedField']`。但是,這意味著如果你的列名中包含任何圓點,你現在必須避免使用反引號(如 `table.`column.with.dots`.nested`)。 * 在內存中的列存儲分區修剪默認是開啟的。它可以通過設置 `spark.sql.inMemoryColumnarStorage.partitionPruning` 為 `false` 來禁用。 * 無限精度的小數列不再支持,而不是 Spark SQL 最大精度為 38。當從 `BigDecimal` 對象推斷模式時,現在使用(38,18)。在 DDL 沒有指定精度時,則默認保留 `Decimal(10, 0)`。 * 時間戳現在存儲在 1 微秒的精度,而不是 1 納秒的。 * 在 sql 語句中,floating point(浮點數)現在解析為 decimal。HiveQL 解析保持不變。 * SQL / DataFrame 函數的規范名稱現在是小寫(例如 sum vs SUM)。 * JSON 數據源不會自動加載由其他應用程序(未通過 Spark SQL 插入到數據集的文件)創建的新文件。對于 JSON 持久表(即表的元數據存儲在 Hive Metastore),用戶可以使用 `REFRESH TABLE` SQL 命令或 `HiveContext` 的 `refreshTable` 方法,把那些新文件列入到表中。對于代表一個 JSON dataset 的 DataFrame,用戶需要重新創建 DataFrame,同時 DataFrame 中將包括新的文件。 * PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替換現有的同名列。 ## 從 Spark SQL 1.3 升級到 1.4 #### DataFrame data reader/writer interface 基于用戶反饋,我們創建了一個新的更流暢的 API,用于讀取(`SQLContext.read`)中的數據并寫入數據(`DataFrame.write`),并且舊的 API 將過時(例如,`SQLContext.parquetFile`,`SQLContext.jsonFile`)。 針對 `SQLContext.read`([Scala](api/scala/index.html#org.apache.spark.sql.SQLContext@read:DataFrameReader),[Java](api/java/org/apache/spark/sql/SQLContext.html#read()),[Python](api/python/pyspark.sql.html#pyspark.sql.SQLContext.read)) 和 `DataFrame.write`([Scala](api/scala/index.html#org.apache.spark.sql.DataFrame@write:DataFrameWriter),[Java](api/java/org/apache/spark/sql/DataFrame.html#write()),[Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame.write))的更多細節,請看 API 文檔。 #### DataFrame.groupBy 保留 grouping columns(分組的列) 根據用戶的反饋,我們更改了 `DataFrame.groupBy().agg()` 的默認行為以保留 `DataFrame` 結果中的 grouping columns(分組列)。為了在 1.3 中保持該行為,請設置 `spark.sql.retainGroupColumns` 為 `false`。 ``` // In 1.3.x, in order for the grouping column "department" to show up, // it must be included explicitly as part of the agg function call. df.groupBy("department").agg($"department", max("age"), sum("expense")) // In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(max("age"), sum("expense")) // Revert to 1.3 behavior (not retaining grouping column) by: sqlContext.setConf("spark.sql.retainGroupColumns", "false") ``` ``` // In 1.3.x, in order for the grouping column "department" to show up, // it must be included explicitly as part of the agg function call. df.groupBy("department").agg(col("department"), max("age"), sum("expense")); // In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(max("age"), sum("expense")); // Revert to 1.3 behavior (not retaining grouping column) by: sqlContext.setConf("spark.sql.retainGroupColumns", "false"); ``` ``` import pyspark.sql.functions as func # In 1.3.x, in order for the grouping column "department" to show up, # it must be included explicitly as part of the agg function call. df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")) # In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(func.max("age"), func.sum("expense")) # Revert to 1.3.x behavior (not retaining grouping column) by: sqlContext.setConf("spark.sql.retainGroupColumns", "false") ``` #### DataFrame.withColumn 上的行為更改 之前 1.4 版本中,DataFrame.withColumn() 只支持添加列。該列將始終在 DateFrame 結果中被加入作為新的列,即使現有的列可能存在相同的名稱。從 1.4 版本開始,DataFrame.withColumn() 支持添加與所有現有列的名稱不同的列或替換現有的同名列。 請注意,這一變化僅適用于 Scala API,并不適用于 PySpark 和 SparkR。 ## 從 Spark SQL 1.0-1.2 升級到 1.3 在 Spark 1.3 中,我們從 Spark SQL 中刪除了 “Alpha” 的標簽,作為一部分已經清理過的可用的 API。從 Spark 1.3 版本以上,Spark SQL 將提供在 1.X 系列的其他版本的二進制兼容性。這種兼容性保證不包括被明確標記為不穩定的(即 DeveloperApi 類或 Experimental)API。 #### 重命名 DataFrame 的 SchemaRDD 升級到 Spark SQL 1.3 版本時,用戶會發現最大的變化是,`SchemaRDD` 已更名為 `DataFrame`。這主要是因為 DataFrames 不再從 RDD 直接繼承,而是由 RDDS 自己來實現這些功能。DataFrames 仍然可以通過調用 `.rdd` 方法轉換為 RDDS。 在 Scala 中,有一個從 `SchemaRDD` 到 `DataFrame` 類型別名,可以為一些情況提供源代碼兼容性。它仍然建議用戶更新他們的代碼以使用 `DataFrame` 來代替。Java 和 Python 用戶需要更新他們的代碼。 #### Java 和 Scala APIs 的統一 此前 Spark 1.3 有單獨的Java兼容類(`JavaSQLContext` 和 `JavaSchemaRDD`),借鑒于 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已經統一。兩種語言的用戶可以使用 `SQLContext` 和 `DataFrame`。一般來說論文類嘗試使用兩種語言的共有類型(如 `Array` 替代了一些特定集合)。在某些情況下不通用的類型情況下,(例如,passing in closures 或 Maps)使用函數重載代替。 此外,該 Java 的特定類型的 API 已被刪除。Scala 和 Java 的用戶可以使用存在于 `org.apache.spark.sql.types` 類來描述編程模式。 #### 隔離隱式轉換和刪除 dsl 包(僅Scala) 許多 Spark 1.3 版本以前的代碼示例都以 `import sqlContext._` 開始,這提供了從 sqlContext 范圍的所有功能。在 Spark 1.3 中,我們移除了從 `RDD`s 到 `DateFrame` 再到 `SQLContext` 內部對象的隱式轉換。用戶現在應該寫成 `import sqlContext.implicits._`. 此外,隱式轉換現在只能使用方法 `toDF` 來增加由 `Product`(即 case classes or tuples)構成的 `RDD`,而不是自動應用。 當使用 DSL 內部的函數時(現在使用 `DataFrame` API 來替換),用戶習慣導入 `org.apache.spark.sql.catalyst.dsl`。相反,應該使用公共的 dataframe 函數 API:`import org.apache.spark.sql.functions._`. #### 針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限于 Scala) Spark 1.3 移除存在于基本 SQL 包的 `DataType` 類型別名。開發人員應改為導入類 `org.apache.spark.sql.types`。 #### UDF 注冊遷移到 `sqlContext.udf` 中(Java & Scala) 用于注冊 UDF 的函數,不管是 DataFrame DSL 還是 SQL 中用到的,都被遷移到 `SQLContext` 中的 udf 對象中。 ``` sqlContext.udf.register("strLen", (s: String) => s.length()) ``` ``` sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType); ``` Python UDF 注冊保持不變。 #### Python DataTypes 不再是 Singletons(單例的) 在 Python 中使用 DataTypes 時,你需要先構造它們(如:`StringType()`),而不是引用一個單例對象。 ## 與 Apache Hive 的兼容 Spark SQL 在設計時就考慮到了和 Hive metastore,SerDes 以及 UDF 之間的兼容性。目前 Hive SerDes 和 UDF 都是基于 Hive 1.2.1 版本,并且Spark SQL 可以連接到不同版本的Hive metastore(從 0.12.0 到 1.2.1,可以參考 [與不同版本的 Hive Metastore 交互]((#interacting-with-different-versions-of-hive-metastore))) #### 在現有的 Hive Warehouses 中部署 Spark SQL Thrift JDBC server 采用了開箱即用的設計以兼容已有的 Hive 安裝版本。你不需要修改現有的 Hive Metastore,或者改變數據的位置和表的分區。 ### 所支持的 Hive 特性 Spark SQL 支持絕大部分的 Hive 功能,如: * Hive query(查詢)語句,包括: * `SELECT` * `GROUP BY` * `ORDER BY` * `CLUSTER BY` * `SORT BY` * 所有 Hive 操作,包括: * 關系運算符(`=`,`?`,`==`,`&lt;&gt;`,`&lt;`,`&gt;`,`&gt;=`,`&lt;=`,等等) * 算術運算符(`+`,`-`,`*`,`/`,`%`,等等) * 邏輯運算符(`AND`,`&&`,`OR`,`||`,等等) * 復雜類型的構造 * 數學函數(`sign`,`ln`,`cos`,等等) * String 函數(`instr`,`length`,`printf`,等等) * 用戶定義函數(UDF) * 用戶定義聚合函數(UDAF) * 用戶定義 serialization formats(SerDes) * 窗口函數 * Joins * `JOIN` * `{LEFT|RIGHT|FULL} OUTER JOIN` * `LEFT SEMI JOIN` * `CROSS JOIN` * Unions * Sub-queries(子查詢) * `SELECT col FROM (SELECT a + b AS col from t1) t2` * Sampling * Explain * Partitioned tables including dynamic partition insertion * View * 所有的 Hive DDL 函數,包括: * `CREATE TABLE` * `CREATE TABLE AS SELECT` * `ALTER TABLE` * 大部分的 Hive Data types(數據類型),包括: * `TINYINT` * `SMALLINT` * `INT` * `BIGINT` * `BOOLEAN` * `FLOAT` * `DOUBLE` * `STRING` * `BINARY` * `TIMESTAMP` * `DATE` * `ARRAY&lt;&gt;` * `MAP&lt;&gt;` * `STRUCT&lt;&gt;` ### 未支持的 Hive 函數 以下是目前還不支持的 Hive 函數列表。在 Hive 部署中這些功能大部分都用不到。 **主要的 Hive 功能** * Tables 使用 buckets 的 Tables:bucket 是 Hive table partition 中的 hash partitioning。Spark SQL 還不支持 buckets. **Esoteric Hive 功能** * `UNION` 類型 * Unique join * Column 統計信息的收集:Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore. **Hive Input/Output Formats** * File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat. * Hadoop archive **Hive 優化** 有少數 Hive 優化還沒有包含在 Spark 中。其中一些(比如 indexes 索引)由于 Spark SQL 的這種內存計算模型而顯得不那么重要。另外一些在 Spark SQL 未來的版本中會持續跟蹤。 * Block 級別的 bitmap indexes 和虛擬 columns(用于構建 indexes) * 自動為 join 和 groupBy 計算 reducer 個數:目前在 Spark SQL 中,你需要使用 “`SET spark.sql.shuffle.partitions=[num_tasks];`” 來控制 post-shuffle 的并行度。 * 僅 Meta-data 的 query:對于只使用 metadata 就能回答的查詢,Spark SQL 仍然會啟動計算結果的任務。 * Skew data flag:Spark SQL 不遵循 Hive 中 skew 數據的標記。 * `STREAMTABLE` hint in join:Spark SQL 不遵循 `STREAMTABLE` hint。 * 對于查詢結果合并多個小文件:如果輸出的結果包括多個小文件,Hive 可以可選的合并小文件到一些大文件中去,以避免溢出 HDFS metadata。Spark SQL 還不支持這樣。 # 參考 ## 數據類型 Spark SQL 和 DataFrames 支持下面的數據類型: * Numeric types * `ByteType`: Represents 1-byte signed integer numbers. The range of numbers is from `-128` to `127`. * `ShortType`: Represents 2-byte signed integer numbers. The range of numbers is from `-32768` to `32767`. * `IntegerType`: Represents 4-byte signed integer numbers. The range of numbers is from `-2147483648` to `2147483647`. * `LongType`: Represents 8-byte signed integer numbers. The range of numbers is from `-9223372036854775808` to `9223372036854775807`. * `FloatType`: Represents 4-byte single-precision floating point numbers. * `DoubleType`: Represents 8-byte double-precision floating point numbers. * `DecimalType`: Represents arbitrary-precision signed decimal numbers. Backed internally by `java.math.BigDecimal`. A `BigDecimal` consists of an arbitrary precision integer unscaled value and a 32-bit integer scale. * String type * `StringType`: Represents character string values. * Binary type * `BinaryType`: Represents byte sequence values. * Boolean type * `BooleanType`: Represents boolean values. * Datetime type * `TimestampType`: Represents values comprising values of fields year, month, day, hour, minute, and second. * `DateType`: Represents values comprising values of fields year, month, day. * Complex types * `ArrayType(elementType, containsNull)`: Represents values comprising a sequence of elements with the type of `elementType`. `containsNull` is used to indicate if elements in a `ArrayType` value can have `null` values. * `MapType(keyType, valueType, valueContainsNull)`: Represents values comprising a set of key-value pairs. The data type of keys are described by `keyType` and the data type of values are described by `valueType`. For a `MapType` value, keys are not allowed to have `null` values. `valueContainsNull` is used to indicate if values of a `MapType` value can have `null` values. * `StructType(fields)`: Represents values with the structure described by a sequence of `StructField`s (`fields`). * `StructField(name, dataType, nullable)`: Represents a field in a `StructType`. The name of a field is indicated by `name`. The data type of a field is indicated by `dataType`. `nullable` is used to indicate if values of this fields can have `null` values. Spark SQL 的所有數據類型都在包 `org.apache.spark.sql.types` 中。你可以用下示例示例來訪問它們. ``` import org.apache.spark.sql.types._ ``` <small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small> | Data type(數據類型)| Scala 中的 Value 類型 | 訪問或創建數據類型的 API | | --- | --- | --- | | **ByteType** | Byte | ByteType | | **ShortType** | Short | ShortType | | **IntegerType** | Int | IntegerType | | **LongType** | Long | LongType | | **FloatType** | Float | FloatType | | **DoubleType** | Double | DoubleType | | **DecimalType** | java.math.BigDecimal | DecimalType | | **StringType** | String | StringType | | **BinaryType** | Array[Byte] | BinaryType | | **BooleanType** | Boolean | BooleanType | | **TimestampType** | java.sql.Timestamp | TimestampType | | **DateType** | java.sql.Date | DateType | | **ArrayType** | scala.collection.Seq | ArrayType(_elementType_, [_containsNull_]) **Note(注意):** _containsNull_ 的默認值是 _true_。 | **MapType** | scala.collection.Map | MapType(_keyType_, _valueType_, [_valueContainsNull_]) **Note(注意):** _valueContainsNull_ 的默認值是 _true_。 | **StructType** | org.apache.spark.sql.Row | StructType(_fields_) **Note(注意):** _fields_ 是 StructFields 的 Seq。所有,兩個 fields 擁有相同的名稱是不被允許的。 | **StructField** | 該 field(字段)數據類型的 Scala 中的 value 類型(例如,數據類型為 IntegerType 的 StructField 是 Int) | StructField(_name_, _dataType_, [_nullable_]) **Note:** _nullable_ 的默認值是 _true_。 Spark SQL 的所有數據類型都在 `org.apache.spark.sql.types` 的包中。要訪問或者創建一個數據類型,請使用 `org.apache.spark.sql.types.DataTypes` 中提供的 factory 方法. | Data type | Value type in Java | API to access or create a data type | | --- | --- | --- | | **ByteType** | byte or Byte | DataTypes.ByteType | | **ShortType** | short or Short | DataTypes.ShortType | | **IntegerType** | int or Integer | DataTypes.IntegerType | | **LongType** | long or Long | DataTypes.LongType | | **FloatType** | float or Float | DataTypes.FloatType | | **DoubleType** | double or Double | DataTypes.DoubleType | | **DecimalType** | java.math.BigDecimal | DataTypes.createDecimalType() DataTypes.createDecimalType(_precision_, _scale_). | | **StringType** | String | DataTypes.StringType | | **BinaryType** | byte[] | DataTypes.BinaryType | | **BooleanType** | boolean or Boolean | DataTypes.BooleanType | | **TimestampType** | java.sql.Timestamp | DataTypes.TimestampType | | **DateType** | java.sql.Date | DataTypes.DateType | | **ArrayType** | java.util.List | DataTypes.createArrayType(_elementType_) **Note:** The value of _containsNull_ will be _true_ DataTypes.createArrayType(_elementType_, _containsNull_). | | **MapType** | java.util.Map | DataTypes.createMapType(_keyType_, _valueType_) **Note:** The value of _valueContainsNull_ will be _true_. DataTypes.createMapType(_keyType_, _valueType_, _valueContainsNull_) | | **StructType** | org.apache.spark.sql.Row | DataTypes.createStructType(_fields_) **Note:** _fields_ is a List or an array of StructFields. Also, two fields with the same name are not allowed. | | **StructField** | The value type in Java of the data type of this field (For example, int for a StructField with the data type IntegerType) | DataTypes.createStructField(_name_, _dataType_, _nullable_) | Spark SQL 的所有數據類型都在 `pyspark.sql.types` 的包中。你可以通過如下方式來訪問它們. ``` from pyspark.sql.types import * ``` | Data type | Value type in Python | API to access or create a data type | | --- | --- | --- | | **ByteType** | int or long **Note:** Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127. | ByteType() | | **ShortType** | int or long **Note:** Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767. | ShortType() | | **IntegerType** | int or long | IntegerType() | | **LongType** | long **Note:** Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807. Otherwise, please convert data to decimal.Decimal and use DecimalType. | LongType() | | **FloatType** | float **Note:** Numbers will be converted to 4-byte single-precision floating point numbers at runtime. | FloatType() | | **DoubleType** | float | DoubleType() | | **DecimalType** | decimal.Decimal | DecimalType() | | **StringType** | string | StringType() | | **BinaryType** | bytearray | BinaryType() | | **BooleanType** | bool | BooleanType() | | **TimestampType** | datetime.datetime | TimestampType() | | **DateType** | datetime.date | DateType() | | **ArrayType** | list, tuple, or array | ArrayType(_elementType_, [_containsNull_]) **Note:** The default value of _containsNull_ is _True_. | | **MapType** | dict | MapType(_keyType_, _valueType_, [_valueContainsNull_]) **Note:** The default value of _valueContainsNull_ is _True_. | | **StructType** | list or tuple | StructType(_fields_) **Note:** _fields_ is a Seq of StructFields. Also, two fields with the same name are not allowed. | | **StructField** | The value type in Python of the data type of this field (For example, Int for a StructField with the data type IntegerType) | StructField(_name_, _dataType_, [_nullable_]) **Note:** The default value of _nullable_ is _True_. | | Data type | Value type in R | API to access or create a data type | | --- | --- | --- | | **ByteType** | integer **Note:** Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127. | "byte" | | **ShortType** | integer **Note:** Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767. | "short" | | **IntegerType** | integer | "integer" | | **LongType** | integer **Note:** Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807. Otherwise, please convert data to decimal.Decimal and use DecimalType. | "long" | | **FloatType** | numeric **Note:** Numbers will be converted to 4-byte single-precision floating point numbers at runtime. | "float" | | **DoubleType** | numeric | "double" | | **DecimalType** | Not supported | Not supported | | **StringType** | character | "string" | | **BinaryType** | raw | "binary" | | **BooleanType** | logical | "bool" | | **TimestampType** | POSIXct | "timestamp" | | **DateType** | Date | "date" | | **ArrayType** | vector or list | list(type="array", elementType=_elementType_, containsNull=[_containsNull_]) **Note:** The default value of _containsNull_ is _TRUE_. | | **MapType** | environment | list(type="map", keyType=_keyType_, valueType=_valueType_, valueContainsNull=[_valueContainsNull_]) **Note:** The default value of _valueContainsNull_ is _TRUE_. | | **StructType** | named list | list(type="struct", fields=_fields_) **Note:** _fields_ is a Seq of StructFields. Also, two fields with the same name are not allowed. | | **StructField** | The value type in R of the data type of this field (For example, integer for a StructField with the data type IntegerType) | list(name=_name_, type=_dataType_, nullable=[_nullable_]) **Note:** The default value of _nullable_ is _TRUE_. | ## NaN Semantics 當處理一些不符合標準浮點數語義的 `float` 或 `double` 類型時,對于 Not-a-Number(NaN) 需要做一些特殊處理。具體如下: * NaN = NaN 返回 true。 * 在 aggregations(聚合)操作中,所有的 NaN values 將被分到同一個組中。 * 在 join key 中 NaN 可以當做一個普通的值。 * NaN 值在升序排序中排到最后,比任何其他數值都大。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看