欢迎访问shiker.tech

请允许在我们的网站上展示广告

您似乎使用了广告拦截器,请关闭广告拦截器。我们的网站依靠广告获取资金。

【译文】spring kafka 简介及使用参考(二)
(last modified Oct 30, 2023, 5:52 PM )
by
侧边栏壁纸
  • 累计撰写 178 篇文章
  • 累计创建 62 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

【译文】spring kafka 简介及使用参考(二)

橙序员
2022-08-21 / 0 评论 / 0 点赞 / 1,579 阅读 / 11,150 字 / 正在检测百度是否收录... 正在检测必应是否收录...
文章摘要(AI生成)

本内容是关于侦听器容器属性的描述。其中包括了ackCount、adviceChain、ackMode、ackTime、assignmentCommitOption、authExceptionRetryInterval、clientId、checkDeserExWhenKeyNull、checkDeserExWhenValueNull、commitCallback等属性的默认值和描述。对于每个属性,都给出了默认值和相应的描述。

4.1.5.侦听器容器属性

属性值 默认 描述
ackCount 1 ackModeCOUNTCOUNT_TIME时,提交未决偏移前的记录数。
adviceChain null 包装消息侦听器的Advice对象链(例如MethodInterceptor,围绕通知)按顺序调用。
ackMode BATCH 控制提交偏移量的频率 - 请参阅提交偏移量
ackTime 5000 ackModeTIMECOUNT_TIME时提交未决偏移量的时间(以毫秒为单位)。
assignmentCommitOption LATEST_ONLY _NO_TX 是否在分配时承诺初始位置;默认情况下,只有在 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest的情况下才会提交初始偏移量,并且即使存在事务管理器,它也不会在事务中运行。有关可用选项的更多信息,请参阅 ContainerProperties.AssignmentCommitOption 的 javadocs。
authExceptionRetryInterval null 当不为null时,当 Kafka 客户端抛出 AuthenticationException AuthorizationException 时,轮询之间休眠的持续时间。当为 null 时,此类异常被认为是致命的,容器将停止。
clientId (空字符串) 消费者属性client.id的前缀。覆盖消费者工厂client.id属性;在并发容器中,-n添加为每个消费者实例的后缀。
checkDeserExWhenKeyNull false 设置为true在收到null key时始终检查DeserializationException标头。当消费者代码无法确定已配置ErrorHandlingDeserializer时很有用,例如在使用委托解串器时。
checkDeserExWhenValueNull false 设置为 true 以便在收到null key时始终检查 DeserializationException 标头。当使用者代码无法确定是否已配置 ErrorHandlingDeserializer 时(例如使用委托反序列化器时),此功能很有用。
commitCallback null 如果存在并且syncCommitsfalse在提交完成后调用的回调。
offsetAndMetadataProvider null OffsetAndMetadata的提供者;默认情况下,提供者使用空元数据创建偏移量和元数据。提供者提供了一种自定义元数据的方法。
commitLogLevel debug 与提交偏移量有关的日志的日志记录级别。
consumerRebalanceListener null 一个rebalance 监听器;请参阅重新平衡侦听器
consumerStartTimout 30 s 在记录错误之前等待消费者启动的时间;例如,如果您使用线程不足的任务执行器,则可能会发生这种情况。
consumerTaskExecutor SimpleAsyncTaskExecutor 运行消费者线程的任务执行器。默认执行器创建名为<name>-C-n的线程;对于KafkaMessageListenerContainer,名称是bean名称;对于 ConcurrentMessageListenerContainer,名称是带有 -n 后缀的 bean 名称,其中 n 针对每个子容器递增。
deliveryAttemptHeader false 请参阅传递尝试标题
eosMode V2 Exactly Once 语义模式;请参阅Exactly Once 语义
fixTxOffsets false 当使用事务生产者生成的记录,并且消费者位于分区的末尾时,延迟可能会错误地报告为大于零,这是由于用于指示事务提交/回滚的伪记录以及可能的存在的回滚记录。这在功能上不会影响消费者,但一些用户表示担心“滞后”不是零。将此属性设置为true,容器将更正此类错误报告的偏移量。检查在下一次轮询之前执行,以避免显着增加提交处理的复杂性。在撰写本文时,只有在消费者配置为isolation.level=read_committedmax.poll.records大于 1时才会纠正延迟。有关更多信息,请参阅KAFKA-10683
groupId null 覆盖消费者group.id属性;由@KafkaListener idgroupId属性自动设置。
idleBeforeDataMultiplier 5.0 在收到任何记录之前应用idleEventInterval乘数。收到记录后,不再应用乘数。从 2.8 版开始可用。
idleBetweenPolls 0 用于通过在轮询之间休眠线程来减慢交付速度。处理一批记录的时间加上这个值必须小于消费者max.poll.interval.ms属性。
idleEventInterval null 设置后,启用ListenerContainerIdleEvents 的发布,请参阅应用程序事件检测空闲和非响应消费者。另请参阅idleBeforeDataMultiplier
idlePartitionEventInterval null 设置后,启用ListenerContainerIdlePartitionEvents 的发布,请参阅应用程序事件检测空闲和非响应消费者
kafkaConsumerProperties 没有任何 用于覆盖在消费者工厂上配置的任意消费者属性。
logContainerConfig false 设置为 true 以在 INFO 级别记录所有容器属性。
messageListener null 消息侦听器。
micrometerEnabled true 是否为消费者线程维护 Micrometer 计时器。
missingTopicsFatal false 当为 true 时,如果代理上不存在已配置的主题,则阻止容器启动。
monitorInterval 30s 检查 NonResponsiveConsumerEvent 的消费者线程状态的频率。请参阅 noPollThresholdpollTimeout
noPollThreshold 3.0 乘以pollTimeOut决定是否发布一个NonResponsiveConsumerEvent. 见monitorInterval
onlyLogRecordMetadata false 设置为 false 以记录完整的消费者记录(错误、调试日志等),而不仅仅是topic-partition@offset.
pauseImmediate false 当容器暂停时,在当前记录之后停止处理,而不是在处理完之前轮询的所有记录之后;剩余的记录保留在内存中,并在容器恢复时传递给侦听器。
pollTimeout 5000 超时传入Consumer.poll().
scheduler ThreadPoolTaskScheduler 在其上运行消费者监控任务的调度程序。
shutdownTimeout 10000 在所有消费者停止和发布容器停止事件之前阻塞stop()方法的最长时间(以毫秒为单位) 。
stopContainerWhenFenced false 如果抛出ProducerFencedException,则停止侦听器容器。有关详细信息,请参阅回滚后处理器
stopImmediate false 当容器停止时,在当前记录之后停止处理,而不是在处理完之前轮询的所有记录之后。
subBatchPerPartition 见描述。 使用批处理侦听器时,如果是true,则调用侦听器,并将轮询结果拆分为子批处理,每个分区一个。默认情况下为false
syncCommitTimeout null syncCommitstrue使用的超时时间。如果未设置,容器将尝试确定消费者属性default.api.timeout.ms并使用它;否则它将使用 60 秒。
syncCommits true 是否对偏移量使用同步或异步提交;见commitCallback
topics topicPattern topicPartitions 不适用 配置的主题、主题模式或明确分配的主题/分区。互斥;必须提供至少一个;由ContainerProperties构造函数强制执行。
transactionManager null 请参阅事务
属性值 默认 描述
afterRollbackProcessor DefaultAfterRollbackProcessor AfterRollbackProcessor事务回滚后调用。
applicationEventPublisher 应用上下文 事件发布者。
batchErrorHandler 见描述。 已弃用 - 请参阅commonErrorHandler.
batchInterceptor null 在调用批处理侦听器之前设置一个BatchInterceptor调用;不适用于录音监听器。另请参阅interceptBeforeTx
beanName bean名 容器的bean名称;后缀-n为子容器。
commonErrorHandler 见描述。 当使用 transactionManager时提供 DefaultAfterRollbackProcessor时,为DefaultErrorHandler或者null。请参阅容器错误处理程序
containerProperties ContainerProperties 容器属性实例。
errorHandler 见描述。 已弃用 - 请参阅commonErrorHandler.
genericErrorHandler 见描述。 已弃用 - 请参阅commonErrorHandler.
groupId 见描述。 如果存在containerProperties.groupId,否则来自消费者工厂的group.id属性。
interceptBeforeTx true 确定是在事务开始之前还是之后调用recordInterceptor
listenerId 见描述。 用户配置容器的 bean 名称或@KafkaListener s 的id属性。
listenerInfo null 要填充在KafkaHeaders.LISTENER_INFO标题中的值。使用@KafkaListener,该值是从info属性中获得的。这个头文件可以用在不同的地方,例如 RecordInterceptorRecordFilterStrategy监听器代码本身。
pauseRequested (只读) 如果已请求消费者暂停,则为真。
recordInterceptor null 在调用记录监听器之前设置一个RecordInterceptor调用;不适用于批处理侦听器。另请参阅interceptBeforeTx
topicCheckTimeout 30 s missingTopicsFatal容器属性为true时,表示操作完成等待多长时间(以秒为单位) 。describeTopics
属性值 默认 描述
assignedPartitions (只读) 当前分配给此容器的分区(显式或非显式)。
assignedPartitionsByClientId (只读) 当前分配给此容器的分区(显式或非显式)。
clientIdSuffix null 由并发容器用来给每个子容器的消费者一个唯一的client.id.
containerPaused 不适用 如果已请求暂停并且消费者实际上已暂停,则为真。
属性值 默认 描述
alwaysClientIdSuffix true 设置为 false 时,在concurrency仅为 1 时,以禁止向消费者属性client.id添加后缀。
assignedPartitions (只读) 当前分配给此容器的子KafkaMessageListenerContainers 的分区聚合(显式或非显式)。
assignedPartitionsByClientId (只读) 当前分配给此容器的子KafkaMessageListenerContainers 的分区(显式或非显式),由子容器的消费者client.id属性键控。
concurrency 1 KafkaMessageListenerContainer要管理的孩子的数量。
containerPaused 不适用 如果已请求暂停并且所有子容器的使用者实际上已暂停,则为真。
containers 不适用 对所有 childKafkaMessageListenerContainer的引用。

