* 首先確定一點,我使用的kafka是2.11-2.11版本。kafka java API已經發生了變化。
所以首先明確,要去看文檔
~~~java
package com.bizzbee.spark.kafka;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SimpleKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
//集群地址,多個地址用","分隔
//因為生產者像三個broker生產,所以這邊寫93,94,95都行
props.put("bootstrap.servers","spark:9095");
//設置消費者的group id
props.put("group.id", "group1");
//如果為真,consumer所消費消息的offset將會自動的同步到zookeeper。如果消費者死掉時,由新的consumer使用繼續接替
props.put("enable.auto.commit", "true");
//consumer向zookeeper提交offset的頻率
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//創建消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 訂閱topic,可以為多個用,隔開,此處訂閱了"test-partition-1", "test"這兩個主題
consumer.subscribe(Arrays.asList("bizzbee_topic", "bizzbee-replicated-topic"));
//持續監聽
while(true){
//poll頻率
ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> consumerRecord : consumerRecords){
System.out.println("bizzbee-replicated-topic" + consumerRecord.value());
}
}
}
}
~~~
* 把上一節的生產者運行起來,然后運行消費者就可以了。
