## updateStateByKey
除了能夠支持 RDD 的算子外,DStream 還有部分獨有的*transformation*算子,這當中比較常用的是 `updateStateByKey`。文章開頭的詞頻統計程序,只能統計每一次輸入文本中單詞出現的數量,想要統計所有歷史輸入中單詞出現的數量,可以使用 `updateStateByKey` 算子。代碼如下:
~~~scala
object NetworkWordCountV2 {
def main(args: Array[String]) {
/*
* 本地測試時最好指定 hadoop 用戶名,否則會默認使用本地電腦的用戶名,
* 此時在 HDFS 上創建目錄時可能會拋出權限不足的異常
*/
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/*必須要設置檢查點*/
ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming")
val lines = ssc.socketTextStream("hadoop001", 9999)
lines.flatMap(_.split(" ")).map(x => (x, 1))
.updateStateByKey[Int](updateFunction _) //updateStateByKey 算子
.print()
ssc.start()
ssc.awaitTermination()
}
/**
* 累計求和
*
* @param currentValues 當前的數據
* @param preValues 之前的數據
* @return 相加后的數據
*/
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}
~~~
使用 `updateStateByKey` 算子,你必須使用 `ssc.checkpoint()` 設置檢查點,這樣當使用 `updateStateByKey` 算子時,它會去檢查點中取出上一次保存的信息,并使用自定義的 `updateFunction` 函數將上一次的數據和本次數據進行相加,然后返回。
## 處理文件系統的數據
~~~
object FileWordCount{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[3]").setAppName("FileWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5));
var lines = ssc.textFileStream("/Users/bizzbee/Desktop/work/projects/sparktrain/ss")
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_);
result.print()
ssc.start()
ssc.awaitTermination()
}
}
~~~
* 這里是**監控ss目錄下新增的文件**。要從別的地方移過來,不能直接在里面寫。
* 結果打印:

* 文件必須相同的格式。
## foreachRDD 將每次的RDD內容放進數據庫
~~~
object ForeachRDDApp{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
connection.createStatement().execute(sql)
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
/**
* 獲取MySQL的連接
*/
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://localhost:3306/stark", "root", "934158")
}
}
~~~

* 存在問題:

## 窗口的DStream

* *窗口長度* - The duration of the window (3 in the figure).
* *窗口間隔* - The interval at which the window operation is performed (2 in the figure).
~~~scala
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
~~~
##黑名單處理


~~~
object TranformApp{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
/**
* 創建StreamingContext需要兩個參數:SparkConf和batch interval
*/
val ssc = new StreamingContext(sparkConf, Seconds(5))
/**
* 構建黑名單
*/
val blacks = List("wade", "james")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
val lines = ssc.socketTextStream("localhost", 6789)
val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(x=> x._2._2.getOrElse(false) != true)
.map(x=>x._2._1)
})
clicklog.print()
ssc.start()
ssc.awaitTermination()
}
}
~~~
