欢迎访问shiker.tech

请允许在我们的网站上展示广告

您似乎使用了广告拦截器,请关闭广告拦截器。我们的网站依靠广告获取资金。

【译文】spring kafka简介及使用参考(四)
(last modified Jul 22, 2023, 1:52 AM )
by
侧边栏壁纸
  • 累计撰写 176 篇文章
  • 累计创建 61 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

【译文】spring kafka简介及使用参考(四)

橙序员
2022-08-21 / 0 评论 / 0 点赞 / 1,280 阅读 / 13,303 字 / 正在检测百度是否收录... 正在检测必应是否收录...
文章摘要(AI生成)

4.2. 非阻塞重试这是一项实验性功能,在删除实验性名称之前,不破坏 API 更改的常规规则不适用于此功能。鼓励用户尝试该功能并通过 GitHub 问题或 GitHub 讨论提供反馈。这仅与 API 有关;该功能被认为是完整且健壮的。2.9 版更改了引导基础设施 bean 的机制;请参阅配置以了解现

4.2. 非阻塞重试

这是一项实验性功能,在删除实验性名称之前,不破坏 API 更改的常规规则不适用于此功能。鼓励用户尝试该功能并通过 GitHub 问题或 GitHub 讨论提供反馈。这仅与 API 有关;该功能被认为是完整且健壮的。

2.9 版更改了引导基础设施 bean 的机制;请参阅配置以了解现在引导该功能所需的两种机制。

在这些更改之后,我们打算删除实验性名称,可能在 3.0 版中。

使用 Kafka 实现非阻塞重试/dlt 功能通常需要设置额外的topic并创建和配置相应的侦听器。从 2.7 Spring for Apache Kafka 开始,通过注解@RetryableTopicRetryTopicConfiguration类提供支持,以简化引导。

4.2.1. 此模式是如何工作的

如果消息处理失败,则将消息转发到带有退避时间戳的重试topic。重试topic的消费者然后检查时间戳,如果它没有到期,它会暂停该topic分区的消费。到期时恢复分区消费,再次消费消息。如果消息处理再次失败,消息将被转发到下一个重试topic,并重复该模式,直到成功处理,或者尝试用尽,然后将消息发送到Dead Letter Topic(如果已配置)。

为了说明,如果您有一个“主topic”的topic,并且想要设置非阻塞重试,指数退避为 1000 毫秒,最大尝试次数为 2 次和 4 次,它将创建主topic重试 1000, main-topic-retry-2000、main-topic-retry-4000 和 main-topic-dlt topic并配置各自的消费者。该框架还负责创建topic以及设置和配置侦听器。

通过使用此策略,您将失去 Kafka 对该topic的排序保证。

您可以设置AckMode为您喜欢的模式,但建议使用RECORD

目前此功能不支持类级别@KafkaListener注释

4.2.2. 回退延迟精度

概述和保证

所有消息处理和回退都由消费者线程处理,因此,延迟精度在最大努力的基础上得到保证。如果一条消息的处理时间比该消费者的下一条消息的回退时间长,则下一条消息的延迟将高于预期。此外,对于短暂的延迟(大约 1 秒或更短),线程必须做的维护工作,例如提交偏移量,可能会延迟消息处理的执行。如果重试topic的消费者处理多个分区,精度也会受到影响,因为我们依靠从轮询中唤醒消费者并拥有完整的 pollTimeouts 来进行时间调整。

话虽如此,对于处理单个分区的消费者来说,在大多数情况下,消息的处理应该大约在其确切的到期时间发生。

保证在到期时间之前永远不会处理消息。

4.2.3. 配置

从 2.9 版开始,对于默认配置,@EnableKafkaRetryTopic注解应在@Configuration注解类中使用。这使该功能能够正确引导,并允许注入一些功能组件以在运行时查找。

如果您添加此注释,则不必同时添加@EnableKafka,因为@EnableKafkaRetryTopic使用@EnableKafka 进行元注释。

此外,从该版本开始,对于功能组件和全局功能的更高级配置,RetryTopicConfigurationSupport应在类中扩展@Configuration该类,并覆盖适当的方法。有关更多详细信息,请参阅配置全局设置和功能

上述技术只能使用其中一种,并且只有一个@Configuration类可以扩展RetryTopicConfigurationSupport

使用@RetryableTopic注释

要为带@KafkaListener注释的方法配置重试topic和 dlt,您只需向其中添加@RetryableTopic注释,Spring for Apache Kafka 将使用默认配置引导所有必要的topic和使用者。

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
        // ... message processing
}

您可以在同一类中指定一个方法来处理 dlt 消息,方法是使用注解对其进行@DltHandler注解。如果没有提供 DltHandler 方法,则会创建一个仅记录消费的默认消费者。

@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}

如果您不指定 kafkaTemplate 名称,则会查找具有defaultRetryTopicKafkaTemplate名称的 bean。如果没有找到 bean,则抛出异常。

使用RetryTopicConfigurationbean

您还可以通过在带@Configuration注释的类中创建RetryTopicConfigurationbean 来配置非阻塞重试支持。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

这将使用默认配置为使用“@KafkaListener”注释的方法中的所有topic创建重试topic和 dlt,以及相应的消费者。KafkaTemplate消息转发需要实例。

