# DStream中的轉換(transformation)
和RDD類似,transformation允許從輸入DStream來的數據被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示:
| Transformation | Meaning |
|-----|-----|
| map(func) | 利用函數`func`處理原DStream的每個元素,返回一個新的DStream |
| flatMap(func) | 與map相似,但是每個輸入項可用被映射為0個或者多個輸出項 |
| filter(func) | 返回一個新的DStream,它僅僅包含源DStream中滿足函數func的項 |
| repartition(numPartitions) | 通過創建更多或者更少的partition改變這個DStream的并行級別(level of parallelism) |
| union(otherStream) | 返回一個新的DStream,它包含源DStream和otherStream的聯合元素 |
| count() | 通過計算源DStream中每個RDD的元素數量,返回一個包含單元素(single-element)RDDs的新DStream |
| reduce(func) | 利用函數func聚集源DStream中每個RDD的元素,返回一個包含單元素(single-element)RDDs的新DStream。函數應該是相關聯的,以使計算可以并行化 |
| countByValue() | 這個算子應用于元素類型為K的DStream上,返回一個(K,long)對的新DStream,每個鍵的值是在原DStream的每個RDD中的頻率。 |
| reduceByKey(func, [numTasks]) | 當在一個由(K,V)對組成的DStream上調用這個算子,返回一個新的由(K,V)對組成的DStream,每一個key的值均由給定的reduce函數聚集起來。注意:在默認情況下,這個算子利用了Spark默認的并發任務數去分組。你可以用`numTasks`參數設置不同的任務數 |
| join(otherStream, [numTasks]) | 當應用于兩個DStream(一個包含(K,V)對,一個包含(K,W)對),返回一個包含(K, (V, W))對的新DStream |
| cogroup(otherStream, [numTasks]) | 當應用于兩個DStream(一個包含(K,V)對,一個包含(K,W)對),返回一個包含(K, Seq[V], Seq[W])的元組 |
| transform(func) | 通過對源DStream的每個RDD應用RDD-to-RDD函數,創建一個新的DStream。這個可以在DStream中的任何RDD操作中使用 |
| updateStateByKey(func) | 利用給定的函數更新DStream的狀態,返回一個新"state"的DStream。 |
最后兩個transformation算子需要重點介紹一下:
### UpdateStateByKey操作
updateStateByKey操作允許不斷用新信息更新它的同時保持任意狀態。你需要通過兩步來使用它
- 定義狀態-狀態可以是任何的數據類型
- 定義狀態更新函數-怎樣利用更新前的狀態和從輸入流里面獲取的新值更新狀態
讓我們舉個例子說明。在例子中,你想保持一個文本數據流中每個單詞的運行次數,運行次數用一個state表示,它的類型是整數
~~~
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
~~~
這個函數被用到了DStream包含的單詞上
~~~
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
~~~
更新函數將會被每個單詞調用,`newValues`擁有一系列的1(從 (詞, 1)對而來),runningCount擁有之前的次數。要看完整的代碼,見[例子](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala)
### Transform操作
`transform`操作(以及它的變化形式如`transformWith`)允許在DStream運行任何RDD-to-RDD函數。它能夠被用來應用任何沒在DStream API中提供的RDD操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。例如,連接數據流中的每個批(batch)和另外一個數據集的功能并沒有在DStream API中提供,然而你可以簡單的利用`transform`方法做到。如果你想通過連接帶有預先計算的垃圾郵件信息的輸入數據流來清理實時數據,然后過了它們,你可以按如下方法來做:
~~~
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
~~~
事實上,你也可以在`transform`方法中用[機器學習](https://spark.apache.org/docs/latest/mllib-guide.html)和[圖計算](https://spark.apache.org/docs/latest/graphx-programming-guide.html)算法
### 窗口(window)操作
Spark Streaming也支持窗口計算,它允許你在一個滑動窗口數據上應用transformation算子。下圖闡明了這個滑動窗口。

如上圖顯示,窗口在源DStream上滑動,合并和操作落入窗內的源RDDs,產生窗口化的DStream的RDDs。在這個具體的例子中,程序在三個時間單元的數據上進行窗口操作,并且每兩個時間單元滑動一次。這說明,任何一個窗口操作都需要指定兩個參數:
- 窗口長度:窗口的持續時間
- 滑動的時間間隔:窗口操作執行的時間間隔
這兩個參數必須是源DStream的批時間間隔的倍數。
下面舉例說明窗口操作。例如,你想擴展前面的[例子](#)用來計算過去30秒的詞頻,間隔時間是10秒。為了達到這個目的,我們必須在過去30秒的`pairs` DStream上應用`reduceByKey`操作。用方法`reduceByKeyAndWindow`實現。
~~~
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
~~~
一些常用的窗口操作如下所示,這些操作都需要用到上文提到的兩個參數:窗口長度和滑動的時間間隔
| Transformation | Meaning |
|-----|-----|
| window(windowLength, slideInterval) | 基于源DStream產生的窗口化的批數據計算一個新的DStream |
| countByWindow(windowLength, slideInterval) | 返回流中元素的一個滑動窗口數 |
| reduceByWindow(func, windowLength, slideInterval) | 返回一個單元素流。利用函數func聚集滑動時間間隔的流的元素創建這個單元素流。函數必須是相關聯的以使計算能夠正確的并行計算。 |
| reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每一個key的值均由給定的reduce函數聚集起來。注意:在默認情況下,這個算子利用了Spark默認的并發任務數去分組。你可以用`numTasks`參數設置不同的任務數 |
| reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter invFunc. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. |
| countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每個key的值都是它們在滑動窗口中出現的頻率。 |
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- 編程指南
- 引入 Spark
- 初始化 Spark
- Spark RDDs
- 并行集合
- 外部數據集
- RDD 操作
- RDD持久化
- 共享變量
- 從這里開始
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 開始
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 性能調優
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- Spark SQL數據類型
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 頂點和邊RDDs
- 圖算法
- 例子
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置