4.1.6.应用程序事件

以下 Spring 应用程序事件由侦听器容器及其使用者发布:

  • ConsumerStartingEvent- 在消费者线程第一次启动时发布,在它开始轮询之前。
  • ConsumerStartedEvent- 在消费者即将开始轮询时发布。
  • ConsumerFailedToStartEvent- 如果没有ConsumerStartingEventconsumerStartTimeout容器属性中发布,则发布。此事件可能表示配置的任务执行器没有足够的线程来支持它所使用的容器及其并发性。发生这种情况时,还会记录一条错误消息。
  • ListenerContainerIdleEvent:在没有收到消息时发布idleInterval(如果配置)。
  • ListenerContainerNoLongerIdleEvent: 在先前发布 . 之后使用记录时发布ListenerContainerIdleEvent
  • ListenerContainerPartitionIdleEvent:当没有从该分区收到消息时发布idlePartitionEventInterval(如果已配置)。
  • ListenerContainerPartitionNoLongerIdleEvent: 当一条记录从之前发布过的分区中消费时发布ListenerContainerPartitionIdleEvent
  • NonResponsiveConsumerEvent:当消费者似乎在poll方法中被阻止时发布。
  • ConsumerPartitionPausedEvent: 由每个消费者在分区暂停时发布。
  • ConsumerPartitionResumedEvent:恢复分区时由每个消费者发布。
  • ConsumerPausedEvent:容器暂停时由每个消费者发布。
  • ConsumerResumedEvent: 由每个消费者在容器恢复时发布。
  • ConsumerStoppingEvent:由每个消费者在停止之前发布。
  • ConsumerStoppedEvent: 消费者关闭后发布。请参阅线程安全
  • ContainerStoppedEvent:当所有消费者都停止时发布。

