文章摘要(AI生成)
本内容是关于侦听器容器属性的描述。其中包括了ackCount、adviceChain、ackMode、ackTime、assignmentCommitOption、authExceptionRetryInterval、clientId、checkDeserExWhenKeyNull、checkDeserExWhenValueNull、commitCallback等属性的默认值和描述。对于每个属性,都给出了默认值和相应的描述。
4.1.5.侦听器容器属性
属性值 | 默认 | 描述 |
---|---|---|
ackCount |
1 | 当ackMode 是COUNT 或COUNT_TIME 时,提交未决偏移前的记录数。 |
adviceChain |
null |
包装消息侦听器的Advice 对象链(例如MethodInterceptor ,围绕通知)按顺序调用。 |
ackMode |
BATCH | 控制提交偏移量的频率 - 请参阅提交偏移量。 |
ackTime |
5000 | 当ackMode 是TIME 或 COUNT_TIME 时提交未决偏移量的时间(以毫秒为单位)。 |
assignmentCommitOption |
LATEST_ONLY _NO_TX | 是否在分配时承诺初始位置;默认情况下,只有在 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 是latest 的情况下才会提交初始偏移量,并且即使存在事务管理器,它也不会在事务中运行。有关可用选项的更多信息,请参阅 ContainerProperties.AssignmentCommitOption 的 javadocs。 |
authExceptionRetryInterval |
null |
当不为null 时,当 Kafka 客户端抛出 AuthenticationException 或 AuthorizationException 时,轮询之间休眠的持续时间。当为 null 时,此类异常被认为是致命的,容器将停止。 |
clientId |
(空字符串) | 消费者属性client.id 的前缀。覆盖消费者工厂client.id 属性;在并发容器中,-n 添加为每个消费者实例的后缀。 |
checkDeserExWhenKeyNull |
false | 设置为true 在收到null key 时始终检查DeserializationException 标头。当消费者代码无法确定已配置ErrorHandlingDeserializer 时很有用,例如在使用委托解串器时。 |
checkDeserExWhenValueNull |
false | 设置为 true 以便在收到null key 时始终检查 DeserializationException 标头。当使用者代码无法确定是否已配置 ErrorHandlingDeserializer 时(例如使用委托反序列化器时),此功能很有用。 |
commitCallback |
null |
如果存在并且syncCommits 是false 在提交完成后调用的回调。 |
offsetAndMetadataProvider |
null |
OffsetAndMetadata 的提供者;默认情况下,提供者使用空元数据创建偏移量和元数据。提供者提供了一种自定义元数据的方法。 |
commitLogLevel |
debug | 与提交偏移量有关的日志的日志记录级别。 |
consumerRebalanceListener |
null |
一个rebalance 监听器;请参阅重新平衡侦听器。 |
consumerStartTimout |
30 s | 在记录错误之前等待消费者启动的时间;例如,如果您使用线程不足的任务执行器,则可能会发生这种情况。 |
consumerTaskExecutor |
SimpleAsyncTaskExecutor |
运行消费者线程的任务执行器。默认执行器创建名为<name>-C-n 的线程;对于KafkaMessageListenerContainer ,名称是bean名称;对于 ConcurrentMessageListenerContainer ,名称是带有 -n 后缀的 bean 名称,其中 n 针对每个子容器递增。 |
deliveryAttemptHeader |
false |
请参阅传递尝试标题。 |
eosMode |
V2 |
Exactly Once 语义模式;请参阅Exactly Once 语义。 |
fixTxOffsets |
false |
当使用事务生产者生成的记录,并且消费者位于分区的末尾时,延迟可能会错误地报告为大于零,这是由于用于指示事务提交/回滚的伪记录以及可能的存在的回滚记录。这在功能上不会影响消费者,但一些用户表示担心“滞后”不是零。将此属性设置为true ,容器将更正此类错误报告的偏移量。检查在下一次轮询之前执行,以避免显着增加提交处理的复杂性。在撰写本文时,只有在消费者配置为isolation.level=read_committed 且max.poll.records 大于 1时才会纠正延迟。有关更多信息,请参阅KAFKA-10683。 |
groupId |
null |
覆盖消费者group.id 属性;由@KafkaListener id 或groupId 属性自动设置。 |
idleBeforeDataMultiplier |
5.0 | 在收到任何记录之前应用idleEventInterval 乘数。收到记录后,不再应用乘数。从 2.8 版开始可用。 |
idleBetweenPolls |
0 | 用于通过在轮询之间休眠线程来减慢交付速度。处理一批记录的时间加上这个值必须小于消费者max.poll.interval.ms 属性。 |
idleEventInterval |
null |
设置后,启用ListenerContainerIdleEvent s 的发布,请参阅应用程序事件和检测空闲和非响应消费者。另请参阅idleBeforeDataMultiplier 。 |
idlePartitionEventInterval |
null |
设置后,启用ListenerContainerIdlePartitionEvent s 的发布,请参阅应用程序事件和检测空闲和非响应消费者。 |
kafkaConsumerProperties |
没有任何 | 用于覆盖在消费者工厂上配置的任意消费者属性。 |
logContainerConfig |
false |
设置为 true 以在 INFO 级别记录所有容器属性。 |
messageListener |
null |
消息侦听器。 |
micrometerEnabled |
true |
是否为消费者线程维护 Micrometer 计时器。 |
missingTopicsFatal |
false |
当为 true 时,如果代理上不存在已配置的主题,则阻止容器启动。 |
monitorInterval |
30s | 检查 NonResponsiveConsumerEvent 的消费者线程状态的频率。请参阅 noPollThreshold 和 pollTimeout 。 |
noPollThreshold |
3.0 | 乘以pollTimeOut 决定是否发布一个NonResponsiveConsumerEvent . 见monitorInterval 。 |
onlyLogRecordMetadata |
false |
设置为 false 以记录完整的消费者记录(错误、调试日志等),而不仅仅是topic-partition@offset . |
pauseImmediate |
false |
当容器暂停时,在当前记录之后停止处理,而不是在处理完之前轮询的所有记录之后;剩余的记录保留在内存中,并在容器恢复时传递给侦听器。 |
pollTimeout |
5000 | 超时传入Consumer.poll() . |
scheduler |
ThreadPoolTaskScheduler |
在其上运行消费者监控任务的调度程序。 |
shutdownTimeout |
10000 | 在所有消费者停止和发布容器停止事件之前阻塞stop() 方法的最长时间(以毫秒为单位) 。 |
stopContainerWhenFenced |
false |
如果抛出ProducerFencedException ,则停止侦听器容器。有关详细信息,请参阅回滚后处理器。 |
stopImmediate |
false |
当容器停止时,在当前记录之后停止处理,而不是在处理完之前轮询的所有记录之后。 |
subBatchPerPartition |
见描述。 | 使用批处理侦听器时,如果是true ,则调用侦听器,并将轮询结果拆分为子批处理,每个分区一个。默认情况下为false 。 |
syncCommitTimeout |
null |
当syncCommits 是true 使用的超时时间。如果未设置,容器将尝试确定消费者属性default.api.timeout.ms 并使用它;否则它将使用 60 秒。 |
syncCommits |
true |
是否对偏移量使用同步或异步提交;见commitCallback 。 |
topics topicPattern topicPartitions |
不适用 | 配置的主题、主题模式或明确分配的主题/分区。互斥;必须提供至少一个;由ContainerProperties 构造函数强制执行。 |
transactionManager |
null |
请参阅事务。 |
属性值 | 默认 | 描述 |
---|---|---|
afterRollbackProcessor |
DefaultAfterRollbackProcessor |
在AfterRollbackProcessor 事务回滚后调用。 |
applicationEventPublisher |
应用上下文 | 事件发布者。 |
batchErrorHandler |
见描述。 | 已弃用 - 请参阅commonErrorHandler . |
batchInterceptor |
null |
在调用批处理侦听器之前设置一个BatchInterceptor 调用;不适用于录音监听器。另请参阅interceptBeforeTx 。 |
beanName |
bean名 | 容器的bean名称;后缀-n 为子容器。 |
commonErrorHandler |
见描述。 | 当使用 transactionManager 时提供 DefaultAfterRollbackProcessor 时,为DefaultErrorHandler 或者null 。请参阅容器错误处理程序。 |
containerProperties |
ContainerProperties |
容器属性实例。 |
errorHandler |
见描述。 | 已弃用 - 请参阅commonErrorHandler . |
genericErrorHandler |
见描述。 | 已弃用 - 请参阅commonErrorHandler . |
groupId |
见描述。 | 如果存在containerProperties.groupId ,否则来自消费者工厂的group.id 属性。 |
interceptBeforeTx |
true |
确定是在事务开始之前还是之后调用recordInterceptor 。 |
listenerId |
见描述。 | 用户配置容器的 bean 名称或@KafkaListener s 的id 属性。 |
listenerInfo |
null | 要填充在KafkaHeaders.LISTENER_INFO 标题中的值。使用@KafkaListener ,该值是从info 属性中获得的。这个头文件可以用在不同的地方,例如 RecordInterceptor 和RecordFilterStrategy 监听器代码本身。 |
pauseRequested |
(只读) | 如果已请求消费者暂停,则为真。 |
recordInterceptor |
null |
在调用记录监听器之前设置一个RecordInterceptor 调用;不适用于批处理侦听器。另请参阅interceptBeforeTx 。 |
topicCheckTimeout |
30 s | 当missingTopicsFatal 容器属性为true 时,表示操作完成等待多长时间(以秒为单位) 。describeTopics |
属性值 | 默认 | 描述 |
---|---|---|
assignedPartitions |
(只读) | 当前分配给此容器的分区(显式或非显式)。 |
assignedPartitionsByClientId |
(只读) | 当前分配给此容器的分区(显式或非显式)。 |
clientIdSuffix |
null |
由并发容器用来给每个子容器的消费者一个唯一的client.id . |
containerPaused |
不适用 | 如果已请求暂停并且消费者实际上已暂停,则为真。 |
属性值 | 默认 | 描述 |
---|---|---|
alwaysClientIdSuffix |
true |
设置为 false 时,在concurrency 仅为 1 时,以禁止向消费者属性client.id 添加后缀。 |
assignedPartitions |
(只读) | 当前分配给此容器的子KafkaMessageListenerContainer s 的分区聚合(显式或非显式)。 |
assignedPartitionsByClientId |
(只读) | 当前分配给此容器的子KafkaMessageListenerContainer s 的分区(显式或非显式),由子容器的消费者client.id 属性键控。 |
concurrency |
1 | KafkaMessageListenerContainer 要管理的孩子的数量。 |
containerPaused |
不适用 | 如果已请求暂停并且所有子容器的使用者实际上已暂停,则为真。 |
containers |
不适用 | 对所有 childKafkaMessageListenerContainer 的引用。 |
4.1.6.应用程序事件
以下 Spring 应用程序事件由侦听器容器及其使用者发布:
ConsumerStartingEvent
- 在消费者线程第一次启动时发布,在它开始轮询之前。ConsumerStartedEvent
- 在消费者即将开始轮询时发布。ConsumerFailedToStartEvent
- 如果没有ConsumerStartingEvent
在consumerStartTimeout
容器属性中发布,则发布。此事件可能表示配置的任务执行器没有足够的线程来支持它所使用的容器及其并发性。发生这种情况时,还会记录一条错误消息。ListenerContainerIdleEvent
:在没有收到消息时发布idleInterval
(如果配置)。ListenerContainerNoLongerIdleEvent
: 在先前发布 . 之后使用记录时发布ListenerContainerIdleEvent
。ListenerContainerPartitionIdleEvent
:当没有从该分区收到消息时发布idlePartitionEventInterval
(如果已配置)。ListenerContainerPartitionNoLongerIdleEvent
: 当一条记录从之前发布过的分区中消费时发布ListenerContainerPartitionIdleEvent
。NonResponsiveConsumerEvent
:当消费者似乎在poll
方法中被阻止时发布。ConsumerPartitionPausedEvent
: 由每个消费者在分区暂停时发布。ConsumerPartitionResumedEvent
:恢复分区时由每个消费者发布。ConsumerPausedEvent
:容器暂停时由每个消费者发布。ConsumerResumedEvent
: 由每个消费者在容器恢复时发布。ConsumerStoppingEvent
:由每个消费者在停止之前发布。ConsumerStoppedEvent
: 消费者关闭后发布。请参阅线程安全。ContainerStoppedEvent
:当所有消费者都停止时发布。
默认情况下,应用程序上下文的事件多播器在调用线程上调用事件侦听器。如果您将多播器更改为使用异步执行器,则当事件包含对使用者的引用时 ,您不得调用任何Consumer
的方法。
ListenerContainerIdleEvent
具有以下属性:
source
:发布事件的侦听器容器实例。container
:侦听器容器或父侦听器容器,如果源容器是子容器。id
:侦听器 ID(或容器 bean 名称)。idleTime
:发布事件时容器空闲的时间。topicPartitions
: 容器在事件生成时分配的主题和分区。consumer
: 对 KafkaConsumer
对象的引用。例如,如果之前调用了消费者的pause()
方法,它可以在收到事件时调用resume()
。paused
: 容器当前是否处于暂停状态。有关详细信息,请参阅暂停和恢复侦听器容器。
ListenerContainerNoLongerIdleEvent
具有相同的属性,除了idleTime
和paused
。
ListenerContainerPartitionIdleEvent
具有以下属性:
source
:发布事件的侦听器容器实例。container
:侦听器容器或父侦听器容器,如果源容器是子容器。id
:侦听器 ID(或容器 bean 名称)。idleTime
: 发布事件时,时间分区消耗一直处于空闲状态。topicPartition
:触发事件的主题和分区。consumer
: 对 KafkaConsumer
对象的引用。例如,如果之前调用了消费者的pause()
方法,它可以在收到事件时调用resume()
。paused
:当前是否为该消费者暂停该分区消费。有关详细信息,请参阅暂停和恢复侦听器容器。
ListenerContainerPartitionNoLongerIdleEvent
具有相同的属性,除了idleTime
和paused
。
NonResponsiveConsumerEvent
具有以下属性:
source
:发布事件的侦听器容器实例。container
:侦听器容器或父侦听器容器,如果源容器是子容器。id
:侦听器 ID(或容器 bean 名称)。timeSinceLastPoll
: 容器最后一次调用poll()
之前的时间。topicPartitions
: 容器在事件生成时分配的主题和分区。consumer
: 对 KafkaConsumer
对象的引用。例如,如果之前调用了消费者的pause()
方法,它可以在收到事件时调用resume()
。paused
: 容器当前是否处于暂停状态。有关详细信息,请参阅暂停和恢复侦听器容器。
ConsumerResumedEvent
、ConsumerPausedEvent
和ConsumerStopping
事件具有以下属性:
source
:发布事件的侦听器容器实例。container
:侦听器容器或父侦听器容器,如果源容器是子容器。partitions
:TopicPartition
涉及的实例。
ConsumerPartitionResumedEvent
,ConsumerPartitionPausedEvent
事件具有以下属性:
source
:发布事件的侦听器容器实例。container
:侦听器容器或父侦听器容器,如果源容器是子容器。partition
:TopicPartition
涉及的实例。
ConsumerStoppedEvent
、ConsumerFailedToStartEvent
、ConsumerStartingEvent
、ConsumerStartingEvent
和ContainerStoppedEvent
事件具有以下属性:
source
:发布事件的侦听器容器实例。container
:侦听器容器或父侦听器容器,如果源容器是子容器。
所有容器(无论是子容器还是父容器)都发布ContainerStoppedEvent
. 对于父容器,源和容器属性是相同的。
此外,ConsumerStoppedEvent
具有以下附加属性:
reason
NORMAL
- 消费者正常停止(容器已停止)。ERROR
- 一个java.lang.Error
被抛出。FENCED
- 事务生产者被围起来,stopContainerWhenFenced
容器属性值是true
.AUTH
-AuthenticationException
或AuthorizationException
被抛出并且authExceptionRetryInterval
未配置。NO_OFFSET
- 分区没有偏移量,auto.offset.reset
策略是none
.
您可以在出现这种情况后使用此事件重新启动容器:
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的消费者
虽然效率很高,但异步消费者的一个问题是检测它们何时空闲。如果在一段时间内没有消息到达,您可能需要采取一些措施。
您可以将侦听器容器配置为在一段时间没有消息传递时发布 ListenerContainerIdleEvent
。当容器空闲时,每idleEventInterval
毫秒发布一个事件。
要配置此功能,请在容器上设置idleEventInterval
。以下示例显示了如何执行此操作:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
return container;
}
以下示例显示了如何在@KafkaListener
设置idleEventInterval
:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在每种情况下,当容器空闲时,每分钟发布一次事件。
如果由于某种原因,consumer的poll()
方法没有退出,则不会收到任何消息并且无法生成空闲事件(这kafka-clients
是无法访问代理的早期版本的问题)。在这种情况下,如果民意调查未在pollTimeout
属性内返回3x
,则容器会发布 aNonResponsiveConsumerEvent
。默认情况下,此检查在每个容器中每 30 秒执行一次。您可以通过在配置侦听器容器时设置ContainerProperties
的monitorInterval
(默认 30 秒)和noPollThreshold
(默认 3.0)属性来修改此行为。noPollThreshold
应该大于1.0
以避免由于竞争条件而出现虚假事件。接收到这样的事件可以让您停止容器,从而唤醒消费者以便它可以停止。
从 2.6.2 版本开始,如果容器发布了 ListenerContainerIdleEvent
,它将在随后收到记录时发布 ListenerContainerNoLongerIdleEvent
。
事件消费
您可以通过实现ApplicationListener
来捕获这些事件 ——一个普通的侦听器或一个缩小为仅接收此特定事件的侦听器。您也可以使用Spring Framework 4.2 中引入的@EventListener
。
下一个示例将@KafkaListener
和@EventListener
组合成一个类。您应该了解应用程序侦听器获取所有容器的事件,因此如果您想根据哪个容器空闲采取特定操作,您可能需要检查侦听器 ID。您也可以将@EventListener
condition
用于此目的。
有关事件属性的信息,请参阅应用程序事件。
事件通常在消费者线程上发布,因此与Consumer
对象交互是安全的。
以下示例同时使用@KafkaListener
和@EventListener
:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器查看所有容器的事件。因此,在前面的示例中,我们根据侦听器 ID 缩小接收到的事件的范围。由于为 @KafkaListener
创建的容器支持并发,因此实际容器被命名为 id-n
,其中 n
是每个实例支持并发的唯一值。这就是我们在条件下使用startsWith
的原因。
如果您希望使用空闲事件来停止侦听器容器,则不应调用container.stop()
调用侦听器的线程。这样做会导致延迟和不必要的日志消息。相反,您应该将事件交给另一个线程,然后该线程可以停止容器。此外,如果容器实例是子容器,则不应stop()
容器实例。您应该改为停止并发容器。
空闲时的当前位置
请注意,您可以通过在侦听器中实现ConsumerSeekAware
检测到空闲时获取当前位置。请参阅寻求特定偏移量中的onIdleContainer()
。
4.1.7. 主题/分区初始偏移量
有几种方法可以设置分区的初始偏移量。
手动分配分区时,您可以在配置的TopicPartitionOffset
参数中设置初始偏移量(如果需要)(请参阅消息侦听器容器)。您也可以随时寻找特定的偏移量。
当您使用代理分配分区的组管理时:
- 对于一个新的
group.id
,初始偏移量由消费者属性auto.offset.reset
(earliest
或latest
)确定。 - 对于现有组 ID,初始偏移量是该组 ID 的当前偏移量。但是,您可以在初始化期间(或之后的任何时间)寻找特定的偏移量。
4.1.8.寻求特定的偏移量
为了寻找,你的监听器必须实现ConsumerSeekAware
,它有以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
当容器启动和分配分区时调用registerSeekCallback
。在初始化后的任意时间进行搜索时,您应该使用此回调。您应该保存对回调的引用。如果您在多个容器(或 ConcurrentMessageListenerContainer
)中使用相同的侦听器,则应将回调存储在 ThreadLocal
或其他由 listener Thread
键入的结构中。
使用组管理时,在分配分区时调用onPartitionsAssigned
。您可以使用此方法,例如,通过调用回调设置分区的初始偏移量。您还可以使用此方法将此线程的回调与分配的分区相关联(请参见下面的示例)。您必须使用回调参数,而不是传递给registerSeekCallback
. 从版本 2.5.5 开始,即使使用手动分区分配,也会调用此方法。
当容器停止或 Kafka 撤销分配时调用onPartitionsRevoked
。您应该丢弃此线程的回调并删除与已撤销分区的任何关联。
回调有以下方法:
void seek(String topic, int partition, long offset);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection=<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection=<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
seekRelative
在 2.3 版中添加,用于执行相对搜索。
offset
为负数和toCurrent
false
- 相对于分区末尾的查找。offset
为正和toCurrent
false
- 相对于分区开头的查找。offset
为负和toCurrent
true
- 相对于当前位置搜索(倒带)。offset
为正和toCurrent
true
- 相对于当前位置搜索(快进)。
这些seekToTimestamp
方法也在 2.3 版中添加。
在onIdleContainer
或onPartitionsAssigned
方法中为多个分区寻找相同的时间戳时,首选第二种方法,因为在对消费者offsetsForTimes
方法的一次调用中找到时间戳的偏移量更有效。当从其他位置调用时,容器将收集所有时间戳查找请求并调用offsetsForTimes
.
您还可以在检测到空闲容器时执行onIdleContainer()
查找操作。有关如何启用空闲容器检测的信息,请参阅检测空闲和非响应消费者。
接受集合的seekToBeginning
方法很有用,例如,在处理压缩主题并且您希望每次启动应用程序时都从头开始:
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在运行时任意查找,请使用来自适当线程的registerSeekCallback
回调引用。
这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;在控制台中点击<Enter>
会导致所有分区搜索到开头。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为了让事情变得更简单,2.3 版添加了AbstractConsumerSeekAware
类,它跟踪要用于主题/分区的回调。以下示例显示了如何在每次容器空闲时查找每个分区中处理的最后一条记录。它还具有允许任意外部调用通过一条记录来回退分区的方法。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
2.6 版为抽象类添加了便利方法:
seekToBeginning()
- 寻找所有分配的分区到开头seekToEnd()
- 寻找所有分配的分区到最后seekToTimestamp(long time)
- 将所有分配的分区查找到该时间戳表示的偏移量。
例子:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
}
}
4.1.9.并发监听器工厂
如@KafkaListener
Annotation中所讨论的, ConcurrentKafkaListenerContainerFactory
用于为带注释的方法创建容器。
从 2.2 版开始,您可以使用相同的工厂来创建任何ConcurrentMessageListenerContainer
. 如果您想创建多个具有相似属性的容器,或者您希望使用一些外部配置的工厂,例如 Spring Boot 自动配置提供的工厂,这可能会很有用。创建容器后,您可以进一步修改其属性,其中许多属性是使用container.getContainerProperties()
. 以下示例配置了一个ConcurrentMessageListenerContainer
:
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
以这种方式创建的容器不会添加到端点注册表中。它们应该作为@Bean
定义创建,以便在应用程序上下文中注册。
从版本 2.3.4 开始,您可以ContainerCustomizer
在工厂中添加一个以在创建和配置每个容器后进一步配置它。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
4.1.10.线程安全
使用并发消息侦听器容器时,会在所有消费者线程上调用单个侦听器实例。因此,监听器需要是线程安全的,最好使用无状态监听器。如果无法使您的侦听器线程安全或添加同步会显着降低添加并发性的好处,您可以使用以下几种技术之一:
- 使用
concurrency=1
的n
个容器和原型范围的MessageListener
bean,以便每个容器都有自己的实例(使用@KafkaListener
时这是不可能的)。 - 在
ThreadLocal<?>
实例中保留状态。 - 将单例侦听器委托给在
SimpleThreadScope
(或类似范围)中声明的 bean。
为了便于清理线程状态(针对前面列表中的第二项和第三项),从 2.2 版本开始,监听器容器会在每个线程退出时发布一个ConsumerStoppedEvent
。您可以使用ApplicationListener
或@EventListener
方法使用这些事件,以从范围中删除ThreadLocal<?>
实例或remove()
线程范围的 bean。请注意,SimpleThreadScope
不会销毁具有销毁接口(例如DisposableBean
)的 bean,因此您应该通过destroy()
销毁自己创建实例。
默认情况下,应用程序上下文的事件多播器在调用线程上调用事件侦听器。如果您将多播器更改为使用异步执行器,则线程清理无效。
4.1.11.监控
监控监听器性能
从版本 2.3 开始,如果在类路径上检测到 Micrometer
,并且应用程序上下文中存在单个 MeterRegistry
,则侦听器容器将自动为侦听器创建和更新 Micrometer Timer
。可以通过将 ContainerProperty
micrometerEnabled
设置为 false
来禁用计时器。
维护了两个计时器 - 一个用于成功调用侦听器,一个用于失败。
计时器被命名spring.kafka.listener
并具有以下标签:
name
:(容器 bean 名称)result
:success
或failure
exception
:none
或ListenerExecutionFailedException
您可以使用ContainerProperties
micrometerTags
属性添加其他标签。
对于并发容器,为每个线程创建计时器,并且名称标签带有后缀 -n
,其中 n
范围是0
到 concurrency-1
。
监控 KafkaTemplate 性能
从版本 2.3 开始,如果在类路径上检测到 Micrometer
,并且应用程序上下文中存在单个 MeterRegistry
,则侦听器容器将自动为侦听器创建和更新 Micrometer Timer
。可以通过将 ContainerProperty
micrometerEnabled
设置为 false
来禁用计时器。
维护了两个计时器 - 一个用于成功调用侦听器,一个用于失败。
计时器被命名spring.kafka.template
并具有以下标签:
name
: (模板 bean 名称)result
:success
或failure
exception
:none
或失败的异常类名
您可以使用模板的micrometerTags
属性添加其他标签。
Micrometer Native Metrics
从 2.5 版开始,该框架提供Factory Listeners来在创建和关闭生产者和消费者时管理 Micrometer KafkaClientMetrics
实例。
要启用此功能,只需将侦听器添加到您的生产者和消费者工厂:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
传递给侦听器的消费者/生产者id
被添加到带有spring.id
标签名称的计量器标签中。
获取 Kafka 指标之一的示例
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count()
StreamsBuilderFactoryBean
提供了一个类似的侦听器。- 参见KafkaStreams Micrometer Support
4.1.12.事务
本节介绍 Spring for Apache Kafka 如何支持事务。
概述
0.11.0.0 客户端库增加了对事务的支持。Spring for Apache Kafka 通过以下方式添加支持:
KafkaTransactionManager
:与正常的 Spring 事务支持(@Transactional
等TransactionTemplate
)一起使用。- 事务性的
KafkaMessageListenerContainer
KafkaTemplate
的本地事务- 与其他事务管理器的事务同步
通过transactionIdPrefix
提供DefaultKafkaProducerFactory
在这种情况下,工厂不会管理单个 shared Producer
,而是维护事务生产者的缓存。当用户调用close()
生产者时,它会返回到缓存中以供重用,而不是实际关闭。每个生产者的transactional.id
属性是transactionIdPrefix
+ n
,其中以每个新生产者n
开始为0
并递增,除非事务由具有基于记录的侦听器的侦听器容器启动。在 Spring for Apache Kafka
的早期版本中,对于由具有基于记录的侦听器的侦听器容器启动的事务,transactional.id
的生成方式有所不同,以支持隔离僵尸,这不再是必要的,EOSMode.V2
是从 3.0 开始的唯一选项。对于运行多个实例的应用程序,每个实例的 transactionIdPrefix
必须是唯一的。
另请参阅transactionIdPrefix
。
使用 Spring Boot,只需要设置spring.kafka.producer.transaction-id-prefix
属性 - Boot 会自动配置一个KafkaTransactionManager
bean 并将其连接到侦听器容器中。
从版本 2.5.8 开始,您现在可以在生产者工厂上配置maxAge
属性。当使用可能为代理的 transactional.id.expiration.ms
闲置的事务生产者时,这非常有用。使用当前的kafka-clients
,这可能会因为没有重新平衡导致 ProducerFencedException
。通过将 maxAge
设置为小于transactional.id.expiration.ms
,如果生产者超过了它的最大年龄,工厂将刷新它。
使用KafkaTransactionManager
KafkaTransactionManager
是 Spring Framework 的PlatformTransactionManager
. 它在其构造函数中提供了对生产者工厂的引用。如果您提供自定义生产者工厂,它必须支持事务。见ProducerFactory.transactionCapable()
。
您可以将KafkaTransactionManager
与正常的 Spring 事务支持一起使用(@Transactional
、TransactionTemplate
和其他)。如果事务处于活动状态,则在事务范围内执行的任何KafkaTemplate
操作都使用事务的Producer
. 管理器根据成功或失败提交或回滚事务。您必须将其配置KafkaTemplate
为使用与ProducerFactory
事务管理器相同的内容。
事务同步
本节涉及仅生产者事务(不是由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅使用消费者发起的事务。
如果你想将记录发送到 kafka 并执行一些数据库更新,你可以使用普通的 Spring 事务管理,比如DataSourceTransactionManager
.
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
@Transactional
注释的拦截器启动事务,KafkaTemplate
并将事务与该事务管理器同步;每个发送都将参与该事务。当方法退出时,数据库事务将提交,然后是 Kafka 事务。如果您希望以相反的顺序执行提交(首先是 Kafka),请使用嵌套@Transactional
方法,外部方法配置为使用DataSourceTransactionManager
.,内部方法配置为使用KafkaTransactionManager
.
有关在 Kafka-first 或 DB-first 配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅Kafka Transactions with Other Transaction Managers示例。
从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果同步事务的提交失败(在主事务提交之后),将向调用者抛出异常。以前,这被默默地忽略(在调试时记录)。如有必要,应用程序应采取补救措施,以补偿已提交的主要事务。
使用消费者发起的事务
从2.7 版开始,ChainedKafkaTransactionManager
现在已弃用;有关更多ChainedTransactionManager
信息,请参阅其超类的 javadocs 。相反,在容器中使用KafkaTransactionManager
来启动 Kafka 事务,并用@Transactional
注释侦听器方法来启动另一个事务。
有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅使用其他事务管理器的 Kafka 事务示例。
KafkaTemplate
本地事务
您可以使用KafkaTemplate
来在本地事务中执行一系列操作。以下示例显示了如何执行此操作:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数是模板本身 ( this
)。如果回调正常退出,则事务被提交。如果抛出异常,事务将回滚。
如果正在处理(或同步)KafkaTransactionManager
事务,则不使用它。相反,使用了一个新的“嵌套”事务。
transactionIdPrefix
使用唯一支持的模式 EOSMode.V2
(又名 BETA
),即使对于消费者发起的交易,也不再需要使用相同的 transactional.id
;事实上,它在每个实例上都必须是唯一的,就像生产者发起的事务一样。该属性在每个应用程序实例上必须具有不同的值。
KafkaTemplate
事务性和非事务性发布
通常,当KafkaTemplate
是事务性的(配置了支持事务的生产者工厂)时,就需要事务。当使用 KafkaTransactionManager
配置时,事务可以通过 TransactionTemplate
、@Transactional
方法、调用executeInTransaction
或侦听器容器来启动。任何在事务范围之外使用模板的尝试都会导致模板抛出 IllegalStateException
。从版本2.4.3开始,您可以将模板的allowNonTransactional
属性设置为true
。在这种情况下,模板将通过调用 ProducerFactory
的 reateNonTransactionalProducer() 方
法,允许操作在没有事务的情况下运行;生产者将被缓存或线程绑定,以正常重用。请参阅使用 DefaultKafkaProducerFactory
。
与批处理侦听器的事务
当正在使用事务时侦听器失败时,会在回滚发生后调用 AfterRollbackProcessor
以采取一些行动。当使用带有记录侦听器的默认 AfterRollbackProcessor
时,会执行查找,以便重新传送失败的记录。但是,使用批处理侦听器,将重新传递整个批处理,因为框架不知道批处理中的哪条记录失败。有关详细信息,请参阅回滚后处理器。
当使用批处理侦听器时,2.4.2 版本引入了一种替代机制来处理批处理时的故障; BatchToRecordAdapter
。当使用 BatchToRecordAdapter
配置将 batchListener
设置为 true
的容器工厂时,一次会调用一条记录的侦听器。这使得能够在批次内进行错误处理,同时仍然可以根据异常类型停止处理整个批次。提供了默认的 BatchToRecordAdapter
,可以使用标准 ConsumerRecordRecoverer
(例如 DeadLetterPublishingRecoverer
)进行配置。以下测试用例配置片段说明了如何使用此功能:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}
4.1.13.Exactly Once Semantics (EOS)
您可以为侦听器容器提供 KafkaAwareTransactionManager
实例。如此配置后,容器会在调用侦听器之前启动事务。侦听器执行的任何 KafkaTemplate
操作都会参与事务。如果侦听器成功处理该记录(或多个记录,当使用 BatchMessageListener
时),则在事务管理器提交事务之前,容器会使用 Producer.sendOffsetsToTransaction()
) 将偏移量发送到事务。如果侦听器抛出异常,则事务将回滚,并且使用者将重新定位,以便可以在下一次轮询时检索回滚的记录。有关详细信息以及处理重复失败的记录的信息,请参阅回滚后处理器。
使用事务启用 Exactly Once Semantics (EOS)。
这意味着,对于一个read→process-write
序列,可以保证该序列恰好完成一次。(读取和处理至少有一次语义)。
Spring for Apache Kafka 3.0 及更高版本仅支持 EOSMode.V2:
V2
- 又名 fetch-offset-request fencing (从 2.5 版开始)
使用 mode V2
,不必为每个生产者指定生产者的group.id/topic/partition
,因为消费者元数据与事务的偏移量一起发送,并且代理可以确定生产者是否使用该信息被隔离。
有关详细信息,请参阅KIP-447。
V2
以前是ALPHA
和BETA
; 它们已被更改以使框架与KIP-732保持一致。
4.1.14.将 Spring Bean 连接到生产者/消费者拦截器中
Apache Kafka 提供了一种向生产者和消费者添加拦截器的机制。这些对象由 Kafka 管理,而不是由 Spring 管理,因此普通的 Spring 依赖注入不适用于依赖 Spring Bean 中的布线。但是,您可以使用拦截器的config()
方法手动连接这些依赖项。下面的 Spring Boot 应用程序展示了如何通过覆盖 boot 的默认工厂以将一些依赖 bean 添加到配置属性中来做到这一点。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
Map<String, Object> producerProperties = new HashMap<>();
// producerProperties.put(..., ...)
// ...
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("some.bean", someBean);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
return factory;
}
@Bean
public SomeBean someBean() {
return new SomeBean();
}
@KafkaListener(id = "kgk897", topics = "kgh897")
public void listen(String in) {
System.out.println("Received " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("kgh897", "test");
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kgh897")
.partitions(1)
.replicas(1)
.build();
}
}
public class SomeBean {
public void someMethod(String what) {
System.out.println(what + " in my foo bean");
}
}
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
结果:
producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test
4.1.15.暂停和恢复侦听器容器
版本 2.1.3 添加了侦听器容器的方法pause()
和resume()
。以前,您可以在ConsumerAwareMessageListener
中暂停消费者并通过侦听ListenerContainerIdleEvent
来恢复它,这提供了对Consumer
对象的访问。虽然您可以使用事件侦听器在空闲容器中暂停消费者,但在某些情况下,这不是线程安全的,因为无法保证在消费者线程上调用事件侦听器。为了安全地暂停和恢复消费者,您应该在侦听器容器上使用pause
和resume
方法。pause()
在下一个poll()
之前生效;resume()
在当前poll()
返回后立即生效。当一个容器暂停时,它会继续poll()
消费者,如果正在使用组管理,则避免重新平衡,但它不会检索任何记录。有关更多信息,请参阅 Kafka 文档。
从 2.1.5 版本开始,可以调用isPauseRequested()
查看是否pause()
已调用。但是,消费者可能还没有真正暂停。 如果所有Consumer
实例实际上都已暂停,isConsumerPaused()
则返回 true 。
此外(也是从 2.1.5 开始),ConsumerPausedEvent
并且ConsumerResumedEvent
实例以容器作为source
属性发布,并且TopicPartition
实例包含在partitions
属性中。
从 2.9 版本开始,新的容器属性pauseImmediate
设置为 true 时,会在处理当前记录后使暂停生效。默认情况下,当上一次轮询中的所有记录都已处理完毕时,暂停生效。请参阅[pauseImmediate]。
以下简单的 Spring Boot 应用程序演示了使用容器注册表获取对@KafkaListener
方法容器的引用并暂停或恢复其使用者以及接收相应的事件:
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
以下清单显示了前面示例的结果:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2
4.1.16.暂停和恢复侦听器容器上的分区
从 2.7 版开始,您可以使用侦听器容器中的pausePartition(TopicPartition topicPartition)
和resumePartition(TopicPartition topicPartition)
方法暂停和恢复分配给该使用者的特定分区的使用。暂停和恢复分别发生在与pause()
和resume()
方法类似的poll()
前后。如果已请求该分区的暂停,则isPartitionPauseRequested()
方法返回 true。如果该分区已有效暂停,则isPartitionPaused()
方法返回 true。
此外,由于 2.7 版ConsumerPartitionPausedEvent
和ConsumerPartitionResumedEvent
实例是使用容器作为source
属性和TopicPartition
实例发布的。
评论区