### **4.1.3 接收消息**
*****
可以通過配置 MessageListenerContainer 并提供消息偵聽器,或使用 @KafkaListener 注解來接收消息。
<br >
#### **消息監聽器(Message Listeners)**
*****
使用消息偵聽器容器時,必須提供一個偵聽器以接收數據。 當前有八種支持的消息偵聽器接口:
~~~
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
~~~
1. 使用自動提交或容器管理的提交方式時,可使用 MessageListener 來處理 poll() 操作接收的單個 ConsumerRecord。
2. 使用手動提交方式時,可使用 AcknowledgingMessageListener 來處理 poll() 操作接收到的各個 ConsumerRecord。
3. 使用自動提交或容器管理的提交方式時,可使用 ConsumerAwareMessageListener 來處理 poll() 操作接收的單個 ConsumerRecord。提供了對 Consumer 對象的訪問。
4. 使用手動提交方式時,可使用 AcknowledgingConsumerAwareMessageListener 來處理 poll() 操作接收到的各個 ConsumerRecord。提供了對Consumer對象的訪問。
5. 使用自動提交或容器管理的提交方式時,可使用 BatchMessageListener 來處理 poll() 操作接收的所有 ConsumerRecord。使用此接口時,不支持 AckMode.RECORD,因為已為偵聽器提供了完整的批處理。
6. 使用手動提交方式時,可使用 BatchAcknowledgingMessageListener 來處理 poll() 操作接收到的所有ConsumerRecord。
7. 使用自動提交或容器管理的提交方式時,可使用 BatchConsumerAwareMessageListener 來處理 poll() 操作接收的所有 ConsumerRecord。使用此接口時,不支持AckMode.RECORD,因為已為偵聽器提供了完整的批處理。提供了對Consumer對象的訪問。
8. 使用手動提交方式時,可使用 BatchAcknowledgingConsumerAwareMessageListener 來處理 poll() 操作接收到的所有 ConsumerRecord。提供了對 Consumer 對象的訪問。
> 注意:Consumer 對象是非線程安全的,你只能在調用監聽器的線程上使用它。
<br >
#### **消息偵聽器容器(Message Listener Containers)**
*****
提供了兩個 MessageListenerContainer 實現:
* KafkaMessageListenerContainer
* ConcurrentMessageListenerContainer
KafkaMessageListenerContainer 在單個線程上接收來自所有主題/分區的所有消息。 ConcurrentMessageListenerContainer 委托給1個或多個 KafkaMessageListenerContainer 來提供多線程使用。
從版本 2.2.7 開始,您可以將 RecordInterceptor 添加到偵聽器容器中。 它將在調用偵聽器之前被調用,以允許檢查或修改記錄。 如果攔截器返回 null,則不調用偵聽器。 當偵聽器為批處理偵聽器時,攔截器不會被調用。
從 2.3 版開始,CompositeRecordInterceptor 可用于調用多個攔截器。
默認情況下,使用事務時,在事務啟動后將調用攔截器。 從版本 2.3.4 開始,您可以將偵聽器容器的 interceptBeforeTx 屬性設置為在事務開始之前調用攔截器。
沒有為批處理偵聽器提供攔截器,因為 Kafka 已經提供了 ConsumerInterceptor。
<br >
##### **使用 KafkaMessageListenerContainer**
可用構造函數如下:
~~~
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties,
TopicPartitionInitialOffset... topicPartitions)
~~~
ConcurrentMessageListenerContainer(稍后介紹)使用第二個構造函數在消費者實例之間分配 TopicPartitionOffset。ContainerProperties 包含主題和分區相關信息,構造函數如下:
~~~
public ContainerProperties(TopicPartitionInitialOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
~~~
* 第一個構造函數包含一個 TopicPartitionInitialOffset 數組參數,以明確指示容器使用哪個分區(使用 consumer 的 assign() 方法),并帶有可選的初始偏移量:默認為正值; 默認情況下,負值相對于分區中的當前最后一個偏移量。
* 第二個構造函數包含一個字符串數據參數,Kafka 根據 group.id 屬性分配分區(在整個組中分配分區)。
* 第三個構造函數使用正則表達式模式選擇主題。
要將 MessageListener 分配給容器,請在創建 Container 時使用 ContainerProps.setMessageListener 方法:
~~~
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
~~~
注意,在創建 DefaultKafkaConsumerFactory 時,使用僅接受上述屬性的構造函數意味著從配置中獲取鍵和值的 Deserializer 類。 或者,可以將 Deserializer 實例傳遞給 DefaultKafkaConsumerFactory 構造函數以獲取鍵或值,在這種情況下,所有消費者均共享相同的實例。 另一個選擇是提供 Supplier (從版本2.3開始),該類將用于為每個消費者獲取單獨的 Deserializer 實例:
~~~
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
~~~
有關可設置的各種屬性的更多信息,請參考 [ContainerProperties 的 Javadoc](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.html) 。
從 2.1.1 版開始,提供了一個名為 logContainerConfig 的新屬性。 設為 true 并啟用 INFO 日志記錄后,每個偵聽器容器(Listener Container)都會寫入一條日志消息,以概述其配置屬性。
默認情況下,主題偏移量提交的日志記錄是使用 DEBUG 日志記錄級別進行的。 從版本 2.1.2 開始,ContainerProperties 中有一個名為 commitLogLevel 的新屬性,該屬性可讓您指定這些消息的日志級別。 例如,要將日志級別更改為 INFO,請使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO)。
從 2.2 版開始,添加了一個名為 missingTopicsFatal 的新容器屬性(默認值:true)。 如果代理中沒有任何已配置的主題,這將阻止容器啟動。 如果偵聽器配置為通過正則匹配,則該方法不適用。 以前,容器線程在 consumer.poll() 方法內循環,等待主題出現,同時記錄許多消息。 除了日志,沒有跡象表明存在問題。 若要還原以前的行為,可以將屬性設置為false。
<br >
##### **使用ConcurrentMessageListenerContainer**
構造函數如下:
~~~
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
~~~
它具有 concurrency 屬性,例如 container.setConcurrency(3) 將創建3個 KafkaMessageListenerContainer。 Kafka 將使用其消費組管理功能在消費者之間分配分區。
> 注意:在監聽多個主題時,默認的分區分布可能不是您期望的。 例如,如果您有3個主題,每個主題有5個分區,而您想使用 concurrency = 15,則只會看到5個活動使用者,每個消費者都為每個主題分配了一個分區,而其他10個消費者處于空閑狀態。 這是因為默認的 Kafka PartitionAssignor 是 RangeAssignor(請參閱其 javadocs)。 對于這種情況,您可能需要考慮使用 RoundRobinAssignor,它將在所有使用者之間分配分區。 然后,將為每個消費者分配一個主題/分區。 要更改 PartitionAssignor,請在提供給 DefaultKafkaConsumerFactory 的屬性中設置partition.assignment.strategy 使用者屬性(ConsumerConfigs.PARTITION\_ASSIGNMENT\_STRATEGY\_CONFIG)。
使用 Spring Boot 時:
```
spring.kafka.consumer.properties.partition.assignment.strategy =
org.apache.kafka.clients.consumer.RoundRobinAssignor
```
對于第二個構造函數,ConcurrentMessageListenerContainer 在委托 KafkaMessageListenerContainer 中分配TopicPartition。
例如,如果提供了6個 TopicPartition,并且并發為3,則為0。 每個容器將獲得2個分區。 對于5個 TopicPartition,兩個容器將獲得2個分區,第三個容器將得到1。如果并發大于 TopicPartitions 的數量,則并發性將向下調整,以便每個容器將獲得一個分區。
注意:client.id屬性(如果已設置)將附加-n,其中n是根據并發性使用的消費者實例。 啟用JMX時,必須為MBean提供唯一的名稱。
從1.3版開始,MessageListenerContainer提供了對基礎KafkaConsumer指標的訪問。 對于ConcurrentMessageListenerContainer而言,metrics()方法將返回所有目標KafkaMessageListenerContainer實例的度量。 度量標準分為Map 。
<br >
#### **@KafkaListener 注解**
*****
@KafkaListener 注解為簡單的 POJO 偵聽器提供了一種機制:
~~~
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
~~~
此機制需要在 @Configuration 類之一上使用 @EnableKafka 批注,以及用于配置基礎 ConcurrentMessageListenerContainer 的偵聽器容器工廠:默認情況下,應使用名稱為 kafkaListenerContainerFactory 的 bean。
~~~
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
~~~
請注意,要設置 container 屬性,必須在工廠上使用 getContainerProperties() 方法。
從 2.1.1 版本開始,現在可以為創建注解的消費者設置 client.id 屬性。 clientIdPrefix 帶有 -n 后綴,其中 n 是表示使用并發時的容器號的整數。
您還可以為 POJO 偵聽器配置明確的主題和分區(以及可選的初始偏移量):
~~~
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
~~~
可以在 partitions 或 partitionOffsets 屬性中指定分區,但不能在兩個屬性中同時指定分區。
當使用手動 AckMode 時,還可以向偵聽器提供該 Acknowledgment。 此示例還顯示了如何使用其他容器工廠。
~~~
@KafkaListener(id = "baz", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
~~~
最后,有關消息的元數據可從消息頭獲得,以下頭名稱可用于檢索消息的頭:
* KafkaHeaders.RECEIVED\_MESSAGE\_KEY
* KafkaHeaders.RECEIVED\_TOPIC
* KafkaHeaders.RECEIVED\_PARTITION\_ID
* KafkaHeaders.RECEIVED\_TIMESTAMP
* KafkaHeaders.TIMESTAMP\_TYPE
~~~
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
~~~
從版本1.1開始,可以將 @KafkaListener 方法配置為處理整批消費者記錄。 要將偵聽器容器工廠配置為創建批處理偵聽器,請設置 batchListener 屬性:
~~~
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
~~~
簡單的批處理方式:
~~~
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
~~~
通過配置主題,分區,偏移量等控制批處理的方式:
~~~
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
~~~
通過 Message 對象控制批處理的方式:
~~~
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}
~~~
在這種情況下,不會對有效負載執行任何轉換。
如果為 BatchMessagingMessageConverter 配置了 RecordMessageConverter,則還可以將通用類型添加到 Message 參數中,然后將轉換有效負載。
您還可以收到 ConsumerRecord 對象的列表,但是它必須是在方法上定義的唯一參數(使用手動提交時,除了可選的Acknowledgment)。
~~~
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
~~~
從 2.0 版開始,id 屬性(如果存在)將用作 Kafka group.id 屬性,并覆蓋 Consumer Factory 中的已配置屬性(如果存在)。 您還可以顯式設置 groupId 或將 idIsGroup 設置為 false,以恢復使用使用者工廠 group.id 的先前行為。
您可以在注解屬性中使用屬性占位符或 SpEL 表達式,例如...
~~~
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
~~~
從版本2.1.2開始,SpEL 表達式支持特殊的令牌 \_\_listener,這是一個偽 bean 名稱,表示此注解所在的當前 bean 實例。
例如,給定:
~~~
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
~~~
我們就可以使用:
~~~
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
~~~
如果在不太可能的情況下有一個名為 \_\_listener 的實際 bean,則可以使用 beanRef 屬性更改表達式令牌...
~~~
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
groupId = "#{__x.topic}.group")
~~~
<br >
#### **Rebalance Listeners**
*****
ContainerProperty 具有一個 ConsumerRebalanceListener 屬性,該屬性采用 Kafka 客戶端的ConsumerRebalanceListener 接口的實現。 如果未提供此屬性,則容器將配置一個簡單的日志偵聽器,該日志偵聽器在 INFO 級別下記錄重新平衡事件。 該框架還添加了一個子接口 ConsumerAwareRebalanceListener:
~~~
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
~~~
請注意,撤銷分區時有兩個回調:第一個立即調用;第二個在提交任何未決的偏移量后調用。 如果您希望在某些外部存儲庫中保持偏移量,則這很有用。 例如:
~~~
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
~~~
<br >
#### **過濾消息**
*****
在某些情況下,例如重平衡,可能會重新傳遞已處理的消息。框架無法知道是否已處理此類消息,即應用程序級功能。這被稱為冪等接收器模式,Spring Integration 提供了其實現。
Spring Kafka 還通過 FilteringMessageListenerAdapter 類提供了一些幫助,該類可以包裝您的 MessageListener。此類采用 RecordFilterStrategy 的實現,在該實現中,您將實現 filter 方法以發出消息重復消息并應將其丟棄的信號。它具有一個附加屬性 ackDiscarded,該屬性指示適配器是否應確認丟棄的記錄;否則,它不可用。默認情況下為 false。
使用 @KafkaListener 時,請在容器工廠上設置 RecordFilterStrategy(以及可選的 ackDiscarded),并且偵聽器將包裝在適當的過濾適配器中。
此外,為使用批處理消息偵聽器提供了 FilteringBatchMessageListenerAdapter。
<br >
#### **有狀態重試**
*****
重要的是要了解,上面討論的重試會掛起使用者線程(如果使用 BackOffPolicy);重試期間沒有調用Consumer.poll()。Kafka 有兩個屬性來判斷消費者是否存活。 session.timeout.ms 用于確定使用者是否處于活動狀態。由于 0.10.1.0 版本后的心跳測試是在后臺線程上發送的,因此緩慢的消費者不再會對此產生影響。 max.poll.interval.ms(默認為5分鐘)用于確定使用者是否似乎被掛起(花費太長時間來處理上次輪詢中的記錄)。如果 poll() 之間的時間超過此配置,則代理將撤銷分配的分區并執行重平衡。對于冗長的重試序列,這很容易發生。
從版本 2.1.3 開始,可以通過將有狀態重試與 SeekToCurrentErrorHandler 結合使用來避免此問題。在這種情況下,每次傳遞嘗試都將異常拋出回容器,并且錯誤處理程序將重新尋找未處理的偏移量,并且下一個poll() 將傳遞相同的消息。這樣可以避免超出 max.poll.interval.ms 屬性的問題(只要兩次嘗試之間的單個延遲不超過該時間)。因此,在使用 ExponentialBackOffPolicy 時,請務必確保 maxInterval 小于 max.poll.interval.ms 屬性。要啟用有狀態重試,請使用 RetryingMessageListenerAdapter 構造函數,該構造函數接受有狀態布爾參數(將其設置為true)。使用偵聽器容器工廠進行配置(對于@KafkaListener)時,請將工廠的 statefulRetry 屬性設置為 true。
<br >