默认情况下,应用程序上下文的事件多播器在调用线程上调用事件侦听器。如果您将多播器更改为使用异步执行器,则当事件包含对使用者的引用时 ,您不得调用任何Consumer的方法。

ListenerContainerIdleEvent具有以下属性:

  • source:发布事件的侦听器容器实例。
  • container:侦听器容器或父侦听器容器,如果源容器是子容器。
  • id:侦听器 ID(或容器 bean 名称)。
  • idleTime:发布事件时容器空闲的时间。
  • topicPartitions: 容器在事件生成时分配的主题和分区。
  • consumer: 对 KafkaConsumer对象的引用。例如,如果之前调用了消费者的pause()方法,它可以在收到事件时调用resume()
  • paused: 容器当前是否处于暂停状态。有关详细信息,请参阅暂停和恢复侦听器容器

ListenerContainerNoLongerIdleEvent具有相同的属性,除了idleTimepaused

ListenerContainerPartitionIdleEvent具有以下属性:

  • source:发布事件的侦听器容器实例。
  • container:侦听器容器或父侦听器容器,如果源容器是子容器。
  • id:侦听器 ID(或容器 bean 名称)。
  • idleTime: 发布事件时,时间分区消耗一直处于空闲状态。
  • topicPartition:触发事件的主题和分区。
  • consumer: 对 KafkaConsumer对象的引用。例如,如果之前调用了消费者的pause()方法,它可以在收到事件时调用resume()
  • paused:当前是否为该消费者暂停该分区消费。有关详细信息,请参阅暂停和恢复侦听器容器

ListenerContainerPartitionNoLongerIdleEvent具有相同的属性,除了idleTimepaused

NonResponsiveConsumerEvent具有以下属性:

  • source:发布事件的侦听器容器实例。
  • container:侦听器容器或父侦听器容器,如果源容器是子容器。
  • id:侦听器 ID(或容器 bean 名称)。
  • timeSinceLastPoll: 容器最后一次调用poll()之前的时间。
  • topicPartitions: 容器在事件生成时分配的主题和分区。
  • consumer: 对 KafkaConsumer对象的引用。例如,如果之前调用了消费者的pause()方法,它可以在收到事件时调用resume()
  • paused: 容器当前是否处于暂停状态。有关详细信息,请参阅暂停和恢复侦听器容器

ConsumerResumedEventConsumerPausedEventConsumerStopping事件具有以下属性:

  • source:发布事件的侦听器容器实例。
  • container:侦听器容器或父侦听器容器,如果源容器是子容器。
  • partitions:TopicPartition涉及的实例。