为了对如何处理每个topic的非阻塞重试实现更细粒度的控制,RetryTopicConfiguration可以提供多个 bean。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3000)
            .maxAttempts(5)
            .includeTopics("my-topic", "my-other-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics("my-topic", "my-other-topic")
            .retryOn(MyException.class)
            .create(template);
}

重试topic和 dlt 的消费者将被分配到一个消费者组,其组 id 是您在@KafkaListener注释groupId参数中提供的一个与topic后缀的组合。如果您不提供任何内容,它们都将属于同一个组,并且在重试topic上重新平衡会导致对主要topic进行不必要的重新平衡。

如果消费者配置了 ErrorHandlingDeserializer,为了处理反序列化异常,那么为 KafkaTemplate 及其生产者配置一个序列化器非常重要,该序列化器可以处理普通对象以及由反序列化异常导致的原始 byte[] 值。模板的通用值类型应该是Object。一种技术是使用 DelegatingByTypeSerializer;示例如下:

@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

多个@KafkaListener注释可以用于同一个topic,有或没有手动分区分配以及非阻塞重试,但只有一个配置将用于给定topic。最好使用单个RetryTopicConfigurationbean 来配置此类topic;如果多个@RetryableTopic注解用于同一topic,则它们都应具有相同的值,否则其中一个将应用于该topic的所有侦听器,而其他注解的值将被忽略。

配置全局设置和功能

从 2.9 开始,以前用于配置组件的 bean 覆盖方法已被删除(由于上述 API 的实验性质,没有弃用)。这不会改变RetryTopicConfigurationbean 方法 - 只是基础设施组件的配置。现在RetryTopicConfigurationSupport该类应该在(单个)@Configuration类中扩展,并重写正确的方法。一个例子如下:

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}

使用此配置方法时,@EnableKafkaRetryTopic不应使用注释来防止由于重复的 bean 导致上下文无法启动。请改用简单的@EnableKafka注释。

4.2.4. 特征

大多数功能都可用于@RetryableTopic注释和RetryTopicConfigurationbean。

退避配置

BackOff 配置依赖于Spring Retry项目中的BackOffPolicy接口。

这包括:

  • 固定后退
  • 指数回退
  • 随机指数回退
  • 均匀随机退避
  • 无后退
  • 自定义后退
@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3000)
            .maxAttempts(4)
            .build();
}

您还可以提供 Spring RetrySleepingBackOffPolicy接口的自定义实现:

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackOff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .build();
}

默认退避策略FixedBackOffPolicy最多尝试 3 次,间隔为 1000 毫秒。

有 30 秒的默认最大延迟ExponentialBackOffPolicy。如果您的退避策略需要大于该值的延迟,请相应地调整 maxDelay 属性。

第一次尝试计入maxAttempts,因此如果您提供maxAttempts4 的值,则原始尝试加上 3 次重试。

单个topic固定延迟重试

如果您使用固定延迟策略,例如FixedBackOffPolicy或者NoBackOffPolicy.您可以使用单个topic来完成非阻塞重试。该topic将以提供的或默认的后缀作为后缀,并且不会附加索引或延迟值。

@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3000)
            .maxAttempts(5)
            .useSingleTopicForFixedDelays()
            .build();
}

默认行为是为每次尝试创建单独的重试topic,并附加它们的索引值:retry-0, retry-1, …

全局超时

您可以为重试过程设置全局超时。如果达到该时间,则下次消费者抛出异常时,消息将直接发送到 DLT,或者如果没有可用的 DLT,则结束处理。

@RetryableTopic(backoff = @Backoff(2000), timeout = 5000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(2000)
            .timeoutAfter(5000)
            .build();
}

默认是没有设置超时,也可以通过提供 -1 作为超时值来实现。

异常分类器

您可以指定要重试和不重试的异常。也可以设置为遍历原因查找嵌套异常。

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        throw new RuntimeException(new MyRetryException()); // Will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}

默认行为是重试所有异常,而不是遍历原因。

从 2.8.3 开始,有一个致命异常的全局列表,这将导致记录被发送到 DLT 而无需任何重试。有关致命异常的默认列表,请参阅 DefaultErrorHandler。您可以通过重写扩展 RetryTopicConfigurationSupport @Configuration 类中的configureNonBlockingRetries方法,向此列表添加或删除异常。有关详细信息,请参阅配置全局设置和功能。

@Override
protected void manageNonBlockingRetriesFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}

要禁用致命异常的分类,只需清除提供的列表。

包括和排除topic

您可以通过 .includeTopic(String topic)、.includeTopics(Collection topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection topics) 方法决定RetryTopicConfiguration bean 将处理和不处理哪些topic。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}

默认行为是包含所有topic。

topic自动创建

除非另有指定,否则框架将使用 KafkaAdmin bean 使用的 NewTopic bean 自动创建所需的主题。您可以指定创建主题时使用的分区数量和复制因子,并且可以关闭此功能。从版本 3.0 开始,默认复制因子为 -1,这意味着使用代理默认值。如果您的代理版本早于 2.4,您将需要设置一个显式值。

请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。

@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}

默认情况下,topic是使用一个分区和一个复制因子自动创建的。

失败标头管理

在考虑如何管理失败标头(原始标头和异常标头)时,框架委托给DeadLetterPublishingRecover决定是追加还是替换标头。

