<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                使用Spark Streaming處理帶狀態的數據。 * 需求:計算到目前為止累計詞頻的個數 * 分析:DStream轉換操作包括無狀態轉換和有狀態轉換 * 無狀態轉換:每個批次的處理不依賴于之前批次的數據 * 有狀態轉換:當前批次的處理需要使用之前批次的數據 * updateStateByKey屬于有狀態轉換,可以跟蹤狀態的變化 * 實現要點 * 定義狀態:狀態數據可以是任意類型 * 定義狀態更新函數:參數為數據流之前的狀態和新的數據流數據 (1)編寫Streaming程序 ```scala import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object StatefulWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 創建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") // Seconds(5)是批處理間隔,即將5秒內新收集的數據作為一個單位進行處理 val ssc = new StreamingContext(conf, Seconds(5)) /** ********* 2. 加載數據 ************/ // 設置checkpoint目錄, 用來保存狀態 ssc.checkpoint("file:///E:\\hadoop\\output") // socketTextStream(hostname, port, StorageLevel) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999, StorageLevel.MEMORY_AND_DISK_SER_2) /** ************ 3. 進行統計 **************/ // 使用了 updateStateByKey 狀態類的算子, 可以統計截止到當前位置的累加值, 需要傳入一個更新狀態的函數 val result: DStream[(String, Int)] = lines.flatMap(_.split("\\s+")) .map((_, 1)) //.updateStateByKey((x,y)=>Some(x.sum+y.getOrElse(0))) .updateStateByKey(updateFunction) result.print() // 打印輸出 /** ************ 4. 啟動Streaming程序 **************/ ssc.start() /** *********** 5. 等待應用程序終止 ****************/ ssc.awaitTermination() } /** * 定義一個更新狀態的函數 * * @param currentValues 當前批次的value值的序列 * @param preValues 前一批次的統計狀態值 * @return 更新狀態值 */ def updateFunction(currentValues: Seq[Int], preValues: Option[Int]) = { val curr: Int = currentValues.sum val pre: Int = preValues.getOrElse(0) Some(curr + pre) } } ``` (2)啟動`nc`并輸入一些單詞 ```shell [root@hadoop101 /]# nc -lk 9999 hello python hadoop hello python kafka ``` (3)運行上面的Streaming程序,控制臺輸出信息如下 ```txt ------------------------------------------- Time: 1612769255000 ms ------------------------------------------- (python,2) (hadoop,1) (hello,2) (kafka,1) ``` (4)再輸入一些單詞 ```shell [root@hadoop101 /]# nc -lk 9999 hello python hadoop hello python kafka python kafka ``` 可以看到python和kafka的單詞數變多了 ```txt ------------------------------------------- Time: 1612769285000 ms ------------------------------------------- (python,3) (hadoop,1) (hello,2) (kafka,2) ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看