# 減少批數據的執行時間
在Spark中有幾個優化可以減少批處理的時間。這些可以在[優化指南](#)中作了討論。這節重點討論幾個重要的。
### 數據接收的并行水平
通過網絡(如kafka,flume,socket等)接收數據需要這些數據反序列化并被保存到Spark中。如果數據接收成為系統的瓶頸,就要考慮并行地接收數據。注意,每個輸入DStream創建一個`receiver`(運行在worker機器上)接收單個數據流。創建多個輸入DStream并配置它們可以從源中接收不同分區的數據流,從而實現多數據流接收。例如,接收兩個topic數據的單個輸入DStream可以被切分為兩個kafka輸入流,每個接收一個topic。這將在兩個worker上運行兩個`receiver`,因此允許數據并行接收,提高整體的吞吐量。多個DStream可以被合并生成單個DStream,這樣運用在單個輸入DStream的transformation操作可以運用在合并的DStream上。
~~~
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
~~~
另外一個需要考慮的參數是`receiver`的阻塞時間。對于大部分的`receiver`,在存入Spark內存之前,接收的數據都被合并成了一個大數據塊。每批數據中塊的個數決定了任務的個數。這些任務是用類似map的transformation操作接收的數據。阻塞間隔由配置參數`spark.streaming.blockInterval`決定,默認的值是200毫秒。
多輸入流或者多`receiver`的可選的方法是明確地重新分配輸入數據流(利用`inputStream.repartition(<number of partitions>)`),在進一步操作之前,通過集群的機器數分配接收的批數據。
### 數據處理的并行水平
如果運行在計算stage上的并發任務數不足夠大,就不會充分利用集群的資源。例如,對于分布式reduce操作如`reduceByKey`和`reduceByKeyAndWindow`,默認的并發任務數通過配置屬性來確定(configuration.html#spark-properties)`spark.default.parallelism`。你可以通過參數(`PairDStreamFunctions` (api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions))傳遞并行度,或者設置參數`spark.default.parallelism`修改默認值。
### 數據序列化
數據序列化的總開銷是平常大的,特別是當sub-second級的批數據被接收時。下面有兩個相關點:
- Spark中RDD數據的序列化。關于數據序列化請參照[Spark優化指南](#)。注意,與Spark不同的是,默認的RDD會被持久化為序列化的字節數組,以減少與垃圾回收相關的暫停。
- 輸入數據的序列化。從外部獲取數據存到Spark中,獲取的byte數據需要從byte反序列化,然后再按照Spark的序列化格式重新序列化到Spark中。因此,輸入數據的反序列化花費可能是一個瓶頸。
### 任務的啟動開支
每秒鐘啟動的任務數是非常大的(50或者更多)。發送任務到slave的花費明顯,這使請求很難獲得亞秒(sub-second)級別的反應。通過下面的改變可以減小開支
- 任務序列化。運行kyro序列化任何可以減小任務的大小,從而減小任務發送到slave的時間。
- 執行模式。在Standalone模式下或者粗粒度的Mesos模式下運行Spark可以在比細粒度Mesos模式下運行Spark獲得更短的任務啟動時間。可以在[在Mesos下運行Spark](#)中獲取更多信息。
These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- 編程指南
- 引入 Spark
- 初始化 Spark
- Spark RDDs
- 并行集合
- 外部數據集
- RDD 操作
- RDD持久化
- 共享變量
- 從這里開始
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 開始
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 性能調優
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- Spark SQL數據類型
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 頂點和邊RDDs
- 圖算法
- 例子
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置