默认情况下,它显式将appendOriginalHeaders 设置为false,并将stripPreviousExceptionHeaders 保留为DeadLetterPublishingRecover 使用的默认值。

这意味着只有第一个“原始”和最后一个异常标头保留在默认配置中。这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪标头)。

有关详细信息,请参阅管理死信记录标头

要重新配置框架以对这些属性使用不同的设置,请通过重写扩展RetryTopicConfigurationSupport的 @Configuration 类中的 configureCustomizers 方法来配置 DeadLetterPublishingRecoverer 定制器。有关更多详细信息,请参阅配置全局设置和功能。

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

从 2.8.4 版本开始,如果你想添加自定义 headers(除了工厂添加的重试信息 headers,你可以添加一个headersFunction到工厂 -factory.setHeadersFunction((rec, ex) → { … })

4.2.5. 结合阻塞和非阻塞重试

从 2.8.4 开始,您可以将框架配置为结合使用阻塞和非阻塞重试。例如,您可以有一组异常,这些异常也可能会在下一条记录上触发错误,例如DatabaseAccessException,因此您可以在将同一记录发送到重试topic或直接发送到 DLT 之前重试几次。

要配置阻止重试,请重写扩展 RetryTopicConfigurationSupport 的 @Configuration 类中的 configureBlockingRetries 方法,并添加要重试的异常以及要使用的 BackOff。默认 BackOff 是固定的 BackOff,无延迟且尝试 9 次。有关详细信息,请参阅配置全局设置和功能。

@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
    blockingRetries
            .retryOn(MyBlockingRetryException.class, MyOtherBlockingRetryException.class)
            .backOff(new FixedBackOff(3000, 5));
}

结合全局可重试topic的致命异常分类,您可以为您想要的任何行为配置框架,例如让某些异常同时触发阻塞和非阻塞重试,仅触发一种或另一种,或者直接访问DLT 无需任何形式的重试。

这是两个配置一起工作的示例:

@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
    blockingRetries
            .retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
            .backOff(new FixedBackOff(50, 3));
}

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(ShouldSkipBothRetriesException.class);
}

在这个例子中:

  • ShouldRetryOnlyBlockingException.class只会通过阻塞重试,如果所有重试都失败,将直接进入 DLT。
  • ShouldRetryViaBothException.class将通过阻塞重试,如果所有阻塞重试失败将被转发到下一个重试topic以进行另一组尝试。
  • ShouldSkipBothRetriesException.class永远不会以任何方式重试,如果第一次处理尝试失败,将直接进入 DLT。

请注意,阻止重试行为是允许列表 - 您添加您确实希望以这种方式重试的异常;虽然非阻塞重试分类针对 FATAL 异常,因此是拒绝列表 - 您添加了您不想进行非阻塞重试的异常,而是直接发送到 DLT。

非阻塞异常分类行为还取决于特定topic的配置。

4.2.6. topic命名

重试topic和 DLT 通过使用提供的或默认值作为主topic的后缀来命名,并附加该topic的延迟或索引。

例子:

“my-topic” → “my-topic-retry-0”, “my-topic-retry-1”, …, “my-topic-dlt”

“my-other-topic” → “my-topic-myRetrySuffix-1000”, “my-topic-myRetrySuffix-2000”, …, “my-topic-myDltSuffix”.

重试topic和 Dlt 后缀

您可以指定 retry 和 dlt topic将使用的后缀。

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .retryTopicSuffix("-my-retry-suffix")
            .dltTopicSuffix("-my-dlt-suffix")
            .create(template);
}

默认后缀是“-retry”和“-dlt”,分别用于重试topic和dlt。

附加topic的索引或延迟

您可以在后缀之后附加topic的索引或延迟值。

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .suffixTopicsWithIndexValues()
            .create(template);
    }

默认行为是以延迟值作为后缀,但具有多个topic的固定延迟配置除外,在这种情况下,topic以topic索引为后缀。

自定义命名策略

更复杂的命名策略可以通过注册一个实现RetryTopicNamesProviderFactory. 默认实现是SuffixingRetryTopicNamesProviderFactory,可以通过以下方式注册不同的实现:

@Override
protected RetryTopicComponentFactory createComponentFactory() {
    return new RetryTopicComponentFactory() {
        @Override
        public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
            return new CustomRetryTopicNamesProviderFactory();
        }
    };
}

例如,以下实现除了标准后缀外,还为 retry/dl topic名称添加前缀:

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

    @Override
    public RetryTopicNamesProvider createRetryTopicNamesProvider(
                DestinationTopic.Properties properties) {

        if(properties.isMainEndpoint()) {
            return new SuffixingRetryTopicNamesProvider(properties);
        }
        else {
            return new SuffixingRetryTopicNamesProvider(properties) {

                @Override
                public String getTopicName(String topic) {
                    return "my-prefix-" + super.getTopicName(topic);
                }

            };
        }
    }

}

4.2.7. DLT 策略

该框架提供了一些使用 DLT 的策略。您可以提供一种 DLT 处理方法,使用默认的日志记录方法,或者根本没有 DLT。您还可以选择如果 DLT 处理失败会发生什么。

Dlt处理方法

您可以指定用于处理该topic的 Dlt 的方法,以及该处理失败时的行为。