ConsumerPartitionResumedEvent,ConsumerPartitionPausedEvent事件具有以下属性:

  • source:发布事件的侦听器容器实例。
  • container:侦听器容器或父侦听器容器,如果源容器是子容器。
  • partition:TopicPartition涉及的实例。

ConsumerStoppedEventConsumerFailedToStartEventConsumerStartingEventConsumerStartingEventContainerStoppedEvent事件具有以下属性:

  • source:发布事件的侦听器容器实例。
  • container:侦听器容器或父侦听器容器,如果源容器是子容器。

所有容器(无论是子容器还是父容器)都发布ContainerStoppedEvent. 对于父容器,源和容器属性是相同的。

此外,ConsumerStoppedEvent具有以下附加属性:

  • reason
    • NORMAL- 消费者正常停止(容器已停止)。
    • ERROR- 一个java.lang.Error被抛出。
    • FENCED- 事务生产者被围起来,stopContainerWhenFenced容器属性值是true.
    • AUTH- AuthenticationExceptionAuthorizationException被抛出并且authExceptionRetryInterval未配置。
    • NO_OFFSET- 分区没有偏移量,auto.offset.reset策略是none.

您可以在出现这种情况后使用此事件重新启动容器:

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的消费者

虽然效率很高,但异步消费者的一个问题是检测它们何时空闲。如果在一段时间内没有消息到达,您可能需要采取一些措施。

您可以将侦听器容器配置为在一段时间没有消息传递时发布 ListenerContainerIdleEvent。当容器空闲时,每idleEventInterval毫秒发布一个事件。

要配置此功能,请在容器上设置idleEventInterval。以下示例显示了如何执行此操作:

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
    return container;
}

以下示例显示了如何在@KafkaListener设置idleEventInterval

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在每种情况下,当容器空闲时,每分钟发布一次事件。

如果由于某种原因,consumer的poll()方法没有退出,则不会收到任何消息并且无法生成空闲事件(这kafka-clients是无法访问代理的早期版本的问题)。在这种情况下,如果民意调查未在pollTimeout属性内返回3x,则容器会发布 aNonResponsiveConsumerEvent。默认情况下,此检查在每个容器中每 30 秒执行一次。您可以通过在配置侦听器容器时设置ContainerPropertiesmonitorInterval(默认 30 秒)和noPollThreshold(默认 3.0)属性来修改此行为。noPollThreshold应该大于1.0以避免由于竞争条件而出现虚假事件。接收到这样的事件可以让您停止容器,从而唤醒消费者以便它可以停止。

从 2.6.2 版本开始,如果容器发布了 ListenerContainerIdleEvent,它将在随后收到记录时发布 ListenerContainerNoLongerIdleEvent

事件消费

您可以通过实现ApplicationListener来捕获这些事件 ——一个普通的侦听器或一个缩小为仅接收此特定事件的侦听器。您也可以使用Spring Framework 4.2 中引入的@EventListener

下一个示例将@KafkaListener@EventListener组合成一个类。您应该了解应用程序侦听器获取所有容器的事件,因此如果您想根据哪个容器空闲采取特定操作,您可能需要检查侦听器 ID。您也可以将@EventListener condition用于此目的。

有关事件属性的信息,请参阅应用程序事件。

事件通常在消费者线程上发布,因此与Consumer对象交互是安全的。

以下示例同时使用@KafkaListener@EventListener

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}

事件侦听器查看所有容器的事件。因此,在前面的示例中,我们根据侦听器 ID 缩小接收到的事件的范围。由于为 @KafkaListener 创建的容器支持并发,因此实际容器被命名为 id-n,其中 n 是每个实例支持并发的唯一值。这就是我们在条件下使用startsWith的原因。

如果您希望使用空闲事件来停止侦听器容器,则不应调用container.stop()调用侦听器的线程。这样做会导致延迟和不必要的日志消息。相反,您应该将事件交给另一个线程,然后该线程可以停止容器。此外,如果容器实例是子容器,则不应stop()容器实例。您应该改为停止并发容器。

空闲时的当前位置

请注意,您可以通过在侦听器中实现ConsumerSeekAware检测到空闲时获取当前位置。请参阅寻求特定偏移量中的onIdleContainer()

4.1.7. 主题/分区初始偏移量

有几种方法可以设置分区的初始偏移量。

手动分配分区时,您可以在配置的TopicPartitionOffset参数中设置初始偏移量(如果需要)(请参阅消息侦听器容器)。您也可以随时寻找特定的偏移量。

当您使用代理分配分区的组管理时:

  • 对于一个新的group.id,初始偏移量由消费者属性auto.offset.resetearliestlatest)确定。
  • 对于现有组 ID,初始偏移量是该组 ID 的当前偏移量。但是,您可以在初始化期间(或之后的任何时间)寻找特定的偏移量。

