<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                因為 Kafka 項目在 0.8 和 0.10 版本之間引入了新的消費者 api,因此有兩個單獨的相應 Spark Streaming 包: ? spark-streaming-kafka-0-8 ? spark-streaming-kafka-0-10 注意選擇正確的包,spark-streaming-kafka-0-8 與 Kafka Brokers 0.9 和 0.10 +兼容,但spark-streaming-kafka-0-10與Kafka Brokers 0.10之前版本不兼容。總之,向后兼容。具體區別如下圖所示。 :-: ![](https://img.kancloud.cn/e4/e4/e4e495c80c535b2082c2bbfe10bd887d_957x358.png) spark-streaming-kafka-0-8 與 spark-streaming-kafka-0-10 區別 本次示例以0.10版本為例: ```xml <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.4</version> </dependency> ``` Spark Streaming 也提供了兩種方式對接 Kafka 數據源:Receiver 和 Direct。 [TOC] # 1. Receiver 方式(了解) Receiver 是最早的方式。Receiver 方式通過 Receiver 來獲取數據,是使用Kafka 的 High Level Consumer API 來實現的。Receiver 將 Kafka 數據源中獲取的數據存儲在 Spark Executor 的內存中,然后 Spark Streaming 啟動的 job 會去處理那些數據。但是,在默認的配置下,這種方式可能會因為底層的故障而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就<ins>必須啟用 Spark Streaming 的預寫日志機制(Write Ahead Log,WAL)</ins>。該機制會同步地將接收到的 Kafka 數據寫入分布式文件系統(比如 HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。<br/> Receiver 方 式 只 有 在 streaming-kafka-0-8 才有支持 , 在較新的streaming-kafka-0-10 已經不再支持。 <br/> # 2. Direct 方式(無 Receiver)(掌握) (1)編寫Spark Streaming程序 ```scala import org.apache.kafka.clients.consumer.{ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object KafkaWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 創建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") // Seconds(5)是批處理間隔,即將5秒內新收集的數據作為一個單位進行處理 val ssc = new StreamingContext(conf, Seconds(5)) /** ********* 2. 準備連接kafka時的一些參數 ************/ val topices = "test1,test2" val topicsSet: Set[String] = topices.split(",").toSet val kafkaParams = Map[String, String]( "bootstrap.servers" -> "hadoop101:9092", "group.id" -> "testGroup1", "enable.auto.commit" -> "true", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) /** ********* 3. 使用Kafka直連方式 ************/ val messages: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, // 分配策略 ConsumerStrategies.Subscribe(topicsSet, kafkaParams) // 訂閱方式, 是按照topic訂閱還是按照分區訂閱 ) /** ********* 4. 加載數據 ************/ // 將從kafka中獲取到的數據取出value值 val lines: DStream[String] = messages.map(_.value()) /** ************ 5. 進行統計 **************/ val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) result.print() // 打印輸出 /** ********* 6. 啟動Streaming程序 ************/ ssc.start() /** *********** 7. 等待應用程序終止 ****************/ ssc.awaitTermination() } } ``` (2)啟動Kafka ```shell -- 啟動Kafka [root@hadoop101 kafka]# bin/kafka-server-start.sh -daemon config/server.properties [root@hadoop101 kafka]# jps 3555 Kafka 3622 Jps ``` (3)創建主題并生產數據 ```shell -- 創建主題 [root@hadoop101 kafka]# bin/kafka-topics.sh --create --topic test1 --zookeeper hadoop101:2181 --replication-factor 1 --partitions 3 -- 啟動kafka生產者并生產數據 [root@hadoop101 kafka]# bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic test1 >hello python hello ``` (4)然后Spark Streaming打印出如下信息 ```txt ------------------------------------------- Time: 1612781510000 ms ------------------------------------------- (python,1) (hello,2) ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看