为此,您可以在带有@DltHandler注释的类的方法中使用@RetryableTopic注释。请注意,该类中的所有带@RetryableTopic注释的方法都将使用相同的方法。

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}

@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}

DLT 处理程序方法也可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供,将应处理 DLT 消息的 bean 名称和方法名称作为参数传递。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltProcessor("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
       // ... message processing, persistence, etc
    }
}

如果未提供 DLT 处理程序,则使用默认的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod。

从版本 2.8 开始,如果您根本不想在此应用程序中从 DLT 消费,包括通过默认处理程序(或者您希望推迟消费),您可以控制 DLT 容器是否启动,独立于集装箱厂的autoStartup财产。

使用@RetryableTopic注解时,将autoStartDltHandler属性设置为false; 使用配置生成器时,请使用.autoStartDltHandler(false).

您可以稍后通过KafkaListenerEndpointRegistry.

DLT 失败行为

如果 DLT 处理失败,有两种可能的行为可用:ALWAYS_RETRY_ON_ERRORFAIL_ON_ERROR.

在前者中,记录被转发回 DLT topic,因此它不会阻止其他 DLT 记录的处理。在后者中,消费者结束执行而不转发消息。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}

默认行为是ALWAYS_RETRY_ON_ERROR

从版本 2.8.3 开始,如果记录导致抛出致命异常(例如DeserializationException ,因为通常总是会抛出此类异常) ,ALWAYS_RETRY_ON_ERROR则不会将记录路由回 DLT 。

被认为是致命的例外是:

  • DeserializationException
  • MessageConversionException
  • ConversionException
  • MethodArgumentResolutionException
  • NoSuchMethodException
  • ClassCastException

您可以使用DestinationTopicResolver bean上的方法向该列表中添加和删除异常。

有关详细信息,请参阅异常分类器。

配置无 DLT

该框架还提供了不为topic配置 DLT 的可能性。在这种情况下,在重试用尽之后,处理简单地结束。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}

4.2.8. 指定 ListenerContainerFactory

默认情况下,RetryTopic 配置将使用@KafkaListener注解中提供的工厂,但您可以指定不同的工厂来创建重试topic和 dlt 侦听器容器。

对于@RetryableTopic注解,您可以提供工厂的 bean 名称,使用RetryTopicConfigurationbean 您可以提供 bean 名称或实例本身。

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
        // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template,
        ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {

    return RetryTopicConfigurationBuilder
            .newInstance()
            .listenerFactory(factory)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .listenerFactory("my-retry-topic-factory")
            .create(template);
}

从 2.8.3 开始,您可以对可重试和不可重试topic使用相同的工厂。

如果您需要将工厂配置行为恢复到 2.8.3 之前的版本,您可以重写 @Configuration 类的 configureRetryTopicConfigurer 方法,该方法扩展了 RetryTopicConfigurationSupport,如配置全局设置和功能中所述,并将 useLegacyFactoryConfigurer 设置为 true,例如:

@Override
protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
    return rtc -> rtc.useLegacyFactoryConfigurer(true);
}

4.2.9. 在运行时访问topic信息

DestinationTopicContainer从 2.9 开始,您可以通过注入提供的bean在运行时访问有关topic链的信息。该接口提供了在链中查找下一个topic的方法,或者如果配置了某个topic的 DLT,还提供了有用的属性,例如topic的名称、延迟和类型。

作为一个真实的用例示例,您可以使用此类信息,以便控制台应用程序可以在处理失败的原因(例如错误/不一致状态)后,将记录从 DLT 重新发送到链中的第一个重试topic解决。

DestinationTopic方法提供的 DestinationTopicContainer#getNextDestinationTopicFor()对应于输入topic链中注册的下一个topic。由于异常分类、尝试次数或单topic固定延迟策略等不同因素,消息将被转发到的实际topic可能会有所不同。如果您需要权衡这些因素, 请使用DestinationTopicResolver接口。

4.2.10. 更改 KafkaBackOffException 日志记录级别

当重试主题中的消息未到期时,会抛出 KafkaBackOffException。默认情况下,此类异常会在 DEBUG 级别记录,但您可以通过在@Configuration 类的 ListenerContainerFactoryConfigurer 中设置错误处理程序自定义程序来更改此行为。

例如,要将日志记录级别更改为 WARN,您可以添加:

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeErrorHandler(defaultErrorHandler ->
            defaultErrorHandler.setLogLevel(KafkaException.Level.WARN))
}

4.3. Apache Kafka 流支持

从 1.1.4 版开始,Spring for Apache Kafka 为Kafka Streams提供一流的支持。要从 Spring 应用程序中使用它,kafka-streamsjar 必须存在于类路径中。它是 Spring for Apache Kafka 项目的可选依赖项,不会传递下载。

4.3.1. 基本

参考 Apache Kafka Streams 文档建议使用以下 API 方式:

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我们有两个主要组件:

  • StreamsBuilder:使用 API 来构建KStream(或KTable)实例。
  • KafkaStreams:管理这些实例的生命周期。