4.1.8.寻求特定的偏移量

为了寻找,你的监听器必须实现ConsumerSeekAware,它有以下方法:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions)

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

当容器启动和分配分区时调用registerSeekCallback。在初始化后的任意时间进行搜索时,您应该使用此回调。您应该保存对回调的引用。如果您在多个容器(或 ConcurrentMessageListenerContainer)中使用相同的侦听器,则应将回调存储在 ThreadLocal或其他由 listener Thread键入的结构中。

使用组管理时,在分配分区时调用onPartitionsAssigned。您可以使用此方法,例如,通过调用回调设置分区的初始偏移量。您还可以使用此方法将此线程的回调与分配的分区相关联(请参见下面的示例)。您必须使用回调参数,而不是传递给registerSeekCallback. 从版本 2.5.5 开始,即使使用手动分区分配,也会调用此方法。

当容器停止或 Kafka 撤销分配时调用onPartitionsRevoked。您应该丢弃此线程的回调并删除与已撤销分区的任何关联。

回调有以下方法:

void seek(String topic, int partition, long offset);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection=<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection=<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

seekRelative在 2.3 版中添加,用于执行相对搜索。

  • offset为负数和toCurrent false- 相对于分区末尾的查找。
  • offset为正和toCurrent false- 相对于分区开头的查找。
  • offset为负和toCurrent true- 相对于当前位置搜索(倒带)。
  • offset为正和toCurrent true- 相对于当前位置搜索(快进)。

这些seekToTimestamp方法也在 2.3 版中添加。

onIdleContaineronPartitionsAssigned方法中为多个分区寻找相同的时间戳时,首选第二种方法,因为在对消费者offsetsForTimes方法的一次调用中找到时间戳的偏移量更有效。当从其他位置调用时,容器将收集所有时间戳查找请求并调用offsetsForTimes.

您还可以在检测到空闲容器时执行onIdleContainer()查找操作。有关如何启用空闲容器检测的信息,请参阅检测空闲和非响应消费者。

接受集合的seekToBeginning方法很有用,例如,在处理压缩主题并且您希望每次启动应用程序时都从头开始:

public class MyListener implements ConsumerSeekAware {

...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}

要在运行时任意查找,请使用来自适当线程的registerSeekCallback回调引用。

这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;在控制台中点击<Enter>会导致所有分区搜索到开头。

@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

为了让事情变得更简单,2.3 版添加了AbstractConsumerSeekAware类,它跟踪要用于主题/分区的回调。以下示例显示了如何在每次容器空闲时查找每个分区中处理的最后一条记录。它还具有允许任意外部调用通过一条记录来回退分区的方法。

public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getSeekCallbacks()
            .forEach((tp, callback) ->
                callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
            .seekRelative(topic, partition, -1, true);
    }

}

2.6 版为抽象类添加了便利方法:

  • seekToBeginning()- 寻找所有分配的分区到开头
  • seekToEnd()- 寻找所有分配的分区到最后
  • seekToTimestamp(long time)- 将所有分配的分区查找到该时间戳表示的偏移量。

例子:

public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listn(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
    }

}

4.1.9.并发监听器工厂

@KafkaListenerAnnotation中所讨论的, ConcurrentKafkaListenerContainerFactory用于为带注释的方法创建容器。

从 2.2 版开始,您可以使用相同的工厂来创建任何ConcurrentMessageListenerContainer. 如果您想创建多个具有相似属性的容器,或者您希望使用一些外部配置的工厂,例如 Spring Boot 自动配置提供的工厂,这可能会很有用。创建容器后,您可以进一步修改其属性,其中许多属性是使用container.getContainerProperties(). 以下示例配置了一个ConcurrentMessageListenerContainer

@Bean
public ConcurrentMessageListenerContainer<String, String>(
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> container =
        factory.createContainer("topic1", "topic2");
    container.setMessageListener(m -> { ... } );
    return container;
}

以这种方式创建的容器不会添加到端点注册表中。它们应该作为@Bean定义创建,以便在应用程序上下文中注册。

从版本 2.3.4 开始,您可以ContainerCustomizer在工厂中添加一个以在创建和配置每个容器后进一步配置它。

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setContainerCustomizer(container -> { /* customize the container */ });
    return factory;
}

4.1.10.线程安全

使用并发消息侦听器容器时,会在所有消费者线程上调用单个侦听器实例。因此,监听器需要是线程安全的,最好使用无状态监听器。如果无法使您的侦听器线程安全或添加同步会显着降低添加并发性的好处,您可以使用以下几种技术之一:

  • 使用 concurrency=1n 个容器和原型范围的 MessageListener bean,以便每个容器都有自己的实例(使用@KafkaListener时这是不可能的)。
  • ThreadLocal<?>实例中保留状态。
  • 将单例侦听器委托给在SimpleThreadScope(或类似范围)中声明的 bean。

