<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                * 需求:使用Spark Streaming+Spark SQL完成WordCount * 分析:將每個RDD轉換為DataFrame (1)編寫Streaming程序 ```scala import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object NetWorkSQLWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 創建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ // Seconds(5)是批處理間隔,即將5秒內新收集的數據作為一個單位進行處理 val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) /** ********* 2. 加載數據 ************/ // socketTextStream(hostname, port, StorageLevel) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999, StorageLevel.MEMORY_AND_DISK_SER_2) /** ************ 3. 進行統計 **************/ val words: DStream[String] = lines.flatMap(_.split("\\s+")) words.foreachRDD(rdd => { if (rdd.count() != 0) { val df: DataFrame = rdd.map(x => Word(x)).toDF() df.createOrReplaceTempView("tb_word") spark.sql("select word,count(1) from tb_word group by word").show() } }) /** ************ 4. 啟動Streaming程序 **************/ ssc.start() /** *********** 5. 等待應用程序終止 ****************/ ssc.awaitTermination() } // 定義一個樣例類, 用于轉換為DF case class Word(word: String) } ``` (2)啟動`nc`并輸入一些單詞 ```shell [root@hadoop101 /]# nc -lk 9999 python python java javascript python ``` (3)啟動上面的Streaming程序,打印出如下信息 ```txt +----------+--------+ | word|count(1)| +----------+--------+ |javascript| 1| | java| 1| | python| 3| +----------+--------+ ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看