# Checkpointing
一個流應用程序必須全天候運行,所有必須能夠解決應用程序邏輯無關的故障(如系統錯誤,JVM崩潰等)。為了使這成為可能,Spark Streaming需要checkpoint足夠的信息到容錯存儲系統中,以使系統從故障中恢復。
- Metadata checkpointing:保存流計算的定義信息到容錯存儲系統如HDFS中。這用來恢復應用程序中運行worker的節點的故障。元數據包括
- Configuration :創建Spark Streaming應用程序的配置信息
- DStream operations :定義Streaming應用程序的操作集合
- Incomplete batches:操作存在隊列中的未完成的批
- Data checkpointing :保存生成的RDD到可靠的存儲系統中,這在有狀態transformation(如結合跨多個批次的數據)中是必須的。在這樣一個transformation中,生成的RDD依賴于之前批的RDD,隨著時間的推移,這個依賴鏈的長度會持續增長。在恢復的過程中,為了避免這種無限增長。有狀態的transformation的中間RDD將會定時地存儲到可靠存儲系統中,以截斷這個依賴鏈。
元數據checkpoint主要是為了從driver故障中恢復數據。如果transformation操作被用到了,數據checkpoint即使在簡單的操作中都是必須的。
### 何時checkpoint
應用程序在下面兩種情況下必須開啟checkpoint
- 使用有狀態的transformation。如果在應用程序中用到了`updateStateByKey`或者`reduceByKeyAndWindow`,checkpoint目錄必需提供用以定期checkpoint RDD。
- 從運行應用程序的driver的故障中恢復過來。使用元數據checkpoint恢復處理信息。
注意,沒有前述的有狀態的transformation的簡單流應用程序在運行時可以不開啟checkpoint。在這種情況下,從driver故障的恢復將是部分恢復(接收到了但是還沒有處理的數據將會丟失)。這通常是可以接受的,許多運行的Spark Streaming應用程序都是這種方式。
### 怎樣配置Checkpointing
在容錯、可靠的文件系統(HDFS、s3等)中設置一個目錄用于保存checkpoint信息。著可以通過`streamingContext.checkpoint(checkpointDirectory)`方法來做。這運行你用之前介紹的有狀態transformation。另外,如果你想從driver故障中恢復,你應該以下面的方式重寫你的Streaming應用程序。
- 當應用程序是第一次啟動,新建一個StreamingContext,啟動所有Stream,然后調用`start()`方法
- 當應用程序因為故障重新啟動,它將會從checkpoint目錄checkpoint數據重新創建StreamingContext
~~~
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
~~~
如果`checkpointDirectory`存在,上下文將會利用checkpoint數據重新創建。如果這個目錄不存在,將會調用`functionToCreateContext`函數創建一個新的上下文,建立DStreams。請看[RecoverableNetworkWordCount](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala)例子。
除了使用`getOrCreate`,開發者必須保證在故障發生時,driver處理自動重啟。只能通過部署運行應用程序的基礎設施來達到該目的。在部署章節將有更進一步的討論。
注意,RDD的checkpointing有存儲成本。這會導致批數據(包含的RDD被checkpoint)的處理時間增加。因此,需要小心的設置批處理的時間間隔。在最小的批容量(包含1秒的數據)情況下,checkpoint每批數據會顯著的減少操作的吞吐量。相反,checkpointing太少會導致譜系以及任務大小增大,這會產生有害的影響。因為有狀態的transformation需要RDD checkpoint。默認的間隔時間是批間隔時間的倍數,最少10秒。它可以通過`dstream.checkpoint`來設置。典型的情況下,設置checkpoint間隔是DStream的滑動間隔的5-10大小是一個好的嘗試。
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- 編程指南
- 引入 Spark
- 初始化 Spark
- Spark RDDs
- 并行集合
- 外部數據集
- RDD 操作
- RDD持久化
- 共享變量
- 從這里開始
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 開始
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 性能調優
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- Spark SQL數據類型
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 頂點和邊RDDs
- 圖算法
- 例子
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置