<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                Consumer分為自動提交和手動提交。 自動提交:自動提交的優點是方便,但是可能會重復處理消息。 手動提交:又分為同步提交commitSync和異步提交commitAsync。在實際開發中常用手動提交。 1. *`KafkaConsumer.scala`*(自動提交) ```scala import java.util import java.util.Properties import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer} /** * @Date 2021/1/19 9:34 * * 消息的消費者 */ object KafkaConsumer { def main(args: Array[String]): Unit = { /** ************* 1. 創建配置 **************/ val props = new Properties() // 配置Kafka集群的ip和端口號 // 如果有多個節點, // 則props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092") props.put("bootstrap.servers", "hadoop101:9092") // 設置消費者組的id, 如果有多個相同id的消費者程序, 那么他們將在一個組當中 // 這個可以亂填 props.put("group.id", "testGroup1") // 開啟自動提交[默認就是true開啟] props.put("enable.auto.commit", "true") // 每隔5000ms提交一次,默認值就是5000ms props.put("auto.commit.interval.ms", "1000") // key和value的反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") /** ****************** 2. 創建Consumer客戶端 ***************/ val consumer = new KafkaConsumer[String, String](props) // 添加要消費的topic到列表中 val topics = new util.ArrayList[String]() topics.add("topic1") // 訂閱模式消費,還有一種consumer.assign()指定分區模式消費 consumer.subscribe(topics) // 為了能夠一直從Kafka中消費數據, 使用 while true 死循環 while (true) { // 拉取數據,設置超時時間, 單位是毫秒, 返回一條數據 val records: ConsumerRecords[String, String] = consumer.poll(1000) // 對這些數據進行遍歷輸出 val iter: util.Iterator[ConsumerRecord[String, String]] = records.iterator() while (iter.hasNext) { val next: ConsumerRecord[String, String] = iter.next() println(s"partition = ${next.partition()}, offset=${next.offset()}\nkey = ${next.key()}, value = ${next.value()}") } } } } ``` 2. *`KafkaConsumerManualOffset.scala`*(手動提交) ```scala import java.util import java.util.Properties import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer} /** * @Date 2021/1/19 10:16 * * 消息的消費者 */ object KafkaConsumerManualOffset { def main(args: Array[String]): Unit = { /** ************* 1. 創建配置 **************/ val props = new Properties() // 配置Kafka集群的ip和端口號 props.put("bootstrap.servers", "hadoop101:9092") // 設置消費者組的id, 如果有多個相同id的消費者程序, 那么他們將在一個組當中 props.put("group.id", "testGroup1") // 關閉自動提交[默認就是開啟] props.put("enable.auto.commit", "false") // key和value的反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") /** ****************** 2. 創建Consumer客戶端 ***************/ val consumer = new KafkaConsumer[String, String](props) // consumer消費 val topics = new util.ArrayList[String]() topics.add("topic1") // 訂閱topic consumer.subscribe(topics) // 創建一個集合, 用來存放消息的個數 val buffer = new util.ArrayList[ConsumerRecord[String, String]]() // 為了能夠一直從Kafka中消費數據, 使用 while true 死循環 while (true) { // 設置超時時間, 單位是毫秒, 返回一些條數據 val records: ConsumerRecords[String, String] = consumer.poll(1000) // 對這些數據進行遍歷輸出 val iter: util.Iterator[ConsumerRecord[String, String]] = records.iterator() while (iter.hasNext) { val next: ConsumerRecord[String, String] = iter.next() println(s"partition = ${next.partition()}, offset=${next.offset()}\nkey = ${next.key()}, value = ${next.value()}") buffer.add(next) } if (buffer.size() > 5) { // 手動提交offset有兩種, 一種是同步阻塞方式, 一種是異步非阻塞方式 consumer.commitAsync() // consumer.commitSync() buffer.clear() } } } } ``` 運行上面兩個Consumer程序中的一個,然后再運行【Producer API】提供的程序生產消息,上面的兩個消費者程序的輸出如下: ```scala partition = 0, offset=7 key = 1001, value = zhangsan ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看