为了便于清理线程状态(针对前面列表中的第二项和第三项),从 2.2 版本开始,监听器容器会在每个线程退出时发布一个ConsumerStoppedEvent。您可以使用ApplicationListener@EventListener方法使用这些事件,以从范围中删除ThreadLocal<?>实例或remove()线程范围的 bean。请注意,SimpleThreadScope不会销毁具有销毁接口(例如DisposableBean)的 bean,因此您应该通过destroy()销毁自己创建实例。

默认情况下,应用程序上下文的事件多播器在调用线程上调用事件侦听器。如果您将多播器更改为使用异步执行器,则线程清理无效。

4.1.11.监控

监控监听器性能

从版本 2.3 开始,如果在类路径上检测到 Micrometer,并且应用程序上下文中存在单个 MeterRegistry,则侦听器容器将自动为侦听器创建和更新 Micrometer Timer。可以通过将 ContainerProperty micrometerEnabled 设置为 false 来禁用计时器。

维护了两个计时器 - 一个用于成功调用侦听器,一个用于失败。

计时器被命名spring.kafka.listener并具有以下标签:

  • name:(容器 bean 名称)
  • resultsuccessfailure
  • exceptionnoneListenerExecutionFailedException

您可以使用ContainerProperties micrometerTags属性添加其他标签。

对于并发容器,为每个线程创建计时器,并且名称标签带有后缀 -n,其中 n 范围是0concurrency-1

监控 KafkaTemplate 性能

从版本 2.3 开始,如果在类路径上检测到 Micrometer,并且应用程序上下文中存在单个 MeterRegistry,则侦听器容器将自动为侦听器创建和更新 Micrometer Timer。可以通过将 ContainerProperty micrometerEnabled 设置为 false 来禁用计时器。

维护了两个计时器 - 一个用于成功调用侦听器,一个用于失败。

计时器被命名spring.kafka.template并具有以下标签:

  • name: (模板 bean 名称)
  • resultsuccessfailure
  • exception:none或失败的异常类名

您可以使用模板的micrometerTags属性添加其他标签。

Micrometer Native Metrics

从 2.5 版开始,该框架提供Factory Listeners来在创建和关闭生产者和消费者时管理 Micrometer KafkaClientMetrics实例。

要启用此功能,只需将侦听器添加到您的生产者和消费者工厂:

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

传递给侦听器的消费者/生产者id被添加到带有spring.id标签名称的计量器标签中。

获取 Kafka 指标之一的示例

double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count()

StreamsBuilderFactoryBean提供了一个类似的侦听器。- 参见KafkaStreams Micrometer Support

4.1.12.事务

本节介绍 Spring for Apache Kafka 如何支持事务。

概述

0.11.0.0 客户端库增加了对事务的支持。Spring for Apache Kafka 通过以下方式添加支持:

  • KafkaTransactionManager:与正常的 Spring 事务支持(@TransactionalTransactionTemplate)一起使用。
  • 事务性的KafkaMessageListenerContainer
  • KafkaTemplate的本地事务
  • 与其他事务管理器的事务同步

通过transactionIdPrefix提供DefaultKafkaProducerFactory 在这种情况下,工厂不会管理单个 shared Producer ,而是维护事务生产者的缓存。当用户调用close()生产者时,它会返回到缓存中以供重用,而不是实际关闭。每个生产者的transactional.id属性是transactionIdPrefix+ n,其中以每个新生产者n开始为0并递增,除非事务由具有基于记录的侦听器的侦听器容器启动。在 Spring for Apache Kafka 的早期版本中,对于由具有基于记录的侦听器的侦听器容器启动的事务,transactional.id的生成方式有所不同,以支持隔离僵尸,这不再是必要的,EOSMode.V2 是从 3.0 开始的唯一选项。对于运行多个实例的应用程序,每个实例的 transactionIdPrefix 必须是唯一的。

另请参阅transactionIdPrefix

使用 Spring Boot,只需要设置spring.kafka.producer.transaction-id-prefix属性 - Boot 会自动配置一个KafkaTransactionManagerbean 并将其连接到侦听器容器中。

从版本 2.5.8 开始,您现在可以在生产者工厂上配置maxAge属性。当使用可能为代理的 transactional.id.expiration.ms 闲置的事务生产者时,这非常有用。使用当前的kafka-clients,这可能会因为没有重新平衡导致 ProducerFencedException。通过将 maxAge设置为小于transactional.id.expiration.ms,如果生产者超过了它的最大年龄,工厂将刷新它。

使用KafkaTransactionManager

KafkaTransactionManager是 Spring Framework 的PlatformTransactionManager. 它在其构造函数中提供了对生产者工厂的引用。如果您提供自定义生产者工厂,它必须支持事务。见ProducerFactory.transactionCapable()

