# 輸入DStreams和receivers
輸入DStreams表示從數據源獲取輸入數據流的DStreams。在[快速例子](#)中,`lines`表示輸入DStream,它代表從netcat服務器獲取的數據流。每一個輸入流DStream和一個`Receiver`對象相關聯,這個`Receiver`從源中獲取數據,并將數據存入內存中用于處理。
輸入DStreams表示從數據源獲取的原始數據流。Spark Streaming擁有兩類數據源
- 基本源(Basic sources):這些源在StreamingContext API中直接可用。例如文件系統、套接字連接、Akka的actor等。
- 高級源(Advanced sources):這些源包括Kafka,Flume,Kinesis,Twitter等等。它們需要通過額外的類來使用。我們在[關聯](#)那一節討論了類依賴。
需要注意的是,如果你想在一個流應用中并行地創建多個輸入DStream來接收多個數據流,你能夠創建多個輸入流(這將在[性能調優](#)那一節介紹)。它將創建多個Receiver同時接收多個數據流。但是,`receiver`作為一個長期運行的任務運行在Spark worker或executor中。因此,它占有一個核,這個核是分配給Spark Streaming應用程序的所有核中的一個(it occupies one of the cores allocated to the Spark Streaming application)。所以,為Spark Streaming應用程序分配足夠的核(如果是本地運行,那么是線程)用以處理接收的數據并且運行`receiver`是非常重要的。
幾點需要注意的地方:
- 如果分配給應用程序的核的數量少于或者等于輸入DStreams或者receivers的數量,系統只能夠接收數據而不能處理它們。
- 當運行在本地,如果你的master URL被設置成了“local”,這樣就只有一個核運行任務。這對程序來說是不足的,因為作為`receiver`的輸入DStream將會占用這個核,這樣就沒有剩余的核來處理數據了。
### 基本源
我們已經在[快速例子](#)中看到,`ssc.socketTextStream(...)`方法用來把從TCP套接字獲取的文本數據創建成DStream。除了套接字,StreamingContext API也支持把文件以及Akka actors作為輸入源創建DStream。
- 文件流(File Streams):從任何與HDFS API兼容的文件系統中讀取數據,一個DStream可以通過如下方式創建
~~~
streamingContext.fileStream[keyClass, valueClass, inputFormatClass](dataDirectory)
~~~
Spark Streaming將會監控`dataDirectory`目錄,并且處理目錄下生成的任何文件(嵌套目錄不被支持)。需要注意一下三點:
~~~
1 所有文件必須具有相同的數據格式
2 所有文件必須在`dataDirectory`目錄下創建,文件是自動的移動和重命名到數據目錄下
3 一旦移動,文件必須被修改。所以如果文件被持續的附加數據,新的數據不會被讀取。
~~~
對于簡單的文本文件,有一個更簡單的方法`streamingContext.textFileStream(dataDirectory)`可以被調用。文件流不需要運行一個receiver,所以不需要分配核。
在Spark1.2中,`fileStream`在Python API中不可用,只有`textFileStream`可用。
- 基于自定義actor的流:DStream可以調用`streamingContext.actorStream(actorProps, actor-name)`方法從Akka actors獲取的數據流來創建。具體的信息見[自定義receiver指南](https://spark.apache.org/docs/latest/streaming-custom-receivers.html#implementing-and-using-a-custom-actor-based-receiver)`actorStream`在Python API中不可用。
- RDD隊列作為數據流:為了用測試數據測試Spark Streaming應用程序,人們也可以調用`streamingContext.queueStream(queueOfRDDs)`方法基于RDD隊列創建DStreams。每個push到隊列的RDD都被當做DStream的批數據,像流一樣處理。
關于從套接字、文件和actor中獲取流的更多細節,請看[StreamingContext](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext)和[JavaStreamingContext](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html)
### 高級源
這類源需要非Spark庫接口,并且它們中的部分還需要復雜的依賴(例如kafka和flume)。為了減少依賴的版本沖突問題,從這些源創建DStream的功能已經被移到了獨立的庫中,你能在[關聯](#)查看細節。例如,如果你想用來自推特的流數據創建DStream,你需要按照如下步驟操作:
- 關聯:添加`spark-streaming-twitter_2.10`到SBT或maven項目的依賴中
- 編寫:導入`TwitterUtils`類,用`TwitterUtils.createStream`方法創建DStream,如下所示
~~~
import org.apache.spark.streaming.twitter._
TwitterUtils.createStream(ssc)
~~~
- 部署:將編寫的程序以及其所有的依賴(包括spark-streaming-twitter_2.10的依賴以及它的傳遞依賴)打為jar包,然后部署。這在[部署章節](#)將會作更進一步的介紹。
需要注意的是,這些高級的源在`spark-shell`中不能被使用,因此基于這些源的應用程序無法在shell中測試。
下面將介紹部分的高級源:
- Twitter:Spark Streaming利用`Twitter4j 3.0.3`獲取公共的推文流,這些推文通過[推特流API](https://dev.twitter.com/docs/streaming-apis)獲得。認證信息可以通過Twitter4J庫支持的任何[方法](http://twitter4j.org/en/configuration.html)提供。你既能夠得到公共流,也能夠得到基于關鍵字過濾后的流。你可以查看API文檔([scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$)和[java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html))和例子([TwitterPopularTags](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala)和[TwitterAlgebirdCMS](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala))
- Flume:Spark Streaming 1.2能夠從flume 1.4.0中獲取數據,可以查看[flume集成指南](#)了解詳細信息
- Kafka:Spark Streaming 1.2能夠從kafka 0.8.0中獲取數據,可以查看[kafka集成指南](#)了解詳細信息
- Kinesis:查看[Kinesis集成指南](#)了解詳細信息
### 自定義源
在Spark 1.2中,這些源不被Python API支持。輸入DStream也可以通過自定義源創建,你需要做的是實現用戶自定義的`receiver`,這個`receiver`可以從自定義源接收數據以及將數據推到Spark中。通過[自定義receiver指南](#)了解詳細信息
### Receiver可靠性
基于可靠性有兩類數據源。源(如kafka、flume)允許。如果從這些可靠的源獲取數據的系統能夠正確的應答所接收的數據,它就能夠確保在任何情況下不丟失數據。這樣,就有兩種類型的receiver:
- Reliable Receiver:一個可靠的receiver正確的應答一個可靠的源,數據已經收到并且被正確地復制到了Spark中。
- Unreliable Receiver :這些receivers不支持應答。即使對于一個可靠的源,開發者可能實現一個非可靠的receiver,這個receiver不會正確應答。
怎樣編寫可靠的Receiver的細節在[自定義receiver](#)中有詳細介紹。
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- 編程指南
- 引入 Spark
- 初始化 Spark
- Spark RDDs
- 并行集合
- 外部數據集
- RDD 操作
- RDD持久化
- 共享變量
- 從這里開始
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 開始
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 性能調優
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- Spark SQL數據類型
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 頂點和邊RDDs
- 圖算法
- 例子
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置