### **3.1.2 一個非常簡單的示例**
*****
如下所示,您可以使用純 Java 來發送和接收消息:
~~~
@Test
public void testAutoCommit() throws Exception {
logger.info("Start auto");
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic(topic1);
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
logger.info("Stop auto");
}
~~~
~~~
private KafkaMessageListenerContainer<Integer, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(props);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}
private KafkaTemplate<Integer, String> createTemplate() {
Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
return template;
}
~~~
~~~
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
~~~
以上代碼用于配置消費者客戶端參數。
* ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG:指定連接 Kafka 集群所需的 broker 地址清單。
* GROUP_ID_CONFIG:此消費者所隸屬的消費組的唯一標識,即消費組的名稱。
* ENABLE_AUTO_COMMIT_CONFIG:配置是否開啟自動提交消費位移的功能,默認開啟。
* AUTO_COMMIT_INTERVAL_MS_CONFIG:當?enable.auto.commit 參數設置為 true 時才生效,表示開啟自動提交消費位移功能時自動提交消費位移的時間間隔
* SESSION_TIMEOUT_MS_CONFIG:消費組管理協議中用來檢測消費者是否失效的超時時間。
* KEY_DESERIALIZER_CLASS_CONFIG:消息中 key 所對應的反序列化類,需要實現 org.apache.kafka.common.serialization.Deserializer 接口。
* VALUE_DESERIALIZER_CLASS_CONFIG:消息中 value 所對應的反序列化類,需要實現?org.apache.kafka.common.serialization.Deserializer 接口。
~~~
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
~~~
<br >