文章摘要(AI生成)
本文介绍了Spring for Apache Kafka项目,该项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。该项目提供了发送消息的高级抽象的\模板\,并支持消息驱动的POJO。文中还提到了使用该项目的先决条件,包括安装并运行Apache Kafka,并将spring-kafka JAR及其依赖项添加到类路径中。对于使用Spring Boot的情况,可以省略版本声明,Boot会自动引入与Boot版本兼容的正确版本。最快的入门方法是使用start.spring.io创建一个项目,选择\Spring for Apache Kafka\作为依赖项。本文还说明了该项目的兼容性和最低Java版本要求。
1、前言
Spring for Apache Kafka 项目将核心 Spring 概念应用于基于 Kafka 的消息传递解决方案的开发。我们提供“模板”作为发送消息的高级抽象。我们还为消息驱动的 POJO 提供支持。
(版本变化省略)
3、简介
参考文档的第一部分是 Spring for Apache Kafka 的高级概述,以及可以帮助您尽快启动和运行的底层概念和一些代码片段。
3.1. 快速浏览
先决条件:您必须安装并运行 Apache Kafka。然后,您必须将 Spring for Apache Kafka ( spring-kafka
) JAR 及其所有依赖项放在您的类路径中。最简单的方法是在构建工具中声明一个依赖项。
如果您不使用 Spring Boot,请将spring-kafka
jar 声明为项目中的依赖项。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
使用 Spring Boot 时(并且您尚未使用 start.spring.io 创建项目),省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
但是,最快的入门方法是使用start.spring.io(或 Spring Tool Suits 和 Intellij IDEA 中的向导)并创建一个项目,选择“Spring for Apache Kafka”作为依赖项。
3.1.1. 兼容性
此快速浏览适用于以下版本:
- Apache Kafka 客户端 3.2.x
- Spring 框架 5.3.x
- 最低 Java 版本:8
3.1.2. 入门
最简单的入门方法是使用start.spring.io(或 Spring Tool Suits 和 Intellij IDEA 中的向导)并创建一个项目,选择“Spring for Apache Kafka”作为依赖项。有关其自以为是的基础设施 bean 自动配置的更多信息,请参阅Spring Boot 文档。
这是一个最小的消费者应用程序。
Spring Boot 消费者应用程序
示例 1. 应用程序
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
示例 2.application.properties
spring.kafka.consumer.auto-offset-reset=earliest
NewTopic
bean 导致在代理上创建topic;如果topic已经存在,则不需要。
Spring Boot 生产者应用程序
示例 3. 应用程序
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
使用 Java 配置(无 Spring Boot)
Spring for Apache Kafka 旨在用于 Spring 应用程序上下文。例如,如果您在 Spring 上下文之外自己创建侦听器容器,除非您满足…Aware
容器实现的所有接口,否则并非所有功能都可以工作。
这是一个不使用 Spring Boot 的应用程序示例;它同时具有 Consumer
和Producer
.
示例 4. 没有Boot
public class Sender {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}
private final KafkaTemplate<Integer, String> template;
public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}
public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
}
public class Listener {
@KafkaListener(id = "listen1", topics = "topic1")
public void listen1(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}
@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<Integer, String>(producerFactory);
}
}
如您所见,在不使用 Spring Boot 时,您必须定义几个基础设施 bean。
4. 参考
参考文档的这一部分详细介绍了组成 Spring for Apache Kafka 的各种组件。主要章节介绍了使用 Spring 开发 Kafka 应用程序的核心类。
4.1. 将 Spring 用于 Apache Kafka
本节详细解释了影响使用 Spring for Apache Kafka 的各种问题。有关快速但不太详细的介绍,请参阅快速浏览。
4.1.1. 连接到Kafka
从 2.5 版开始,每一个都扩展了KafkaResourceFactory
. 这允许在运行时更改引导服务器,setBootstrapServersSupplier(() → …)
方法是将添加Supplier<String>
到它们的配置中. 这将为所有新连接调用以获取服务器列表。消费者和生产者通常是长寿的。要关闭现有的生产者,请在 DefaultKafkaProducerFactory 上调用 Reset()。要关闭现有的 Consumer
,请在 KafkaListenerEndpointRegistry
上调用 stop()
(然后调用 start()
)和/或在任何其他侦听器容器 bean 上调用 stop() 和 start()。
为方便起见,该框架还提供了一个ABSwitchCluster
支持两组引导服务器;其中之一随时处于活动状态。通过调用setBootstrapServersSupplier()
配置 ABSwitchCluster
并将其添加到生产者和消费者工厂以及 KafkaAdmin
。当你想切换时,调用primary()
或 secondary()
,并在生产者工厂上调用reset()
来建立新的连接;对于消费者,stop()
和 start()
所有侦听器容器。使用 @KafkaListener
时,可以调用KafkaListenerEndpointRegistry
bean的stop()
和 start()
。
有关更多信息,请参阅 Javadocs。
工厂监听器
从 2.5 版开始,DefaultKafkaProducerFactory
和DefaultKafkaConsumerFactory
可以通过Listener
在创建或关闭生产者或消费者时接收通知。
生产者工厂监听器
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
消费者工厂监听器
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id
都是通过将 client-id
属性(创建后从metrics()
获取)附加到工厂 beanName
属性来创建的,以.
分隔。
例如,这些侦听器可用于在创建KafkaClientMetrics
新客户端时创建和绑定 Micrometer 实例(并在客户端关闭时关闭它)。
该框架提供的侦听器正是这样做的;见千分尺原生度量。
4.1.2. 配置topic
如果您在应用程序上下文中定义KafkaAdmin
bean,它可以自动将topic添加到代理。为此,您可以为每个topic添加一个 NewTopic @Bean
到应用程序上下文。2.3 版引入了一个新类TopicBuilder
,使创建此类 bean 更加方便。以下示例显示了如何执行此操作:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
从 2.6 版开始,您可以省略.partitions()
和/或replicas()
代理默认值将应用于这些属性。代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅KIP-464。
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
从 2.7 版本开始,您可以在单个KafkaAdmin.NewTopics
bean 定义中声明多个NewTopic
s:
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
使用 Spring Boot 时,会自动注册一个KafkaAdmin
bean,因此您只需要NewTopic
(和/或 NewTopics
) @Bean
。
默认情况下,如果代理不可用,则会记录一条消息,但会继续加载上下文。您可以以编程方式调用管理员的initialize()
方法稍后再试。如果您希望这种情况被认为是致命的,请将管理员的fatalIfBrokerNotAvailable
属性设置为true
. 然后上下文无法初始化。
如果 broker 支持(1.0.0 或更高版本),如果发现现有topic的分区数少于NewTopic.numPartitions
.
从 2.7 版开始,KafkaAdmin
提供了在运行时创建和检查topic的方法。
createOrModifyTopics
describeTopics
对于更高级的功能,您可以直接使用AdminClient
。以下示例显示了如何执行此操作:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
4.1.3. 发送消息
本节介绍如何发送消息。
使用KafkaTemplate
本节介绍如何使用KafkaTemplate
来发送消息。
概述
KafkaTemplate
包装生产者并提供方便的方法将数据发送到 Kafka topic。以下清单显示了来自的相关方法KafkaTemplate
:
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
有关更多详细信息,请参阅Javadoc。
在 3.0 版本中,之前返回 ListenableFuture
的方法已更改为返回 CompletableFuture
。为了方便迁移,2.9版本添加了usingCompletableFuture()
方法,它提供了与CompletableFuture
返回类型相同的方法;此方法不再可用。
KafkaOperations2<String, String> template = new KafkaTemplate<>().usingCompletableFuture();
CompletableFuture<SendResult<String, String>> future = template.send(topic1, 0, 0, "buz")
.whenComplete((sr, thrown) -> {
...
});
)
sendDefault
API 要求已向模板提供默认topic。
API 将 timestamp
作为参数并将此时间戳存储在记录中。用户提供的时间戳如何存储取决于 Kafka topic上配置的时间戳类型。如果topic配置为使用CREATE_TIME
,则记录用户指定的时间戳(如果未指定,则生成)。如果topic配置为使用LOG_APPEND_TIME
,则忽略用户指定的时间戳,并且代理添加本地代理时间。
metrics
和partitionsFor
方法委托给底层Producer
的相同方法。该execute
方法提供对底层Producer
的直接访问。
要使用模板,您可以配置生产者工厂并在模板的构造函数中提供它。以下示例显示了如何执行此操作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从 2.5 版开始,您现在可以覆盖工厂的ProducerConfig
属性以创建具有来自同一工厂的不同生产者配置的模板。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
请注意,可以使用不同的窄泛型类型引用类型ProducerFactory<?, ?>
的 bean(例如 Spring Boot 自动配置的 bean)。
您还可以使用标准<bean/>
定义来配置模板。
然后,要使用模板,您可以调用其中一种方法。
当您使用带有Message<?>
参数的方法时,topic、分区和key信息将在包含以下项目的消息头中提供:
KafkaHeaders.TOPIC
KafkaHeaders.PARTITION
KafkaHeaders.KEY
KafkaHeaders.TIMESTAMP
消息有效负载是数据。
或者,您可以使用ProducerListener
配置 KafkaTemplate
以获取带有发送结果(成功或失败)的异步回调,而不是等待Future
完成。以下清单显示了ProducerListener
接口的定义:
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
默认情况下,模板配置有LoggingProducerListener
,它记录错误并且在发送成功时不执行任何操作。
为方便起见,提供了默认方法实现,以防您只想实现其中一种方法。
请注意,发送方法返回一个ListenableFuture<SendResult>
. 您可以向侦听器注册回调以异步接收发送的结果。以下示例显示了如何执行此操作:
ListenableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
SendResult
有两个属性 :ProducerRecord
和RecordMetadata
. 有关这些对象的信息,请参阅 Kafka API 文档。
onFailure
中的Throwable
可以转换为KafkaProducerException
; 它的failedProducerRecord
属性包含失败的记录。
从 2.5 版开始,您可以使用 KafkaSendCallback
而不是 ListenableFutureCallback
,从而更容易提取失败的ProducerRecord
,避免需要强制转换Throwable
:
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
}
});
您还可以使用一对 lambda:
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
...
}, (KafkaFailureCallback<Integer, String>) ex -> {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
});
如果你想阻塞发送线程等待结果,你可以调用future的get()
方法;建议使用带超时的方法。您可能希望在等待之前调用flush()
,或者为了方便起见,模板有一个带有autoFlush
参数的构造函数,该参数会导致模板在每次发送时执行flush()
。仅当您设置了生产者的linger.ms
属性并希望立即发送部分批次时才需要刷新。
例子
本节展示了向 Kafka 发送消息的示例:
示例 5. 非阻塞(异步)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
handleSuccess(data);
}
@Override
public void onFailure(KafkaProducerException ex) {
handleFailure(data, record, ex);
}
});
}
阻塞(同步)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
请注意,ExecutionException
与KafkaProducerException
导致的原因和failedProducerRecord
属性有关。
使用RoutingKafkaTemplate
从 2.5 版开始,您可以使用RoutingKafkaTemplate
在运行时根据目标topic
名称选择生产者。
路由模板不支持事务、execute
、flush
、或metrics
操作,因为这些操作不知道topic。
该模板需要实例的java.util.regex.Pattern
映射ProducerFactory<Object, Object>
。这个映射应该是有序的(例如 LinkedHashMap
),因为它是按顺序遍历的;您应该在开始时添加更具体的模式。
以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用相同的模板发送到不同的topic,每个topic使用不同的值序列化器。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
此示例的相应@KafkaListener
s 显示在Annotation Properties中。
有关实现类似结果的另一种技术,但具有将不同类型发送到同一topic的附加功能,请参阅委托序列化器和反序列化器。
使用DefaultKafkaProducerFactory
如UsingKafkaTemplate
中所见, ProducerFactory
用于创建生产者。
默认情况下,不使用Transactions时,DefaultKafkaProducerFactory
会创建一个供所有客户端使用的单例生产者,如KafkaProducer
javadocs 中所建议的那样。但是,如果您调用模板的flush()
,这可能会导致使用同一生产者的其他线程延迟。从 2.3 版开始,DefaultKafkaProducerFactory
有一个新属性producerPerThread
。当设置为true
时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。
当producerPerThread
是true
,当不再需要生产者时,用户代码必须调用工厂的closeThreadBoundProducer()
方法。这将物理关闭生产者并将其从ThreadLocal
移除. 调用reset()
或destroy()
不会清理这些生产者。
创建 DefaultKafkaProducerFactory
时,可以通过调用仅接受属性映射的构造函数从配置中获取键和/或值 Serializer
类(请参阅使用 KafkaTemplate
中的示例),或者可以将 Serializer
实例传递给 DefaultKafkaProducerFactory
构造函数(在这种情况下,所有 Producer
共享相同的实例)。或者,您可以提供Supplier
(从版本2.3开始),它将用于为每个生产者获取单独的Serializer
实例:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从版本 2.5.10 开始,您现在可以在创建工厂后更新生产者属性。这可能很有用,例如,如果您必须在凭据更改后更新 SSL 密钥/信任存储位置。这些更改不会影响现有的生产者实例;调用reset()
关闭任何现有的生产者,以便使用新属性创建新的生产者。注意:您不能将事务性生产者工厂更改为非事务性,反之亦然。
现在提供了两种新方法:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从 2.8 版开始,如果您将序列化程序作为对象提供(在构造函数中或通过设置器),工厂将调用该configure()
方法以使用配置属性对其进行配置。
使用ReplyingKafkaTemplate
2.1.3 版引入了一个KafkaTemplate
的子类来提供请求/回复语义。ReplyingKafkaTemplate
类已命名并具有两个附加方法;下面显示了方法签名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(另请参阅使用请求/回复Message
)。
ListenableFuture
结果是异步填充的结果(或异常,超时)。结果还有一个sendFuture
属性,它是调用KafkaTemplate.send()
的结果。您可以使用这个future来确定发送操作的结果。
在 3.0 版中,这些方法(及其sendFuture
属性)返回的future将是CompletableFuture
s 而不是ListenableFuture
s。要协助转换,使用此版本,您可以CompleteableFuture
通过调用asCompletable()
返回的Future
.
如果使用第一种方法,或者replyTimeout
参数是null
,则使用模板的defaultReplyTimeout
属性(默认为 5 秒)。
从版本 2.8.8 开始,模板有一个新方法waitForAssignment
。如果回复容器被配置auto.offset.reset=latest
为避免在容器初始化之前发送请求和回复,这将很有用。
当使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的pollTimeout
属性,因为直到第一次轮询完成后才会发送通知。
以下 Spring Boot 应用程序显示了如何使用该功能的示例:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
请注意,我们可以使用 Boot 的自动配置容器工厂来创建回复容器。
如果使用重要的反序列化器进行回复,请考虑使用委托给您配置的反序列化器的 ErrorHandlingDeserializer
。如此配置后,RequestReplyFuture
将异常完成,您可以捕获 ExecutionException
,并在其原因属性中包含 DeserializationException
。
从 2.6.7 版本开始,除了检测DeserializationException
s 之外,模板还将调用该replyErrorChecker
函数(如果提供)。如果它返回异常,future 将异常完成。
这是一个例子:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
模板设置了一个标头(默认命名为KafkaHeaders.CORRELATION_ID
),必须由服务器端回显。
在这种情况下,以下@KafkaListener
应用程序会响应:
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
@KafkaListener
基础设施回显相关ID 并确定回复topic。
有关发送回复的更多信息,请参阅转发侦听器结果使用@SendTo
。该模板使用默认标题KafKaHeaders.REPLY_TOPIC
来指示回复的topic。
从版本 2.2 开始,模板尝试从配置的回复容器中检测回复topic或分区。如果容器配置为侦听单个topic或单个TopicPartitionOffset
,则用于设置回复标头。如果容器以其他方式配置,则用户必须设置回复标头。在这种情况下,初始化期间会写入一条INFO
日志消息。以下示例使用KafkaHeaders.REPLY_TOPIC
:
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
当您配置单个回复TopicPartitionOffset
时,您可以为多个模板使用相同的回复topic,只要每个实例侦听不同的分区即可。使用单个回复topic进行配置时,每个实例必须使用不同的group.id
. 在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例才能找到相关 ID。这对于自动缩放可能很有用,但会产生额外网络流量的开销以及丢弃每个不需要的回复的小成本。当您使用此设置时,我们建议您将模板设置sharedReplyTopic
为true
,这会降低对 DEBUG 的意外回复的日志记录级别,而不是默认的 ERROR。
以下是配置回复容器以使用相同的共享回复topic的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
如果您有多个客户端实例并且您没有按照上一段中的讨论配置它们,则每个实例都需要一个专用的回复topic。另一种方法是为每个实例设置KafkaHeaders.REPLY_PARTITION
并使用专用分区。Header
包含一个四字节的 int (big-endian) 。服务器必须使用此标头将回复路由到正确的分区(@KafkaListener
这样做)。但是,在这种情况下,回复容器不能使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(通过在ContainerProperties
构造函数中使用 TopicPartitionOffset
)。
DefaultKafkaHeaderMapper
要求 Jackson在类路径上(对于@KafkaListener
)。如果它不可用,则消息转换器没有标头映射器,因此您必须配置 MessagingMessageConverter
和 SimpleKafkaHeaderMapper
,如前所示。
默认情况下,使用 3 个标头:
KafkaHeaders.CORRELATION_ID
- 用于将回复与请求相关联KafkaHeaders.REPLY_TOPIC
- 用于告诉服务器在哪里回复KafkaHeaders.REPLY_PARTITION
- (可选)用于告诉服务器要回复哪个分区
@KafkaListener
基础设施使用这些标头名称来路由回复。
从 2.3 版开始,您可以自定义标题名称 - 模板有 3 个属性correlationHeaderName
、replyTopicHeaderName
和replyPartitionHeaderName
. 如果您的服务器不是 Spring 应用程序(或不使用@KafkaListener
).
请求/回复Message<?>
s
2.7 版ReplyingKafkaTemplate
向发送和接收spring-messaging
的Message<?>
抽象添加了方法:
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
这些将使用模板的默认值replyTimeout
,也有可能在方法调用中超时的重载版本。
在 3.0 版中,这些方法(及其sendFuture
属性)返回的期货将是CompletableFuture
s 而不是ListenableFuture
s。要协助转换,使用此版本,您可以CompleteableFuture
通过调用asCompletable()
返回的Future
.
如果使用者的反序列化器或模板的 MessageConverter
可以通过配置或回复消息中的类型元数据转换有效负载,而无需任何附加信息,请使用第一种方法。。
如果需要为返回类型提供类型信息,请使用第二种方法,以帮助消息转换器。这也允许同一个模板接收不同的类型,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。下面是后者的一个例子:
示例 6. 模板 Bean
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
示例 7. 使用模板
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
回复类型消息<?>
当@KafkaListener
返回Message<?>
,在2.5 之前的版本时,有必要填充回复topic和相关 id 标头。在此示例中,我们使用请求中的回复topic标头:
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这也显示了如何在回复记录上设置密钥。
从版本 2.5 开始,框架将检测这些标头是否丢失并使用topic填充它们 - 从@SendTo
值确定的topic或传入KafkaHeaders.REPLY_TOPIC
标头(如果存在)。它还将回显传入的KafkaHeaders.CORRELATION_ID
和 KafkaHeaders.REPLY_PARTITION
(如果存在)。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
聚合多个回复
UsingReplyingKafkaTemplate
中的模板严格用于单个请求/回复场景。对于单个消息的多个接收者返回回复的情况,您可以使用AggregatingReplyingKafkaTemplate
. 这是Scatter-Gather Enterprise Integration Pattern客户端的实现。
与 ReplyingKafkaTemplate
一样,AggregatingReplyingKafkaTemplate
构造函数采用生产者工厂和侦听器容器来接收回复;它有第三个参数BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy
,每次收到回复时都会查询该参数;当谓词true
返回时,使用ConsumerRecord
s的集合来完成sendAndReceive
方法返回的Future
。
returnPartialOnTimeout
还有一个附加属性(默认为 false)。当它设置为true
时,不是用 KafkaReplyTimeoutException
替代future完成,而是future的部分结果正常表示完成(只要已收到至少一个回复记录)。
从版本 2.3.5 开始,谓词也会在超时后调用(如果returnPartialOnTimeout
是true
)。第一个参数是当前记录列表;第二个是true
如果这个调用是由于超时。谓词可以修改记录列表。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
请注意,返回类型是 ConsumerRecord
,其值是ConsumerRecord
s 的集合。“外部”ConsumerRecord
不是“真实”记录,它是由模板合成的,作为收到请求的实际回复记录的持有者。当正常发布时(发布策略返回true),topic设置为aggregatedResults
;如果returnPartialOnTimeout
为真,并且发生超时(并且至少收到一条回复记录),则topic设置为partialResultsAfterTimeout
. 该模板为这些“topic”名称提供了常量静态变量:
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
Collection
中的实数ConsumerRecord
s包含收到回复的实际topic。
回复的侦听器容器必须配置为AckMode.MANUAL
或AckMode.MANUAL_IMMEDIATE
;消费者属性enable.auto.commit
必须是false
(自 2.3 版以来的默认值)。为了避免丢失消息的任何可能性,模板仅在有零个未完成的请求时提交偏移量,即当最后一个未完成的请求被释放策略释放时。重新平衡后,可能会出现重复的回复交付;对于任何正在进行的请求,这些都将被忽略;当收到已发布回复的重复回复时,您可能会看到错误日志消息。
如果你使用ErrorHandlingDeserializer
这个聚合模板,框架将不会自动检测DeserializationException
s。相反,记录(带有null
值)将原封不动地返回,并在标头中包含反序列化异常。建议应用程序调用实用方法ReplyingKafkaTemplate.checkDeserialization()
方法来确定是否发生反序列化异常。有关更多信息,请参阅其 javadocs。该replyErrorChecker
聚合模板也不需要;您应该对回复的每个元素进行检查。
4.1.4. 接收消息
您可以通过配置MessageListenerContainer
并提供消息侦听器或使用@KafkaListener
注解来接收消息。
消息监听器
当您使用消息侦听器容器时,您必须提供一个侦听器来接收数据。目前有八个受支持的消息侦听器接口。以下清单显示了这些接口:
//使用此接口,用于在使用自动提交或容器管理的提交方法之一时处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
//当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
//当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。提供对 Consumer 对象的访问。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
//当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。提供对 Consumer 对象的访问。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
//当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
//当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
//当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。提供对 Consumer 对象的访问。
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
//当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。提供对 Consumer 对象的访问。
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
该Consumer
对象不是线程安全的。您只能在调用侦听器的线程上调用其方法。
您不应该执行任何影响消费者位置和/或在您的侦听器中提交的偏移量的Consumer<?, ?>
方法;容器需要管理这些信息。
消息侦听器容器
提供了两种MessageListenerContainer
实现:
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
接收来自单个线程上所有topic或分区的所有消息。ConcurrentMessageListenerContainer
委托给一个或多个实例KafkaMessageListenerContainer
以提供多线程消费。
从版本 2.2.7 开始,您可以向侦听器容器添加一个RecordInterceptor
;它将在调用允许检查或修改记录的侦听器之前调用。如果拦截器返回 null,则不调用侦听器。从版本 2.7 开始,它具有在侦听器退出后调用的附加方法(通常,或通过抛出异常)。此外,从版本 2.7 开始,现在有一个 BatchInterceptor,为 Batch Listeners 提供类似的功能。此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供对 Consumer 的访问。例如,这可以用于访问拦截器中的消费者指标。
您不应执行任何影响消费者位置和/或在这些拦截器中提交的偏移量的方法;容器需要管理这些信息。
如果拦截器更改了记录(通过创建新记录),则topic
、partition
和offset
必须保持不变以避免意外的副作用,例如记录丢失。
CompositeRecordInterceptor
和CompositeBatchInterceptor
可用于调用多个拦截器。
默认情况下,从 2.8 版本开始,当使用事务时,拦截器在事务开始之前被调用。您可以将侦听器容器的interceptBeforeTx
属性设置false
为在事务开始后调用拦截器。从 2.9 版开始,这将适用于任何事务管理器,而不仅仅是KafkaAwareTransactionManager
s。例如,这允许拦截器参与由容器启动的 JDBC 事务。
从版本 2.3.8、2.4.6 开始,当并发大于一时,ConcurrentMessageListenerContainer
现在支持静态成员 。group.instance.id 带有 -n 后缀,其中 n 从 1 开始。这与增加的 session.timeout.ms
一起,可用于减少重新平衡事件,例如,当应用程序实例重新启动时。
使用KafkaMessageListenerContainer
以下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它在ContainerProperties
对象中接收ConsumerFactory
有关topic和分区以及其他配置的信息。 ContainerProperties
具有以下构造函数:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数接受一个TopicPartitionOffset
参数数组来显式地指示容器使用哪些分区(使用消费者assign()
方法)以及可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值相对于分区内的当前最后一个偏移量。TopicPartitionOffset
提供了一个带有附加boolean
参数的构造函数。如果是true
,则初始偏移量(正或负)相对于该消费者的当前位置。启动容器时应用偏移量。第二个采用一组topic,Kafka 根据group.id
属性分配分区——在组中分布分区。第三个使用正则表达式Pattern
来选择topic。
要将MessageListener
分配给容器,您可以在创建容器时使用ContainerProps.setMessageListener
方法。以下示例显示了如何执行此操作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,在创建DefaultKafkaConsumerFactory
时,使用仅接受上述属性的构造函数意味着从配置中获取键和值的Deserializer
类。或者,Deserializer
可以将实例传递给DefaultKafkaConsumerFactory
键和/或值的构造函数,在这种情况下,所有消费者共享相同的实例。另一种选择是提供Supplier<Deserializer>
s(从 2.3 版开始),用于为每个Consumer
s 获取单独Deserializer
的实例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有关可以设置ContainerProperties
的各种属性的更多信息,请参阅Javadoc 。
从版本 2.1.1 开始,一个名为logContainerConfig
的新属性可用。当为true
时启用INFO
日志记录,每个侦听器容器都会写入一条日志消息,总结其配置属性。
默认情况下,topic偏移提交的日志记录在DEBUG
日志记录级别执行。从版本 2.1.2 开始,ContainerProperties
调用的属性commitLogLevel
允许您指定这些消息的日志级别。例如,要将日志级别更改为INFO
,您可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
.
从版本 2.2 开始,添加了一个名为missingTopicsFatal
的新容器属性(默认值:false
自 2.3.4 起)。如果代理上不存在任何已配置的topic,这将阻止容器启动。如果容器配置为侦听topic模式(正则表达式),则不适用。以前,容器线程在consumer.poll()
方法中循环等待topic出现,同时记录许多消息。除了日志,没有迹象表明存在问题。
从版本 2.8 开始,引入了新的容器属性 authExceptionRetryInterval
。这会导致容器在从KafkaConsumer
获取任何 AuthenticationException
或 AuthorizationException
后重试获取消息。例如,当配置的用户被拒绝访问某个主题或凭据不正确时,就会发生这种情况。定义authExceptionRetryInterval
允许容器在授予适当的权限时恢复。
默认情况下,没有配置时间间隔 - 身份验证和授权错误被认为是致命的,这会导致容器停止。
从 2.8 版本开始,在创建消费者工厂时,如果您将反序列化器作为对象提供(在构造函数中或通过设置器),工厂将调用该configure()
方法以使用配置属性对其进行配置。
使用ConcurrentMessageListenerContainer
单构造函数类似于KafkaListenerContainer
构造函数。以下清单显示了构造函数的签名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它也有concurrency
属性。例如,container.setConcurrency(3)
创建三个KafkaMessageListenerContainer
实例。
对于第一个构造函数,Kafka 使用其组管理功能在消费者之间分配分区。
在监听多个topic时,默认的分区分布可能不是你所期望的。例如,如果您有 3 个topic,每个topic有 5 个分区,并且您使用concurrency=15
,那么您只会看到 5 个活动消费者,每个消费者从每个topic分配一个分区,其他 10 个消费者处于空闲状态。这是因为默认的 KafkaPartitionAssignor
是RangeAssignor
(参见它的 Javadoc)。对于这种情况,您可能需要考虑使用RoundRobinAssignor
。相反,它将分区分布在所有消费者之间。然后,为每个消费者分配一个topic或分区。要更改PartitionAssignor
,您可以在提供给 DefaultKafkaConsumerFactory
的属性中设置partition.assignment.strategy
使用者属性(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)。使用 Spring Boot 时,可以按如下方式指定设置策略:
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
当使用 TopicPartitionOffset
配置容器属性时,ConcurrentMessageListenerContainer
会在委托KafkaMessageListenerContainer
实例之间分发 TopicPartitionOffset
实例。
假设提供了 6 个 TopicPartitionOffset
实例,并发度为 3;每个容器有两个分区。对于五个 TopicPartitionOffset
实例,两个容器获得两个分区,第三个容器获得一个分区。如果并发数大于TopicPartition
的数量,则降低并发数,使每个容器获得一个分区。
client.id
属性(如果设置)附加 -n
,其中 n
是与并发对应的使用者实例。这是在启用 JMX 时为 MBean 提供唯一名称所必需的。
从版本 1.3 开始,MessageListenerContainer
提供对底层 KafkaConsumer
指标的访问。对于 ConcurrentMessageListenerContainer
,metrics()
方法返回所有目标 KafkaMessageListenerContainer
实例的指标。这些指标被分组为 Map<MetricName, ? extends Metric>
通过为底层KafkaConsumer
提供的 client-id
扩展。
从版本 2.3 开始,ContainerProperties
提供了一个idleBetweenPolls
选项,让侦听器容器中的主循环在KafkaConsumer.poll()
调用之间休眠。max.poll.interval.ms
从提供的选项和消费者配置与当前记录批处理时间之间的差异中选择实际的睡眠间隔作为最小值。
提交偏移量
为提交偏移量提供了几个选项。如果enable.auto.commit
消费者属性是true
,Kafka 会根据其配置自动提交偏移量。如果是false
,则容器支持多种AckMode
设置(在下一个列表中描述)。默认AckMode
值为BATCH
. 从版本 2.3 开始,框架设置enable.auto.commit
为false
除非在配置中明确设置。以前,如果未设置该属性,则使用Kafka 默认值 (true
)。
消费者poll()
方法返回一个或多个ConsumerRecords
。为每条记录调用MessageListener
。以下列表描述了容器对每个AckMode
(不使用事务时)采取的操作:
RECORD
:在监听器处理完记录返回时提交偏移量。BATCH
:当poll()
返回的所有记录都处理完后,提交偏移量。TIME
: 当poll()
返回的所有记录都被处理完时,只要ackTime
超过了自上次提交的时间,提交偏移量。COUNT
:当poll()
返回的所有记录都被处理后,只要ackCount
从上次提交后已经收到记录,提交偏移量。COUNT_TIME
: 与TIME
和COUNT
类似,但如果任一条件为true
,则执行提交。MANUAL
: 消息监听器调用Acknowledgment.acknowledge()
。之后,应用与 BATCH 相同的语义。。MANUAL_IMMEDIATE
:当监听器调用Acknowledgment.acknowledge()
方法时立即提交偏移量。
使用transactions时,偏移量被发送到 transaction,语义等同于RECORD
或 BATCH
,具体取决于侦听器类型(记录或批处理)。
MANUAL
,以及MANUAL_IMMEDIATE
要监听者是一个AcknowledgingMessageListener
或一个BatchAcknowledgingMessageListener
。请参阅消息侦听器。
根据syncCommits
容器属性,使用消费者上的commitSync()
或commitAsync()
方法。 默认情况下syncCommits
是true
;也见setSyncCommitTimeout
。查看setCommitCallback
获取异步提交的结果;默认回调是LoggingCommitCallback
记录错误(以及调试级别的成功)。
因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
为false
. 从 2.3 版开始,它无条件地将其设置为 false,除非在消费者工厂中特别设置或容器的消费者属性覆盖。
有以下Acknowledgment
方法:
public interface Acknowledgment {
void acknowledge();
}
此方法使侦听器可以控制何时提交偏移量。
从 2.3 版开始,该Acknowledgment
接口有两个附加方法nack(long sleep)
和nack(int index, long sleep)
. 第一个用于记录侦听器,第二个用于批处理侦听器。为您的侦听器类型调用错误的方法将引发IllegalStateException
.
如果要提交部分批处理,请使用nack()
, 使用事务时,将其设置AckMode
为MANUAL
; 调用nack()
会将成功处理的记录的偏移量发送到事务。
nack()
方法只能在调用您的侦听器的消费者线程上调用。
使用记录侦听器,当nack()
被调用时,将提交任何未决的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一次poll()
重新传递失败的记录和未处理的记录。通过设置sleep
参数,消费者可以在重新交付之前暂停。这与在容器配置了DefaultErrorHandler
.
使用批处理侦听器时,您可以指定批处理中发生故障的索引。调用nack()
时,将在索引之前为记录提交偏移量,并在分区上为失败和丢弃的记录执行查找,以便它们将在下一次poll()
重新传递。
有关详细信息,请参阅容器错误处理程序。
消费者在睡眠期间暂停,以便我们继续轮询代理以保持消费者存活。实际的睡眠时间及其分辨率取决于pollTimeout
默认为 5 秒的容器。最小睡眠时间等于pollTimeout
并且所有睡眠时间都是它的倍数。对于较小的睡眠时间,或者为了提高其准确性,请考虑减少容器的pollTimeout
.
侦听器容器自动启动
侦听器容器实现SmartLifecycle
,并且autoStartup
默认情况下是true
。容器在后期(Integer.MAX-VALUE - 100
)中启动。其他SmartLifecycle
的实现 以处理来自侦听器的数据的组件应在较早的阶段启动。为后续阶段留出了- 100
空间,以使组件能够在容器之后自动启动。
手动提交偏移量
通常,在使用AckMode.MANUAL
或AckMode.MANUAL_IMMEDIATE
时,必须按顺序确认确认,因为 Kafka 不维护每条记录的状态,只为每个组/分区维护一个已提交的偏移量。从 2.8 版开始,您现在可以设置 container 属性asyncAcks
,它允许以任何顺序确认对轮询返回的记录的确认。侦听器容器将推迟无序提交,直到收到丢失的确认。消费者将被暂停(不提供新记录),直到前一次轮询的所有偏移量都已提交。
虽然此功能允许应用程序异步处理记录,但应该理解它增加了失败后重复交付的可能性。
@KafkaListener
注解
@KafkaListener
注解用于指定一个 bean 方法作为侦听器容器的侦听器。bean 被包装在一个MessagingMessageListenerAdapter
配置有各种功能的组件中,例如转换器来转换数据,如果需要,匹配方法参数。
您可以使用#{…}
或属性占位符 ( ${…}
) 在带有 SpEL 的注解上配置大多数属性。有关更多信息,请参阅Javadoc。
记录监听器
@KafkaListener
注解为简单的 POJO 侦听器提供了一种机制。以下示例显示了如何使用它:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
此机制需要@EnableKafka
在您的一个类和一个侦听器容器工厂上添加注解@Configuration
,该工厂用于配置底层ConcurrentMessageListenerContainer
. 默认情况下,需要一个具有名称的 kafkaListenerContainerFactory
bean。下面的例子展示了如何使用ConcurrentMessageListenerContainer
:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
请注意,要设置容器属性,您必须使用getContainerProperties()
工厂上的方法。它用作注入容器的实际属性的模板。
从版本 2.1.1 开始,您现在可以client.id
为注解创建的使用者设置属性。clientIdPrefix
后缀为-n
,其中是n
一个整数,表示使用并发时的容器号。
从 2.2 版开始,您现在可以通过使用注解本身的属性来覆盖容器工厂的concurrency
和autoStartup
属性。属性可以是简单值、属性占位符或 SpEL 表达式。以下示例显示了如何执行此操作:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
显式分区分配
您还可以使用显式topic和分区(以及可选的初始偏移量)配置 POJO 侦听器。以下示例显示了如何执行此操作:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以在partitions
或partitionOffsets
属性中指定每个分区,但不能同时指定两者。
与大多数注解属性一样,您可以使用 SpEL 表达式;有关如何生成大量分区列表的示例,请参阅手动分配所有分区。
从版本 2.5.5 开始,您可以将初始偏移量应用于所有分配的分区:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
*
通配符代表partitions
属性中的所有分区。每个@TopicPartition
中只能有一个@PartitionOffset
带有通配符。
此外,当侦听器实现ConsumerSeekAware
时,现在调用onPartitionsAssigned
,即使使用手动分配也是如此。例如,这允许当时的任何任意查找操作。
从版本 2.6.4 开始,您可以指定以逗号分隔的分区列表或分区范围:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
范围包括在内;上面的示例将分配分区0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
。
指定初始偏移量时可以使用相同的技术:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移量将应用于所有 6 个分区。
手动确认
使用手动AckMode
时,还可以为监听器提供Acknowledgment
. 以下示例还展示了如何使用不同的容器工厂。
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
消费者记录元数据
最后,有关记录的元数据可从消息头中获得。您可以使用以下标头名称来检索消息的标头:
KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE
从 2.5 版开始,如果传入记录有键,RECEIVED_KEY
则不存在;null
以前,标题填充了一个null
值。此更改是为了使框架与不存在带null
值标头的spring-messaging
约定保持一致。
以下示例显示了如何使用标头:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
从版本 2.5 开始,您可以在ConsumerRecordMetadata
参数中接收记录元数据,而不是使用离散的标头。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
这包含ConsumerRecord
除了键和值之外的所有数据。
批处理监听器
从 1.1 版开始,您可以配置@KafkaListener
方法来接收从消费者轮询收到的整批消费者记录。要配置侦听器容器工厂以创建批处理侦听器,您可以设置该batchListener
属性。以下示例显示了如何执行此操作:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
从 2.8 版开始,您可以batchListener
使用注解batch
上的属性覆盖工厂的属性。这与对容器错误处理程序@KafkaListener
的更改一起允许将同一工厂用于记录和批处理侦听器。
以下示例显示了如何接收有效负载列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
topic、分区、偏移量等在与有效负载并行的标头中可用。以下示例显示了如何使用标头:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,您可以在每条消息中接收带有每个偏移量和其他详细信息List
的Message<?>
对象,但它必须是方法上定义的唯一参数(除了可选的 Acknowledgment
,在使用手动提交时和/或Consumer<?, ?>
参数)。以下示例显示了如何执行此操作:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在这种情况下,不对有效负载执行任何转换。
如果BatchMessagingMessageConverter
配置了RecordMessageConverter
,您还可以将泛型类型添加到Message
参数并转换有效负载。有关详细信息,请参阅使用批处理侦听器进行有效负载转换。
您还可以接收ConsumerRecord<?, ?>
对象列表,但它必须是方法上定义的唯一参数(除了可选的 Acknowledgment
,使用手动提交和Consumer<?, ?>
参数时)。以下示例显示了如何执行此操作:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
从 2.2 版本开始,监听器可以接收方法ConsumerRecords<?, ?>
返回的完整对象poll()
,让监听器访问其他方法,例如partitions()
(返回TopicPartition
列表中的实例)和records(TopicPartition)
(获取选择性记录)。同样,这必须是方法上的唯一参数(除了可选的Acknowledgment
,当使用手动提交或Consumer<?, ?>
参数时)。以下示例显示了如何执行此操作:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器工厂已RecordFilterStrategy
配置,则ConsumerRecords<?, ?>
侦听器将忽略它,并发出WARN
日志消息。如果使用监听器的<List<?>>
形式,则只能使用批处理监听器过滤记录。默认情况下,一次过滤一条记录;从 2.8 版开始,您可以重写filterBatch
以在一次调用中过滤整个批次。
注解属性
从 2.0 版开始,该id
属性(如果存在)用作 Kafka 消费者group.id
属性,覆盖消费者工厂中配置的属性(如果存在)。您还可以groupId
显式设置或设置idIsGroup
为 false 以恢复之前使用消费者工厂的group.id
属性。
您可以在大多数注解属性中使用属性占位符或 SpEL 表达式,如以下示例所示:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
从版本 2.1.2 开始,SpEL 表达式支持特殊标记:__listener
. 它是一个伪 bean 名称,表示存在此注解的当前 bean 实例。
考虑以下示例:
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
鉴于前面示例中的 bean,我们可以使用以下内容:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果万一您有一个名为__listener
的实际 bean ,您可以使用beanRef
属性更改表达式标记。以下示例显示了如何执行此操作:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
groupId = "#{__x.topic}.group")
从版本 2.2.4 开始,您可以直接在注解上指定 Kafka 消费者属性,这些将覆盖消费者工厂中配置的任何同名属性。您不能以这种方式指定group.id
和client.id
属性;它们将被忽略;使用这些groupId
和clientIdPrefix
注解属性。
这些属性被指定为具有正常 JavaProperties
文件格式的单个字符串:foo:bar
、foo=bar
或foo bar
.
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是UsingRoutingKafkaTemplate
中的示例对应的监听器示例。
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
获取消费者group.id
在多个容器中运行相同的侦听器代码时,能够确定group.id
记录来自哪个容器(由其消费者属性标识)可能很有用。
您可以调用KafkaUtils.getConsumerGroupId()
侦听器线程来执行此操作。或者,您可以在方法参数中访问组 ID。
@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
这在接收List<?>
的记录侦听器和批处理侦听器中可用。它在接收ConsumerRecords<?, ?>
参数的批处理侦听器中**不可用。**在这种情况下使用该KafkaUtils
机制。
容器线程命名
监听器容器目前使用两个任务执行器,一个用于调用消费者,另一个用于在 kafka 消费者属性enable.auto.commit
为false
时调用监听器。您可以通过设置容器的consumerExecutor
和listenerExecutor
属性来提供自定义执行器ContainerProperties
。使用池执行器时,请确保有足够的线程可用于处理使用它们的所有容器的并发性。使用ConcurrentMessageListenerContainer
时,每个使用者 ( concurrency
) 使用每个线程。
如果您不提供消费者执行程序,则使用SimpleAsyncTaskExecutor
a。此执行程序创建名称类似于<beanName>-C-1
(consumer thread) 的线程。对于ConcurrentMessageListenerContainer
,<beanName>
线程名称的部分变为<beanName>-m
,其中m
代表消费者实例。 n
每次启动容器时递增。所以,bean 名称为container
,容器中的线程将在第一次启动容器后命名为container-0-C-1
,container-1-C-1
等等;在停止和随后的启动之后,命名为container-0-C-2
、container-1-C-2
等。
@KafkaListener
作为元注解
从 2.2 版开始,您现在可以@KafkaListener
用作元注解。以下示例显示了如何执行此操作:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
您必须为topics
, topicPattern
, 或topicPartitions
中的至少一个起别名(并且,通常是id
或者groupId
除非您在消费者工厂配置中指定了 group.id
)。以下示例显示了如何执行此操作:
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
@KafkaListener
在一个类上
在类级别使用@KafkaListener
时,必须在方法级别指定@KafkaHandler
。传递消息时,转换后的消息负载类型用于确定调用哪个方法。以下示例显示了如何执行此操作:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从版本 2.1.3 开始,您可以指定一个@KafkaHandler
方法作为默认方法,如果其他方法没有匹配,则调用该方法。最多只能指定一种方法。使用@KafkaHandler
方法时,有效负载必须已经转换为域对象(因此可以执行匹配)。使用自定义解串器,JsonDeserializer
或JsonMessageConverter
设置TypePrecedence
为TYPE_ID
。有关详细信息,请参阅序列化、反序列化和消息转换。
由于 Spring 解析方法参数的方式存在一些限制,默认的@KafkaHandler
不能接收离散的标头;它必须使用Consumer Record MetadataConsumerRecordMetadata
中讨论的。
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是String
,这将不起作用 ;该topic
参数还将获得对object
的引用.
如果您需要默认方法中有关记录的元数据,请使用以下命令:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
@KafkaListener
属性修改
从版本 2.7.2 开始,您现在可以在创建容器之前以编程方式修改注解属性。为此,请将一个或多个添加KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer
到应用程序上下文中。 AnnotationEnhancer
是一个BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>
并且必须返回一个属性映射。属性值可以包含 SpEL 和/或属性占位符;在执行任何解析之前调用增强器。如果存在多个增强器,并且它们实现了Ordered
,它们将按顺序被调用。
必须声明AnnotationEnhancer
bean 定义为static
,因为它们在应用程序上下文的生命周期的早期就需要。
一个例子如下:
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
? ((Class<?>) element).getSimpleName()
: ((Method) element).getDeclaringClass().getSimpleName()
+ "." + ((Method) element).getName()));
return attrs;
};
}
@KafkaListener
生命周期管理
为@KafkaListener
注解创建的侦听器容器不是应用程序上下文中的 bean。相反,它们使用KafkaListenerEndpointRegistry
类型的基础设施 bean 进行注册。该 bean 由框架自动声明并管理容器的生命周期;它将自动启动任何已autoStartup
设置为true
. 所有容器工厂创建的所有容器必须在同一个phase
. 有关详细信息,请参阅侦听器容器自动启动。您可以使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,您可以通过使用其id
属性来获取对单个容器的引用。你可以在注解上设置autoStartup
,它会覆盖容器工厂中配置的默认设置。您可以从应用程序上下文中获取对 bean 的引用,例如自动装配,以管理其注册的容器。以下示例显示了如何执行此操作:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
注册中心只维护它管理的容器的生命周期;声明为 bean 的容器不受注册表管理,可以从应用程序上下文中获取。可以通过调用注册表的getListenerContainers()
方法来获取托管容器的集合。2.2.5 版添加了一个便捷方法getAllListenerContainers()
,它返回所有容器的集合,包括由注册表管理的容器和声明为 bean 的容器。返回的集合将包括任何已初始化的原型 bean,但它不会初始化任何惰性 bean 声明。
刷新应用程序上下文后注册的端点将立即启动,无论其autoStartup
属性如何,以遵守SmartLifecycle
约定,其中autoStartup
仅在应用程序上下文初始化期间考虑。延迟注册的一个示例是具有@KafkaListener
原型范围的 bean,其中在上下文初始化后创建实例。从版本 2.8.7 开始,您可以将注册表的alwaysStartAfterRefresh
属性设置为false
,然后容器的autoStartup
属性将定义容器是否启动。
@KafkaListener
@Payload
验证
从 2.2 版开始,现在可以更轻松地添加 Validator
来验证@KafkaListener
@Payload
参数。以前,您必须配置自定义DefaultMessageHandlerMethodFactory
并将其添加到注册器。现在,您可以将验证器添加到注册商本身。以下代码显示了如何执行此操作:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
}
}
当您将 Spring Boot 与验证启动器一起使用时,会自动配置 LocalValidatorFactoryBean
,如以下示例所示:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
以下示例显示了如何验证:
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
}
public void setBar(int bar) {
this.bar = bar;
}
}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
从版本 2.5.11 开始,验证现在适用于@KafkaHandler
类级侦听器中方法的有效负载。见@KafkaListener
上一课。
reblance 侦听器
ContainerProperties
有一个名为consumerRebalanceListener
的属性,它采用 Kafka 客户端ConsumerRebalanceListener
接口的实现。如果未提供此属性,则容器会配置一个日志侦听器,用于在INFO
级别记录重新平衡事件。该框架还添加了一个子接口ConsumerAwareRebalanceListener
。以下清单显示了ConsumerAwareRebalanceListener
接口定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
请注意,撤销分区时有两个回调。第一个被立即调用。第二个在提交任何待处理的偏移量后调用。如果您希望在某些外部存储库中维护偏移量,这将很有用,如以下示例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从2.4版本开始,添加了一个新方法onPartitionsLost()(类似于ConsumerRebalanceLister中的同名方法)。 ConsumerRebalanceLister 的默认实现仅调用 onPartionsRevoked。 ConsumerAwareRebalanceListener 的默认实现不执行任何操作。当为侦听器容器提供自定义侦听器(任一类型)时,您的实现不要从 onPartitionsLost 调用 onPartitionsRevoked ,这一点很重要。如果您实现 ConsumerRebalanceListener 您应该覆盖默认方法。这是因为在调用您的实现上的方法后,侦听器容器将从其 onPartitionsLost 实现中调用自己的 onPartitionsRevoked。如果您实现委托为默认行为,则每次消费者在容器的侦听器上调用该方法时,onPartitionsRevoked 都会被调用两次。
使用@SendTo
转发侦听器结果
从 2.0 版本开始,如果您还使用@SendTo
注解对@KafkaListener
进行注解,并且方法调用返回结果,则将结果转发到@SendTo
指定的topic.
该@SendTo
值可以有多种形式:
@SendTo("someTopic")
到指定topic的路由@SendTo("#{someExpression}")
通过在应用程序上下文初始化期间评估一次表达式确定的topic的路由。@SendTo("!{someExpression}")
通过在运行时评估表达式确定的topic的路由。评估#root
对象具有三个属性:request
:入站ConsumerRecord
(或ConsumerRecords
批处理侦听器的对象))source
:org.springframework.messaging.Message<?>
从request
.result
: 方法返回结果。
@SendTo
(无属性):这被视为!{source.headers['kafka_replyTopic']}
(从版本 2.1.3 开始)。
从版本 2.1.11 和 2.2.1 开始,属性占位符在@SendTo
值内解析。
表达式求值的结果必须是表示topic名称的 String
类型。以下示例显示了各种使用方法@SendTo
:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
为了支持@SendTo
,监听器容器工厂必须提供一个KafkaTemplate
(在其replyTemplate
属性中),用于发送回复。这应该是 KafkaTemplate
而不是 ReplyingKafkaTemplate
在客户端用于请求/回复处理。使用 Spring Boot 时,boot 会自动将模板配置到工厂中;在配置自己的工厂时,必须按照以下示例进行设置。
从版本 2.2 开始,您可以向侦听器容器工厂添加一个ReplyHeadersConfigurer
。参考此信息以确定您要在回复消息中设置哪些标头。以下示例显示了如何添加ReplyHeadersConfigurer
:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果您愿意,还可以添加更多标题。以下示例显示了如何执行此操作:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
当您使用@SendTo时,您必须在其replyTemplate
属性中使用KafkaTemplate
配置ConcurrentKafkaListenerContainerFactory
才能执行发送。 Spring Boot 将自动连接其自动配置的模板(或者如果存在单个实例则任何模板)。
除非您使用请求/回复语义,否则仅使用简单send(topic, value)
方法,因此您可能希望创建一个子类来生成分区或键。以下示例显示了如何执行此操作:
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
如果侦听器方法返回 Message 或 Collection>,则侦听器方法负责设置回复的消息标头。例如,当处理来自 ReplyingKafkaTemplate 的请求时,您可以执行以下操作:
@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader("someOtherHeader", "someValue")
.build();
}
即使没有返回结果,您也可以使用 @SendTo 注解 @KafkaListener 方法。这是为了允许配置 errorHandler,该 errorHandler 可以将有关失败消息传递的信息转发到某个主题。以下示例展示了如何执行此操作:
@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
throw new RuntimeException("fail");
}
@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
return (m, e) -> {
return ... // some information about the failure and input data
};
}
有关详细信息,请参阅处理异常。
如果侦听器方法返回一个Iterable
,则默认情况下,每个元素都会在发送值时记录一条记录。从版本 2.3.5 开始,将@KafkaListener
的splitIterables
属性设置为false
,整个结果将作为单个ProducerRecord
. 这需要在回复模板的生产者配置中使用合适的序列化程序。但是,如果回复是Iterable<Message<?>>
,则忽略该属性并单独发送每条消息。
过滤消息
在某些情况下,例如重新平衡,可能会重新传递已经处理的消息。框架无法知道这样的消息是否已被处理。那是一个应用程序级的功能。这被称为Idempotent Receiver模式,Spring Integration 提供了它的实现。
Spring for Apache Kafka 项目还通过FilteringMessageListenerAdapter
类提供了一些帮助,它可以包装你的MessageListener
. 此类采用一个实现RecordFilterStrategy
,您可以在其中实现该filter
方法来表示消息是重复的并且应该被丢弃。这有一个称为 的附加属性ackDiscarded
,它指示适配器是否应确认丢弃的记录。false
默认情况下。
使用 时@KafkaListener
,在容器工厂上设置RecordFilterStrategy
(和可选ackDiscarded
的),以便将侦听器包装在适当的过滤适配器中。
此外,FilteringBatchMessageListenerAdapter
还提供了 a ,供您使用批处理消息侦听器时使用。
FilteringBatchMessageListenerAdapter
如果您@KafkaListener
收到 a 而ConsumerRecords<?, ?>
不是 ,则忽略List<ConsumerRecord<?, ?>>
,因为ConsumerRecords
它是不可变的。
从版本 2.8.4 开始,您可以RecordFilterStrategy
通过使用filter
侦听器注解上的属性来覆盖侦听器容器工厂的默认值。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
重试消费
请参阅处理异常DefaultErrorHandler
中的。
@KafkaListener
按顺序开始
一个常见的用例是在另一个侦听器消耗了topic中的所有记录后启动侦听器。例如,您可能希望在处理来自其他topic的记录之前将一个或多个压缩topic的内容加载到内存中。从版本 2.7.3 开始,ContainerGroupSequencer
引入了一个新组件。当当前组中的所有容器都空闲时,它使用该@KafkaListener
containerGroup
属性将容器组合在一起并启动下一组中的容器。
最好用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
在这里,我们有两组 4 个监听器,g1
和g2
.
在应用程序上下文初始化期间,定序器将所提供组中所有容器的autoStartup
属性设置为 false。它还将任何容器(尚未设置)的idleEventInterval
设置为提供的值(在本例中为5000 毫秒)。然后,当应用程序上下文启动定序器时,将启动第一组中的容器。当接收到 ListenerContainerIdleEvent
时,每个容器中的每个单独的子容器都会停止。当 ConcurrentMessageListenerContainer
中的所有子容器停止时,父容器也停止。当一组中的所有容器都停止后,下一组中的容器将启动。组或组中容器的数量没有限制。
默认情况下,最后组中的容器(上面的 g2)在空闲时不会停止。要修改该行为,请在定序器上将 stopLastGroupWhenIdle
设置为 true
。
作为旁白:以前,每个组中的容器都被添加到 Collection
类型的 bean 中,bean 名称为 containerGroup
。这些集合现在已被弃用,取而代之的是 ContainerGroup
类型的 bean,其 bean 名称是组名称,后缀为 .group
;在上面的示例中,将有 2 个 bean g1.group
和 g2.group
。 Collection bean 将在未来版本中删除。
用于KafkaTemplate
接收
本节介绍如何使用KafkaTemplate
来接收消息。
从 2.8 版本开始,模板有四种receive()
方法:
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
如您所见,您需要知道需要检索的记录的分区和偏移量;为每个操作创建(并关闭)一个新的Consumer
。
使用最后两种方法,分别检索每条记录并将结果组合到一个ConsumerRecords
对象中。在为请求创建TopicPartitionOffset
s 时,仅支持正的绝对偏移量。
评论区