您可以将KafkaTransactionManager与正常的 Spring 事务支持一起使用(@TransactionalTransactionTemplate和其他)。如果事务处于活动状态,则在事务范围内执行的任何KafkaTemplate操作都使用事务的Producer. 管理器根据成功或失败提交或回滚事务。您必须将其配置KafkaTemplate为使用与ProducerFactory事务管理器相同的内容。

事务同步

本节涉及仅生产者事务(不是由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅使用消费者发起的事务。

如果你想将记录发送到 kafka 并执行一些数据库更新,你可以使用普通的 Spring 事务管理,比如DataSourceTransactionManager.

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

@Transactional注释的拦截器启动事务,KafkaTemplate并将事务与该事务管理器同步;每个发送都将参与该事务。当方法退出时,数据库事务将提交,然后是 Kafka 事务。如果您希望以相反的顺序执行提交(首先是 Kafka),请使用嵌套@Transactional方法,外部方法配置为使用DataSourceTransactionManager.,内部方法配置为使用KafkaTransactionManager.

有关在 Kafka-first 或 DB-first 配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅Kafka Transactions with Other Transaction Managers示例。

从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果同步事务的提交失败(在主事务提交之后),将向调用者抛出异常。以前,这被默默地忽略(在调试时记录)。如有必要,应用程序应采取补救措施,以补偿已提交的主要事务。

使用消费者发起的事务

从2.7 版开始,ChainedKafkaTransactionManager现在已弃用;有关更多ChainedTransactionManager信息,请参阅其超类的 javadocs 。相反,在容器中使用KafkaTransactionManager来启动 Kafka 事务,并用@Transactional注释侦听器方法来启动另一个事务。

有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅使用其他事务管理器的 Kafka 事务示例。

KafkaTemplate本地事务

您可以使用KafkaTemplate来在本地事务中执行一系列操作。以下示例显示了如何执行此操作:

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回调中的参数是模板本身 ( this)。如果回调正常退出,则事务被提交。如果抛出异常,事务将回滚。

如果正在处理(或同步)KafkaTransactionManager事务,则不使用它。相反,使用了一个新的“嵌套”事务。

transactionIdPrefix

使用唯一支持的模式 EOSMode.V2(又名 BETA),即使对于消费者发起的交易,也不再需要使用相同的 transactional.id;事实上,它在每个实例上都必须是唯一的,就像生产者发起的事务一样。该属性在每个应用程序实例上必须具有不同的值。

KafkaTemplate事务性和非事务性发布

通常,当KafkaTemplate是事务性的(配置了支持事务的生产者工厂)时,就需要事务。当使用 KafkaTransactionManager 配置时,事务可以通过 TransactionTemplate@Transactional 方法、调用executeInTransaction 或侦听器容器来启动。任何在事务范围之外使用模板的尝试都会导致模板抛出 IllegalStateException。从版本2.4.3开始,您可以将模板的allowNonTransactional属性设置为true。在这种情况下,模板将通过调用 ProducerFactoryreateNonTransactionalProducer() 方法,允许操作在没有事务的情况下运行;生产者将被缓存或线程绑定,以正常重用。请参阅使用 DefaultKafkaProducerFactory

与批处理侦听器的事务

当正在使用事务时侦听器失败时,会在回滚发生后调用 AfterRollbackProcessor以采取一些行动。当使用带有记录侦听器的默认 AfterRollbackProcessor 时,会执行查找,以便重新传送失败的记录。但是,使用批处理侦听器,将重新传递整个批处理,因为框架不知道批处理中的哪条记录失败。有关详细信息,请参阅回滚后处理器

当使用批处理侦听器时,2.4.2 版本引入了一种替代机制来处理批处理时的故障; BatchToRecordAdapter。当使用 BatchToRecordAdapter 配置将 batchListener 设置为 true 的容器工厂时,一次会调用一条记录的侦听器。这使得能够在批次内进行错误处理,同时仍然可以根据异常类型停止处理整个批次。提供了默认的 BatchToRecordAdapter,可以使用标准 ConsumerRecordRecoverer(例如 DeadLetterPublishingRecoverer)进行配置。以下测试用例配置片段说明了如何使用此功能:

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}

4.1.13.Exactly Once Semantics (EOS)

您可以为侦听器容器提供 KafkaAwareTransactionManager 实例。如此配置后,容器会在调用侦听器之前启动事务。侦听器执行的任何 KafkaTemplate 操作都会参与事务。如果侦听器成功处理该记录(或多个记录,当使用 BatchMessageListener 时),则在事务管理器提交事务之前,容器会使用 Producer.sendOffsetsToTransaction()) 将偏移量发送到事务。如果侦听器抛出异常,则事务将回滚,并且使用者将重新定位,以便可以在下一次轮询时检索回滚的记录。有关详细信息以及处理重复失败的记录的信息,请参阅回滚后处理器。

