<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                (1)編寫Producer代碼 *`KafkaProducer.scala`* ```scala import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} /** * @Date 2021/1/19 8:59 * * 消息的生產者 */ object KafkaProducer { def main(args: Array[String]): Unit = { /** *************** 1. 創建配置 ******************/ val props = new Properties() // 設置Kafka集群 // 如果有多個節點, // 則props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092") props.put("bootstrap.servers", "hadoop101:9092") // 設置ack // 0: 不需要leader partition確認接收成功, 將消息發送到leader partition即可 // 1:需要等待leader partition確認接收成功 // -1(all): 需要等待leader partition以及ISR列表中的follower都確認接收成功;速度最慢,但最安全。 props.put("acks", "all") // 設置key和value的序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") /** ************ 2. 創建生產者 ***************/ val producer = new KafkaProducer[String, String](props) /** ************ 3. 發送消息 ***************/ // new ProducerRecord[String, String](topic, key, value) producer.send(new ProducerRecord[String, String]("topic1", "1001", "zhangsan")) /** ************ 4. 關閉資源 *************/ producer.close() } } ``` (2)檢查topic1是否已經存在 ```shell -- 檢查topic1是否存在 [root@hadoop101 kafka]# bin/kafka-topics.sh --list --zookeeper hadoop101:2181 topic1 __consumer_offsets test topic1 -- 如果不存在則創建topic1 [root@hadoop101 kafka]# bin/kafka-topics.sh --create --topic topic1 --zookeeper hadoop101:2181 --replication-factor 1 --partitions 3 Created topic "topic1". -- 啟動消費者控制臺 [root@hadoop101 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic topic1 --from-beginning ``` (3)執行程序,你就可以在消費者的終端看到發送過來的消息了 ``` zhansan ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看