# 一個快速的例子
在我們進入如何編寫Spark Streaming程序的細節之前,讓我們快速地瀏覽一個簡單的例子。在這個例子中,程序從監聽TCP套接字的數據服務器獲取文本數據,然后計算文本中包含的單詞數。做法如下:
首先,我們導入Spark Streaming的相關類以及一些從StreamingContext獲得的隱式轉換到我們的環境中,為我們所需的其他類(如DStream)提供有用的方法。[StreamingContext](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext)是Spark所有流操作的主要入口。然后,我們創建了一個具有兩個執行線程以及1秒批間隔時間(即以秒為單位分割數據流)的本地StreamingContext。
~~~
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
~~~
利用這個上下文,我們能夠創建一個DStream,它表示從TCP源(主機位localhost,端口為9999)獲取的流式數據。
~~~
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
~~~
這個`lines`變量是一個DStream,表示即將從數據服務器獲得的流數據。這個DStream的每條記錄都代表一行文本。下一步,我們需要將DStream中的每行文本都切分為單詞。
~~~
// Split each line into words
val words = lines.flatMap(_.split(" "))
~~~
`flatMap`是一個一對多的DStream操作,它通過把源DStream的每條記錄都生成多條新記錄來創建一個新的DStream。在這個例子中,每行文本都被切分成了多個單詞,我們把切分的單詞流用`words`這個DStream表示。下一步,我們需要計算單詞的個數。
~~~
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
~~~
`words`這個DStream被mapper(一對一轉換操作)成了一個新的DStream,它由(word,1)對組成。然后,我們就可以用這個新的DStream計算每批數據的詞頻。最后,我們用`wordCounts.print()`打印每秒計算的詞頻。
需要注意的是,當以上這些代碼被執行時,Spark Streaming僅僅準備好了它要執行的計算,實際上并沒有真正開始執行。在這些轉換操作準備好之后,要真正執行計算,需要調用如下的方法
~~~
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
~~~
完整的例子可以在[NetworkWordCount](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala)中找到。
如果你已經下載和構建了Spark環境,你就能夠用如下的方法運行這個例子。首先,你需要運行Netcat作為數據服務器
~~~
$ nc -lk 9999
~~~
然后,在不同的終端,你能夠用如下方式運行例子
~~~
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
~~~
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- 編程指南
- 引入 Spark
- 初始化 Spark
- Spark RDDs
- 并行集合
- 外部數據集
- RDD 操作
- RDD持久化
- 共享變量
- 從這里開始
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 開始
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 性能調優
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- Spark SQL數據類型
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 頂點和邊RDDs
- 圖算法
- 例子
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置