<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # Spark Streaming 編程指南 * [概述](#概述) * [一個入門示例](#一個入門示例) * [基礎概念](#基礎概念) * [依賴](#依賴) * [初始化 StreamingContext](#初始化-streamingcontext) * [Discretized Streams (DStreams)(離散化流)](#discretized-streams-dstreams離散化流) * [Input DStreams 和 Receivers(接收器)](#input-dstreams-和-receivers接收器) * [DStreams 上的 Transformations(轉換)](#dstreams-上的-transformations轉換) * [DStreams 上的輸出操作](#dstreams-上的輸出操作) * [DataFrame 和 SQL 操作](#dataframe-和-sql-操作) * [MLlib 操作](#mllib-操作) * [緩存 / 持久性](#緩存--持久性) * [Checkpointing](#checkpointing) * [Accumulators,Broadcast 變量,和 Checkpoint](#accumulators-broadcast-變量-和-checkpoint) * [應用程序部署](#應用程序部署) * [Monitoring Applications(監控應用程序)](#monitoring-applications-監控應用程序) * [Performance Tuning(性能調優)](#performance-tuning-性能調優) * [Reducing the Batch Processing Times(減少批處理時間)](#reducing-the-batch-processing-times-減少批處理時間) * [Setting the Right Batch Interval(設置正確的批次間隔)](#setting-the-right-batch-interval-設置正確的批次間隔) * [Memory Tuning(內存調優)](#memory-tuning-內存調優) * [Fault-tolerance Semantics(容錯語義)](#fault-tolerance-semantics-容錯語義) * [快速鏈接](#快速鏈接) # 概述 Spark Streaming 是 Spark Core API 的擴展,它支持彈性的,高吞吐的,容錯的實時數據流的處理。數據可以通過多種數據源獲取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也可以通過例如 `map`,`reduce`,`join`,`window` 等的高級函數組成的復雜算法處理。最終,處理后的數據可以輸出到文件系統,數據庫以及實時儀表盤中。事實上,你還可以在 data streams(數據流)上使用 [機器學習](ml-guide.html) 以及 [圖計算](graphx-programming-guide.html) 算法。 ![Spark Streaming](https://img.kancloud.cn/64/5c/645c18615c3b60f98b473dbf2eefa589_1172x438.jpg "Spark Streaming architecture") 在內部,它工作原理如下,Spark Streaming 接收實時輸入數據流并將數據切分成多個 batch(批)數據,然后由 Spark 引擎處理它們以生成最終的 stream of results in batches(分批流結果)。 ![Spark Streaming](https://img.kancloud.cn/1a/cd/1acdb841d036bec190db33dddcec8650_1071x239.jpg "Spark Streaming data flow") Spark Streaming 提供了一個名為 _discretized stream_ 或 _DStream_ 的高級抽象,它代表一個連續的數據流。DStream 可以從數據源的輸入數據流創建,例如 Kafka,Flume 以及 Kinesis,或者在其他 DStream 上進行高層次的操作以創建。在內部,一個 DStream 是通過一系列的 [RDDs](api/scala/index.html#org.apache.spark.rdd.RDD) 來表示。 本指南告訴你如何使用 DStream 來編寫一個 Spark Streaming 程序。你可以使用 Scala,Java 或者 Python(Spark 1.2 版本后引進)來編寫 Spark Streaming 程序。所有這些都在本指南中介紹。您可以在本指南中找到標簽,讓您可以選擇不同語言的代碼段。 **Note(注意):** 在 Python 有些 API 可能會有不同或不可用。在本指南,您將找到 Python API 的標簽來高亮顯示不同的地方。 * * * # 一個入門示例 在我們詳細介紹如何編寫你自己的 Spark Streaming 程序的細節之前,讓我們先來看一看一個簡單的 Spark Streaming 程序的樣子。比方說,我們想要計算從一個監聽 TCP socket 的數據服務器接收到的文本數據(text data)中的字數。你需要做的就是照著下面的步驟做。 首先,我們導入了 Spark Streaming 類和部分從 StreamingContext 隱式轉換到我們的環境的名稱,目的是添加有用的方法到我們需要的其他類(如 DStream)。[StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) 是所有流功能的主要入口點。我們創建了一個帶有 2 個執行線程和間歇時間為 1 秒的本地 StreamingContext。 ``` import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // 自從 Spark 1.3 開始,不再是必要的了 // 創建一個具有兩個工作線程(working thread)并且批次間隔為 1 秒的本地 StreamingContext。 // master 需要 2 個核,以防止饑餓情況(starvation scenario)。 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) ``` Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. `localhost`) and port (e.g. `9999`)。使用該 context,我們可以創建一個代表從 TCP 源流數據的離散流(DStream),指定主機名(hostname)(例如 localhost)和端口(例如 9999)。 ``` // 創建一個將要連接到 hostname:port 的 DStream,如 localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) ``` 上一步的這個 `lines` DStream 表示將要從數據服務器接收到的數據流。在這個離散流(DStream)中的每一條記錄都是一行文本(text)。接下來,我們想要通過空格字符(space characters)拆分這些數據行(lines)成單詞(words)。 ``` // 將每一行拆分成 words(單詞) val words = lines.flatMap(_.split(" ")) ``` `flatMap` 是一種 one-to-many(一對多)的離散流(DStream)操作,它會通過在源離散流(source DStream)中根據每個記錄(record)生成多個新紀錄的形式創建一個新的離散流(DStream)。在這種情況下,在這種情況下,每一行(each line)都將被拆分成多個單詞(`words`)和代表單詞離散流(words DStream)的單詞流。接下來,我們想要計算這些單詞。 ``` import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // 計算每一個 batch(批次)中的每一個 word(單詞) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // 在控制臺打印出在這個離散流(DStream)中生成的每個 RDD 的前十個元素 // 注意:必需要觸發 action(很多初學者會忘記觸發 action 操作,導致報錯:No output operations registered, so nothing to execute) wordCounts.print() ``` 上一步的 `words` DStream 進行了進一步的映射(一對一的轉換)為一個 (word, 1) paris 的離散流(DStream),這個 DStream 然后被規約(reduce)來獲得數據中每個批次(batch)的單詞頻率。最后,`wordCounts.print()` 將會打印一些每秒生成的計數。 Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call 請注意,當這些行(lines)被執行的時候,Spark Streaming 僅僅設置了計算,只有在啟動時才會執行,并沒有開始真正地處理。為了在所有的轉換都已經設置好之后開始處理,我們在最后調用: ``` ssc.start() // 開始計算 ssc.awaitTermination() // 等待計算被中斷 ``` 該部分完整的代碼可以在 Spark Streaming 示例 [NetworkWordCount](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala) 中找到。 First, we create a [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) object, which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and a batch interval of 1 second. ``` import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import scala.Tuple2; // Create a local StreamingContext with two working thread and batch interval of 1 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); ``` Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. `localhost`) and port (e.g. `9999`). ``` // Create a DStream that will connect to hostname:port, like localhost:9999 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); ``` This `lines` DStream represents the stream of data that will be received from the data server. Each record in this stream is a line of text. Then, we want to split the lines by space into words. ``` // Split each line into words JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); ``` `flatMap` is a DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the `words` DStream. Note that we defined the transformation using a [FlatMapFunction](api/scala/index.html#org.apache.spark.api.java.function.FlatMapFunction) object. As we will discover along the way, there are a number of such convenience classes in the Java API that help define DStream transformations. Next, we want to count these words. ``` // Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1)); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2); // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print(); ``` The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word, 1)` pairs, using a [PairFunction](api/scala/index.html#org.apache.spark.api.java.function.PairFunction) object. Then, it is reduced to get the frequency of words in each batch of data, using a [Function2](api/scala/index.html#org.apache.spark.api.java.function.Function2) object. Finally, `wordCounts.print()` will print a few of the counts generated every second. Note that when these lines are executed, Spark Streaming only sets up the computation it will perform after it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call `start` method. ``` jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate ``` The complete code can be found in the Spark Streaming example [JavaNetworkWordCount](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java). First, we import [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext), which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second. ``` from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 1) ``` Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. `localhost`) and port (e.g. `9999`). ``` # Create a DStream that will connect to hostname:port, like localhost:9999 lines = ssc.socketTextStream("localhost", 9999) ``` This `lines` DStream represents the stream of data that will be received from the data server. Each record in this DStream is a line of text. Next, we want to split the lines by space into words. ``` # Split each line into words words = lines.flatMap(lambda line: line.split(" ")) ``` `flatMap` is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the `words` DStream. Next, we want to count these words. ``` # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD generated in this DStream to the console wordCounts.pprint() ``` The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word, 1)` pairs, which is then reduced to get the frequency of words in each batch of data. Finally, `wordCounts.pprint()` will print a few of the counts generated every second. Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call ``` ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate ``` The complete code can be found in the Spark Streaming example [NetworkWordCount](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/python/streaming/network_wordcount.py). 如果你已經 [下載](index.html#downloading) 并且 [構建](index.html#building) Spark,您可以使用如下方式來運行該示例。你首先需要運行 Netcat(一個在大多數類 Unix 系統中的小工具)作為我們使用的數據服務器。 ``` $ nc -lk 9999 ``` 然后,在另一個不同的終端,你可以通過執行如下命令來運行該示例: ``` $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ``` ``` $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999 ``` ``` $ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999 ``` 然后,在運行在 netcat 服務器上的終端輸入的任何行(lines),都將被計算,并且每一秒都顯示在屏幕上,它看起來就像下面這樣: | &lt;figure class="highlight"&gt; ``` # TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ... ``` &lt;/figure&gt; | | &lt;figure class="highlight"&gt; ``` # TERMINAL 2: RUNNING NetworkWordCount $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ... ``` &lt;/figure&gt; &lt;figure class="highlight"&gt; ``` # TERMINAL 2: RUNNING JavaNetworkWordCount $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ... ``` &lt;/figure&gt; &lt;figure class="highlight"&gt; ``` # TERMINAL 2: RUNNING network_wordcount.py $ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999 ... ------------------------------------------- Time: 2014-10-14 15:25:21 ------------------------------------------- (hello,1) (world,1) ... ``` &lt;/figure&gt; | * * * * * * # 基礎概念 接下來,我們了解完了簡單的例子,開始闡述 Spark Streaming 的基本知識。 ## 依賴 與 Spark 類似,Spark Streaming 可以通過 Maven 來管理依賴。為了編寫你自己的 Spark Streaming 程序,你必須添加以下的依賴到你的 SBT 或者 Maven 項目中。 ``` <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> ``` ``` libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0" ``` 針對從 Spark Streaming Core API 中不存在的數據源中獲取數據,如 Kafka,Flume,Kinesis,你必須添加相應的坐標 `spark-streaming-xyz_2.11` 到依賴中。例如,有一些常見的依賴如下。 | Source(數據源)| Artifact(坐標)| | --- | --- | | Kafka | spark-streaming-kafka-0-8_2.11 | | Flume | spark-streaming-flume_2.11 | | Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] | | | | 想要查看一個實時更新的列表,請參閱 [Maven repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%222.2.0%22) 來了解支持的 sources(數據源)和 artifacts(坐標)的完整列表。 * * * ## 初始化 StreamingContext 為了初始化一個 Spark Streaming 程序,一個 **StreamingContext** 對象必須要被創建出來,它是所有的 Spark Streaming 功能的主入口點。 一個 [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) 對象可以從一個 [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) 對象中來創建。 ``` import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) ``` 這個 `appName` 參數是展示在集群 UI 界面上的應用程序的名稱。`master` 是一個 [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),或者一個特殊的 **“local[\*]”** 字符串以使用 local mode(本地模式)來運行。在實踐中,當在集群上運行時,你不會想在應用程序中硬編碼 `master`,而是 [使用 `spark-submit` 來啟動應用程序](submitting-applications.html),并且接受該參數。然而,對于本地測試和單元測試,你可以傳遞 “local[\*]” 來運行 Spark Streaming 進程(檢測本地系統中內核的個數)。請注意,做個內部創建了一個 [SparkContext](api/scala/index.html#org.apache.spark.SparkContext)(所有 Spark 功能的出發點),它可以像 ssc.sparkContext 這樣被訪問。 這個 batch interval(批間隔)必須根據您的應用程序和可用的集群資源的等待時間要求進行設置。更多詳情請參閱 [優化指南](#setting-the-right-batch-interval) 部分。 一個 `StreamingContext` 對象也可以從一個現有的 `SparkContext` 對象來創建。 ``` import org.apache.spark.streaming._ val sc = ... // 已存在的 SparkContext val ssc = new StreamingContext(sc, Seconds(1)) ``` A [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) object can be created from a [SparkConf](api/java/index.html?org/apache/spark/SparkConf.html) object. ``` import org.apache.spark.*; import org.apache.spark.streaming.api.java.*; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); ``` The `appName` parameter is a name for your application to show on the cluster UI. `master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls), or a special **“local[\*]”** string to run in local mode. In practice, when running on a cluster, you will not want to hardcode `master` in the program, but rather [launch the application with `spark-submit`](submitting-applications.html) and receive it there. However, for local testing and unit tests, you can pass “local[\*]” to run Spark Streaming in-process. Note that this internally creates a [JavaSparkContext](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) (starting point of all Spark functionality) which can be accessed as `ssc.sparkContext`. The batch interval must be set based on the latency requirements of your application and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-interval) section for more details. A `JavaStreamingContext` object can also be created from an existing `JavaSparkContext`. ``` import org.apache.spark.streaming.api.java.*; JavaSparkContext sc = ... //existing JavaSparkContext JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1)); ``` A [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) object can be created from a [SparkContext](api/python/pyspark.html#pyspark.SparkContext) object. ``` from pyspark import SparkContext from pyspark.streaming import StreamingContext sc = SparkContext(master, appName) ssc = StreamingContext(sc, 1) ``` The `appName` parameter is a name for your application to show on the cluster UI. `master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls), or a special **“local[\*]”** string to run in local mode. In practice, when running on a cluster, you will not want to hardcode `master` in the program, but rather [launch the application with `spark-submit`](submitting-applications.html) and receive it there. However, for local testing and unit tests, you can pass “local[\*]” to run Spark Streaming in-process (detects the number of cores in the local system). The batch interval must be set based on the latency requirements of your application and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-interval) section for more details. 在定義一個 context 之后,您必須執行以下操作。 1. 通過創建輸入 DStreams 來定義輸入源。 2. 通過應用轉換和輸出操作 DStreams 定義流計算(streaming computations)。 3. 開始接收輸入并且使用 `streamingContext.start()` 來處理數據。 4. 使用 `streamingContext.awaitTermination()` 等待處理被終止(手動或者由于任何錯誤)。 5. 使用 `streamingContext.stop()` 來手動的停止處理。 ##### 需要記住的幾點: * 一旦一個 context 已經啟動,將不會有新的數據流的計算可以被創建或者添加到它。 * 一旦一個 context 已經停止,它不會被重新啟動。 * 同一時間內在 JVM 中只有一個 StreamingContext 可以被激活。 * 在 StreamingContext 上的 stop() 同樣也停止了 SparkContext。為了只停止 StreamingContext,設置 `stop()` 的可選參數,名叫 `stopSparkContext` 為 false。 * 一個 SparkContext 就可以被重用以創建多個 StreamingContexts,只要前一個 StreamingContext 在下一個StreamingContext 被創建之前停止(不停止 SparkContext)。 * * * ## Discretized Streams (DStreams)(離散化流) **Discretized Stream** or **DStream** 是 Spark Streaming 提供的基本抽象。它代表了一個連續的數據流,無論是從 source(數據源)接收到的輸入數據流,還是通過轉換輸入流所產生的處理過的數據流。在內部,一個 DStream 被表示為一系列連續的 RDDs,它是 Spark 中一個不可改變的抽象,distributed dataset (的更多細節請看 [Spark 編程指南](programming-guide.html#resilient-distributed-datasets-rdds)。在一個 DStream 中的每個 RDD 包含來自一定的時間間隔的數據,如下圖所示。 ![Spark Streaming](https://img.kancloud.cn/fa/bd/fabdf95b1b46cf3851c9ef858aecab87_1091x239.jpg "Spark Streaming data flow") 應用于 DStream 的任何操作轉化為對于底層的 RDDs 的操作。例如,在 [先前的示例](#一個入門示例),轉換一個行(lines)流成為單詞(words)中,flatMap 操作被應用于在行離散流(lines DStream)中的每個 RDD 來生成單詞離散流(words DStream)的 RDDs。如下所示。 ![Spark Streaming](https://img.kancloud.cn/ec/c8/ecc85f8411796f4b77cb70f6ddf6b278_1091x388.jpg "Spark Streaming data flow") 這些底層的 RDD 變換由 Spark 引擎(engine)計算。DStream 操作隱藏了大多數這些細節并為了方便起見,提供給了開發者一個更高級別的 API。這些操作細節會在后邊的章節中討論。 * * * ## Input DStreams 和 Receivers(接收器) 輸入 DStreams 是代表輸入數據是從流的源數據(streaming sources)接收到的流的 DStream。在 [一個入門示例](#一個入門示例) 中,`lines` 是一個 input DStream,因為它代表著從 netcat 服務器接收到的數據的流。每一個 input DStream(除了 file stream 之外,會在本章的后面來討論)與一個 **Receiver** ([Scala doc](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver),[Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)) 對象關聯,它從 source(數據源)中獲取數據,并且存儲它到 Sparl 的內存中用于處理。 Spark Streaming 提供了兩種內置的 streaming source(流的數據源)。 * _Basic sources(基礎的數據源)_:在 StreamingContext API 中直接可以使用的數據源。例如:file systems 和 socket connections。 * _Advanced sources(高級的數據源)_:像 Kafka,Flume,Kinesis,等等這樣的數據源。可以通過額外的 utility classes 來使用。像在 [依賴](#依賴) 中討論的一樣,這些都需要額外的外部依賴。 在本節的后邊,我們將討論每種類別中的現有的一些數據源。 請注意,如果你想要在你的流處理程序中并行的接收多個數據流,你可以創建多個 input DStreams(在 [性能優化](#level-of-parallelism-in-data-receiving) 部分進一步討論)。這將創建同時接收多個數據流的多個 receivers(接收器)。但需要注意,一個 Spark 的 worker/executor 是一個長期運行的任務(task),因此它將占用分配給 Spark Streaming 的應用程序的所有核中的一個核(core)。因此,要記住,一個 Spark Streaming 應用需要分配足夠的核(core)(或線程(threads),如果本地運行的話)來處理所接收的數據,以及來運行接收器(receiver(s))。 ##### 要記住的幾點 * 當在本地運行一個 Spark Streaming 程序的時候,不要使用 “local” 或者 “local[1]” 作為 master 的 URL。這兩種方法中的任何一個都意味著只有一個線程將用于運行本地任務。如果你正在使用一個基于接收器(receiver)的輸入離散流(input DStream)(例如,sockets,Kafka,Flume 等),則該單獨的線程將用于運行接收器(receiver),而沒有留下任何的線程用于處理接收到的數據。因此,在本地運行時,總是用 “local[n]” 作為 master URL,其中的 n &gt; 運行接收器的數量(查看 [Spark 屬性](configuration.html#spark-properties) 來了解怎樣去設置 master 的信息)。 * 將邏輯擴展到集群上去運行,分配給 Spark Streaming 應用程序的內核(core)的內核數必須大于接收器(receiver)的數量。否則系統將接收數據,但是無法處理它。 ### 基礎的 Sources(數據源) 我們已經簡單地了解過了在 [入門示例](#一個入門示例) 中 `ssc.socketTextStream(...)` 的例子,例子中是通過從一個 TCP socket 連接接收到的文本數據來創建了一個離散流(DStream)。除了 sockets 之外,StreamingContext API 也提供了根據文件作為輸入來源創建離散流(DStreams)的方法。 * **File Streams:** 用于從文件中讀取數據,在任何與 HDFS API 兼容的文件系統中(即,HDFS,S3,NFS 等),一個 DStream 可以像下面這樣創建: ``` streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) ``` ``` streamingContext.fileStream&lt;KeyClass, ValueClass, InputFormatClass&gt;(dataDirectory); ``` ``` streamingContext.textFileStream(dataDirectory) ``` Spark Streaming 將監控`dataDirectory` 目錄并且該目錄中任何新建的文件 (寫在嵌套目錄中的文件是不支持的)。注意 * 文件必須具有相同的數據格式。 * 文件必須被創建在 `dataDirectory` 目錄中,通過 atomically(原子的)_moving(移動)_ 或 _renaming(重命名)_ 它們到數據目錄。 * 一旦移動,這些文件必須不能再更改,因此如果文件被連續地追加,新的數據將不會被讀取。 對于簡單的文本文件,還有一個更加簡單的方法 `streamingContext.textFileStream(dataDirectory)`。并且文件流(file streams)不需要運行一個接收器(receiver),因此,不需要分配內核(core)。 Python API 在 Python API 中 `fileStream` 是不可用的,只有 `textFileStream` 是可用的。 * **Streams based on Custom Receivers(基于自定義的接收器的流):** DStreams 可以使用通過自定義的 receiver(接收器)接收到的數據來創建。更多細節請參閱 [自定義 Receiver 指南](streaming-custom-receivers.html)。 * **Queue of RDDs as a Stream(RDDs 隊列作為一個流):** 為了使用測試數據測試 Spark Streaming 應用程序,還可以使用 `streamingContext.queueStream(queueOfRDDs)` 創建一個基于 RDDs 隊列的 DStream,每個進入隊列的 RDD 都將被視為 DStream 中的一個批次數據,并且就像一個流進行處理。 想要了解更多的關于從 sockets 和文件(files)創建的流的細節,請參閱相關函數的 API文檔,它們在 [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for Scala,[JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) for Java 以及 [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) for Python 中。 ### 高級 Sources(數據源) Python API 從 Spark 2.2.0 開始,在 Python API 中的 Kafka,Kinesis 和 Flume 這樣的外部數據源都是可用的。 這一類別的 sources(數據源)需要使用非 Spark 庫中的外部接口,它們中的其中一些還需要比較復雜的依賴關系(例如,Kafka 和 Flume)。因此,為了最小化有關的依賴關系的版本沖突的問題,這些資源本身不能創建 DStream 的功能,它是通過 [依賴](#依賴) 單獨的類庫實現創建 DStream 的功能。 請注意,這些高級 sources(數據源)不能再 Spark shell 中使用,因此,基于這些高級 sources(數據源)的應用程序不能在 shell 中被測試。如果你真的想要在 Spark shell 中使用它們,你必須下載帶有它的依賴的相應的 Maven 組件的 JAR,并且將其添加到 classpath。 一些高級的 sources(數據源)如下。 * **Kafka:** Spark Streaming 2.2.0 與 Kafka broker 版本 0.8.2.1 或更高是兼容的。更多細節請參閱 [Kafka 集成指南](streaming-kafka-integration.html)。 * **Flume:** Spark Streaming 2.2.0 與 Flume 1.6.0 相兼容。更多細節請參閱 [Flume 集成指南](streaming-flume-integration.html)。 * **Kinesis:** Spark Streaming 2.2.0 與 Kinesis Client Library 1.2.1 相兼容。更多細節請參閱 [Kinesis 集成指南](streaming-kinesis-integration.html)。 ### 自定義 Sources(數據源) Python API 在 Python 中還不支持這一功能。 Input DStreams 也可以從自定義數據源中創建。如果您想這樣做,需要實現一個用戶自定義的 **receiver**(看下一節以了解它是什么),它可以從自定義的 sources(數據源)中接收數據并且推送它到 Spark。更多細節請參閱 [自定義 Receiver 指南](streaming-custom-receivers.html)。 ### Receiver Reliability(接收器的可靠性) 可以有兩種基于他們的 _reliability可靠性_ 的數據源。數據源(如 Kafka 和 Flume)允許傳輸的數據被確認。如果系統從這些可靠的數據來源接收數據,并且被確認(acknowledges)正確地接收數據,它可以確保數據不會因為任何類型的失敗而導致數據丟失。這樣就出現了 2 種接收器(receivers): 1. _Reliable Receiver(可靠的接收器)_ - 當數據被接收并存儲在 Spark 中并帶有備份副本時,一個可靠的接收器(reliable receiver)正確地發送確認(acknowledgment)給一個可靠的數據源(reliable source)。 2. _Unreliable Receiver(不可靠的接收器)_ - 一個不可靠的接收器(unreliable receiver)不發送確認(acknowledgment)到數據源。這可以用于不支持確認的數據源,或者甚至是可靠的數據源當你不想或者不需要進行復雜的確認的時候。 在 [自定義 Receiver 指南](streaming-custom-receivers.html) 中描述了關于如何去編寫一個 reliable receiver(可靠的接收器)的細節。 * * * ## DStreams 上的 Transformations(轉換) 與 RDD 類似,transformation 允許從 input DStream 輸入的數據做修改。DStreams 支持很多在 RDD 中可用的 transformation 算子。一些常用的如下所示 : 與RDD類似,類似,transformation 允許修改來自 input DStream 的數據。DStreams 支持標準的 Spark RDD 上可用的許多轉換。一些常見的如下。 | Transformation(轉換)| Meaning(含義)| | --- | --- | | **map**(_func_) | 利用函數 _func_ 處理原 DStream 的每個元素,返回一個新的 DStream。 | **flatMap**(_func_) | 與 map 相似,但是每個輸入項可用被映射為 0 個或者多個輸出項。。 | **filter**(_func_) | 返回一個新的 DStream,它僅僅包含原 DStream 中函數 _func_ 返回值為 true 的項。 | **repartition**(_numPartitions_) | 通過創建更多或者更少的 partition 以改變這個 DStream 的并行級別(level of parallelism)。 | **union**(_otherStream_) | 返回一個新的 DStream,它包含源 DStream 和 _otherDStream_ 的所有元素。 | **count**() | 通過 count 源 DStream 中每個 RDD 的元素數量,返回一個包含單元素(single-element)RDDs 的新 DStream。 | **reduce**(_func_) | 利用函數 _func_ 聚集源 DStream 中每個 RDD 的元素,返回一個包含單元素(single-element)RDDs 的新 DStream。函數應該是相關聯的,以使計算可以并行化。 | **countByValue**() | 在元素類型為 K 的 DStream上,返回一個(K,long)pair 的新的 DStream,每個 key 的值是在原 DStream 的每個 RDD 中的次數。 | **reduceByKey**(_func_, [_numTasks_]) | 當在一個由 (K,V) pairs 組成的 DStream 上調用這個算子時,返回一個新的,由 (K,V) pairs 組成的 DStream,每一個 key 的值均由給定的 reduce 函數聚合起來。**注意**:在默認情況下,這個算子利用了 Spark 默認的并發任務數去分組。你可以用 numTasks 參數設置不同的任務數。 | | **join**(_otherStream_, [_numTasks_]) | 當應用于兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, (V, W)) 對的新 DStream。 | **cogroup**(_otherStream_, [_numTasks_]) | 當應用于兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, Seq[V], Seq[W]) 的 tuples(元組)。 | **transform**(_func_) | 通過對源 DStream 的每個 RDD 應用 RDD-to-RDD 函數,創建一個新的 DStream。這個可以在 DStream 中的任何 RDD 操作中使用。 | **updateStateByKey**(_func_) | 返回一個新的 "狀態" 的 DStream,其中每個 key 的狀態通過在 key 的先前狀態應用給定的函數和 key 的新 valyes 來更新。這可以用于維護每個 key 的任意狀態數據。 | | | 其中一些轉換值得深入討論。 #### UpdateStateByKey 操作 該 `updateStateByKey` 操作允許您維護任意狀態,同時不斷更新新信息。你需要通過兩步來使用它。 1. 定義 state - state 可以是任何的數據類型。 2. 定義 state update function(狀態更新函數)- 使用函數指定如何使用先前狀態來更新狀態,并從輸入流中指定新值。 在每個 batch 中,Spark 會使用狀態更新函數為所有已有的 key 更新狀態,不管在 batch 中是否含有新的數據。如果這個更新函數返回一個 none,這個 key-value pair 也會被消除。 讓我們舉個例子來說明。在例子中,假設你想保持在文本數據流中看到的每個單詞的運行計數,運行次數用一個 state 表示,它的類型是整數,我們可以使用如下方式來定義 update 函數: ``` def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) } ``` 這里是一個應用于包含 words(單詞)的 DStream 上(也就是說,在 [先前的示例](#一個入門示例)中,該 `pairs` DStream 包含了 (word, 1) pair)。 ``` val runningCounts = pairs.updateStateByKey[Int](updateFunction _) ``` update 函數將會被每個單詞調用,`newValues` 擁有一系列的 1(來自 (word, 1) pairs),runningCount 擁有之前的次數。 ``` Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = (values, state) -> { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); }; ``` This is applied on a DStream containing words (say, the `pairs` DStream containing `(word, 1)` pairs in the [quick example](#a-quick-example)). ``` JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction); ``` The update function will be called for each word, with `newValues` having a sequence of 1’s (from the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete Java code, take a look at the example [JavaStatefulNetworkWordCount.java](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/java/org/apache/spark/examples/streaming /JavaStatefulNetworkWordCount.java). ``` def updateFunction(newValues, runningCount): if runningCount is None: runningCount = 0 return sum(newValues, runningCount) # add the new values with the previous running count to get the new count ``` This is applied on a DStream containing words (say, the `pairs` DStream containing `(word, 1)` pairs in the [earlier example](#a-quick-example)). ``` runningCounts = pairs.updateStateByKey(updateFunction) ``` The update function will be called for each word, with `newValues` having a sequence of 1’s (from the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete Python code, take a look at the example [stateful_network_wordcount.py](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/python/streaming/stateful_network_wordcount.py). 請注意,使用 `updateStateByKey` 需要配置的 `checkpoint`(檢查點)的目錄,這里是更詳細關于討論 [checkpointing](#checkpointing) 的部分。 #### Transform Operation(轉換操作) transform 操作(以及它的變化形式如 `transformWith`)允許在 DStream 運行任何 RDD-to-RDD 函數。它能夠被用來應用任何沒在 DStream API 中提供的 RDD 操作。例如,連接數據流中的每個批(batch)和另外一個數據集的功能并沒有在 DStream API 中提供,然而你可以簡單的利用 `transform` 方法做到。這使得有非常強大的可能性。例如,可以通過將輸入數據流與預先計算的垃圾郵件信息(也可以使用 Spark 一起生成)進行實時數據清理,然后根據它進行過濾。 ``` val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... } ``` ``` import org.apache.spark.streaming.api.java.*; // RDD containing spam information JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...); JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> { rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning ... }); ``` ``` spamInfoRDD = sc.pickleFile(...) # RDD containing spam information # join data stream with spam information to do data cleaning cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...)) ``` 請注意,每個 batch interval(批間隔)提供的函數被調用。這允許你做隨時間變動的 RDD 操作,即 RDD 操作,分區的數量,廣播變量,等等。batch 之間等可以改變。 #### Window Operations(窗口操作) Spark Streaming 也支持 _windowed computations(窗口計算)_,它允許你在數據的一個滑動窗口上應用 transformation(轉換)。下圖說明了這個滑動窗口。 ![Spark Streaming](https://img.kancloud.cn/ca/21/ca213fa24a34cfdacd05808391d5b0e9_994x388.jpg "Spark Streaming data flow") 如上圖顯示,窗口在源 DStream 上 _slides(滑動)_,合并和操作落入窗內的源 RDDs,產生窗口化的 DStream 的 RDDs。在這個具體的例子中,程序在三個時間單元的數據上進行窗口操作,并且每兩個時間單元滑動一次。這說明,任何一個窗口操作都需要指定兩個參數。 * _window length(窗口長度)_ - 窗口的持續時間(圖 3)。 * _sliding interval(滑動間隔)_ - 執行窗口操作的間隔(圖 2)。 這兩個參數必須是 source DStream 的 batch interval(批間隔)的倍數(圖 1)。 讓我們舉例以說明窗口操作。例如,你想擴展前面的例子用來計算過去 30 秒的詞頻,間隔時間是 10 秒。為了達到這個目的,我們必須在過去 30 秒的 `(wrod, 1)` pairs 的 `pairs` DStream 上應用 `reduceByKey` 操作。用方法 `reduceByKeyAndWindow` 實現。 ``` // Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) ``` ``` // Reduce last 30 seconds of data, every 10 seconds JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10)); ``` ``` # Reduce last 30 seconds of data, every 10 seconds windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10) ``` 一些常用的窗口操作如下所示,這些操作都需要用到上文提到的兩個參數 - _windowLength(窗口長度)_ 和 _slideInterval(滑動的時間間隔)_。 | Transformation(轉換)| Meaning(含義)| | --- | --- | | **window**(_windowLength_, _slideInterval_) | 返回一個新的 DStream,它是基于 source DStream 的窗口 batch 進行計算的。 | **countByWindow**(_windowLength_, _slideInterval_) | 返回 stream(流)中滑動窗口元素的數 | | **reduceByWindow**(_func_, _windowLength_, _slideInterval_) | 返回一個新的單元素 stream(流),它通過在一個滑動間隔的 stream 中使用 _func_ 來聚合以創建。該函數應該是 associative(關聯的)且 commutative(可交換的),以便它可以并行計算 | | **reduceByKeyAndWindow**(_func_, _windowLength_, _slideInterval_, [_numTasks_]) | 在一個 (K, V) pairs 的 DStream 上調用時,返回一個新的 (K, V) pairs 的 Stream,其中的每個 key 的 values 是在滑動窗口上的 batch 使用給定的函數 _func_ 來聚合產生的。**Note(注意):** 默認情況下,該操作使用 Spark 的默認并行任務數量(local model 是 2,在 cluster mode 中的數量通過 `spark.default.parallelism` 來確定)來做 grouping。您可以通過一個可選的 `numTasks` 參數來設置一個不同的 tasks(任務)數量。 | **reduceByKeyAndWindow**(_func_, _invFunc_, _windowLength_, _slideInterval_, [_numTasks_]) | 上述 `reduceByKeyAndWindow()` 的更有效的一個版本,其中使用前一窗口的 reduce 值逐漸計算每個窗口的 reduce值。這是通過減少進入滑動窗口的新數據,以及 “inverse reducing(逆減)” 離開窗口的舊數據來完成的。一個例子是當窗口滑動時”添加” 和 “減” keys 的數量。然而,它僅適用于 “invertible reduce functions(可逆減少函數)”,即具有相應 “inverse reduce(反向減少)” 函數的 reduce 函數(作為參數 _invFunc &lt;/ i&gt;)。像在 `reduceByKeyAndWindow` 中的那樣,reduce 任務的數量可以通過可選參數進行配置。請注意,針對該操作的使用必須啟用 [checkpointing](#checkpointing)._ | | **countByValueAndWindow**(_windowLength_, _slideInterval_, [_numTasks_]) | 在一個 (K, V) pairs 的 DStream 上調用時,返回一個新的 (K, Long) pairs 的 DStream,其中每個 key 的 value 是它在一個滑動窗口之內的頻次。像 code&gt;reduceByKeyAndWindow&lt;/code&gt; 中的那樣,reduce 任務的數量可以通過可選參數進行配置。 | | | #### Join 操作 最后,它值得強調的是,您可以輕松地在 Spark Streaming 中執行不同類型的 join。 ##### Stream-stream joins Streams(流)可以非常容易地與其他流進行 join。 ``` val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) ``` ``` JavaPairDStream<String, String> stream1 = ... JavaPairDStream<String, String> stream2 = ... JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2); ``` ``` stream1 = ... stream2 = ... joinedStream = stream1.join(stream2) ``` 這里,在每個 batch interval(批間隔)中,由 `stream1` 生成的 RDD 將與 `stream2` 生成的 RDD 進行 jion。你也可以做 `leftOuterJoin`,`rightOuterJoin`,`fullOuterJoin`。此外,在 stream(流)的窗口上進行 join 通常是非常有用的。這也很容易做到。 ``` val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2) ``` ``` JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20)); JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1)); JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2); ``` ``` windowedStream1 = stream1.window(20) windowedStream2 = stream2.window(60) joinedStream = windowedStream1.join(windowedStream2) ``` ##### Stream-dataset joins 這在解釋 `DStream.transform` 操作時已經在前面演示過了。這是另一個 join window stream(窗口流)與 dataset 的例子。 ``` val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) } ``` ``` JavaPairRDD<String, String> dataset = ... JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20)); JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset)); ``` ``` dataset = ... # some RDD windowedStream = stream.window(20) joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset)) ``` 實際上,您也可以動態更改要加入的 dataset。提供給 `transform` 的函數是每個 batch interval(批次間隔)進行評估,因此將使用 `dataset` 引用指向當前的 dataset。 DStream 轉換的完整列表可在 API 文檔中找到。針對 Scala API,請看 [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream) 和 [PairDStreamFunctions](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions)。針對 Java API,請看 [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html) 和 [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html)。針對 Python API,請看 [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)。 * * * ## DStreams 上的輸出操作 輸出操作允許將 DStream 的數據推送到外部系統,如數據庫或文件系統。由于輸出操作實際上允許外部系統使用變換后的數據,所以它們觸發所有 DStream 變換的實際執行(類似于RDD的動作)。目前,定義了以下輸出操作: | Output Operation | Meaning | | --- | --- | | **print**() | 在運行流應用程序的 driver 節點上的DStream中打印每批數據的前十個元素。這對于開發和調試很有用。 Python API 這在 Python API 中稱為 **pprint()**。 | **saveAsTextFiles**(_prefix_, [_suffix_]) | 將此 DStream 的內容另存為文本文件。每個批處理間隔的文件名是根據 _前綴_ 和 _后綴_:_"prefix-TIME_IN_MS[.suffix]"_ 生成的。 | **saveAsObjectFiles**(_prefix_, [_suffix_]) | 將此 DStream 的內容另存為序列化 Java 對象的 `SequenceFiles`。每個批處理間隔的文件名是根據 _前綴_ 和 _后綴_:_"prefix-TIME_IN_MS[.suffix]"_ 生成的。 Python API 這在Python API中是不可用的。 | **saveAsHadoopFiles**(_prefix_, [_suffix_]) | 將此 DStream 的內容另存為 Hadoop 文件。每個批處理間隔的文件名是根據 _前綴_ 和 _后綴_:_"prefix-TIME_IN_MS[.suffix]"_ 生成的。 Python API 這在Python API中是不可用的。 | **foreachRDD**(_func_) | 對從流中生成的每個 RDD 應用函數 _func_ 的最通用的輸出運算符。此功能應將每個 RDD 中的數據推送到外部系統,例如將 RDD 保存到文件,或將其通過網絡寫入數據庫。請注意,函數 _func_ 在運行流應用程序的 driver 進程中執行,通常會在其中具有 RDD 動作,這將強制流式傳輸 RDD 的計算。 | | | ### foreachRDD 設計模式的使用 `dstream.foreachRDD` 是一個強大的原語,允許將數據發送到外部系統。但是,了解如何正確有效地使用這個原語很重要。避免一些常見的錯誤如下。 通常向外部系統寫入數據需要創建連接對象(例如與遠程服務器的 TCP 連接),并使用它將數據發送到遠程系統。為此,開發人員可能會無意中嘗試在Spark driver 中創建連接對象,然后嘗試在Spark工作人員中使用它來在RDD中保存記錄。例如(在 Scala 中): ``` dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } } ``` ``` dstream.foreachRDD(rdd -> { Connection connection = createNewConnection(); // executed at the driver rdd.foreach(record -> { connection.send(record); // executed at the worker }); }); ``` ``` def sendRecord(rdd): connection = createNewConnection() # executed at the driver rdd.foreach(lambda record: connection.send(record)) connection.close() dstream.foreachRDD(sendRecord) ``` 這是不正確的,因為這需要將連接對象序列化并從 driver 發送到 worker。這種連接對象很少能跨機器轉移。此錯誤可能會顯示為序列化錯誤(連接對象不可序列化),初始化錯誤(連接對象需要在 worker 初始化)等。正確的解決方案是在 worker 創建連接對象。 但是,這可能會導致另一個常見的錯誤 - 為每個記錄創建一個新的連接。例如: ``` dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } } ``` ``` dstream.foreachRDD(rdd -> { rdd.foreach(record -> { Connection connection = createNewConnection(); connection.send(record); connection.close(); }); }); ``` ``` def sendRecord(record): connection = createNewConnection() connection.send(record) connection.close() dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord)) ``` 通常,創建連接對象具有時間和資源開銷。因此,創建和銷毀每個記錄的連接對象可能會引起不必要的高開銷,并可顯著降低系統的總體吞吐量。一個更好的解決方案是使用 `rdd.foreachPartition` - 創建一個連接對象,并使用該連接在 RDD 分區中發送所有記錄。 ``` dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } } ``` ``` dstream.foreachRDD(rdd -> { rdd.foreachPartition(partitionOfRecords -> { Connection connection = createNewConnection(); while (partitionOfRecords.hasNext()) { connection.send(partitionOfRecords.next()); } connection.close(); }); }); ``` ``` def sendPartition(iter): connection = createNewConnection() for record in iter: connection.send(record) connection.close() dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition)) ``` 這樣可以在多個記錄上分攤連接創建開銷。 最后,可以通過跨多個RDD /批次重用連接對象來進一步優化。可以維護連接對象的靜態池,而不是將多個批次的 RDD 推送到外部系統時重新使用,從而進一步減少開銷。 ``` dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } ``` ``` dstream.foreachRDD(rdd -> { rdd.foreachPartition(partitionOfRecords -> { // ConnectionPool is a static, lazily initialized pool of connections Connection connection = ConnectionPool.getConnection(); while (partitionOfRecords.hasNext()) { connection.send(partitionOfRecords.next()); } ConnectionPool.returnConnection(connection); // return to the pool for future reuse }); }); ``` ``` def sendPartition(iter): # ConnectionPool is a static, lazily initialized pool of connections connection = ConnectionPool.getConnection() for record in iter: connection.send(record) # return to the pool for future reuse ConnectionPool.returnConnection(connection) dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition)) ``` 請注意,池中的連接應根據需要懶惰創建,如果不使用一段時間,則會超時。這實現了最有效地將數據發送到外部系統. ##### 其他要記住的要點: * DStreams 通過輸出操作進行延遲執行,就像 RDD 由 RDD 操作懶惰地執行。具體來說,DStream 輸出操作中的 RDD 動作強制處理接收到的數據。因此,如果您的應用程序沒有任何輸出操作,或者具有 `dstream.foreachRDD()` 等輸出操作,而在其中沒有任何 RDD 操作,則不會執行任何操作。系統將簡單地接收數據并將其丟棄。 * 默認情況下,輸出操作是 one-at-a-time 執行的。它們按照它們在應用程序中定義的順序執行。 * * * ## DataFrame 和 SQL 操作 您可以輕松地在流數據上使用 [DataFrames and SQL](sql-programming-guide.html) 和 SQL 操作。您必須使用 StreamingContext 正在使用的 SparkContext 創建一個 SparkSession。此外,必須這樣做,以便可以在 driver 故障時重新啟動。這是通過創建一個簡單實例化的 SparkSession 單例實例來實現的。這在下面的示例中顯示。它使用 DataFrames 和 SQL 來修改早期的字數 [示例以生成單詞計數](#a-quick-example)。將每個 RDD 轉換為 DataFrame,注冊為臨時表,然后使用 SQL 進行查詢。 ``` /** DataFrame operations inside your streaming program */ val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() } ``` 請參閱完整的 [源代碼](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala). ``` /** Java Bean class for converting RDD to DataFrame */ public class JavaRow implements java.io.Serializable { private String word; public String getWord() { return word; } public void setWord(String word) { this.word = word; } } ... /** DataFrame operations inside your streaming program */ JavaDStream<String> words = ... words.foreachRDD((rdd, time) -> { // Get the singleton instance of SparkSession SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate(); // Convert RDD[String] to RDD[case class] to DataFrame JavaRDD<JavaRow> rowRDD = rdd.map(word -> { JavaRow record = new JavaRow(); record.setWord(word); return record; }); DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class); // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words"); // Do word count on table using SQL and print it DataFrame wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word"); wordCountsDataFrame.show(); }); ``` 請參閱完整的 [源代碼](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java)。 ``` # Lazily instantiated global instance of SparkSession def getSparkSessionInstance(sparkConf): if ("sparkSessionSingletonInstance" not in globals()): globals()["sparkSessionSingletonInstance"] = SparkSession \ .builder \ .config(conf=sparkConf) \ .getOrCreate() return globals()["sparkSessionSingletonInstance"] ... # DataFrame operations inside your streaming program words = ... # DStream of strings def process(time, rdd): print("========= %s =========" % str(time)) try: # Get the singleton instance of SparkSession spark = getSparkSessionInstance(rdd.context.getConf()) # Convert RDD[String] to RDD[Row] to DataFrame rowRdd = rdd.map(lambda w: Row(word=w)) wordsDataFrame = spark.createDataFrame(rowRdd) # Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words") # Do word count on table using SQL and print it wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() except: pass words.foreachRDD(process) ``` 請參閱完整的 [源代碼](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/python/streaming/sql_network_wordcount.py)。 您還可以對來自不同線程的流數據(即異步運行的 StreamingContext)上定義的表運行 SQL 查詢。只需確保您將 StreamingContext 設置為記住足夠數量的流數據,以便查詢可以運行。否則,不知道任何異步 SQL 查詢的 StreamingContext 將在查詢完成之前刪除舊的流數據。例如,如果要查詢最后一個批次,但是您的查詢可能需要5分鐘才能運行,則可以調用 `streamingContext.remember(Minutes(5))`(以 Scala 或其他語言的等價物)。 有關DataFrames的更多信息,請參閱 [DataFrames 和 SQL 指南](sql-programming-guide.html)。 * * * ## MLlib 操作 您還可以輕松使用 [MLlib](ml-guide.html) 提供的機器學習算法。首先,有 streaming 機器學習算法(例如:[Streaming 線性回歸](mllib-linear-methods.html#streaming-linear-regression),[Streaming KMeans](mllib-clustering.html#streaming-k-means) 等),其可以同時從 streaming 數據中學習,并將該模型應用于 streaming 數據。除此之外,對于更大類的機器學習算法,您可以離線學習一個學習模型(即使用歷史數據),然后將該模型在線應用于流數據。有關詳細信息,請參閱 [MLlib指南](ml-guide.html)。 * * * ## 緩存 / 持久性 與 RDD 類似,DStreams 還允許開發人員將流的數據保留在內存中。也就是說,在 DStream 上使用 `persist()` 方法會自動將該 DStream 的每個 RDD 保留在內存中。如果 DStream 中的數據將被多次計算(例如,相同數據上的多個操作),這將非常有用。對于基于窗口的操作,如 `reduceByWindow` 和 `reduceByKeyAndWindow` 以及基于狀態的操作,如 `updateStateByKey`,這是隱含的。因此,基于窗口的操作生成的 DStream 會自動保存在內存中,而不需要開發人員調用 `persist()`。 對于通過網絡接收數據(例如:Kafka,Flume,sockets 等)的輸入流,默認持久性級別被設置為將數據復制到兩個節點進行容錯。 請注意,與 RDD 不同,DStreams 的默認持久性級別將數據序列化在內存中。這在 [性能調優](#memory-tuning) 部分進一步討論。有關不同持久性級別的更多信息,請參見 [Spark編程指南](programming-guide.html#rdd-persistence)。 * * * ## Checkpointing streaming 應用程序必須 24/7 運行,因此必須對應用邏輯無關的故障(例如,系統故障,JVM 崩潰等)具有彈性。為了可以這樣做,Spark Streaming 需要 _checkpoint_ 足夠的信息到容錯存儲系統,以便可以從故障中恢復。_checkpoint_ 有兩種類型的數據。 * _Metadata checkpointing_ - 將定義 streaming 計算的信息保存到容錯存儲(如 HDFS)中。這用于從運行 streaming 應用程序的 driver 的節點的故障中恢復(稍后詳細討論)。元數據包括: * _Configuration_ - 用于創建流應用程序的配置。 * _DStream operations_ - 定義 streaming 應用程序的 DStream 操作集。 * _Incomplete batches_ - 批量的job 排隊但尚未完成。 * _Data checkpointing_ - 將生成的 RDD 保存到可靠的存儲。這在一些將多個批次之間的數據進行組合的 _狀態_ 變換中是必需的。在這種轉換中,生成的 RDD 依賴于先前批次的 RDD,這導致依賴鏈的長度隨時間而增加。為了避免恢復時間的這種無限增加(與依賴關系鏈成比例),有狀態轉換的中間 RDD 會定期 _checkpoint_ 到可靠的存儲(例如 HDFS)以切斷依賴關系鏈。 總而言之,元數據 checkpoint 主要用于從 driver 故障中恢復,而數據或 RDD checkpoint 對于基本功能(如果使用有狀態轉換)則是必需的。 #### 何時啟用 checkpoint 對于具有以下任一要求的應用程序,必須啟用 checkpoint: * _使用狀態轉換_ - 如果在應用程序中使用 `updateStateByKey`或 `reduceByKeyAndWindow`(具有反向功能),則必須提供 checkpoint 目錄以允許定期的 RDD checkpoint。 * _從運行應用程序的 driver 的故障中恢復_ - 元數據 checkpoint 用于使用進度信息進行恢復。 請注意,無需進行上述有狀態轉換的簡單 streaming 應用程序即可運行,無需啟用 checkpoint。在這種情況下,驅動器故障的恢復也將是部分的(一些接收但未處理的數據可能會丟失)。這通常是可以接受的,許多運行 Spark Streaming 應用程序。未來對非 Hadoop 環境的支持預計會有所改善。 #### 如何配置 checkpoint 可以通過在保存 checkpoint 信息的容錯,可靠的文件系統(例如,HDFS,S3等)中設置目錄來啟用 checkpoint。這是通過使用 `streamingContext.checkpoint(checkpointDirectory)` 完成的。這將允許您使用上述有狀態轉換。另外,如果要使應用程序從 driver 故障中恢復,您應該重寫 streaming 應用程序以具有以下行為。 * 當程序第一次啟動時,它將創建一個新的 StreamingContext,設置所有流,然后調用 start()。 * 當程序在失敗后重新啟動時,它將從 checkpoint 目錄中的 checkpoint 數據重新創建一個 StreamingContext。 使用 `StreamingContext.getOrCreate` 可以簡化此行為。這樣使用如下。 ``` // Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination() ``` If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. If the directory does not exist (i.e., running for the first time), then the function `functionToCreateContext` will be called to create a new context and set up the DStreams. See the Scala example [RecoverableNetworkWordCount](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala). This example appends the word counts of network data into a file. This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This is used as follows. ``` // Create a factory object that can create and setup a new JavaStreamingContext JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...); // new context JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams ... jssc.checkpoint(checkpointDirectory); // set checkpoint directory return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start(); context.awaitTermination(); ``` If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. If the directory does not exist (i.e., running for the first time), then the function `contextFactory` will be called to create a new context and set up the DStreams. See the Java example [JavaRecoverableNetworkWordCount](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java). This example appends the word counts of network data into a file. This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows. ``` # Function to create and setup a new StreamingContext def functionToCreateContext(): sc = SparkContext(...) # new context ssc = StreamingContext(...) lines = ssc.socketTextStream(...) # create DStreams ... ssc.checkpoint(checkpointDirectory) # set checkpoint directory return ssc # Get StreamingContext from checkpoint data or create a new one context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) # Do additional setup on context that needs to be done, # irrespective of whether it is being started or restarted context. ... # Start the context context.start() context.awaitTermination() ``` If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. If the directory does not exist (i.e., running for the first time), then the function `functionToCreateContext` will be called to create a new context and set up the DStreams. See the Python example [recoverable_network_wordcount.py](https://github.com/apache/spark/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py). This example appends the word counts of network data into a file. You can also explicitly create a `StreamingContext` from the checkpoint data and start the computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`. 除了使用 `getOrCreate` 之外,還需要確保在失敗時自動重新啟動 driver 進程。這只能由用于運行應用程序的部署基礎架構完成。這在 [部署](#deploying-applications) 部分進一步討論。 請注意,RDD 的 checkpoint 會導致保存到可靠存儲的成本。這可能會導致 RDD 得到 checkpoint 的批次的處理時間增加。因此,需要仔細設置 checkpoint 的間隔。在小批量大小(例如:1秒),檢查每個批次可能會顯著降低操作吞吐量。相反,checkpoint 太少會導致譜系和任務大小增長,這可能會產生不利影響。對于需要 RDD checkpoint 的狀態轉換,默認間隔是至少10秒的批間隔的倍數。它可以通過使用 `dstream.checkpoint(checkpointInterval)` 進行設置。通常,DStream 的5到10個滑動間隔的 checkpoint 間隔是一個很好的設置。 * * * ## Accumulators,Broadcast 變量,和 Checkpoint 在Spark Streaming中,無法從 checkpoint 恢復 [Accumulators](programming-guide.html#accumulators) 和 [Broadcast 變量](programming-guide.html#broadcast-variables)。如果啟用 checkpoint 并使用 [Accumulators](programming-guide.html#accumulators) 或 [Broadcast 變量](programming-guide.html#broadcast-variables),則必須為 [Accumulators](programming-guide.html#accumulators) 和 [Broadcast 變量](programming-guide.html#broadcast-variables) 創建延遲實例化的單例實例,以便在 driver 重新啟動失敗后重新實例化。這在下面的示例中顯示: ``` object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter.add(count) false } else { true } }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts }) ``` 請參閱完整的 [源代碼](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala). ``` class JavaWordBlacklist { private static volatile Broadcast<List<String>> instance = null; public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) { if (instance == null) { synchronized (JavaWordBlacklist.class) { if (instance == null) { List<String> wordBlacklist = Arrays.asList("a", "b", "c"); instance = jsc.broadcast(wordBlacklist); } } } return instance; } } class JavaDroppedWordsCounter { private static volatile LongAccumulator instance = null; public static LongAccumulator getInstance(JavaSparkContext jsc) { if (instance == null) { synchronized (JavaDroppedWordsCounter.class) { if (instance == null) { instance = jsc.sc().longAccumulator("WordsInBlacklistCounter"); } } } return instance; } } wordCounts.foreachRDD((rdd, time) -> { // Get or register the blacklist Broadcast Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); // Get or register the droppedWordsCounter Accumulator LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); // Use blacklist to drop words and use droppedWordsCounter to count them String counts = rdd.filter(wordCount -> { if (blacklist.value().contains(wordCount._1())) { droppedWordsCounter.add(wordCount._2()); return false; } else { return true; } }).collect().toString(); String output = "Counts at time " + time + " " + counts; } ``` 請參閱完整的 [源代碼](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java). ``` def getWordBlacklist(sparkContext): if ("wordBlacklist" not in globals()): globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"]) return globals()["wordBlacklist"] def getDroppedWordsCounter(sparkContext): if ("droppedWordsCounter" not in globals()): globals()["droppedWordsCounter"] = sparkContext.accumulator(0) return globals()["droppedWordsCounter"] def echo(time, rdd): # Get or register the blacklist Broadcast blacklist = getWordBlacklist(rdd.context) # Get or register the droppedWordsCounter Accumulator droppedWordsCounter = getDroppedWordsCounter(rdd.context) # Use blacklist to drop words and use droppedWordsCounter to count them def filterFunc(wordCount): if wordCount[0] in blacklist.value: droppedWordsCounter.add(wordCount[1]) False else: True counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) wordCounts.foreachRDD(echo) ``` 請參閱完整的 [源代碼](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/python/streaming/recoverable_network_wordcount.py)。 * * * ## 應用程序部署 本節討論部署 Spark Streaming 應用程序的步驟。 ### 要求 要運行 Spark Streaming 應用程序,您需要具備以下功能。 * _集群管理器集群_ - 這是任何 Spark 應用程序的一般要求,并在 [部署指南](cluster-overview.html) 中詳細討論。 * _打包應用程序 JAR_ - 您必須將 streaming 應用程序編譯為 JAR。如果您正在使用 [`spark-submit`](submitting-applications.html) 啟動應用程序,則不需要在 JAR 中提供 Spark 和 Spark Streaming。但是,如果您的應用程序使用高級資源(例如:Kafka,Flume),那么您將必須將他們鏈接的額外工件及其依賴項打包在用于部署應用程序的 JAR 中。例如,使用 `KafkaUtils` 的應用程序必須在應用程序 JAR 中包含 `spark-streaming-kafka-0-8_2.11` 及其所有傳遞依賴關系。 * _為 executor 配置足夠的內存_ - 由于接收到的數據必須存儲在內存中,所以 executor 必須配置足夠的內存來保存接收到的數據。請注意,如果您正在進行10分鐘的窗口操作,系統必須至少保留最近10分鐘的內存中的數據。因此,應用程序的內存要求取決于其中使用的操作。 * _配置 checkpoint_ - 如果 streaming 應用程序需要它,則 Hadoop API 兼容容錯存儲(例如:HDFS,S3等)中的目錄必須配置為 checkpoint 目錄,并且流程應用程序以 checkpoint 信息的方式編寫 用于故障恢復。有關詳細信息,請參閱 [checkpoint](#checkpointing) 部分。 * _配置應用程序 driver 的自動重新啟動_ - 要從 driver 故障自動恢復,用于運行流應用程序的部署基礎架構必須監視 driver 進程,并在 driver 發生故障時重新啟動 driver。不同的 [集群管理者](cluster-overview.html#cluster-manager-types) 有不同的工具來實現這一點。 * _Spark Standalone_ - 可以提交 Spark 應用程序 driver 以在Spark Standalone集群中運行(請參閱 [集群部署模式](spark-standalone.html#launching-spark-applications)),即應用程序 driver 本身在其中一個工作節點上運行。此外,可以指示獨立的群集管理器來監督 driver,如果由于非零退出代碼而導致 driver 發生故障,或由于運行 driver 的節點發生故障,則可以重新啟動它。有關詳細信息,請參閱 [Spark Standalone 指南]](spark-standalone.html) 中的群集模式和監督。 * _YARN_ - Yarn 支持類似的機制來自動重新啟動應用程序。有關詳細信息,請參閱 YARN文檔。 * _Mesos_ - [Marathon](https://github.com/mesosphere/marathon) 已被用來實現這一點與Mesos。 * _配置預寫日志_ - 自 Spark 1.2 以來,我們引入了寫入日志來實現強大的容錯保證。如果啟用,則從 receiver 接收的所有數據都將寫入配置 checkpoint 目錄中的寫入日志。這可以防止 driver 恢復時的數據丟失,從而確保零數據丟失(在 [容錯語義](#fault-tolerance-semantics) 部分中詳細討論)。可以通過將 [配置參數](configuration.html#spark-streaming) `spark.streaming.receiver.writeAheadLog.enable` 設置為 `true`來啟用此功能。然而,這些更強的語義可能以單個 receiver 的接收吞吐量為代價。通過 [并行運行更多的 receiver](#level-of-parallelism-in-data-receiving) 可以糾正這一點,以增加總吞吐量。另外,建議在啟用寫入日志時,在日志已經存儲在復制的存儲系統中時,禁用在 Spark 中接收到的數據的復制。這可以通過將輸入流的存儲級別設置為 `StorageLevel.MEMORY_AND_DISK_SER` 來完成。使用 S3(或任何不支持刷新的文件系統)寫入日志時,請記住啟用 `spark.streaming.driver.writeAheadLog.closeFileAfterWrite` 和`spark.streaming.receiver.writeAheadLog.closeFileAfterWrite`。有關詳細信息,請參閱 [Spark Streaming配](configuration.html#spark-streaming)。請注意,啟用 I/O 加密時,Spark 不會將寫入寫入日志的數據加密。如果需要對提前記錄數據進行加密,則應將其存儲在本地支持加密的文件系統中。 * _設置最大接收速率_ - 如果集群資源不夠大,streaming 應用程序能夠像接收到的那樣快速處理數據,則可以通過設置 記錄/秒 的最大速率限制來對 receiver 進行速率限制。請參閱 receiver 的 `spark.streaming.receiver.maxRate` 和用于 Direct Kafka 方法的 `spark.streaming.kafka.maxRatePerPartition` 的 [配置參數](configuration.html#spark-streaming)。在Spark 1.5中,我們引入了一個稱為背壓的功能,無需設置此速率限制,因為Spark Streaming會自動計算速率限制,并在處理條件發生變化時動態調整速率限制。可以通過將 [配置參數](configuration.html#spark-streaming) `spark.streaming.backpressure.enabled` 設置為 `true` 來啟用此 backpressure。 ### 升級應用程序代碼 如果運行的 Spark Streaming 應用程序需要使用新的應用程序代碼進行升級,則有兩種可能的機制。 * 升級后的 Spark Streaming 應用程序與現有應用程序并行啟動并運行。一旦新的(接收與舊的數據相同的數據)已經升溫并準備好黃金時段,舊的可以被關掉。請注意,這可以用于支持將數據發送到兩個目的地(即較早和已升級的應用程序)的數據源。 * 現有應用程序正常關閉(請參閱 [`StreamingContext.stop(...)`](api/scala/index.html#org.apache.spark.streaming.StreamingContext) 或 [`JavaStreamingContext.stop(...)`](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) 以獲取正常的關閉選項),以確保已關閉的數據在關閉之前被完全處理。然后可以啟動升級的應用程序,這將從較早的應用程序停止的同一點開始處理。請注意,只有在支持源端緩沖的輸入源(如:Kafka 和 Flume)時才可以進行此操作,因為數據需要在先前的應用程序關閉并且升級的應用程序尚未啟動時進行緩沖。從升級前代碼的早期 checkpoint 信息重新啟動不能完成。checkpoint 信息基本上包含序列化的 Scala/Java/Python 對象,并嘗試使用新的修改的類反序列化對象可能會導致錯誤。在這種情況下,可以使用不同的 checkpoint 目錄啟動升級的應用程序,也可以刪除以前的 checkpoint 目錄。 * * * ## Monitoring Applications(監控應用程序) 除了 Spark 的 [monitoring capabilities(監控功能)](monitoring.html),還有其他功能特定于 Spark Streaming。當使用 StreamingContext 時,[Spark web UI](monitoring.html#web-interfaces) 顯示一個額外的 `Streaming` 選項卡,顯示 running receivers(運行接收器)的統計信息(無論是 receivers(接收器)是否處于 active(活動狀態),接收到的 records(記錄)數,receiver error(接收器錯誤)等)并完成 batches(批次)(batch processing times(批處理時間),queueing delays(排隊延遲)等)。這可以用來監視 streaming application(流應用程序)的進度。 web UI 中的以下兩個 metrics(指標)特別重要: * _Processing Time(處理時間)_ - 處理每 batch(批)數據的時間。 * _Scheduling Delay(調度延遲)_ - batch(批處理)在 queue(隊列)中等待處理 previous batches(以前批次)完成的時間。 如果 batch processing time(批處理時間)始終 more than(超過)batch interval(批間隔)and/or queueing delay(排隊延遲)不斷增加,表示系統是無法快速 process the batches(處理批次),并且正在 falling behind(落后)。在這種情況下,請考慮 [reducing(減少)](#reducing-the-batch-processing-times) batch processing time(批處理時間)。 Spark Streaming 程序的進展也可以使用 [StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener) 接口,這允許您獲得 receiver status(接收器狀態)和 processing times(處理時間)。請注意,這是一個開發人員 API 并且將來可能會改善(即,更多的信息報告)。 * * * * * * # Performance Tuning(性能調優) 在集群上的 Spark Streaming application 中獲得最佳性能需要一些調整。本節介紹了可調整的多個 parameters(參數)和 configurations(配置)提高你的應用程序性能。在高層次上,你需要考慮兩件事情: 1. 通過有效利用集群資源,Reducing the processing time of each batch of data(減少每批數據的處理時間)。 2. 設置正確的 batch size(批量大小),以便 batches of data(批量的數據)可以像 received(被接收)處理一樣快(即 data processing(數據處理)與 data ingestion(數據攝取)保持一致)。 ## Reducing the Batch Processing Times(減少批處理時間) 在 Spark 中可以進行一些優化,以 minimize the processing time of each batch(最小化每批處理時間)。這些已在 [Tuning Guide(調優指南)](tuning.html) 中詳細討論過。本節突出了一些最重要的。 ### Level of Parallelism in Data Receiving(數據接收中的并行級別) 通過網絡接收數據(如Kafka,Flume,socket 等)需要 deserialized(反序列化)數據并存儲在 Spark 中。如果數據接收成為系統的瓶頸,那么考慮一下 parallelizing the data receiving(并行化數據接收)。注意每個 input DStream 創建接收 single stream of data(單個數據流)的 single receiver(單個接收器)(在 work machine 上運行)。因此,可以通過創建多個 input DStreams 來實現 Receiving multiple data streams(接收多個數據流)并配置它們以從 source(s) 接收 data stream(數據流)的 different partitions(不同分區)。例如,接收 two topics of data(兩個數據主題)的單個Kafka input DStream 可以分為兩個 Kafka input streams(輸入流),每個只接收一個 topic(主題)。這將運行兩個 receivers(接收器),允許 in parallel(并行)接收數據,從而提高 overall throughput(總體吞吐量)。這些 multiple DStreams 可以 unioned(聯合起來)創建一個 single DStream。然后 transformations(轉化)為應用于 single input DStream 可以應用于 unified stream。如下這樣做。 ``` val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print() ``` ``` int numStreams = 5; List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(...)); } JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); unifiedStream.print(); ``` ``` numStreams = 5 kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)] unifiedStream = streamingContext.union(*kafkaStreams) unifiedStream.pprint() ``` 應考慮的另一個參數是 receiver’s block interval(接收器的塊間隔),這由[configuration parameter(配置參數)](configuration.html#spark-streaming) 的 `spark.streaming.blockInterval` 決定。對于大多數 receivers(接收器),接收到的數據 coalesced(合并)在一起存儲在 Spark 內存之前的 blocks of data(數據塊)。每個 batch(批次)中的 blocks(塊)數確定將用于處理接收到的數據以 map-like(類似與 map 形式的)transformation(轉換)的 task(任務)的數量。每個 receiver(接收器)每 batch(批次)的任務數量將是大約(batch interval(批間隔)/ block interval(塊間隔))。例如,200 ms的 block interval(塊間隔)每 2 秒 batches(批次)創建 10 個 tasks(任務)。如果 tasks(任務)數量太少(即少于每個機器的內核數量),那么它將無效,因為所有可用的內核都不會被使用處理數據。要增加 given batch interval(給定批間隔)的 tasks(任務)數量,請減少 block interval(塊間??隔)。但是,推薦的 block interval(塊間隔)最小值約為 50ms,低于此任務啟動開銷可能是一個問題。 使用 multiple input streams(多個輸入流)/ receivers(接收器)接收數據的替代方法是明確 repartition(重新分配)input data stream(輸入數據流)(使用 `inputStream.repartition(&lt;number of partitions&gt;)`)。這會在 further processing(進一步處理)之前將 received batches of data(收到的批次數據)distributes(分發)到集群中指定數量的計算機。 ### Level of Parallelism in Data Processing(數據處理中的并行度水平) 如果在任何 computation(計算)階段中使用 number of parallel tasks(并行任務的數量),則 Cluster resources(集群資源)可能未得到充分利用。例如,對于 distributed reduce(分布式 reduce)操作,如 `reduceByKey` 和 `reduceByKeyAndWindow`,默認并行任務的數量由 `spark.default.parallelism` [configuration property](configuration.html#spark-properties) 控制。您 可以通過 parallelism(并行度)作為參數(見 [`PairDStreamFunctions`](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions) 文檔),或設置 `spark.default.parallelism` [configuration property](configuration.html#spark-properties) 更改默認值。 ### Data Serialization(數據序列化) 可以通過調優 serialization formats(序列化格式)來減少數據 serialization(序列化)的開銷。在 streaming 的情況下,有兩種類型的數據被 serialized(序列化)。 * **Input data(輸入數據)**:默認情況下,通過 Receivers 接收的 input data(輸入數據)通過 [StorageLevel。MEMORY_AND_DISK_SER_2](api/scala/index.html#org.apache.spark.storage.StorageLevel$) 存儲在 executors 的內存中。也就是說,將數據 serialized(序列化)為 bytes(字節)以減少 GC 開銷,并復制以容忍 executor failures(執行器故障)。此外,數據首先保留在內存中,并且只有在內存不足以容納 streaming computation(流計算)所需的所有輸入數據時才會 spilled over(溢出)到磁盤。這個 serialization(序列化)顯然具有開銷 - receiver(接收器)必須使接收的數據 deserialize(反序列化),并使用 Spark 的 serialization format(序列化格式)重新序列化它。 * **Persisted RDDs generated by Streaming Operations(流式操作生成的持久 RDDs)**:通過 streaming computations(流式計算)生成的 RDD 可能會持久存儲在內存中。例如,window operations(窗口操作)會將數據保留在內存中,因為它們將被處理多次。但是,與 [StorageLevel.MEMORY_ONLY](api/scala/index.html#org.apache.spark.storage.StorageLevel$) 的 Spark Core 默認情況不同,通過流式計算生成的持久化 RDD 將以 [StorageLevel.MEMORY_ONLY_SER](api/scala/index.html#org.apache.spark.storage.StorageLevel$)(即序列化),以最小化 GC 開銷。 在這兩種情況下,使用 Kryo serialization(Kryo 序列化)可以減少 CPU 和內存開銷。有關詳細信息,請參閱 [Spark Tuning Guide](tuning.html#data-serialization)。對于 Kryo,請考慮 registering custom classes,并禁用對象引用跟蹤(請參閱 [Configuration Guide](configuration.html#compression-and-serialization) 中的 Kryo 相關配置)。 在 streaming application 需要保留的數據量不大的特定情況下,可以將數據(兩種類型)作為 deserialized objects(反序列化對象)持久化,而不會導致過多的 GC 開銷。例如,如果您使用幾秒鐘的 batch intervals(批次間隔)并且沒有 window operations(窗口操作),那么可以通過明確地相應地設置 storage level(存儲級別)來嘗試禁用 serialization in persisted data(持久化數據中的序列化)。這將減少由于序列化造成的 CPU 開銷,潛在地提高性能,而不需要太多的 GC 開銷。 ### Task Launching Overheads(任務啟動開銷) 如果每秒啟動的任務數量很高(比如每秒 50 個或更多),那么這個開銷向 slaves 發送任務可能是重要的,并且將難以實現 sub-second latencies(次要的延遲)。可以通過以下更改減少開銷: * **Execution mode(執行模式)**:以 Standalone mode(獨立模式)或 coarse-grained Mesos 模式運行 Spark 比 fine-grained Mesos 模式更好的任務啟動時間。有關詳細信息,請參閱 [Running on Mesos guide](running-on-mesos.html)。 這些更改可能會將 batch processing time(批處理時間)縮短 100 毫秒,從而允許 sub-second batch size(次秒批次大小)是可行的。 * * * ## Setting the Right Batch Interval(設置正確的批次間隔) 對于在集群上穩定地運行的 Spark Streaming application,該系統應該能夠處理數據盡可能快地被接收。換句話說,應該處理批次的數據就像生成它們一樣快。這是否適用于 application 可以在 [monitoring](#monitoring-applications) streaming web UI 中的 processing times 中被找到,processing time(批處理處理時間)應小于 batch interval(批間隔)。 取決于 streaming computation(流式計算)的性質,使用的 batch interval(批次間隔)可能對處理由應用程序持續一組固定的 cluster resources(集群資源)的數據速率有重大的影響。例如,讓我們考慮早期的 WordCountNetwork 示例。對于特定的 data rate(數據速率),系統可能能夠跟蹤每 2 秒報告 word counts(即 2 秒的 batch interval(批次間隔)),但不能每 500 毫秒。因此,需要設置 batch interval(批次間隔),使預期的數據速率在生產可以持續。 為您的應用程序找出正確的 batch size(批量大小)的一個好方法是使用進行測試 conservative batch interval(保守的批次間隔)(例如 5-10 秒)和 low data rate(低數據速率)。驗證是否系統能夠跟上 data rate(數據速率),可以檢查遇到的 end-to-end delay(端到端延遲)的值通過每個 processed batch(處理的批次)(在 Spark driver log4j 日志中查找 “Total delay”,或使用 [StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener) 接口)。如果 delay(延遲)保持與 batch size(批量大小)相當,那么系統是穩定的。除此以外,如果延遲不斷增加,則意味著系統無法跟上,因此不穩定。一旦你有一個 stable configuration(穩定的配置)的想法,你可以嘗試增加 data rate and/or 減少 batch size。請注意,momentary increase(瞬時增加)由于延遲暫時增加只要延遲降低到 low value(低值),臨時數據速率增加就可以很好(即,小于 batch size(批量大小))。 * * * ## Memory Tuning(內存調優) 調整 Spark 應用程序的內存使用情況和 GC behavior 已經有很多的討論在 [Tuning Guide](tuning.html#memory-tuning) 中。我們強烈建議您閱讀一下。在本節中,我們將在 Spark Streaming applications 的上下文中討論一些 tuning parameters(調優參數)。 Spark Streaming application 所需的集群內存量在很大程度上取決于所使用的 transformations 類型。例如,如果要在最近 10 分鐘的數據中使用 window operation(窗口操作),那么您的集群應該有足夠的內存來容納內存中 10 分鐘的數據。或者如果要使用大量 keys 的 `updateStateByKey`,那么必要的內存將會很高。相反,如果你想做一個簡單的 map-filter-store 操作,那么所需的內存就會很低。 一般來說,由于通過 receivers(接收器)接收的數據與 StorageLevel.MEMORY_AND_DISK_SER_2 一起存儲,所以不適合內存的數據將會 spill over(溢出)到磁盤上。這可能會降低 streaming application(流式應用程序)的性能,因此建議您提供足夠的 streaming application(流量應用程序)所需的內存。最好仔細查看內存使用量并相應地進行估算。 memory tuning(內存調優)的另一個方面是 garbage collection(垃圾收集)。對于需要低延遲的 streaming application,由 JVM Garbage Collection 引起的大量暫停是不希望的。 有幾個 parameters(參數)可以幫助您調整 memory usage(內存使用量)和 GC 開銷: * **Persistence Level of DStreams(DStreams 的持久性級別)**:如前面在 [Data Serialization](#data-serialization) 部分中所述,input data 和 RDD 默認保持為 serialized bytes(序列化字節)。與 deserialized persistence(反序列化持久性)相比,這減少了內存使用量和 GC 開銷。啟用 Kryo serialization 進一步減少了 serialized sizes(序列化大小)和 memory usage(內存使用)。可以通過 compression(壓縮)來實現內存使用的進一步減少(參見Spark配置 `spark.rdd.compress`),代價是 CPU 時間。 * **Clearing old data(清除舊數據)**:默認情況下,DStream 轉換生成的所有 input data 和 persisted RDDs 將自動清除。Spark Streaming 決定何時根據所使用的 transformations(轉換)來清除數據。例如,如果您使用 10 分鐘的 window operation(窗口操作),則 Spark Streaming 將保留最近 10 分鐘的數據,并主動丟棄舊數據。數據可以通過設置 `streamingContext.remember` 保持更長的持續時間(例如交互式查詢舊數據)。 * **CMS Garbage Collector(CMS垃圾收集器)**:強烈建議使用 concurrent mark-and-sweep GC,以保持 GC 相關的暫停始終如一。即使 concurrent GC 已知可以減少 系統的整體處理吞吐量,其使用仍然建議實現更多一致的 batch processing times(批處理時間)。確保在 driver(使用 `--driver-java-options` 在 `spark-submit` 中)和 executors(使用 [Spark configuration](configuration.html#runtime-environment) `spark.executor.extraJavaOptions`)中設置 CMS GC。 * **Other tips(其他提示)**:為了進一步降低 GC 開銷,以下是一些更多的提示。 * 使用 `OFF_HEAP` 存儲級別的保持 RDDs。在 [Spark Programming Guide](programming-guide.html#rdd-persistence) 中查看更多詳細信息。 * 使用更小的 heap sizes 的 executors。這將降低每個 JVM heap 內的 GC 壓力。 * * * ##### Important points to remember(要記住的要點): * DStream 與 single receiver(單個接收器)相關聯。為了獲得讀取并行性,需要創建多個 receivers,即 multiple DStreams。receiver 在一個 executor 中運行。它占據一個 core(內核)。確保在 receiver slots are booked 后有足夠的內核進行處理,即 `spark.cores.max` 應該考慮 receiver slots。receivers 以循環方式分配給 executors。 * 當從 stream source 接收到數據時,receiver 創建數據 blocks(塊)。每個 blockInterval 毫秒生成一個新的數據塊。在 N = batchInterval/blockInterval 的 batchInterval 期間創建 N 個數據塊。這些塊由當前 executor 的 BlockManager 分發給其他執行程序的 block managers。之后,在驅動程序上運行的 Network Input Tracker(網絡輸入跟蹤器)通知有關進一步處理的塊位置 * 在驅動程序中為在 batchInterval 期間創建的塊創建一個 RDD。在 batchInterval 期間生成的塊是 RDD 的 partitions。每個分區都是一個 spark 中的 task。blockInterval == batchinterval 意味著創建 single partition(單個分區),并且可能在本地進行處理。 * 除非 non-local scheduling(非本地調度)進行,否則塊上的 map tasks(映射任務)將在 executors(接收 block,復制塊的另一個塊)中進行處理。具有更大的 block interval(塊間隔)意味著更大的塊。`spark.locality.wait` 的高值增加了處理 local node(本地節點)上的塊的機會。需要在這兩個參數之間找到平衡,以確保在本地處理較大的塊。 * 而不是依賴于 batchInterval 和 blockInterval,您可以通過調用 `inputDstream.repartition(n)` 來定義 number of partitions(分區數)。這樣可以隨機重新組合 RDD 中的數據,創建 n 個分區。是的,為了更大的 parallelism(并行性)。雖然是 shuffle 的代價。RDD 的處理由 driver’s jobscheduler 作為一項工作安排。在給定的時間點,只有一個 job 是 active 的。因此,如果一個作業正在執行,則其他作業將排隊。 * 如果您有兩個 dstream,將會有兩個 RDD 形成,并且將創建兩個將被安排在另一個之后的作業。為了避免這種情況,你可以聯合兩個 dstream。這將確保為 dstream 的兩個 RDD 形成一個 unionRDD。這個 unionRDD 然后被認為是一個 single job(單一的工作)。但 RDD 的 partitioning(分區)不受影響。 * 如果 batch processing time(批處理時間)超過 batchinterval(批次間隔),那么顯然 receiver 的內存將會開始填滿,最終會拋出 exceptions(最可能是 BlockNotFoundException)。目前沒有辦法暫停 receiver。使用 SparkConf 配置 `spark.streaming.receiver.maxRate`,receiver 的 rate 可以受到限制。 * * * * * * # Fault-tolerance Semantics(容錯語義) 在本節中,我們將討論 Spark Streaming applications 在該 event 中的行為的失敗。 ## Background(背景) 要了解 Spark Streaming 提供的語義,請記住 Spark 的 RDD 的基本 fault-tolerance semantics(容錯語義)。 1. RDD 是一個不可變的,確定性地可重新計算的分布式數據集。每個RDD 記住在容錯輸入中使用的確定性操作的 lineage 數據集創建它。 2. 如果 RDD 的任何 partition 由于工作節點故障而丟失,則該分區可以是 從 original fault-tolerant dataset(原始容錯數據集)中使用業務流程重新計算。 3. 假設所有的 RDD transformations 都是確定性的,最后的數據被轉換,無論 Spark 集群中的故障如何,RDD 始終是一樣的。 Spark 運行在容錯文件系統(如 HDFS 或 S3)中的數據上。因此,從容錯數據生成的所有 RDD 也都是容錯的。但是,這不是在大多數情況下,Spark Streaming 作為數據的情況通過網絡接收(除非 `fileStream` 被使用)。為了為所有生成的 RDD 實現相同的 fault-tolerance properties(容錯屬性),接收的數據在集群中的工作節點中的多個 Spark executors 之間進行復制(默認 replication factor(備份因子)為 2)。這導致了發生故障時需要恢復的系統中的兩種數據: 1. _Data received and replicated(數據接收和復制)_ - 這個數據在單個工作節點作為副本的故障中幸存下來,它存在于其他節點之一上。 2. _Data received but buffered for replication(接收數據但緩沖進行復制)_ - 由于不復制,恢復此數據的唯一方法是從 source 重新獲取。 此外,我們應該關注的有兩種 failures: 1. _Failure of a Worker Node(工作節點的故障)_ - 運行 executors 的任何工作節點都可能會故障,并且這些節點上的所有內存中數據將丟失。如果任何 receivers 運行在失敗節點,則它們的 buffered(緩沖)數據將丟失。 2. _Failure of the Driver Node(Driver 節點的故障)_ - 如果運行 Spark Streaming application 的 driver node 發生了故障,那么顯然 SparkContext 丟失了,所有的 executors 和其內存中的數據也一起丟失了。 有了這個基礎知識,讓我們了解 Spark Streaming 的 fault-tolerance semantics(容錯語義)。 ## Definitions(定義) streaming systems(流系統)的語義通常是通過系統可以處理每個記錄的次數來捕獲的。系統可以在所有可能的操作條件下提供三種類型的保證(盡管有故障等)。 1. _At most once(最多一次)_:每個 record(記錄)將被處理一次或根本不處理。 2. _At least once(至少一次)_:每個 record(記錄)將被處理一次或多次。這比_at-most once_,因為它確保沒有數據將丟失。但可能有重復。 3. _Exactly once(有且僅一次)_:每個 record(記錄)將被精確處理一次 - 沒有數據丟失,數據不會被多次處理。這顯然是三者的最強保證。 ## Basic Semantics(基本語義) 在任何 stream processing system(流處理系統)中,廣義上說,處理數據有三個步驟。 1. _Receiving the data(接收數據)_:使用 Receivers 或其他方式從數據源接收數據。 2. _Transforming the data(轉換數據)_:使用 DStream 和 RDD transformations 來 transformed(轉換)接收到的數據。 3. _Pushing out the data(推出數據)_:最終的轉換數據被推出到 external systems(外部系統),如 file systems(文件系統),databases(數據庫),dashboards(儀表板)等。 如果 streaming application 必須實現 end-to-end exactly-once guarantees(端到端的一次且僅一次性保證),那么每個步驟都必須提供 exactly-once guarantee。也就是說,每個記錄必須被精確地接收一次,轉換完成一次,并被推送到下游系統一次。讓我們在 Spark Streaming 的上下文中了解這些步驟的語義。 1. _Receiving the data(接收數據)_:不同的 input sources 提供不同的保證。這將在下一小節中詳細討論。 2. _Transforming the data(轉換數據)_:所有已收到的數據都將被處理 _exactly once_,這得益于 RDD 提供的保證。即使存在故障,只要接收到的輸入數據可訪問,最終變換的 RDD 將始終具有相同的內容。 3. _Pushing out the data(推出數據)_:默認情況下的輸出操作確保 _at-least once_ 語義,因為它取決于輸出操作的類型(idempotent(冪等))或 downstream system(下游系統)的語義(是否支持 transactions(事務))。但用戶可以實現自己的事務機制來實現 _exactly-once_ 語義。這將在本節后面的更多細節中討論。 ## Semantics of Received Data(接收數據的語義) 不同的 input sources(輸入源)提供不同的保證,范圍從 _at-least once_ 到 _exactly once_。 ### With Files 如果所有的 input data(輸入數據)都已經存在于 fault-tolerant file system(容錯文件系統)中 HDFS,Spark Streaming 可以隨時從任何故障中恢復并處理所有數據。這給了 _exactly-once_ 語義,意味著無論什么故障,所有的數據將被精確處理一次。 ### With Receiver-based Sources(使用基于接收器的數據源) 對于基于 receivers(接收器)的 input sources(輸入源),容錯語義取決于故障場景和接收器的類型。正如我們 [earlier](#receiver-reliability) 討論的,有兩種類型的 receivers(接收器): 1. _Reliable Receiver(可靠的接收器)_ - 這些 receivers(接收機)只有在確認收到的數據已被復制之后確認 reliable sources(可靠的源)。如果這樣的接收器出現故障,source 將不會被接收對于 buffered (unreplicated) data(緩沖(未復制)數據)的確認。因此,如果 receiver 是重新啟動,source 將重新發送數據,并且不會由于故障而丟失數據。 2. _Unreliable Receiver(不可靠的接收器)_ - 這樣的接收器 _不會_ 發送確認,因此_可能_ 丟失數據,由于 worker 或 driver 故障。 根據使用的 receivers 類型,我們實現以下語義。如果 worker node 出現故障,則 reliable receivers 沒有數據丟失。unreliable receivers,收到但未復制的數據可能會丟失。如果 driver node 失敗,那么除了這些損失之外,在內存中接收和復制的所有過去的數據將丟失。這將影響 stateful transformations(有狀態轉換)的結果。 為避免過去收到的數據丟失,Spark 1.2 引入了 _write ahead logs_ 將接收到的數據保存到 fault-tolerant storage(容錯存儲)。用[write ahead logs enabled](#deploying-applications) 和 reliable receivers,數據沒有丟失。在語義方面,它提供 at-least once guarantee(至少一次保證)。 下表總結了失敗的語義: | Deployment Scenario(部署場景)| Worker Failure(Worker 故障)| Driver Failure(Driver 故障)| | --- | --- | --- | | _Spark 1.1 或更早版本,_ 或者 <br> _Spark 1.2 或者沒有 write ahead logs 的更高的版本_ | Buffered data lost with unreliable receivers(unreliable receivers 的緩沖數據丟失)<br> Zero data loss with reliable receivers(reliable receivers 的零數據丟失)<br> At-least once semantics(至少一次性語義)| Buffered data lost with unreliable receivers(unreliable receivers 的緩沖數據丟失)<br> Past data lost with all receivers(所有的 receivers 的過去的數據丟失)<br> Undefined semantics(未定義語義)| | _Spark 1.2 或者帶有 write ahead logs 的更高版本_ | Zero data loss with reliable receivers(reliable receivers 的零數據丟失)<br> At-least once semantics(至少一次性語義)| Zero data loss with reliable receivers and files(reliable receivers 和 files 的零數據丟失)<br> At-least once semantics(至少一次性語義)| | | | | ### With Kafka Direct API(使用 Kafka Direct API) 在 Spark 1.3 中,我們引入了一個新的 Kafka Direct API,可以確保所有的 Kafka 數據都被 Spark Streaming exactly once(一次)接收。與此同時,如果您實現 exactly-once output operation(一次性輸出操作),您可以實現 end-to-end exactly-once guarantees(端到端的一次且僅一次性保證)。在 [Kafka Integration Guide](streaming-kafka-integration.html) 中進一步討論了這種方法。 ## Semantics of output operations(輸出操作的語義) Output operations(輸出操作)(如 `foreachRDD`)具有 _at-least once_ 語義,也就是說,transformed data(變換后的數據)可能會不止一次寫入 external entity(外部實體)在一個 worker 故障事件中。雖然這是可以接受的使用 `saveAs***Files`操作(因為文件將被相同的數據簡單地覆蓋)保存到文件系統,可能需要額外的努力來實現 exactly-once(一次且僅一次)語義。有兩種方法。 * _Idempotent updates(冪等更新)_:多次嘗試總是寫入相同的數據。例如,`saveAs***Files` 總是將相同的數據寫入生成的文件。 * _Transactional updates(事務更新)_:所有更新都是事務性的,以便更新完全按原子進行。這樣做的一個方法如下。 * 使用批處理時間(在 `foreachRDD` 中可用)和 RDD 的 partition index(分區索引)來創建 identifier(標識符)。該標識符唯一地標識 streaming application 中的 blob 數據。 * 使用該 identifier(標識符)blob transactionally(blob 事務地)更新 external system(外部系統)(即,exactly once,atomically(一次且僅一次,原子性地))。也就是說,如果 identifier(標識符)尚未提交,則以 atomically(原子方式)提交 partition data(分區數據)和 identifier(標識符)。否則,如果已經提交,請跳過更新。 ``` dstream.foreachRDD { (rdd, time) =&gt; rdd.foreachPartition { partitionIterator =&gt; val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } } ``` * * * * * * # 快速鏈接 * 附加指南 * [Kafka 集成指南](streaming-kafka-integration.html) * [Kinesis 集成指南](streaming-kinesis-integration.html) * [自定義 Receiver(接收器)指南](streaming-custom-receivers.html) * 第三方 DStream 數據源可以在 [第三方項目](http://spark.apache.org/third-party-projects.html) 上查看。 * API 文檔 * Scala 文檔 * [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) 和 [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream) * [KafkaUtils](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$), [FlumeUtils](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$), [KinesisUtils](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$), * Java 文檔 * [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html), [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html) 和 [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html) * [KafkaUtils](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html), [FlumeUtils](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html), [KinesisUtils](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html) * Python 文檔 * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) 和 [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream) * [KafkaUtils](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils) * 更多的示例在 [Scala](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming) 和 [Java](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming) 和 [Python](https://github.com/apache/spark/tree/master/examples/src/main/python/streaming) * 描述 Spark Streaming 的 [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) 和 [video](http://youtu.be/g171ndOHgJ0).
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看