<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                與 RDDs 類似,轉換允許修改輸入 DStream 中的數據。DStreams 支持許多在普通 Spark RDD 上可用的轉換。一些常見的轉換操作定義如下。 <br/> **map(func):** 通過函數 func 傳遞源 DStream 的每個元素來返回一個新的DStream。 **flatMap(func):** 與 map 類似,但是每個輸入項可以映射到 0 或多個輸出項。 **filter(func):** 通過只選擇 func 返回 true 的源 DStream 的記錄來返回一個新的 DStream。 **repartition(numPartitions):** 通過創建更多或更少的分區來改變 DStream 中的并行度。 **union(otherStream):** 返回一個新的 DStream,它包含源 DStream 和otherDStream 中元素的并集。 **count():** 通過計算源 DStream 的每個 RDD 中的元素數量,返回一個新的單元素 RDDs DStream。 **reduce(func):** 通過使用函數 func(接受兩個參數并返回一個參數)聚合源DStream 的每個 RDD 中的元素,返回一個新的單元素 RDDs DStream。這個函數應該是結合律和交換律,這樣才能并行計算。 **countByValue():** 當對類型為 K 的元素的 DStream 調用時,返回一個新的 DStream (K,Long)對,其中每個鍵的值是它在源 DStream 的每個 RDD中的頻率。 **reduceByKey(func, [numTasks]):** 當在(K, V)對的 DStream 上調用時,返回一個新的(K, V)對的 DStream,其中每個鍵的值使用給定的 reduce 函數進行聚合。注意:在默認情況下,這將使用 Spark 的默認并行任務數(本地模式為 2,而在集群模式下,該數量由配置屬性 spark.default.parallelism決定)來進行分組。可以傳遞一個可選的 numTasks 參數來設置不同數量的任務。 **join(otherStream, [numTasks]):** 當調用兩個 DStream (K, V)和(K, W)對時,返回一個新的 DStream (K,(V, W))對,每個鍵的所有對的元素。 **cogroup(otherStream, [numTasks]):** 當調用(K, V)和(K, W)對的 DStream 時,返回一個新的(K, Seq[V],Seq[W])元組 DStream。 **transform(func):** 通過對源 DStream 的每個 RDD 應用一個 RDD-to-RDD函數來返回一個新的 DStream。這可以用來在 DStream 上執行任意的RDD 操作。 **updateStateByKey(func):** 返回一個新的“狀態”DStream,其中通過對鍵的前一個狀態和鍵的新值應用給定的函數來更新每個鍵的狀態。這可以用來維護每個鍵的任意狀態數據。 <br/> 相比RDD轉換,DStream有兩個特殊操作:updateStateByKey操作和window操作。 <br/> # 1. updateStateByKey 操作 updateStateByKey 操作允許維護任意狀態,同時不斷地用新信息更新它。要使用它,必須執行兩個步驟。 (1)定義狀態——狀態可以是任意的數據類型。 (2)定義狀態更新函數——使用一個函數指定如何使用輸入流中的前一個狀態和新值來更新狀態。 <br/> 在每個批處理中,Spark 將對所有現有 keys 應用<mark>狀態更新</mark>功能,而不管它們在批處理中是否有新數據。如果更新函數返回 None,則鍵值對將被刪除。 <br/> 例如:需要維護在整個文本數據流中看到的每個單詞的運行計數。這里,運行計數是狀態,它是一個整數。將更新函數定義如下: ```scala // 將更新函數 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... //使用前一個運行的計數添加新值以獲得新計數 Some(newCount) } // 函數調用示例: val runningCounts = pairs.updateStateByKey[Int](updateFunction _) ``` <br/> 完整代碼: ```scala import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用 Spark Streaming 處理有狀態的數據 */ object StatefulWordCount { def main(args: Array[String]) { val sparkConf = new SparkConf sparkConf.setAppName("StatefulWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.checkpoint(".") val lines = ssc.socketTextStream("localhost", 6789) val result = lines.flatMap(_.split(" ")).map((_, 1)) val state = result.updateStateByKey(updateFunction) state.print() ssc.start() ssc.awaitTermination() } def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = { val curr = currentValues.sum val pre = preValues.getOrElse(0) Some(curr + pre) } } ``` <br/> # 2. window操作 Spark Streaming 還提供了窗口計算,它允許在數據的滑動窗口上應用轉換。下圖演示了這個滑動窗口。 :-: ![](https://img.kancloud.cn/8c/3c/8c3c7d032f06fb980f7980296a3caf26_1137x342.jpg) 滑動窗口 <br/> 如圖所示,每當窗口在源 DStream 上滑動時,位于窗口內的源 RDDs 就會被合并并操作,以生成窗口化的 DStream 的 RDDs。在本例中,操作應用于數據的最后 3 個時間單位,幻燈片應用于 2 個時間單位。這表明任何窗口操作都需要指定兩個參數。 * 窗口長度—窗口的持續時間(圖中為 3)。 * 滑動間隔—窗口操作執行的間隔(圖中為 2)。 這兩個參數必須是源 DStream 的批處理間隔的倍數(圖中為 1)。 <br/> 例如,希望通過每 10 秒在最后 30 秒的數據中生成單詞計數來擴展前面的示例。為此,必須在最后 30 秒的數據中對(word, 1)的 DStream 應用 reduceByKey操作,可以使用 reduceByKeyAndWindow 操作完成。 ```scala //每 10 秒對最后 30 秒的數據進行 reduceByKey val windowedWordCounts = pairs.reduceByKeyAndWindow( (a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) ``` <br/> 還有一種更通用的方式:`window(windowLength, slideInterval)`。下述代碼效果與上面相同。 ```scala val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost",6789) val words = lines.window(Seconds(30),Seconds(10)).flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看