# 快速入門
- [安全](https://spark.apache.org/docs/latest/quick-start.html#security)
* [使用 Spark Shell 進行交互式分析](#使用-spark-shell-進行交互式分析)
* [基礎](#基礎)
* [Dataset 上的更多操作](#dataset-上的更多操作)
* [緩存](#緩存)
* [獨立的應用](#獨立的應用)
* [快速跳轉](#快速跳轉)
本教程提供了如何使用 Spark 的快速入門介紹。首先通過運行 Spark 交互式的 shell(在 Python 或 Scala 中)來介紹 API,然后展示如何使用 Java,Scala 和 Python 來編寫應用程序。
為了繼續閱讀本指南,首先從 [Spark 官網](http://spark.apache.org/downloads.html) 下載 Spark 的發行包。因為我們不使用 HDFS,所以你可以下載一個任何 Hadoop 版本的軟件包。
請注意,在 Spark 2.0 之前,Spark 的主要編程接口是彈性分布式數據集(RDD)。 在 Spark 2.0 之后,RDD 被 Dataset 替換,它是像RDD 一樣的 strongly-typed(強類型),但是在引擎蓋下更加優化。 RDD 接口仍然受支持,您可以在 [RDD 編程指南](rdd-programming-guide.html) 中獲得更完整的參考。 但是,我們強烈建議您切換到使用 Dataset(數據集),其性能要更優于 RDD。 請參閱 [SQL 編程指南](sql-programming-guide.html) 獲取更多有關 Dataset 的信息。
# 安全
默認情況下,Spark中的安全性處于關閉狀態。這意味著您默認情況下容易受到攻擊。在下載和運行Spark之前,請參閱[Spark Security](https://spark.apache.org/docs/latest/security.html)。
# 使用 Spark Shell 進行交互式分析
## 基礎
Spark shell 提供了一種來學習該 API 比較簡單的方式,以及一個強大的來分析數據交互的工具。在 Scala(運行于 Java 虛擬機之上,并能很好的調用已存在的 Java 類庫)或者 Python 中它是可用的。通過在 Spark 目錄中運行以下的命令來啟動它:
```
./bin/spark-shell
```
Spark 的主要抽象是一個稱為 Dataset 的分布式的 item 集合。Datasets 可以從 Hadoop 的 InputFormats(例如 HDFS文件)或者通過其它的 Datasets 轉換來創建。讓我們從 Spark 源目錄中的 README 文件來創建一個新的 Dataset:
```
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
```
您可以直接從 Dataset 中獲取 values(值),通過調用一些 actions(動作),或者 transform(轉換)Dataset 以獲得一個新的。更多細節,請參閱 _[API doc](api/scala/index.html#org.apache.spark.sql.Dataset)_。
```
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
```
現在讓我們 transform 這個 Dataset 以獲得一個新的 。我們調用 `filter` 以返回一個新的 Dataset,它是文件中的 items 的一個子集。
```
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
```
我們可以鏈式操作 transformation(轉換)和 action(動作):
```
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
```
```
./bin/pyspark
```
Spark有一個主要的抽象概念叫做 Dataset 的分布式集合類。Dataset 可以從Hadoop InputFormats(例如HDFS文件)或通過 transforming 其他數據集來創建數據集。 由于Python的動態特性,我們不需要在Python中定義強類型的 Dataset。 因此,Python中的所有數據集都是 Dataset[Row],我們稱之為"DataFrame" 來與 Pandas 和R中的數據框概念一致。讓我們從Spark源文件中的README文件中創建一個新的 DataFrame 目錄:
```
>>> textFile = spark.read.text("README.md")
```
您可以通過調用某些action直接從DataFrame獲取值,也可以transform DataFrame以獲取新的DataFrame。 有關詳細信息,請閱讀 _[API doc](api/python/index.html#pyspark.sql.DataFrame)_.
```
>>> textFile.count() # 這個DataFrame 有多少行
126
>>> textFile.first() # DataFrame的第一行
Row(value=u'# Apache Spark')
```
現在讓我們 transform(轉換) 這個DataFrame來獲得一個新的DataFrame. 我們調用 `filter` 方法來返回文件中的一個子集.
```
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
```
我們可以把 transform 和 acition 連在一起用:
```
>>> textFile.filter(textFile.value.contains("Spark")).count() # 統計文件中 "Spark" 字符串有多少個
15
```
## Dataset 上的更多操作
Dataset actions(操作)和 transformations(轉換)可以用于更復雜的計算。例如,統計出現次數最多的行 :
```
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
```
第一個 map 操作創建一個新的 Dataset,將一行數據 map 為一個整型值。在 Dataset 上調用 `reduce` 來找到最大的行計數。參數 `map` 與 `reduce` 是 Scala 函數(closures),并且可以使用 Scala/Java 庫的任何語言特性。例如,我們可以很容易地調用函數聲明,我們將定義一個 max 函數來使代碼更易于理解 :
```
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
```
一種常見的數據流模式是被 Hadoop 所推廣的 MapReduce。Spark 可以很容易實現 MapReduce:
```
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
```
在這里,我們調用了 `flatMap` 以 transform 一個 lines 的 Dataset 為一個 words 的 Dataset,然后結合 `groupByKey` 和 `count` 來計算文件中每個單詞的 counts 作為一個 (String, Long) 的 Dataset pairs。要在 shell 中收集 word counts,我們可以調用 `collect`:
```
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
```
```
>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]
```
這首先將一行映射為一個整數值并且別名為 "numWords" ,從中創建一個新的DataFrame. 在該 DataFrame 上調用 `agg` 函數是為了找到最大詞數(word count) . `select` 和 `agg` 的參數都是 _[Column](api/python/index.html#pyspark.sql.Column)_ 里的,我們也可以通過 `df.colName` 來獲得該 DataFrame 的一列. 我們也可以導入 pyspark.sql.functions 來提供了許多方便的功能來從舊的列構建一個新的列,
一個常見的數據流模式是MapReduce,由Hadoop推廣。 Spark可以輕松實現MapReduce流程:
```
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).as("word")).groupBy("word").count()
```
在這里我們在 `select` 中使用 `explode` 函數來將一個 Dataset 的所有行轉換成一個詞的數據集,然后組合使用 `groupBy` 和 `count` 來計算文件中各個單詞的計數作為 DataFrame 的兩列: "word" 和 "count".要在我們的shell中統計單詞的詞頻,我們可以調用 `collect`:
```
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
```
## 緩存
Spark 還支持 Pulling(拉取)數據集到一個群集范圍的內存緩存中。例如當查詢一個小的 “hot” 數據集或運行一個像 PageRANK 這樣的迭代算法時,在數據被重復訪問時是非常高效的。舉一個簡單的例子,讓我們標記我們的 `linesWithSpark` 數據集到緩存中:
```
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
```
使用 Spark 來探索和緩存一個 100 行的文本文件看起來比較愚蠢。有趣的是,即使在他們跨越幾十或者幾百個節點時,這些相同的函數也可以用于非常大的數據集。您也可以像 [編程指南](rdd-programming-guide.html#using-the-shell). 中描述的一樣通過連接 `bin/spark-shell` 到集群中,使用交互式的方式來做這件事情。
```
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
```
使用Spark探索和緩存100行文本文件似乎很愚蠢. T有趣的是,這些相同的功能可用于非常大的數據集,即使它們跨越數十個或數百個節點交錯著, 你也可以通過`bin/pyspark` 連接到集群來進行交互,詳細描述在 [RDD programming guide](rdd-programming-guide.html#using-the-shell).
# 獨立的應用
假設我們希望使用 Spark API 來創建一個獨立的應用程序。我們在 Scala(SBT),Java(Maven)和 Python 中練習一個簡單應用程序。
我們將在 Scala 中創建一個非常簡單的 Spark 應用程序 - 很簡單的,事實上,它名為 `SimpleApp.scala`:
```
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
```
注意,這個應用程序我們應該定義一個 `main()` 方法而不是去擴展 `scala.App`。使用 `scala.App` 的子類可能不會正常運行。
該程序僅僅統計了 Spark README 文件中每一行包含 ‘a’ 的數量和包含 ‘b’ 的數量。注意,您需要將 YOUR_SPARK_HOME 替換為您 Spark 安裝的位置。不像先前使用 spark shell 操作的示例,它們初始化了它們自己的 SparkContext,我們初始化了一個 SparkContext 作為應用程序的一部分。
我們調用 `SparkSession.builder` 以構造一個 [[SparkSession]],然后設置 application name(應用名稱),最終調用 `getOrCreate` 以獲得 [[SparkSession]] 實例。
我們的應用依賴了 Spark API,所以我們將包含一個名為 `build.sbt` 的 sbt 配置文件,它描述了 Spark 的依賴。該文件也會添加一個 Spark 依賴的 repository:
```
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
```
為了讓 sbt 正常的運行,我們需要根據經典的目錄結構來布局 `SimpleApp.scala` 和 `build.sbt` 文件。在成功后,我們可以創建一個包含應用程序代碼的 JAR 包,然后使用 `spark-submit` 腳本來運行我們的程序。
```
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23
```
這個例子使用Maven來編譯成一個jar應用程序,其他的構建系統(如Ant、Gradle,譯者注)也可以。
我們會創建一個非常簡單的Spark應用,`SimpleApp.java`:
```
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read.textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
```
這個程序計算Spark README文檔中包含字母’a’和字母’b’的行數。注意把YOUR_SPARK_HOME修改成你的Spark的安裝目錄。 跟之前的Spark shell不同,我們需要初始化SparkSession。
把Spark依賴添加到Maven的`pom.xml`文件里。 注意Spark的artifacts使用Scala版本進行標記。
```
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
```
我們按照Maven經典的目錄結構組織這些文件:
```
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
```
現在我們用Maven打包這個應用,然后用`./bin/spark-submit`執行它。
```
# 打包包含應用程序的JAR
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# 用spark-submit來運行程序
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
```
現在我們來展示如何用python API 來寫一個應用 (pyspark).
如果要構建打包的PySpark應用程序或庫,則可以添加以下內容到setup.py文件中:
```
install_requires=[
'pyspark=={site.SPARK_VERSION}'
]
```
我們以一個簡單的例子為例,創建一個簡單的pyspark 應用 `SimpleApp.py`:
```
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder().appName(appName).master(master).getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
```
該程序只是統計計算在該文本中包含a字母和包含b字母的行數. 請注意你需要將 YOUR_SPARK_HOME 替換成你的spark路徑.就像scala 示例和java示例一樣,我們使用 SparkSession 來創建數據集, 對于使用自定義類護著第三方庫的應用程序,我們還可以通過 `spark-submit` 帶著 `--py-files` 來添加代碼依賴 , 我們也可以通過把代碼打成zip包來進行依賴添加 (詳細請看 `spark-submit --help` ). `SimpleApp` 是個簡單的例子我們不需要添加特別的代碼或自定義類.
我們可以通過 `bin/spark-submit` 腳本來運行應用:
```
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
```
如果您的環境中已安裝PySpark pip(例如pip install pyspark),則可以使用常規Python解釋器運行應用程序,也可以根據需要使用前面的“ spark-submit”。
```
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23
```
# 快速跳轉
恭喜您成功的運行了您的第一個 Spark 應用程序!
* 更多 API 的深入概述,從 [RDD programming guide](rdd-programming-guide.html) 和 [SQL programming guide](sql-programming-guide.html) 這里開始,或者看看 “編程指南” 菜單中的其它組件。
* 為了在集群上運行應用程序,請前往 [deployment overview](cluster-overview.html).
* 最后,在 Spark 的 `examples` 目錄中包含了一些 ([Scala](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples),[Java](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples),[Python](https://github.com/apache/spark/tree/master/examples/src/main/python),[R](https://github.com/apache/spark/tree/master/examples/src/main/r)) 示例。您可以按照如下方式來運行它們:
```
# 針對 Scala 和 Java,使用 run-example:
./bin/run-example SparkPi
# 針對 Python 示例,直接使用 spark-submit:
./bin/spark-submit examples/src/main/python/pi.py
# 針對 R 示例,直接使用 spark-submit:
./bin/spark-submit examples/src/main/r/dataframe.R
```
- Spark 概述
- 編程指南
- 快速入門
- Spark 編程指南
- 構建在 Spark 之上的模塊
- Spark Streaming 編程指南
- Spark SQL, DataFrames and Datasets Guide
- MLlib
- GraphX Programming Guide
- API 文檔
- 部署指南
- 集群模式概述
- Submitting Applications
- 部署模式
- Spark Standalone Mode
- 在 Mesos 上運行 Spark
- Running Spark on YARN
- 其它
- 更多
- Spark 配置
- Monitoring and Instrumentation
- Tuning Spark
- 作業調度
- Spark 安全
- 硬件配置
- Accessing OpenStack Swift from Spark
- 構建 Spark
- 其它
- 外部資源
- Spark RDD(Resilient Distributed Datasets)論文
- 翻譯進度