# DStreams上的輸出操作
輸出操作允許DStream的操作推到如數據庫、文件系統等外部系統中。因為輸出操作實際上是允許外部系統消費轉換后的數據,它們觸發的實際操作是DStream轉換。目前,定義了下面幾種輸出操作:
| Output Operation | Meaning |
|-----|-----|
| print() | 在DStream的每個批數據中打印前10條元素,這個操作在開發和調試中都非常有用。在Python API中調用`pprint()`。 |
| saveAsObjectFiles(prefix, [suffix]) | 保存DStream的內容為一個序列化的文件`SequenceFile`。每一個批間隔的文件的文件名基于`prefix`和`suffix`生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。 |
| saveAsTextFiles(prefix, [suffix]) | 保存DStream的內容為一個文本文件。每一個批間隔的文件的文件名基于`prefix`和`suffix`生成。"prefix-TIME_IN_MS[.suffix]" |
| saveAsHadoopFiles(prefix, [suffix]) | 保存DStream的內容為一個hadoop文件。每一個批間隔的文件的文件名基于`prefix`和`suffix`生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。 |
| foreachRDD(func) | 在從流中生成的每個RDD上應用函數`func`的最通用的輸出操作。這個函數應該推送每個RDD的數據到外部系統,例如保存RDD到文件或者通過網絡寫到數據庫中。需要注意的是,`func`函數在驅動程序中執行,并且通常都有RDD action在里面推動RDD流的計算。 |
### 利用foreachRDD的設計模式
dstream.foreachRDD是一個強大的原語,發送數據到外部系統中。然而,明白怎樣正確地、有效地用這個原語是非常重要的。下面幾點介紹了如何避免一般錯誤。
- 經常寫數據到外部系統需要建一個連接對象(例如到遠程服務器的TCP連接),用它發送數據到遠程系統。為了達到這個目的,開發人員可能不經意的在Spark驅動中創建一個連接對象,但是在Spark worker中嘗試調用這個連接對象保存記錄到RDD中,如下:
~~~
dstream.foreachRDD(rdd => {
val connection = createNewConnection() // executed at the driver
rdd.foreach(record => {
connection.send(record) // executed at the worker
})
})
~~~
這是不正確的,因為這需要先序列化連接對象,然后將它從driver發送到worker中。這樣的連接對象在機器之間不能傳送。它可能表現為序列化錯誤(連接對象不可序列化)或者初始化錯誤(連接對象應該在worker中初始化)等等。正確的解決辦法是在worker中創建連接對象。
- 然而,這會造成另外一個常見的錯誤-為每一個記錄創建了一個連接對象。例如:
~~~
dstream.foreachRDD(rdd => {
rdd.foreach(record => {
val connection = createNewConnection()
connection.send(record)
connection.close()
})
})
~~~
通常,創建一個連接對象有資源和時間的開支。因此,為每個記錄創建和銷毀連接對象會導致非常高的開支,明顯的減少系統的整體吞吐量。一個更好的解決辦法是利用`rdd.foreachPartition`方法。為RDD的partition創建一個連接對象,用這個兩件對象發送partition中的所有記錄。
~~~
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
})
})
~~~
這就將連接對象的創建開銷分攤到了partition的所有記錄上了。
- 最后,可以通過在多個RDD或者批數據間重用連接對象做更進一步的優化。開發者可以保有一個靜態的連接對象池,重復使用池中的對象將多批次的RDD推送到外部系統,以進一步節省開支。
~~~
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
})
})
~~~
需要注意的是,池中的連接對象應該根據需要延遲創建,并且在空閑一段時間后自動超時。這樣就獲取了最有效的方式發生數據到外部系統。
其它需要注意的地方:
- 輸出操作通過懶執行的方式操作DStreams,正如RDD action通過懶執行的方式操作RDD。具體地看,RDD actions和DStreams輸出操作接收數據的處理。因此,如果你的應用程序沒有任何輸出操作或者用于輸出操作`dstream.foreachRDD()`,但是沒有任何RDD action操作在`dstream.foreachRDD()`里面,那么什么也不會執行。系統僅僅會接收輸入,然后丟棄它們。
- 默認情況下,DStreams輸出操作是分時執行的,它們按照應用程序的定義順序按序執行。
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- 編程指南
- 引入 Spark
- 初始化 Spark
- Spark RDDs
- 并行集合
- 外部數據集
- RDD 操作
- RDD持久化
- 共享變量
- 從這里開始
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 開始
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 性能調優
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- Spark SQL數據類型
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 頂點和邊RDDs
- 圖算法
- 例子
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置