<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ## updateStateByKey 除了能夠支持 RDD 的算子外,DStream 還有部分獨有的*transformation*算子,這當中比較常用的是 `updateStateByKey`。文章開頭的詞頻統計程序,只能統計每一次輸入文本中單詞出現的數量,想要統計所有歷史輸入中單詞出現的數量,可以使用 `updateStateByKey` 算子。代碼如下: ~~~scala object NetworkWordCountV2 { def main(args: Array[String]) { /* * 本地測試時最好指定 hadoop 用戶名,否則會默認使用本地電腦的用戶名, * 此時在 HDFS 上創建目錄時可能會拋出權限不足的異常 */ System.setProperty("HADOOP_USER_NAME", "root") val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) /*必須要設置檢查點*/ ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming") val lines = ssc.socketTextStream("hadoop001", 9999) lines.flatMap(_.split(" ")).map(x => (x, 1)) .updateStateByKey[Int](updateFunction _) //updateStateByKey 算子 .print() ssc.start() ssc.awaitTermination() } /** * 累計求和 * * @param currentValues 當前的數據 * @param preValues 之前的數據 * @return 相加后的數據 */ def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = { val current = currentValues.sum val pre = preValues.getOrElse(0) Some(current + pre) } } ~~~ 使用 `updateStateByKey` 算子,你必須使用 `ssc.checkpoint()` 設置檢查點,這樣當使用 `updateStateByKey` 算子時,它會去檢查點中取出上一次保存的信息,并使用自定義的 `updateFunction` 函數將上一次的數據和本次數據進行相加,然后返回。 ## 處理文件系統的數據 ~~~ object FileWordCount{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[3]").setAppName("FileWordCount") val ssc = new StreamingContext(sparkConf,Seconds(5)); var lines = ssc.textFileStream("/Users/bizzbee/Desktop/work/projects/sparktrain/ss") val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_); result.print() ssc.start() ssc.awaitTermination() } } ~~~ * 這里是**監控ss目錄下新增的文件**。要從別的地方移過來,不能直接在里面寫。 * 結果打印: ![](https://img.kancloud.cn/f9/74/f974618e512ef6d787d1d758dc2d0bf2_171x240.png) * 文件必須相同的格式。 ## foreachRDD 將每次的RDD內容放進數據庫 ~~~ object ForeachRDDApp{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 6789) val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) result.print() result.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createConnection() partitionOfRecords.foreach(record => { val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")" connection.createStatement().execute(sql) }) connection.close() }) }) ssc.start() ssc.awaitTermination() } /** * 獲取MySQL的連接 */ def createConnection() = { Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/stark", "root", "934158") } } ~~~ ![](https://img.kancloud.cn/b2/9c/b29caee14401623e7dbbdfa1d7557b2c_227x204.png) * 存在問題: ![](https://img.kancloud.cn/1a/34/1a346093196377b556bfe4c74ff7b8a9_1015x222.png) ## 窗口的DStream ![](https://img.kancloud.cn/0c/7a/0c7a2234057597fb0944c5067d8093c3_994x388.png) * *窗口長度* - The duration of the window (3 in the figure). * *窗口間隔* - The interval at which the window operation is performed (2 in the figure). ~~~scala val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) ~~~ ##黑名單處理 ![](https://img.kancloud.cn/f9/fa/f9fa5af79846265e411885c3c9ae5c55_803x275.png) ![](https://img.kancloud.cn/55/50/5550ed4c49c21dde18fc717494baf9d1_629x205.png) ~~~ object TranformApp{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") /** * 創建StreamingContext需要兩個參數:SparkConf和batch interval */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 構建黑名單 */ val blacks = List("wade", "james") val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true)) val lines = ssc.socketTextStream("localhost", 6789) val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => { rdd.leftOuterJoin(blacksRDD) .filter(x=> x._2._2.getOrElse(false) != true) .map(x=>x._2._1) }) clicklog.print() ssc.start() ssc.awaitTermination() } } ~~~ ![](https://img.kancloud.cn/d3/f8/d3f8565497301395f9abf9722cab31fc_154x165.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看