<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # Spark 編程指南 * [概述](#概述) * [Spark 依賴](#spark-依賴) * [初始化 Spark](#初始化-spark) * [使用 Shell](#使用-shell) * [彈性分布式數據集(RDDs)](#彈性分布式數據集-rdds) * [并行集合](#并行集合) * [外部 Datasets(數據集)](#外部-datasets數據集) * [RDD 操作](#rdd-操作) * [基礎](#基礎) * [傳遞 Functions(函數)給 Spark](#傳遞-functions函數給-spark) * [理解閉包](#理解閉包-) * [示例](#示例) * [Local(本地)vs. cluster(集群)模式](#local本地vs-cluster集群模式) * [打印 RDD 的 elements](#打印-rdd-的-elements) * [與 Key-Value Pairs 一起使用](#與--key-value-pairs-一起使用) * [Transformations(轉換)](#transformations轉換) * [Actions(動作)](#actions動作) * [Shuffle 操作](#shuffle-操作) * [Background(幕后)](#background幕后) * [性能影響](#性能影響) * [RDD Persistence(持久化)](#rdd-persistence持久化) * [如何選擇存儲級別 ?](#如何選擇存儲級別-) * [刪除數據](#刪除數據) * [共享變量](#共享變量) * [廣播變量](#廣播變量) * [Accumulators(累加器)](#accumulators累加器) * [部署應用到集群中](#部署應用到集群中) * [從 Java / Scala 啟動 Spark jobs](#從-java--scala-啟動-spark-jobs) * [單元測試](#單元測試) * [快速鏈接](#快速鏈接) # 概述 在一個較高的概念上來說,每一個 Spark 應用程序由一個在集群上運行著用戶的 `main` 函數和執行各種并行操作的 _driver program_(驅動程序)組成。Spark 提供的主要抽象是一個 _彈性分布式數據集_(RDD),它是可以執行并行操作且跨集群節點的元素的集合。RDD 可以從一個 Hadoop 文件系統(或者任何其它 Hadoop 支持的文件系統),或者一個在 driver program(驅動程序)中已存在的 Scala 集合,以及通過 transforming(轉換)來創建一個 RDD。用戶為了讓它在整個并行操作中更高效的重用,也許會讓 Spark persist(持久化)一個 RDD 到內存中。最后,RDD 會自動的從節點故障中恢復。 在 Spark 中的第二個抽象是能夠用于并行操作的 _shared variables_(共享變量),默認情況下,當 Spark 的一個函數作為一組不同節點上的任務運行時,它將每一個變量的副本應用到每一個任務的函數中去。有時候,一個變量需要在整個任務中,或者在任務和 driver program(驅動程序)之間來共享。Spark 支持兩種類型的共享變量:_broadcast variables_(廣播變量),它可以用于在所有節點上緩存一個值,和 _accumulators_(累加器),他是一個只能被 “added(增加)” 的變量,例如 counters 和 sums。 本指南介紹了每一種 Spark 所支持的語言的特性。如果您啟動 Spark 的交互式 shell - 針對 Scala shell 使用 `bin/spark-shell` 或者針對 Python 使用 `bin/pyspark` 是很容易來學習的。 # Spark 依賴 Spark 2.2.0 默認使用 Scala 2.11 來構建和發布直到運行。(當然,Spark 也可以與其它的 Scala 版本一起運行)。為了使用 Scala 編寫應用程序,您需要使用可兼容的 Scala 版本(例如,2.11.X)。 要編寫一個 Spark 的應用程序,您需要在 Spark 上添加一個 Maven 依賴。Spark 可以通過 Maven 中央倉庫獲取: ``` groupId = org.apache.spark artifactId = spark-core_2.11 version = 2.2.0 ``` 此外,如果您想訪問一個 HDFS 集群,則需要針對您的 HDFS 版本添加一個 `hadoop-client`(hadoop 客戶端)依賴。 ``` groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version> ``` 最后,您需要導入一些 Spark classes(類)到您的程序中去。添加下面幾行: ``` import org.apache.spark.SparkContext import org.apache.spark.SparkConf ``` (在 Spark 1.3.0 之前,您需要明確導入 `org.apache.spark.SparkContext._` 來啟用必要的的隱式轉換。) Spark 2.2.0 supports [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html) for concisely writing functions, otherwise you can use the classes in the [org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package. Note that support for Java 7 was removed in Spark 2.2.0. To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at: ``` groupId = org.apache.spark artifactId = spark-core_2.11 version = 2.2.0 ``` In addition, if you wish to access an HDFS cluster, you need to add a dependency on `hadoop-client` for your version of HDFS. ``` groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version> ``` Finally, you need to import some Spark classes into your program. Add the following lines: ``` import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; ``` Spark 2.2.0 works with Python 2.6+ or Python 3.4+. It can use the standard CPython interpreter, so C libraries like NumPy can be used. It also works with PyPy 2.3+. Note that support for Python 2.6 is deprecated as of Spark 2.0.0, and may be removed in Spark 2.2.0. To run Spark applications in Python, use the `bin/spark-submit` script located in the Spark directory. This script will load Spark’s Java/Scala libraries and allow you to submit applications to a cluster. You can also use `bin/pyspark` to launch an interactive Python shell. If you wish to access HDFS data, you need to use a build of PySpark linking to your version of HDFS. [Prebuilt packages](http://spark.apache.org/downloads.html) are also available on the Spark homepage for common HDFS versions. Finally, you need to import some Spark classes into your program. Add the following line: ``` from pyspark import SparkContext, SparkConf ``` PySpark requires the same minor version of Python in both driver and workers. It uses the default python version in PATH, you can specify which version of Python you want to use by `PYSPARK_PYTHON`, for example: ``` $ PYSPARK_PYTHON=python3.4 bin/pyspark $ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py ``` # 初始化 Spark Spark 程序必須做的第一件事情是創建一個 [SparkContext](api/scala/index.html#org.apache.spark.SparkContext) 對象,它會告訴 Spark 如何訪問集群。要創建一個 `SparkContext`,首先需要構建一個包含應用程序的信息的 [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) 對象。 每一個 JVM 可能只能激活一個 SparkContext 對象。在創新一個新的對象之前,必須調用 `stop()` 該方法停止活躍的 SparkContext。 ``` val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf) ``` The first thing a Spark program must do is to create a [JavaSparkContext](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) object, which tells Spark how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/java/index.html?org/apache/spark/SparkConf.html) object that contains information about your application. ``` SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); ``` The first thing a Spark program must do is to create a [SparkContext](api/python/pyspark.html#pyspark.SparkContext) object, which tells Spark how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/python/pyspark.html#pyspark.SparkConf) object that contains information about your application. ``` conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) ``` 這個 `appName` 參數是一個在集群 UI 上展示應用程序的名稱。`master` 是一個 [Spark,Mesos 或 YARN 的 cluster URL](submitting-applications.html#master-urls),或者指定為在 local mode(本地模式)中運行的 “local” 字符串。在實際工作中,當在集群上運行時,您不希望在程序中將 master 給硬編碼,而是用 [使用 `spark-submit` 啟動應用](submitting-applications.html) 并且接收它。然而,對于本地測試和單元測試,您可以通過 “local” 來運行 Spark 進程。 ## 使用 Shell 在 Spark Shell 中,一個特殊的 interpreter-aware(可用的解析器)SparkContext 已經為您創建好了,稱之為 `sc` 的變量。創建您自己的 SparkContext 將不起作用。您可以使用 `--master` 參數設置這個 SparkContext 連接到哪一個 master 上,并且您可以通過 `--jars` 參數傳遞一個逗號分隔的列表來添加 JARs 到 classpath 中。也可以通過 `--packages` 參數應用一個用逗號分隔的 maven coordinates(maven 坐標)方式來添加依賴(例如,Spark 包)到您的 shell session 中去。任何額外存在且依賴的倉庫(例如 Sonatype)可以傳遞到 `--repositories` 參數。例如,要明確使用四個核(CPU)來運行 `bin/spark-shell`,使用: ``` $ ./bin/spark-shell --master local[4] ``` 或者,也可以添加 `code.jar` 到它的 classpath 中去,使用: ``` $ ./bin/spark-shell --master local[4] --jars code.jar ``` 為了包含一個依賴,使用 Maven 坐標: ``` $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" ``` 有關選項的完整列表,請運行 `spark-shell --help`。在幕后,`spark-shell` 調用了常用的 [`spark-submit` 腳本](submitting-applications.html). In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. Sonatype) can be passed to the `--repositories` argument. Any Python dependencies a Spark package has (listed in the requirements.txt of that package) must be manually installed using `pip` when necessary. For example, to run `bin/pyspark` on exactly four cores, use: ``` $ ./bin/pyspark --master local[4] ``` Or, to also add `code.py` to the search path (in order to later be able to `import code`), use: ``` $ ./bin/pyspark --master local[4] --py-files code.py ``` For a complete list of options, run `pyspark --help`. Behind the scenes, `pyspark` invokes the more general [`spark-submit` script](submitting-applications.html). It is also possible to launch the PySpark shell in [IPython](http://ipython.org), the enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To use IPython, set the `PYSPARK_DRIVER_PYTHON` variable to `ipython` when running `bin/pyspark`: ``` $ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark ``` To use the Jupyter notebook (previously known as the IPython notebook), ``` $ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark ``` You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`. After the Jupyter Notebook server is launched, you can create a new “Python 2” notebook from the “Files” tab. Inside the notebook, you can input the command `%pylab inline` as part of your notebook before you start to try Spark from the Jupyter notebook. # 彈性分布式數據集(RDDs) Spark 主要以一個 _彈性分布式數據集_(RDD)的概念為中心,它是一個容錯且可以執行并行操作的元素的集合。有兩種方法可以創建 RDD:在你的 driver program(驅動程序)中 _parallelizing_ 一個已存在的集合,或者在外部存儲系統中引用一個數據集,例如,一個共享文件系統,HDFS,HBase,或者提供 Hadoop InputFormat 的任何數據源。 ## 并行集合 可以在您的 driver program(a Scala `Seq`)中已存在的集合上通過調用 `SparkContext` 的 `parallelize` 方法來創建并行集合。該集合的元素從一個可以并行操作的 distributed dataset(分布式數據集)中復制到另一個 dataset(數據集)中去。例如,這里是一個如何去創建一個保存數字 1 ~ 5 的并行集合。 ``` val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) ``` 在創建后,該 distributed dataset(分布式數據集)(`distData`)可以并行的執行操作。例如,我們可以調用 `distData.reduce((a, b) =&gt; a + b`) 來合計數組中的元素。后面我們將介紹 distributed dataset(分布式數據集)上的操作。 Parallelized collections are created by calling `JavaSparkContext`’s `parallelize` method on an existing `Collection` in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5: ``` List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> distData = sc.parallelize(data); ``` Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we might call `distData.reduce((a, b) -&gt; a + b)` to add up the elements of the list. We describe operations on distributed datasets later on. Parallelized collections are created by calling `SparkContext`’s `parallelize` method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5: ``` data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) ``` Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we can call `distData.reduce(lambda a, b: a + b)` to add up the elements of the list. We describe operations on distributed datasets later on. 并行集合中一個很重要參數是 _partitions_(分區)的數量,它可用來切割 dataset(數據集)。Spark 將在集群中的每一個分區上運行一個任務。通常您希望群集中的每一個 CPU 計算 2-4 個分區。一般情況下,Spark 會嘗試根據您的群集情況來自動的設置的分區的數量。當然,您也可以將分區數作為第二個參數傳遞到 `parallelize`(例如,`sc.parallelize(data, 10)`)方法中來手動的設置它。注意:代碼中的一些地方會使用 term slices(a synonym for partitions)以保持向后兼容. ## 外部 Datasets(數據集) Spark 可以從 Hadoop 所支持的任何存儲源中創建 distributed dataset(分布式數據集),包括本地文件系統,HDFS,Cassandra,HBase,[Amazon S3](http://wiki.apache.org/hadoop/AmazonS3) 等等。Spark 支持文本文件,[SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html),以及任何其它的 Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html)。 可以使用 `SparkContext` 的 `textFile` 方法來創建文本文件的 RDD。此方法需要一個文件的 URI(計算機上的本地路徑,`hdfs://`,`s3n://` 等等的 URI),并且讀取它們作為一個 lines(行)的集合。下面是一個調用示例: ``` scala> val distFile = sc.textFile("data.txt") distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26 ``` 在創建后,`distFile` 可以使用 dataset(數據集)的操作。例如,我們可以使用下面的 map 和 reduce 操作來合計所有行的數量:`distFile.map(s =&gt; s.length).reduce((a, b) =&gt; a + b)`。 使用 Spark 讀取文件時需要注意: * 如果使用本地文件系統的路徑,所工作節點的相同訪問路徑下該文件必須可以訪問。復制文件到所有工作節點上,或著使用共享的網絡掛載文件系統。 * 所有 Spark 基于文件的 input 方法,包括 `textFile`,支持在目錄上運行,壓縮文件,和通配符。例如,您可以使用 `textFile("/my/directory")`,`textFile("/my/directory/*.txt")`,and `textFile("/my/directory/*.gz")`. * `textFile` 方法也可以通過第二個可選的參數來控制該文件的分區數量。默認情況下,Spark 為文件的每一個 block(塊)創建的一 個 partition 分區(HDFS 中塊大小默認是 128MB),當然你也可以通過傳遞一個較大的值來要求一個較高的分區數量。請注意,分區的數量不能夠小于塊的數量。 除了文本文件之外,Spark 的 Scala API 也支持一些其它的數據格式: * `SparkContext.wholeTextFiles` 可以讀取包含多個小文本文件的目錄,并且將它們作為一個 (filename, content) pairs 來返回。這與 `textFile` 相比,它的每一個文件中的每一行將返回一個記錄。分區由數據量來確定,某些情況下,可能導致分區太少。針對這些情況,`wholeTextFiles` 在第二個位置提供了一個可選的參數用戶控制分區的最小數量. * 針對 [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html),使用 SparkContext 的 `sequenceFile[K, V]` 方法,其中 `K` 和 `V` 指的是文件中 key 和 values 的類型。這些應該是 Hadoop 的 [Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html) 接口的子類,像 [IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html) and [Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html)。此外,Spark 可以讓您為一些常見的 Writables 指定原生類型; 例如,`sequenceFile[Int, String]` 會自動讀取 IntWritables 和 Texts. * 針對其它的 Hadoop InputFormats,您可以使用 `SparkContext.hadoopRDD` 方法,它接受一個任意的 `JobConf` 和 input format class,key class 和 value class。通過相同的方法你可以設置你的 input source(輸入源)。你還可以針對 InputFormats 使用基于 “new” MapReduce API(`org.apache.hadoop.mapreduce`)的 `SparkContext.newAPIHadoopRDD`. * `RDD.saveAsObjectFile` 和 `SparkContext.objectFile` 支持使用簡單的序列化的 Java objects 來保存 RDD。雖然這不像 Avro 這種專用的格式一樣高效,但其提供了一種更簡單的方式來保存任何的 RDD。. Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html). Text file RDDs can be created using `SparkContext`’s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3n://`, etc URI) and reads it as a collection of lines. Here is an example invocation: ``` JavaRDD<String> distFile = sc.textFile("data.txt"); ``` Once created, `distFile` can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the `map` and `reduce` operations as follows: `distFile.map(s -&gt; s.length()).reduce((a, b) -&gt; a + b)`. Some notes on reading files with Spark: * If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system. * All of Spark’s file-based input methods, including `textFile`, support running on directories, compressed files, and wildcards as well. For example, you can use `textFile("/my/directory")`, `textFile("/my/directory/*.txt")`, and `textFile("/my/directory/*.gz")`. * The `textFile` method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks. Apart from text files, Spark’s Java API also supports several other data formats: * `JavaSparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. * For [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), use SparkContext’s `sequenceFile[K, V]` method where `K` and `V` are the types of key and values in the file. These should be subclasses of Hadoop’s [Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html) interface, like [IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html) and [Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html). * For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `JavaSparkContext.newAPIHadoopRDD` for InputFormats based on the “new” MapReduce API (`org.apache.hadoop.mapreduce`). * `JavaRDD.saveAsObjectFile` and `JavaSparkContext.objectFile` support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD. PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html). Text file RDDs can be created using `SparkContext`’s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3n://`, etc URI) and reads it as a collection of lines. Here is an example invocation: ``` >>> distFile = sc.textFile("data.txt") ``` Once created, `distFile` can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the `map` and `reduce` operations as follows: `distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)`. Some notes on reading files with Spark: * If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system. * All of Spark’s file-based input methods, including `textFile`, support running on directories, compressed files, and wildcards as well. For example, you can use `textFile("/my/directory")`, `textFile("/my/directory/*.txt")`, and `textFile("/my/directory/*.gz")`. * The `textFile` method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks. Apart from text files, Spark’s Python API also supports several other data formats: * `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. * `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10. * SequenceFile and Hadoop Input/Output Formats **Note** this feature is currently marked `Experimental` and is intended for advanced users. It may be replaced in future with read/write support based on Spark SQL, in which case Spark SQL is the preferred approach. **Writable Support** PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the resulting Java objects using [Pyrolite](https://github.com/irmen/Pyrolite/). When saving an RDD of key-value pairs to SequenceFile, PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following Writables are automatically converted: | Writable Type | Python Type | | --- | --- | | Text | unicode str | | IntWritable | int | | FloatWritable | float | | DoubleWritable | float | | BooleanWritable | bool | | BytesWritable | bytearray | | NullWritable | None | | MapWritable | dict | Arrays are not handled out-of-the-box. Users need to specify custom `ArrayWritable` subtypes when reading or writing. When writing, users also need to specify custom converters that convert arrays to custom `ArrayWritable` subtypes. When reading, the default converter will convert custom `ArrayWritable` subtypes to Java `Object[]`, which then get pickled to Python tuples. To get Python `array.array` for arrays of primitive types, users need to specify custom converters. **Saving and Loading SequenceFiles** Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value classes can be specified, but for standard Writables this is not required. ``` >>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')] ``` **Saving and Loading Other Hadoop Input/Output Formats** PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both ‘new’ and ‘old’ Hadoop MapReduce APIs. If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the Elasticsearch ESInputFormat: ``` $ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345}) ``` Note that, if the InputFormat simply depends on a Hadoop configuration and/or input path, and the key and value classes can easily be converted according to the above table, then this approach should work well for such cases. If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to transform that data on the Scala/Java side to something which can be handled by Pyrolite’s pickler. A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) trait is provided for this. Simply extend this trait and implement your transformation code in the `convert` method. Remember to ensure that this class, along with any dependencies required to access your `InputFormat`, are packaged into your Spark job jar and included on the PySpark classpath. See the [Python examples](https://github.com/apache/spark/tree/master/examples/src/main/python) and the [Converter examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters) for examples of using Cassandra / HBase `InputFormat` and `OutputFormat` with custom converters. ## RDD 操作 RDDs support 兩種類型的操作:_transformations(轉換)_,它會在一個已存在的 dataset 上創建一個新的 dataset,和 _actions(動作)_,將在 dataset 上運行的計算后返回到 driver 程序。例如,`map` 是一個通過讓每個數據集元素都執行一個函數,并返回的新 RDD 結果的 transformation,`reduce` reduce 通過執行一些函數,聚合 RDD 中所有元素,并將最終結果給返回驅動程序(雖然也有一個并行 `reduceByKey` 返回一個分布式數據集)的 action. Spark 中所有的 transformations 都是 _lazy(懶加載的)_,因此它不會立刻計算出結果。相反,他們只記得應用于一些基本數據集的轉換(例如. 文件)。只有當需要返回結果給驅動程序時,transformations 才開始計算。這種設計使 Spark 的運行更高效。例如,我們可以了解到,`map` 所創建的數據集將被用在 `reduce` 中,并且只有 `reduce` 的計算結果返回給驅動程序,而不是映射一個更大的數據集. 默認情況下,每次你在 RDD 運行一個 action 的時,每個 transformed RDD 都會被重新計算。但是,您也可用 `persist`(或 `cache`)方法將 RDD persist(持久化)到內存中;在這種情況下,Spark 為了下次查詢時可以更快地訪問,會把數據保存在集群上。此外,還支持持續持久化 RDDs 到磁盤,或復制到多個結點。 ### 基礎 為了說明 RDD 基礎,請思考下面這個的簡單程序: ``` val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b) ``` 第一行從外部文件中定義了一個基本的 RDD,但這個數據集并未加載到內存中或即將被行動:`line` 僅僅是一個類似指針的東西,指向該文件。第二行定義了 `lineLengths` 作為 `map` transformation 的結果。請注意,由于 `laziness`(延遲加載)`lineLengths` 不會被立即計算。最后,我們運行 `reduce`,這是一個 action。此時,Spark 分發計算任務到不同的機器上運行,每臺機器都運行在 map 的一部分并本地運行 reduce,僅僅返回它聚合后的結果給驅動程序. 如果我們也希望以后再次使用 `lineLengths`,我們還可以添加: ``` lineLengths.persist() ``` 在 `reduce` 之前,這將導致 `lineLengths` 在第一次計算之后就被保存在 memory 中。 To illustrate RDD basics, consider the simple program below: ``` JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(s -> s.length()); int totalLength = lineLengths.reduce((a, b) -> a + b); ``` The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: `lines` is merely a pointer to the file. The second line defines `lineLengths` as the result of a `map` transformation. Again, `lineLengths` is _not_ immediately computed, due to laziness. Finally, we run `reduce`, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program. If we also wanted to use `lineLengths` again later, we could add: ``` lineLengths.persist(StorageLevel.MEMORY_ONLY()); ``` before the `reduce`, which would cause `lineLengths` to be saved in memory after the first time it is computed. To illustrate RDD basics, consider the simple program below: ``` lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b) ``` The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: `lines` is merely a pointer to the file. The second line defines `lineLengths` as the result of a `map` transformation. Again, `lineLengths` is _not_ immediately computed, due to laziness. Finally, we run `reduce`, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program. If we also wanted to use `lineLengths` again later, we could add: ``` lineLengths.persist() ``` before the `reduce`, which would cause `lineLengths` to be saved in memory after the first time it is computed. ### 傳遞 Functions(函數)給 Spark 當 driver 程序在集群上運行時,Spark 的 API 在很大程度上依賴于傳遞函數。有 2 種推薦的方式來做到這一點: * [Anonymous function syntax(匿名函數語法)](http://docs.scala-lang.org/tutorials/tour/anonymous-function-syntax.html),它可以用于短的代碼片斷. * 在全局單例對象中的靜態方法。例如,您可以定義 `object MyFunctions` 然后傳遞 `MyFunctions.func1`,如下: ``` object MyFunctions { def func1(s: String): String = { ... } } myRdd.map(MyFunctions.func1) ``` 請注意,雖然也有可能傳遞一個類的實例(與單例對象相反)的方法的引用,這需要發送整個對象,包括類中其它方法。例如,考慮: ``` class MyClass { def func1(s: String): String = { ... } def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } } ``` 這里,如果我們創建一個 `MyClass` 的實例,并調用 `doStuff`,在 `map` 內有 `MyClass` 實例的 `func1` 方法的引用,所以整個對象需要被發送到集群的。它類似于 `rdd.map(x =&gt; this.func1(x))` 類似的方式,訪問外部對象的字段將引用整個對象: ``` class MyClass { val field = "Hello" def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } } ``` 相當于寫 `rdd.map(x =&gt; this.field + x)`,它引用 `this` 所有的東西。為了避免這個問題,最簡單的方式是復制 `field` 到一個本地變量,而不是外部訪問它: ``` def doStuff(rdd: RDD[String]): RDD[String] = { val field_ = this.field rdd.map(x => field_ + x) } ``` Spark’s API relies heavily on passing functions in the driver program to run on the cluster. In Java, functions are represented by classes implementing the interfaces in the [org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package. There are two ways to create such functions: * Implement the Function interfaces in your own class, either as an anonymous inner class or a named one, and pass an instance of it to Spark. * Use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html) to concisely define an implementation. While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs in long-form. For example, we could have written our code above as follows: ``` JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() { public Integer call(String s) { return s.length(); } }); int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() { public Integer call(Integer a, Integer b) { return a + b; } }); ``` Or, if writing the functions inline is unwieldy: ``` class GetLength implements Function<String, Integer> { public Integer call(String s) { return s.length(); } } class Sum implements Function2<Integer, Integer, Integer> { public Integer call(Integer a, Integer b) { return a + b; } } JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(new GetLength()); int totalLength = lineLengths.reduce(new Sum()); ``` Note that anonymous inner classes in Java can also access variables in the enclosing scope as long as they are marked `final`. Spark will ship copies of these variables to each worker node as it does for other languages. Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are three recommended ways to do this: * [Lambda expressions](https://docs.python.org/2/tutorial/controlflow.html#lambda-expressions), for simple functions that can be written as an expression. (Lambdas do not support multi-statement functions or statements that do not return a value.) * Local `def`s inside the function calling into Spark, for longer code. * Top-level functions in a module. For example, to pass a longer function than can be supported using a `lambda`, consider the code below: ``` """MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) sc = SparkContext(...) sc.textFile("file.txt").map(myFunc) ``` Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider: ``` class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func) ``` Here, if we create a `new MyClass` and call `doStuff` on it, the `map` inside there references the `func` method _of that `MyClass` instance_, so the whole object needs to be sent to the cluster. In a similar way, accessing fields of the outer object will reference the whole object: ``` class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + s) ``` To avoid this issue, the simplest way is to copy `field` into a local variable instead of accessing it externally: ``` def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + s) ``` ### 理解閉包 在集群中執行代碼時,一個關于 Spark 更難的事情是理解變量和方法的范圍和生命周期。修改其范圍之外的變量 RDD 操作可以混淆的常見原因。在下面的例子中,我們將看一下使用的 `foreach()` 代碼遞增累加計數器,但類似的問題,也可能會出現其他操作上. #### 示例 考慮一個簡單的 RDD 元素求和,以下行為可能不同,具體取決于是否在同一個 JVM 中執行。一個常見的例子是當 Spark 運行在 `local` 本地模式(`--master = local[n]`)時,與部署 Spark 應用到群集(例如,通過 spark-submit 到 YARN): ``` var counter = 0 var rdd = sc.parallelize(data) // Wrong: Don't do this!! rdd.foreach(x => counter += x) println("Counter value: " + counter) ``` ``` int counter = 0; JavaRDD<Integer> rdd = sc.parallelize(data); // Wrong: Don't do this!! rdd.foreach(x -> counter += x); println("Counter value: " + counter); ``` ``` counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter) ``` #### Local(本地)vs. cluster(集群)模式 上面的代碼行為是不確定的,并且可能無法按預期正常工作。執行作業時,Spark 會分解 RDD 操作到每個 executor 中的 task 里。在執行之前,Spark 計算任務的 **closure**(閉包)。閉包是指 executor 要在RDD上進行計算時必須對執行節點可見的那些變量和方法(在這里是foreach())。閉包被序列化并被發送到每個 executor。 閉包的變量副本發給每個 **executor**,當 **counter** 被 `foreach` 函數引用的時候,它已經不再是 driver node 的 **counter** 了。雖然在 driver node 仍然有一個 counter 在內存中,但是對 executors 已經不可見。executor 看到的只是序列化的閉包一個副本。所以 **counter** 最終的值還是 0,因為對 `counter` 所有的操作均引用序列化的 closure 內的值。 在 `local` 本地模式,在某些情況下的 `foreach` 功能實際上是同一 JVM 上的驅動程序中執行,并會引用同一個原始的 **counter** 計數器,實際上可能更新. 為了確保這些類型的場景明確的行為應該使用的 [`Accumulator`](#accumulators) 累加器。當一個執行的任務分配到集群中的各個 worker 結點時,Spark 的累加器是專門提供安全更新變量的機制。本指南的累加器的部分會更詳細地討論這些。 在一般情況下,closures - constructs 像循環或本地定義的方法,不應該被用于改動一些全局狀態。Spark 沒有規定或保證突變的行為,以從封閉件的外側引用的對象。一些代碼,這可能以本地模式運行,但是這只是偶然和這樣的代碼如預期在分布式模式下不會表現。如果需要一些全局的聚合功能,應使用 Accumulator(累加器)。 #### 打印 RDD 的 elements 另一種常見的語法用于打印 RDD 的所有元素使用 `rdd.foreach(println)` 或 `rdd.map(println)`。在一臺機器上,這將產生預期的輸出和打印 RDD 的所有元素。然而,在集群 `cluster` 模式下,`stdout` 輸出正在被執行寫操作 executors 的 `stdout` 代替,而不是在一個驅動程序上,因此 `stdout` 的 `driver` 程序不會顯示這些!要打印 `driver` 程序的所有元素,可以使用的 `collect()` 方法首先把 RDD 放到 driver 程序節點上:`rdd.collect().foreach(println)`。這可能會導致 driver 程序耗盡內存,雖說,因為 `collect()` 獲取整個 RDD 到一臺機器; 如果你只需要打印 RDD 的幾個元素,一個更安全的方法是使用 `take()`:`rdd.take(100).foreach(println)`。 ### 與 Key-Value Pairs 一起使用 雖然大多數 Spark 操作工作在包含任何類型對象的 RDDs 上,只有少數特殊的操作可用于 Key-Value 對的 RDDs。最常見的是分布式 “shuffle” 操作,如通過元素的 key 來進行 grouping 或 aggregating 操作. 在 Scala 中,這些操作在 RDD 上是自動可用,它包含了 [Tuple2](http://www.scala-lang.org/api/2.11.8/index.html#scala.Tuple2) objects (the built-in tuples in the language, created by simply writing `(a, b)`)。在 [PairRDDFunctions](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) class 中該 key-value pair 操作是可用的,其中圍繞 tuple 的 RDD 進行自動封裝. 例如,下面的代碼使用的 `Key-Value` 對的 `reduceByKey` 操作統計文本文件中每一行出現了多少次: ``` val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b) ``` 我們也可以使用 `counts.sortByKey()`,例如,在對按字母順序排序,最后 `counts.collect()` 把他們作為一個數據對象返回給 driver 程序。 **Note(注意):** 當在 key-value pair 操作中使用自定義的 objects 作為 key 時,您必須確保有一個自定義的 `equals()` 方法有一個 `hashCode()` 方法相匹配。有關詳情,請參閱 [Object.hashCode() documentation](http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()) 中列出的約定. While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements by a key. In Java, key-value pairs are represented using the [scala.Tuple2](http://www.scala-lang.org/api/2.11.8/index.html#scala.Tuple2) class from the Scala standard library. You can simply call `new Tuple2(a, b)` to create a tuple, and access its fields later with `tuple._1()` and `tuple._2()`. RDDs of key-value pairs are represented by the [JavaPairRDD](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html) class. You can construct JavaPairRDDs from JavaRDDs using special versions of the `map` operations, like `mapToPair` and `flatMapToPair`. The JavaPairRDD will have both standard RDD functions and special key-value ones. For example, the following code uses the `reduceByKey` operation on key-value pairs to count how many times each line of text occurs in a file: ``` JavaRDD<String> lines = sc.textFile("data.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b); ``` We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally `counts.collect()` to bring them back to the driver program as an array of objects. **Note:** when using custom objects as the key in key-value pair operations, you must be sure that a custom `equals()` method is accompanied with a matching `hashCode()` method. For full details, see the contract outlined in the [Object.hashCode() documentation](http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()). While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements by a key. In Python, these operations work on RDDs containing built-in Python tuples such as `(1, 2)`. Simply create such tuples and then call your desired operation. For example, the following code uses the `reduceByKey` operation on key-value pairs to count how many times each line of text occurs in a file: ``` lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b) ``` We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally `counts.collect()` to bring them back to the driver program as a list of objects. ### Transformations(轉換) 下表列出了一些 Spark 常用的 transformations(轉換)。詳情請參考 RDD API 文檔([Scala](api/scala/index.html#org.apache.spark.rdd.RDD),[Java](api/java/index.html?org/apache/spark/api/java/JavaRDD.html),[Python](api/python/pyspark.html#pyspark.RDD),[R](api/R/index.html))和 pair RDD 函數文檔([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions),[Java](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。 | Transformation(轉換)| Meaning(含義)| | --- | --- | | **map**(_func_) | 返回一個新的 distributed dataset(分布式數據集),它由每個 source(數據源)中的元素應用一個函數 _func_ 來生成。 | **filter**(_func_) | 返回一個新的 distributed dataset(分布式數據集),它由每個 source(數據源)中應用一個函數 _func_ 且返回值為 true 的元素來生成。 | **flatMap**(_func_) | 與 map 類似,但是每一個輸入的 item 可以被映射成 0 個或多個輸出的 items(所以 _func_ 應該返回一個 Seq 而不是一個單獨的 item)。 | **mapPartitions**(_func_) | 與 map 類似,但是單獨的運行在在每個 RDD 的 partition(分區,block)上,所以在一個類型為 T 的 RDD 上運行時 _func_ 必須是 Iterator&lt;T&gt; =&gt; Iterator&lt;U&gt; 類型。 | **mapPartitionsWithIndex**(_func_) | 與 mapPartitions 類似,但是也需要提供一個代表 partition 的 index(索引)的 interger value(整型值)作為參數的 _func_,所以在一個類型為 T 的 RDD 上運行時 _func_ 必須是 (Int, Iterator&lt;T&gt;) =&gt; Iterator&lt;U&gt; 類型。 | **sample**(_withReplacement_, _fraction_, _seed_) | 樣本數據,設置是否放回(withReplacement),采樣的百分比(_fraction_)、使用指定的隨機數生成器的種子(seed)。 | **union**(_otherDataset_) | 反回一個新的 dataset,它包含了 source dataset(源數據集)和 otherDataset(其它數據集)的并集。 | **intersection**(_otherDataset_) | 返回一個新的 RDD,它包含了 source dataset(源數據集)和 otherDataset(其它數據集)的交集。 | **distinct**([_numTasks_])) | 返回一個新的 dataset,它包含了 source dataset(源數據集)中去重的元素。 | **groupByKey**([_numTasks_]) | 在一個 (K, V) pair 的 dataset 上調用時,返回一個 (K, Iterable&lt;V&gt;) . **Note:** 如果分組是為了在每一個 key 上執行聚合操作(例如,sum 或 average),此時使用 `reduceByKey` 或 `aggregateByKey` 來計算性能會更好. **Note:** 默認情況下,并行度取決于父 RDD 的分區數。可以傳遞一個可選的 `numTasks` 參數來設置不同的任務數。 | **reduceByKey**(_func_, [_numTasks_]) | 在 (K, V) pairs 的 dataset 上調用時,返回 dataset of (K, V) pairs 的 dataset,其中的 values 是針對每個 key 使用給定的函數 _func_ 來進行聚合的,它必須是 type (V,V) =&gt; V 的類型。像 `groupByKey` 一樣,reduce tasks 的數量是可以通過第二個可選的參數來配置的。 | **aggregateByKey**(_zeroValue_)(_seqOp_, _combOp_, [_numTasks_]) | 在 (K, V) pairs 的 dataset 上調用時,返回 (K, U) pairs 的 dataset,其中的 values 是針對每個 key 使用給定的 combine 函數以及一個 neutral "0" 值來進行聚合的。允許聚合值的類型與輸入值的類型不一樣,同時避免不必要的配置。像 `groupByKey` 一樣,reduce tasks 的數量是可以通過第二個可選的參數來配置的。 | **sortByKey**([_ascending_], [_numTasks_]) | 在一個 (K, V) pair 的 dataset 上調用時,其中的 K 實現了 Ordered,返回一個按 keys 升序或降序的 (K, V) pairs 的 dataset,由 boolean 類型的 `ascending` 參數來指定。 | **join**(_otherDataset_, [_numTasks_]) | 在一個 (K, V) 和 (K, W) 類型的 dataset 上調用時,返回一個 (K, (V, W)) pairs 的 dataset,它擁有每個 key 中所有的元素對。Outer joins 可以通過 `leftOuterJoin`, `rightOuterJoin` 和 `fullOuterJoin` 來實現。 | **cogroup**(_otherDataset_, [_numTasks_]) | 在一個 (K, V) 和的 dataset 上調用時,返回一個 (K, (Iterable&lt;V&gt;, Iterable&lt;W&gt;)) tuples 的 dataset。這個操作也調用了 `groupWith`。 | **cartesian**(_otherDataset_) | 在一個 T 和 U 類型的 dataset 上調用時,返回一個 (T, U) pairs 類型的 dataset(所有元素的 pairs,即笛卡爾積)。 | **pipe**(_command_, _[envVars]_) | 通過使用 shell 命令來將每個 RDD 的分區給 Pipe。例如,一個 Perl 或 bash 腳本。RDD 的元素會被寫入進程的標準輸入(stdin),并且 lines(行)輸出到它的標準輸出(stdout)被作為一個字符串型 RDD 的 string 返回。 | **coalesce**(_numPartitions_) | Decrease(降低)RDD 中 partitions(分區)的數量為 numPartitions。對于執行過濾后一個大的 dataset 操作是更有效的。 | **repartition**(_numPartitions_) | Reshuffle(重新洗牌)RDD 中的數據以創建或者更多的 partitions(分區)并將每個分區中的數據盡量保持均勻。該操作總是通過網絡來 shuffles 所有的數據。 | **repartitionAndSortWithinPartitions**(_partitioner_) | 根據給定的 partitioner(分區器)對 RDD 進行重新分區,并在每個結果分區中,按照 key 值對記錄排序。這比每一個分區中先調用 `repartition` 然后再 sorting(排序)效率更高,因為它可以將排序過程推送到 shuffle 操作的機器上進行。 ### Actions(動作) 下表列出了一些 Spark 常用的 actions 操作。詳細請參考 RDD API 文檔([Scala](api/scala/index.html#org.apache.spark.rdd.RDD),[Java](api/java/index.html?org/apache/spark/api/java/JavaRDD.html),[Python](api/python/pyspark.html#pyspark.RDD),[R](api/R/index.html)) 和 pair RDD 函數文檔([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions),[Java](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。 | Action(動作)| Meaning(含義)| | --- | --- | | **reduce**(_func_) | 使用函數 _func_ 聚合 dataset 中的元素,這個函數 _func_ 輸入為兩個元素,返回為一個元素。這個函數應該是可交換(commutative)和關聯(associative)的,這樣才能保證它可以被并行地正確計算。 | **collect**() | 在 driver 程序中,以一個 array 數組的形式返回 dataset 的所有元素。這在過濾器(filter)或其他操作(other operation)之后返回足夠小(sufficiently small)的數據子集通常是有用的。 | **count**() | 返回 dataset 中元素的個數。 | **first**() | 返回 dataset 中的第一個元素(類似于 take(1)。 | **take**(_n_) | 將數據集中的前 _n_ 個元素作為一個 array 數組返回。 | **takeSample**(_withReplacement_, _num_, [_seed_]) | 對一個 dataset 進行隨機抽樣,返回一個包含 _num_ 個隨機抽樣(random sample)元素的數組,參數 withReplacement 指定是否有放回抽樣,參數 seed 指定生成隨機數的種子。 | **takeOrdered**(_n_, _[ordering]_) | 返回 RDD 按自然順序(natural order)或自定義比較器(custom comparator)排序后的前 _n_ 個元素。 | **saveAsTextFile**(_path_) | 將 dataset 中的元素以文本文件(或文本文件集合)的形式寫入本地文件系統、HDFS 或其它 Hadoop 支持的文件系統中的給定目錄中。Spark 將對每個元素調用 toString 方法,將數據元素轉換為文本文件中的一行記錄。 | **saveAsSequenceFile**(_path_) (Java and Scala) | 將 dataset 中的元素以 Hadoop SequenceFile 的形式寫入到本地文件系統、HDFS 或其它 Hadoop 支持的文件系統指定的路徑中。該操作可以在實現了 Hadoop 的 Writable 接口的鍵值對(key-value pairs)的 RDD 上使用。在 Scala 中,它還可以隱式轉換為 Writable 的類型(Spark 包括了基本類型的轉換,例如 Int,Double,String 等等)。 | **saveAsObjectFile**(_path_) (Java and Scala) | 使用 Java 序列化(serialization)以簡單的格式(simple format)編寫數據集的元素,然后使用 `SparkContext.objectFile()` 進行加載。 | **countByKey**() | 僅適用于(K,V)類型的 RDD。返回具有每個 key 的計數的(K , Int)pairs 的 hashmap。 | **foreach**(_func_) | 對 dataset 中每個元素運行函數 _func_。這通常用于副作用(side effects),例如更新一個 [Accumulator](#accumulators)(累加器)或與外部存儲系統(external storage systems)進行交互。**Note**:修改除 `foreach()`之外的累加器以外的變量(variables)可能會導致未定義的行為(undefined behavior)。詳細介紹請閱讀 [Understanding closures(理解閉包)](#understanding-closures-a-nameclosureslinka) 部分。 該 Spark RDD API 還暴露了一些 actions(操作)的異步版本,例如針對 `foreach` 的 `foreachAsync`,它們會立即返回一個`FutureAction` 到調用者,而不是在完成 action 時阻塞。這可以用于管理或等待 action 的異步執行。. ### Shuffle 操作 Spark 里的某些操作會觸發 shuffle。shuffle 是spark 重新分配數據的一種機制,使得這些數據可以跨不同的區域進行分組。這通常涉及在 executors 和 機器之間拷貝數據,這使得 shuffle 成為一個復雜的、代價高的操作。 #### Background(幕后) 為了明白 [`reduceByKey`](#ReduceByLink) 操作的過程,我們以 `reduceByKey` 為例。reduceBykey 操作產生一個新的 RDD,其中 key 所有相同的的值組合成為一個 tuple - key 以及與 key 相關聯的所有值在 reduce 函數上的執行結果。面臨的挑戰是,一個 key 的所有值不一定都在一個同一個 paritition 分區里,甚至是不一定在同一臺機器里,但是它們必須共同被計算。 在 spark 里,特定的操作需要數據不跨分區分布。在計算期間,一個任務在一個分區上執行,為了所有數據都在單個 `reduceByKey` 的 reduce 任務上運行,我們需要執行一個 all-to-all 操作。它必須從所有分區讀取所有的 key 和 key對應的所有的值,并且跨分區聚集去計算每個 key 的結果 - 這個過程就叫做 **shuffle**。 盡管每個分區新 shuffle 的數據集將是確定的,分區本身的順序也是這樣,但是這些數據的順序是不確定的。如果希望 shuffle 后的數據是有序的,可以使用: * `mapPartitions` 對每個 partition 分區進行排序,例如,`.sorted` * `repartitionAndSortWithinPartitions` 在分區的同時對分區進行高效的排序. * `sortBy` 對 RDD 進行全局的排序 觸發的 shuffle 操作包括 **repartition** 操作,如 [`repartition`](#RepartitionLink) 和 [`coalesce`](#CoalesceLink),**‘ByKey** 操作(除了 counting 之外)像 [`groupByKey`](#GroupByLink) 和 [`reduceByKey`](#ReduceByLink),和 **join** 操作,像 [`cogroup`](#CogroupLink) 和 [`join`](#JoinLink). #### 性能影響 該 **Shuffle** 是一個代價比較高的操作,它涉及磁盤 I/O、數據序列化、網絡 I/O。為了準備 shuffle 操作的數據,Spark 啟動了一系列的任務,_map_ 任務組織數據,_reduce_ 完成數據的聚合。這些術語來自 MapReduce,跟 Spark 的 `map` 操作和 `reduce` 操作沒有關系。 在內部,一個 map 任務的所有結果數據會保存在內存,直到內存不能全部存儲為止。然后,這些數據將基于目標分區進行排序并寫入一個單獨的文件中。在 reduce 時,任務將讀取相關的已排序的數據塊。 某些 shuffle 操作會大量消耗堆內存空間,因為 shuffle 操作在數據轉換前后,需要在使用內存中的數據結構對數據進行組織。需要特別說明的是,`reduceByKey` 和 `aggregateByKey` 在 map 時會創建這些數據結構,`'ByKey` 操作在 reduce 時創建這些數據結構。當內存滿的時候,Spark 會把溢出的數據存到磁盤上,這將導致額外的磁盤 I/O 開銷和垃圾回收開銷的增加。 shuffle 操作還會在磁盤上生成大量的中間文件。在 Spark 1.3 中,這些文件將會保留至對應的 RDD 不在使用并被垃圾回收為止。這么做的好處是,如果在 Spark 重新計算 RDD 的血統關系(lineage)時,shuffle 操作產生的這些中間文件不需要重新創建。如果 Spark 應用長期保持對 RDD 的引用,或者垃圾回收不頻繁,這將導致垃圾回收的周期比較長。這意味著,長期運行 Spark 任務可能會消耗大量的磁盤空間。臨時數據存儲路徑可以通過 SparkContext 中設置參數 `spark.local.dir` 進行配置。 shuffle 操作的行為可以通過調節多個參數進行設置。詳細的說明請看 [Spark 配置指南](configuration.html) 中的 “Shuffle 行為” 部分。 ## RDD Persistence(持久化) Spark 中一個很重要的能力是將數據 _persisting_ 持久化(或稱為 _caching_ 緩存),在多個操作間都可以訪問這些持久化的數據。當持久化一個 RDD 時,每個節點的其它分區都可以使用 RDD 在內存中進行計算,在該數據上的其他 action 操作將直接使用內存中的數據。這樣會讓以后的 action 操作計算速度加快(通常運行速度會加速 10 倍)。緩存是迭代算法和快速的交互式使用的重要工具。 RDD 可以使用 `persist()` 方法或 `cache()` 方法進行持久化。數據將會在第一次 action 操作時進行計算,并緩存在節點的內存中。Spark 的緩存具有容錯機制,如果一個緩存的 RDD 的某個分區丟失了,Spark 將按照原來的計算過程,自動重新計算并進行緩存。 另外,每個持久化的 RDD 可以使用不同的 _storage level_ 存儲級別進行緩存,例如,持久化到磁盤、已序列化的 Java 對象形式持久化到內存(可以節省空間)、跨節點間復制、以 off-heap 的方式存儲在 Tachyon。這些存儲級別通過傳遞一個 `StorageLevel` 對象([Scala](api/scala/index.html#org.apache.spark.storage.StorageLevel),[Java](api/java/index.html?org/apache/spark/storage/StorageLevel.html),[Python](api/python/pyspark.html#pyspark.StorageLevel))給 `persist()` 方法進行設置。`cache()` 方法是使用默認存儲級別的快捷設置方法,默認的存儲級別是 `StorageLevel.MEMORY_ONLY`(將反序列化的對象存儲到內存中)。詳細的存儲級別介紹如下: | Storage Level(存儲級別)| Meaning(含義)| | --- | --- | | MEMORY_ONLY | 將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。如果內存空間不夠,部分數據分區將不再緩存,在每次需要用到這些數據時重新進行計算。這是默認的級別。 | MEMORY_AND_DISK | 將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。如果內存空間不夠,將未緩存的數據分區存儲到磁盤,在需要使用這些分區時從磁盤讀取。 | MEMORY_ONLY_SER (Java and Scala) | 將 RDD 以序列化的 Java 對象的形式進行存儲(每個分區為一個 byte 數組)。這種方式會比反序列化對象的方式節省很多空間,尤其是在使用 [fast serializer](tuning.html) 時會節省更多的空間,但是在讀取時會增加 CPU 的計算負擔。 | MEMORY_AND_DISK_SER (Java and Scala) | 類似于 MEMORY_ONLY_SER,但是溢出的分區會存儲到磁盤,而不是在用到它們時重新計算。 | DISK_ONLY | 只在磁盤上緩存 RDD。 | MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc。 與上面的級別功能相同,只不過每個分區在集群中兩個節點上建立副本。 | OFF_HEAP(experimental 實驗性) | 類似于 MEMORY_ONLY_SER,但是將數據存儲在 [off-heap memory](configuration.html#memory-management) 中。這需要啟用 off-heap 內存。 **Note:** _在 Python 中,stored objects will 總是使用 [Pickle](https://docs.python.org/2/library/pickle.html) library 來序列化對象,所以無論你選擇序列化級別都沒關系。在 Python 中可用的存儲級別有 `MEMORY_ONLY`,`MEMORY_ONLY_2`,`MEMORY_AND_DISK`,`MEMORY_AND_DISK_2`,`DISK_ONLY`,和 `DISK_ONLY_2`。_ 在 shuffle 操作中(例如 `reduceByKey`),即便是用戶沒有調用 `persist` 方法,Spark 也會自動緩存部分中間數據.這么做的目的是,在 shuffle 的過程中某個節點運行失敗時,不需要重新計算所有的輸入數據。如果用戶想多次使用某個 RDD,強烈推薦在該 RDD 上調用 persist 方法. ### 如何選擇存儲級別 ? Spark 的存儲級別的選擇,核心問題是在 memory 內存使用率和 CPU 效率之間進行權衡。建議按下面的過程進行存儲級別的選擇: * 如果您的 RDD 適合于默認存儲級別(`MEMORY_ONLY`),leave them that way。這是 CPU 效率最高的選項,允許 RDD 上的操作盡可能快地運行. * 如果不是,試著使用 `MEMORY_ONLY_SER` 和 [selecting a fast serialization library](tuning.html) 以使對象更加節省空間,但仍然能夠快速訪問。(Java和Scala) * 不要溢出到磁盤,除非計算您的數據集的函數是昂貴的,或者它們過濾大量的數據。否則,重新計算分區可能與從磁盤讀取分區一樣快. * 如果需要快速故障恢復,請使用復制的存儲級別(例如,如果使用 Spark 來服務 來自網絡應用程序的請求)。_All_ 存儲級別通過重新計算丟失的數據來提供完整的容錯能力,但復制的數據可讓您繼續在 RDD 上運行任務,而無需等待重新計算一個丟失的分區. ### 刪除數據 Spark 會自動監視每個節點上的緩存使用情況,并使用 least-recently-used(LRU)的方式來丟棄舊數據分區。如果您想手動刪除 RDD 而不是等待它掉出緩存,使用 `RDD.unpersist()` 方法。 # 共享變量 通常情況下,一個傳遞給 Spark 操作(例如 `map` 或 `reduce`)的函數 func 是在遠程的集群節點上執行的。該函數 func 在多個節點執行過程中使用的變量,是同一個變量的多個副本。這些變量的以副本的方式拷貝到每個機器上,并且各個遠程機器上變量的更新并不會傳播回 driver program(驅動程序)。通用且支持 read-write(讀-寫)的共享變量在任務間是不能勝任的。所以,Spark 提供了兩種特定類型的共享變量:broadcast variables(廣播變量)和 accumulators(累加器)。 ## 廣播變量 Broadcast variables(廣播變量)允許程序員將一個 read-only(只讀的)變量緩存到每臺機器上,而不是給任務傳遞一個副本。它們是如何來使用呢,例如,廣播變量可以用一種高效的方式給每個節點傳遞一份比較大的 input dataset(輸入數據集)副本。在使用廣播變量時,Spark 也嘗試使用高效廣播算法分發 broadcast variables(廣播變量)以降低通信成本。 Spark 的 action(動作)操作是通過一系列的 stage(階段)進行執行的,這些 stage(階段)是通過分布式的 “shuffle” 操作進行拆分的。Spark 會自動廣播出每個 stage(階段)內任務所需要的公共數據。這種情況下廣播的數據使用序列化的形式進行緩存,并在每個任務運行前進行反序列化。這也就意味著,只有在跨越多個 stage(階段)的多個任務會使用相同的數據,或者在使用反序列化形式的數據特別重要的情況下,使用廣播變量會有比較好的效果。 廣播變量通過在一個變量 `v` 上調用 `SparkContext.broadcast(v)` 方法來進行創建。廣播變量是 `v` 的一個 wrapper(包裝器),可以通過調用 `value` 方法來訪問它的值。代碼示例如下: ``` scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) ``` ``` Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3}); broadcastVar.value(); // returns [1, 2, 3] ``` ``` >>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3] ``` 在創建廣播變量之后,在集群上執行的所有的函數中,應該使用該廣播變量代替原來的 `v` 值,所以節點上的 `v` 最多分發一次。另外,對象 `v` 在廣播后不應該再被修改,以保證分發到所有的節點上的廣播變量具有同樣的值(例如,如果以后該變量會被運到一個新的節點)。 ## Accumulators(累加器) Accumulators(累加器)是一個僅可以執行 “added”(添加)的變量來通過一個關聯和交換操作,因此可以高效地執行支持并行。累加器可以用于實現 counter(計數,類似在 MapReduce 中那樣)或者 sums(求和)。原生 Spark 支持數值型的累加器,并且程序員可以添加新的支持類型。 作為一個用戶,您可以創建 accumulators(累加器)并且重命名。如下圖所示,一個命名的 accumulator 累加器(在這個例子中是 `counter`)將顯示在 web UI 中,用于修改該累加器的階段。Spark 在 “Tasks” 任務表中顯示由任務修改的每個累加器的值. ![Accumulators in the Spark UI](https://img.kancloud.cn/19/39/1939919adecb38da01a67ec454907fce_1280x556.jpg "Accumulators in the Spark UI") 在 UI 中跟蹤累加器可以有助于了解運行階段的進度(注:這在 Python 中尚不支持). 可以通過調用 `SparkContext.longAccumulator()` 或 `SparkContext.doubleAccumulator()` 方法創建數值類型的 `accumulator`(累加器)以分別累加 Long 或 Double 類型的值。集群上正在運行的任務就可以使用 `add` 方法來累計數值。然而,它們不能夠讀取它的值。只有 driver program(驅動程序)才可以使用 `value` 方法讀取累加器的值。 下面的代碼展示了一個 accumulator(累加器)被用于對一個數組中的元素求和: ``` scala> val accum = sc.longAccumulator("My Accumulator") accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Long = 10 ``` 雖然此代碼使用 Long 類型的累加器的內置支持,但是開發者通過 [AccumulatorV2](api/scala/index.html#org.apache.spark.util.AccumulatorV2) 它的子類來創建自己的類型。AccumulatorV2 抽象類有幾個需要 override(重寫)的方法:`reset` 方法可將累加器重置為 0,`add` 方法可將其它值添加到累加器中,`merge` 方法可將其他同樣類型的累加器合并為一個。其他需要重寫的方法可參考 [API documentation](api/scala/index.html#org.apache.spark.util.AccumulatorV2)。例如,假設我們有一個表示數學上 vectors(向量)的 `MyVector` 類,我們可以寫成: ``` class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] { private val myVector: MyVector = MyVector.createZeroVector def reset(): Unit = { myVector.reset() } def add(v: MyVector): Unit = { myVector.add(v) } ... } // Then, create an Accumulator of this type: val myVectorAcc = new VectorAccumulatorV2 // Then, register it into spark context: sc.register(myVectorAcc, "MyVectorAcc1") ``` 注意,在開發者定義自己的 AccumulatorV2 類型時,resulting type(返回值類型)可能與添加的元素的類型不一致。 A numeric accumulator can be created by calling `SparkContext.longAccumulator()` or `SparkContext.doubleAccumulator()` to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using the `add` method. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its `value` method. 可以通過調用 `SparkContext.longAccumulator()` 或 `SparkContext.doubleAccumulator()` 來創建一個數值累加器分別累積 Long 或 Double 類型的值。然后可以使用群集中的任務進行添加 `add` 方法。但是,他們看不到它的 value(值)。只有 driver 可以讀取累加器的值,使用它的 `value` 方法. 下面的代碼顯示了一個累加器用于將數組的元素相加: ``` LongAccumulator accum = jsc.sc().longAccumulator(); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); // ... // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s accum.value(); // returns 10 ``` While this code used the built-in support for accumulators of type Long, programmers can also create their own types by subclassing [AccumulatorV2](api/scala/index.html#org.apache.spark.util.AccumulatorV2). The AccumulatorV2 abstract class has several methods which one has to override: `reset` for resetting the accumulator to zero, `add` for adding another value into the accumulator, `merge` for merging another same-type accumulator into this one. Other methods that must be overridden are contained in the [API documentation](api/scala/index.html#org.apache.spark.util.AccumulatorV2). For example, supposing we had a `MyVector` class representing mathematical vectors, we could write: ``` class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> { private MyVector myVector = MyVector.createZeroVector(); public void reset() { myVector.reset(); } public void add(MyVector v) { myVector.add(v); } ... } // Then, create an Accumulator of this type: VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2(); // Then, register it into spark context: jsc.sc().register(myVectorAcc, "MyVectorAcc1"); ``` Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added. An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks running on a cluster can then add to it using the `add` method or the `+=` operator. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its `value` method. The code below shows an accumulator being used to add up the elements of an array: ``` >>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s >>> accum.value 10 ``` While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing [AccumulatorParam](api/python/pyspark.html#pyspark.AccumulatorParam). The AccumulatorParam interface has two methods: `zero` for providing a “zero value” for your data type, and `addInPlace` for adding two values together. For example, supposing we had a `Vector` class representing mathematical vectors, we could write: ``` class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam()) ``` 累加器的更新只發生在 **action** 操作中,Spark 保證每個任務只更新累加器一次,例如,重啟任務不會更新值。在 transformations(轉換)中,用戶需要注意的是,如果 task(任務)或 job stages(階段)重新執行,每個任務的更新操作可能會執行多次。 累加器不會改變 Spark lazy evaluation(懶加載)的模式。如果累加器在 RDD 中的一個操作中進行更新,它們的值僅被更新一次,RDD 被作為 action 的一部分來計算。因此,在一個像 `map()` 這樣的 transformation(轉換)時,累加器的更新并沒有執行。下面的代碼片段證明了這個特性: ``` val accum = sc.longAccumulator data.map { x => accum.add(x); x } // Here, accum is still 0 because no actions have caused the map operation to be computed. ``` ``` LongAccumulator accum = jsc.sc().longAccumulator(); data.map(x -> { accum.add(x); return f(x); }); // Here, accum is still 0 because no actions have caused the `map` to be computed. ``` ``` accum = sc.accumulator(0) def g(x): accum.add(x) return f(x) data.map(g) # Here, accum is still 0 because no actions have caused the `map` to be computed. ``` # 部署應用到集群中 該 [應用提交指南](submitting-applications.html) 描述了如何將應用提交到集群中。簡單的說,在您將應用打包成一個 JAR(針對 Java/Scala)或者一組 `.py` 或 `.zip` 文件(針對 Python),該 `bin/spark-submit` 腳本可以讓你提交它到任何所支持的 cluster manager 上去. # 從 Java / Scala 啟動 Spark jobs 該 [org.apache.spark.launcher](api/java/index.html?org/apache/spark/launcher/package-summary.html) package 提供了 classes 用于使用簡單的 Java API 來作為一個子進程啟動 Spark jobs. # 單元測試 Spark 可以友好的使用流行的單元測試框架進行單元測試。在將 master URL 設置為 `local` 來測試時會簡單的創建一個 `SparkContext`,運行您的操作,然后調用 `SparkContext.stop()` 將該作業停止。因為 Spark 不支持在同一個程序中并行的運行兩個 contexts,所以需要確保使用 finally 塊或者測試框架的 `tearDown` 方法將 context 停止。 # 快速鏈接 您可以在 Spark 網站上看一下 [Spark 程序示例](http://spark.apache.org/examples.html)。此外,Spark 在 `examples` 目錄中包含了許多示例([Scala](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples),[Java](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples),[Python](https://github.com/apache/spark/tree/master/examples/src/main/python),[R](https://github.com/apache/spark/tree/master/examples/src/main/r))。您可以通過傳遞 class name 到 Spark 的 bin/run-example 腳本以運行 Java 和 Scala 示例; 例如: ``` ./bin/run-example SparkPi ``` 針對 Python 示例,使用 `spark-submit` 來代替: ``` ./bin/spark-submit examples/src/main/python/pi.py ``` 針對 R 示例,使用 `spark-submit` 來代替: ``` ./bin/spark-submit examples/src/main/r/dataframe.R ``` 針對應用程序的優化,該 [配置](configuration.html) 和 [優化](tuning.html) 指南一些最佳實踐的信息。這些優化建議在確保你的數據以高效的格式存儲在內存中尤其重要。針對部署參考,該 [集群模式概述](cluster-overview.html) 描述了分布式操作和支持的 cluster managers 集群管理器的組件. 最后,所有的 API 文檔可在 [Scala](api/scala/#org.apache.spark.package),[Java](api/java/),[Python](api/python/) and [R](api/R/) 中獲取.
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看