由单个StreamsBuilder暴露给 KafkaStreams 实例的所有KStream实例都会同时启动和停止,即使它们具有不同的逻辑。换句话说,StreamsBuilder 定义的所有流都与单个生命周期控件绑定。一旦KafkaStreams实例被streams.close()`关闭,它就无法重新启动。相反,必须创建一个新的 KafkaStreams 实例来重新启动流处理。

4.3.2. spring的管理

为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用并通过容器使用生命周期管理,Spring for Apache Kafka 引入了StreamsBuilderFactoryBean. 这是一个AbstractFactoryBeanStreamsBuilder单例实例公开为 bean 的实现。以下示例创建了这样一个 bean:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}

从 2.2 版开始,流配置现在以KafkaStreamsConfiguration对象而不是StreamsConfig.

StreamsBuilderFactoryBean 还实现了 SmartLifecycle 来管理内部 KafkaStreams 实例的生命周期。与 Kafka Streams API 类似,您必须在启动 KafkaStreams 之前定义 KStream 实例。这也适用于 Kafka Streams 的 Spring API。因此,当您在 StreamsBuilderFactoryBean 上使用默认 autoStartup = true 时,您必须在刷新应用程序上下文之前在 StreamsBuilder 上声明 KStream 实例。例如,KStream 可以是常规 bean 定义,而使用 Kafka Streams API 不会产生任何影响。以下示例展示了如何执行此操作:

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果您想手动控制生命周期(例如,根据某些条件停止和启动),可以使用工厂 bean (&) 前缀直接引用 StreamsBuilderFactoryBean bean。由于 StreamsBuilderFactoryBean 使用其内部 KafkaStreams 实例,因此可以安全地停止并再次重新启动它。每次 start() 时都会创建一个新的 KafkaStreams。如果您想单独控制 KStream 实例的生命周期,您也可以考虑使用不同的 StreamsBuilderFactoryBean 实例。

您还可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListener、Thread.UncaughtExceptionHandler 和 StateRestoreListener 选项,这些选项将委托给内部 KafkaStreams 实例。此外,除了在 StreamsBuilderFactoryBean 上间接设置这些选项之外,从版本 2.1.5 开始,您还可以使用 KafkaStreamsCustomizer 回调接口来配置内部 KafkaStreams 实例。请注意,KafkaStreamsCustomizer 会覆盖 StreamsBuilderFactoryBean 提供的选项。如果您需要直接执行一些 KafkaStreams 操作,您可以使用 StreamsBuilderFactoryBean.getKafkaStreams() 访问内部 KafkaStreams 实例。您可以按类型自动装配 StreamsBuilderFactoryBean bean,但应确保在 bean 定义中使用完整类型,如以下示例所示:

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,如果您使用接口 bean 定义,则可以按名称添加@Qualifier注入。以下示例显示了如何执行此操作:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

从版本 2.4.1 开始,工厂 bean 有一个新的属性基础设施定制器,类型为 KafkaStreamsInfrastructureCustomizer;这允许在创建流之前自定义 StreamsBuilder(例如添加状态存储)和/或拓扑。

public interface KafkaStreamsInfrastructureCustomizer {

  void configureBuilder(StreamsBuilder builder);

  void configureTopology(Topology topology);

}

提供了默认的无操作实现以避免在不需要时必须实现这两种方法。

提供了一个CompositeKafkaStreamsInfrastructureCustomizer,用于当您需要应用多个定制器时。

4.3.3. KafkaStreams 千分尺支持

在 2.5.3 版本中引入,您可以配置 KafkaStreamsMicrometerListener 来自动为工厂 bean 管理的 KafkaStreams 对象注册 micrometer 计量表:

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

4.3.4. 流中的 JSON 序列化和反序列化

为了在以 JSON 格式读取或写入主题或状态存储时序列化和反序列化数据,Spring for Apache Kafka 提供了使用 JSON 的 JsonSerde 实现,委托给序列化、反序列化和消息转换中描述的 JsonSerializer 和 JsonDeserializer。 JsonSerde 实现通过其构造函数(目标类型或 ObjectMapper)提供相同的配置选项。在以下示例中,我们使用 JsonSerde 序列化和反序列化 Kafka 流的 Cat 有效负载(只要需要实例,就可以以类似的方式使用 JsonSerde):

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

在以编程方式构建序列化器/反序列化器以供生产者/消费者工厂使用时,从 2.3 版本开始,您可以使用 fluent API,它简化了配置。

stream.through(new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

4.3.5. 使用KafkaStreamBrancher

KafkaStreamBrancher 类引入了一种在 KStream 之上构建条件分支的更方便的方法。

考虑以下不使用的示例KafkaStreamBrancher

KStream<String, String>[] branches = builder.stream("source").branch(
      (key, value) -> value.contains("A"),
      (key, value) -> value.contains("B"),
      (key, value) -> true
     );
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用KafkaStreamBrancher

new KafkaStreamBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
   //default branch should not necessarily be defined in the end of the chain!
   .defaultBranch(ks -> ks.to("C"))
   .onTopOf(builder.stream("source"));
   //onTopOf method returns the provided stream so we can continue with method chaining

4.3.6. 配置

要配置 Kafka Streams 环境,StreamsBuilderFactoryBean需要一个KafkaStreamsConfiguration实例。有关所有可能的选项,请参阅 Apache Kafka文档

从 2.2 版开始,流配置现在作为KafkaStreamsConfiguration对象提供,而不是作为StreamsConfig.

为了避免大多数情况下的样板代码,尤其是在开发微服务时,Spring for Apache Kafka 提供了 @EnableKafkaStreams 注释,您应该将其放置在 @Configuration 类上。您所需要做的就是声明一个名为 defaultKafkaStreamsConfig 的 KafkaStreamsConfiguration bean。名为 defaultKafkaStreamsBuilder 的 StreamsBuilderFactoryBean bean 会在应用程序上下文中自动声明。您还可以声明和使用任何其他 StreamsBuilderFactoryBean bean。您可以通过提供实现 StreamsBuilderFactoryBeanConfigurer 的 bean 来执行该 bean 的其他自定义。如果有多个这样的 bean,将根据其 Ordered.order 属性应用它们。

默认情况下,当工厂 bean 停止时,会调用 KafkaStreams.cleanUp() 方法。从版本 2.1.2 开始,工厂 bean 具有额外的构造函数,采用 CleanupConfig 对象,该对象具有可让您控制是否在 start() 或 stop() 期间调用 cleanUp() 方法或两者都不调用的属性。从版本 2.7 开始,默认设置是从不清理本地状态。

4.3.7. HeaderEnricher

2.3 版添加HeaderEnricherTransformer. 这可用于在流处理中添加标头;标头值是 SpEL 表达式;表达式求值的根对象有 3 个属性:

  • context- ProcessorContext,允许访问当前记录元数据
  • key- 当前记录的键
  • value- 当前记录的值

表达式必须返回 byte[]String(将转换为UTF-8类型的byte[] )。

要在流中使用扩充器:

.transform(() -> enricher)

变压器不改变keyvalue;它只是添加标题。

如果您的流是多线程的,则每条记录都需要一个新实例。

.transform(() -> new HeaderEnricher<..., ...>(expressionMap))

这是一个简单的示例,添加一个文字标题和一个变量:

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("context.timestamp() + ' @' + context.offset()"));
HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .transform(() -> enricher)
        .to(OUTPUT);

4.3.8. MessagingTransformer

版本 2.3 添加了MessagingTransformer这允许 Kafka Streams 拓扑与 Spring 消息传递组件交互,例如 Spring 集成流。变压器需要实现MessagingFunction.

@FunctionalInterface
public interface MessagingFunction {

  Message<?> exchange(Message<?> message);

}

Spring Integration 使用它的GatewayProxyFactoryBean. 它还需要MessagingMessageConverter将键、值和元数据(包括标头)与 Spring Messaging 相互转换Message<?>。有关更多信息,请参阅从 a 调用 Spring 集成流KStream

4.3.9.从反序列化异常中恢复

版本 2.3 引入了RecoveringDeserializationExceptionHandler当发生反序列化异常时可以采取一些措施。请参阅 Kafka 文档DeserializationExceptionHandler,其中RecoveringDeserializationExceptionHandler是一个实现。配置RecoveringDeserializationExceptionHandler了一个ConsumerRecordRecoverer实现。该框架提供了DeadLetterPublishingRecoverer将失败记录发送到死信topic的方法。有关此恢复器的更多信息,请参阅发布死信记录

要配置恢复器,请将以下属性添加到您的流配置中:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,recoverer()bean 可以是你自己的实现ConsumerRecordRecoverer

4.3.10. Kafka 流示例

以下示例结合了我们在本章中介绍的所有topic:

@Configuration
@EnableKafka
@EnableKafkaStreams
public static class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
                .reduce((String value1, String value2) -> value1 + value2,
                    Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}

4.4. 测试应用程序

spring-kafka-testjar 包含一些有用的实用程序来帮助测试您的应用程序。

4.4.1。KafkaTestUtils

o.s.kafka.test.utils.KafkaTestUtils提供了许多静态辅助方法来使用记录、检索各种记录偏移量等。有关完整的详细信息,请参阅其Javadocs

4.4.2. JUnit

o.s.kafka.test.utils.KafkaTestUtils还提供了一些静态方法来设置生产者和消费者属性。以下清单显示了这些方法签名:

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

从 2.5 版开始,该consumerProps方法ConsumerConfig.AUTO_OFFSET_RESET_CONFIGearliest. 这是因为,在大多数情况下,您希望消费者使用在测试用例中发送的任何消息。ConsumerConfig默认值latest意味着在消费者开始之前已经由测试发送的消息将不会收到这些记录。要恢复到以前的行为,请latest在调用方法后将属性设置为。使用嵌入式代理时,通常最好的做法是为每个测试使用不同的topic,以防止串扰。如果由于某种原因无法做到这一点,请注意该consumeFromEmbeddedTopics方法的默认行为是在分配后将分配的分区搜索到开头。由于它无法访问消费者属性,因此您必须使用带有seekToEnd布尔参数的重载方法来寻找结尾而不是开头。

提供了一个 JUnit 4@Rule包装器EmbeddedKafkaBroker来创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。(有关使用JUnit 5的信息,请参阅@EmbeddedKafka 注释)。@EmbeddedKafka以下清单显示了这些方法的签名:

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }

该类EmbeddedKafkaBroker有一个实用方法,可让您使用它创建的所有topic。以下示例显示了如何使用它:

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
        consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils一些实用方法可以从消费者那里获取结果。以下清单显示了这些方法签名:

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

下面的例子展示了如何使用KafkaTestUtils

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

当嵌入式 Kafka 和嵌入式 Zookeeper 服务器由 启动时EmbeddedKafkaBroker,名为的系统属性spring.embedded.kafka.brokers设置为 Kafka 代理的地址,并且名为的系统属性spring.embedded.zookeeper.connect设置为 Zookeeper 的地址。为此属性提供了方便的常量 (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS和)。EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT

使用EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以为 Kafka 服务器提供额外的属性。有关可能的代理属性的更多信息,请参阅Kafka 配置

4.4.3. 配置topic

以下示例配置创建名为catand的topic,hat其中包含 5 个分区,一个名为thing110 个分区的topic,以及一个名为thing215 个分区的topic:

public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
      }

}

默认情况下,addTopics当出现问题时会抛出异常(例如添加一个已经存在的topic)。2.6 版添加了该方法的新版本,该方法返回Map<String, Exception>; 键是topic名称,值是null成功或Exception失败。

4.4.4. 对多个测试类使用相同的代理

这样做没有内置支持,但您可以将相同的代理用于多个测试类,类似于以下内容:

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

这假设一个 Spring Boot 环境,并且嵌入式代理替换了引导服务器属性。

然后,在每个测试类中,您可以使用类似于以下内容的内容:

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果您不使用 Spring Boot,则可以使用broker.getBrokersAsString().

前面的示例没有提供在所有测试完成时关闭代理的机制。如果您在 Gradle 守护程序中运行测试,这可能是一个问题。在这种情况下,您不应该使用此技术,或者您应该在测试完成时使用某些东西来destroy()调用EmbeddedKafkaBroker

4.4.5. @EmbeddedKafka 注解

我们通常建议您使用该规则@ClassRule来避免在测试之间启动和停止代理(并为每个测试使用不同的topic)。从 2.0 版本开始,如果使用 Spring 的测试应用程序上下文缓存,还可以声明一个EmbeddedKafkaBrokerbean,因此可以跨多个测试类使用单个代理。为方便起见,我们提供了一个测试类级别的注解,调用@EmbeddedKafka它来注册EmbeddedKafkaBrokerbean。以下示例显示了如何使用它:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

从版本 2.2.4 开始,您还可以使用@EmbeddedKafka注解来指定 Kafka 端口属性。

以下示例设置支持属性占位符解析的topicsbrokerPropertiesbrokerPropertiesLocation属性:@EmbeddedKafka

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://localhost:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

在前面的示例中,属性占位符${kafka.topics.another-topic}、、${kafka.broker.logs-dir}${kafka.broker.port}是从 Spring 解析的Environment。此外,代理属性是broker.propertiesbrokerPropertiesLocation. brokerPropertiesLocation为URL 和资源中找到的任何属性占位符解析属性占位符。brokerProperties由在 中找到的覆盖属性定义的属性brokerPropertiesLocation

您可以将@EmbeddedKafka注释与 JUnit 4 或 JUnit 5 一起使用。

4.4.6. 使用 JUnit5 进行 @EmbeddedKafka 注释

从 2.3 版开始,有两种方法可以将@EmbeddedKafka注解与 JUnit5 一起使用。与@SpringJunitConfig注释一起使用时,嵌入式代理被添加到测试应用程序上下文中。您可以在类或方法级别将代理自动连接到您的测试中,以获取代理地址列表。

使用 spring 测试上下文时,EmbdeddedKafkaCondition创建一个代理;该条件包括一个参数解析器,因此您可以在您的测试方法中访问代理…

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

@EmbeddedBroker如果用 注释的类也没有用 注释(或元注释),则将创建一个独立的(不是 Spring 测试上下文)代理ExtendedWith(SpringExtension.class)@SpringJunitConfig并且@SpringBootTest被元注释,并且当这些注释中的任何一个也存在时,将使用基于上下文的代理。

当有可用的 Spring 测试应用程序上下文时,topic和代理属性可以包含属性占位符,只要在某处定义了属性,就会解析这些占位符。如果没有可用的 Spring 上下文,则不会解析这些占位符。

4.4.7. @SpringBootTest注释中的嵌入式代理

Spring Initializr现在自动将spring-kafka-test测试范围内的依赖添加到项目配置中。

如果您的应用程序使用 Kafka binderspring-cloud-stream并且如果您想使用嵌入式代理进行测试,则必须删除spring-cloud-stream-test-support依赖项,因为它用测试用例的测试 binder 替换了真正的 binder。如果您希望某些测试使用测试绑定器,而某些测试使用嵌入式代理,则使用真实绑定器的测试需要通过在测试类中排除绑定器自动配置来禁用测试绑定器。以下示例显示了如何执行此操作:@RunWith(SpringRunner.class) @SpringBootTest(properties = "spring.autoconfigure.exclude=" + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration") public class MyApplicationTests { ... }

在 Spring Boot 应用程序测试中使用嵌入式代理有多种方法。

他们包括:

JUnit4 类规则

以下示例显示了如何使用 JUnit4 类规则来创建嵌入式代理:

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1,
        false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

请注意,由于这是一个 Spring Boot 应用程序,我们覆盖了代理列表属性来设置 Boot 的属性。

@EmbeddedKafka注释或EmbeddedKafkaBrokerBean

以下示例展示了如何使用@EmbeddedKafkaAnnotation 创建嵌入式代理:

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

4.4.8. Hamcrest 匹配器

提供以下o.s.kafka.test.hamcrest.KafkaMatchers匹配器:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

4.4.9. 断言条件

您可以使用以下 AssertJ 条件:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

4.4.10. 例子

以下示例汇集了本章涵盖的大部分topic:

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<Integer, String>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

前面的示例使用了 Hamcrest 匹配器。使用AssertJ,最后一部分看起来像下面的代码:

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

5. 提示、技巧和示例

5.1. 手动分配所有分区

假设您希望始终从所有分区中读取所有记录(例如在使用压缩topic加载分布式缓存时),手动分配分区而不使用 Kafka 的组管理会很有用。当有很多分区时这样做可能会很笨拙,因为您必须列出分区。如果分区数量随时间变化,这也是一个问题,因为每次分区计数发生变化时,您都必须重新编译应用程序。

以下是如何在应用程序启动时使用 SpEL 表达式的强大功能来动态创建分区列表的示例:

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
            partitions = "#{@finder.partitions('compacted')}"),
            partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    ...
}

@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
    return new PartitionFinder(consumerFactory);
}

public static class PartitionFinder {

    private final ConsumerFactory<String, String> consumerFactory;

    public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public String[] partitions(String topic) {
        try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
            return consumer.partitionsFor(topic).stream()
                .map(pi -> "" + pi.partition())
                .toArray(String[]::new);
        }
    }

}

与此结合使用ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest将在每次启动应用程序时加载所有记录。您还应该将容器设置AckModeMANUAL以防止容器为null消费者组提交偏移量。但是,从 2.5.5 版本开始,如上所示,您可以将初始偏移量应用于所有分区;有关详细信息,请参阅显式分区分配

5.2. Kafka 事务与其他事务管理器的示例

以下 Spring Boot 应用程序是链接数据库和 Kafka 事务的示例。listener 容器启动 Kafka 事务,@Transactional注解启动 DB 事务。DB事务首先提交;如果 Kafka 事务提交失败,记录将被重新传递,因此数据库更新应该是幂等的。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
    }

    @Bean
    public DataSourceTransactionManager dstm(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Component
    public static class Listener {

        private final JdbcTemplate jdbcTemplate;

        private final KafkaTemplate<String, String> kafkaTemplate;

        public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
            this.jdbcTemplate = jdbcTemplate;
            this.kafkaTemplate = kafkaTemplate;
        }

        @KafkaListener(id = "group1", topics = "topic1")
        @Transactional("dstm")
        public void listen1(String in) {
            this.kafkaTemplate.send("topic2", in.toUpperCase());
            this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
        }

        @KafkaListener(id = "group2", topics = "topic2")
        public void listen2(String in) {
            System.out.println(in);
        }

    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic1").build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic2").build();
    }

}
spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed

spring.kafka.producer.transaction-id-prefix=tx-

#logging.level.org.springframework.transaction=trace
#logging.level.org.springframework.kafka.transaction=debug
#logging.level.org.springframework.jdbc=debug
create table mytable (data varchar(20));

对于仅生产者事务,事务同步有效:

@Transactional("dstm")
public void someMethod(String in) {
    this.kafkaTemplate.send("topic2", in.toUpperCase());
    this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}

KafkaTemplate其事务与数据库事务同步,提交/回滚发生在数据库之后。

如果您希望先提交 Kafka 事务,并且只有在 Kafka 事务成功时才提交 DB 事务,请使用嵌套@Transactional方法:

@Transactional("dstm")
public void someMethod(String in) {
    this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
    sendToKafka(in);
}

@Transactional("kafkaTransactionManager")
public void sendToKafka(String in) {
    this.kafkaTemplate.send("topic2", in.toUpperCase());
}

5.3. 自定义 JsonSerializer 和 JsonDeserializer

序列化器和反序列化器支持许多使用属性的自定义,有关更多信息,请参阅JSON。代码而kafka-clients不是 Spring 实例化这些对象,除非您将它们直接注入消费者和生产者工厂。如果您希望使用属性配置(反)序列化器,但希望使用 custom ObjectMapper,只需创建一个子类并将自定义映射器传递给super构造函数。例如:

public class CustomJsonSerializer extends JsonSerializer<Object> {

    public CustomJsonSerializer() {
        super(customizedObjectMapper());
    }

    private static ObjectMapper customizedObjectMapper() {
        ObjectMapper mapper = JacksonUtils.enhancedObjectMapper();
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return mapper;
    }

}

6. 其他资源

除了本参考文档之外,我们还推荐了一些其他资源,它们可以帮助您了解 Spring 和 Apache Kafka。

附录 A:覆盖 Spring Boot 依赖项

在 Spring Boot 应用程序中使用 Spring for Apache Kafka 时,Apache Kafka 依赖项版本由 Spring Boot 的依赖项管理确定。如果您希望使用不同版本的kafka-clientsorkafka-streams并使用嵌入式 kafka 代理进行测试,则需要覆盖 Spring Boot 依赖管理使用的版本并test为 Apache Kafka 添加两个工件。

马文

摇篮

<properties>
    <kafka.version>3.2.0</kafka.version>
</properties>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!-- optional - only needed when using kafka-streams -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

仅当您在测试中使用嵌入式 Kafka 代理时才需要测试范围依赖项。

0

评论区