使用事务启用 Exactly Once Semantics (EOS)。

这意味着,对于一个read→process-write序列,可以保证该序列恰好完成一次。(读取和处理至少有一次语义)。

Spring for Apache Kafka 3.0 及更高版本仅支持 EOSMode.V2:

  • V2- 又名 fetch-offset-request fencing (从 2.5 版开始)

使用 mode V2,不必为每个生产者指定生产者的group.id/topic/partition,因为消费者元数据与事务的偏移量一起发送,并且代理可以确定生产者是否使用该信息被隔离。

有关详细信息,请参阅KIP-447

V2以前是ALPHABETA; 它们已被更改以使框架与KIP-732保持一致。

4.1.14.将 Spring Bean 连接到生产者/消费者拦截器中

Apache Kafka 提供了一种向生产者和消费者添加拦截器的机制。这些对象由 Kafka 管理,而不是由 Spring 管理,因此普通的 Spring 依赖注入不适用于依赖 Spring Bean 中的布线。但是,您可以使用拦截器的config()方法手动连接这些依赖项。下面的 Spring Boot 应用程序展示了如何通过覆盖 boot 的默认工厂以将一些依赖 bean 添加到配置属性中来做到这一点。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
        Map<String, Object> consumerProperties = new HashMap<>();
        // consumerProperties.put(..., ...)
        // ...
        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        consumerProperties.put("some.bean", someBean);
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
        Map<String, Object> producerProperties = new HashMap<>();
        // producerProperties.put(..., ...)
        // ...
        Map<String, Object> producerProperties = properties.buildProducerProperties();
        producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
        producerProperties.put("some.bean", someBean);
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
        return factory;
    }

    @Bean
    public SomeBean someBean() {
        return new SomeBean();
    }

    @KafkaListener(id = "kgk897", topics = "kgh897")
    public void listen(String in) {
        System.out.println("Received " + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("kgh897", "test");
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kgh897")
            .partitions(1)
            .replicas(1)
            .build();
    }

}
public class SomeBean {

    public void someMethod(String what) {
        System.out.println(what + " in my foo bean");
    }

}
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private SomeBean bean;

    @Override
    public void configure(Map<String, ?> configs) {
        this.bean = (SomeBean) configs.get("some.bean");
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    private SomeBean bean;

    @Override
    public void configure(Map<String, ?> configs) {
        this.bean = (SomeBean) configs.get("some.bean");
    }

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        this.bean.someMethod("consumer interceptor");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
    }

}

结果:

producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test

4.1.15.暂停和恢复侦听器容器

版本 2.1.3 添加了侦听器容器的方法pause()resume()。以前,您可以在ConsumerAwareMessageListener中暂停消费者并通过侦听ListenerContainerIdleEvent 来恢复它,这提供了对Consumer对象的访问。虽然您可以使用事件侦听器在空闲容器中暂停消费者,但在某些情况下,这不是线程安全的,因为无法保证在消费者线程上调用事件侦听器。为了安全地暂停和恢复消费者,您应该在侦听器容器上使用pauseresume方法。pause()在下一个poll()之前生效;resume()在当前poll()返回后立即生效。当一个容器暂停时,它会继续poll()消费者,如果正在使用组管理,则避免重新平衡,但它不会检索任何记录。有关更多信息,请参阅 Kafka 文档。

从 2.1.5 版本开始,可以调用isPauseRequested()查看是否pause()已调用。但是,消费者可能还没有真正暂停。 如果所有Consumer实例实际上都已暂停,isConsumerPaused()则返回 true 。

此外(也是从 2.1.5 开始),ConsumerPausedEvent并且ConsumerResumedEvent实例以容器作为source属性发布,并且TopicPartition实例包含在partitions属性中。

从 2.9 版本开始,新的容器属性pauseImmediate设置为 true 时,会在处理当前记录后使暂停生效。默认情况下,当上一次轮询中的所有记录都已处理完毕时,暂停生效。请参阅[pauseImmediate]

以下简单的 Spring Boot 应用程序演示了使用容器注册表获取对@KafkaListener方法容器的引用并暂停或恢复其使用者以及接收相应的事件:

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下清单显示了前面示例的结果:

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2

4.1.16.暂停和恢复侦听器容器上的分区

从 2.7 版开始,您可以使用侦听器容器中的pausePartition(TopicPartition topicPartition)resumePartition(TopicPartition topicPartition)方法暂停和恢复分配给该使用者的特定分区的使用。暂停和恢复分别发生在与pause()resume()方法类似的poll()前后。如果已请求该分区的暂停,则isPartitionPauseRequested()方法返回 true。如果该分区已有效暂停,则isPartitionPaused()方法返回 true。

此外,由于 2.7 版ConsumerPartitionPausedEventConsumerPartitionResumedEvent实例是使用容器作为source属性和TopicPartition实例发布的。

0

评论区