文章摘要(AI生成)
总结:Apache Kafka提供了序列化、反序列化和消息转换的高级API,通过配置属性可以指定记录的键和值的序列化器和反序列化器类。KafkaConsumer和KafkaProducer提供了重载构造函数来接受自定义的序列化器和反序列化器实例。此外,Spring for Apache Kafka还提供了ToStringSerializer和ParseStringDeserializer类,用于将实体表示为字符串以及解析字符串。针对JSON对象映射,Spring for Apache Kafka提供了JsonSerializer和JsonDeserializer,允许将Java对象编写为JSON字节数组,并反序列化为正确的目标对象。用户可以自定义JsonSerializer和JsonDeserializer,或扩展它们以实现特定的配置逻辑。JacksonUtils.enhancedObjectMapper()提供了默认配置,禁用了一些功能并提供了自定义数据类型支持。这些功能为用户提供了广泛的选择和配置灵活性。
4.1.17. 序列化、反序列化和消息转换
概述
Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。它与一些内置实现的org.apache.kafka.common.serialization.Serializer<T>
和org.apache.kafka.common.serialization.Deserializer<T>
抽象一起出现。同时,我们可以通过使用Producer
或Consumer
配置属性来指定序列化器和反序列化器类。以下示例显示了如何执行此操作:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特定的情况,KafkaConsumer
以及 KafkaProducer
)提供重载构造函数来分别接受键和值的序列化器和反序列化器实例。
当您使用此 API 时,DefaultKafkaProducerFactory
和DefaultKafkaConsumerFactory
还提供属性(通过构造函数或 setter 方法)将自定义Serializer
和Deserializer
实例注入目标Producer
或Consumer
. 此外,您可以通过构造函数传入Supplier<Serializer>
或Supplier<Deserializer>
实例 - 这些Supplier
s 在创建每个Producer
或Consumer
时被调用。
字符串序列化
从 2.5 版开始,Spring for Apache Kafka 提供了使用字符串表示实体的ToStringSerializer
和ParseStringDeserializer
类。它们依赖方法toString
和一些Function<String>
或BiFunction<String, Headers>
解析字符串并填充实例的属性。通常,这会调用类上的一些静态方法,例如parse
:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer
配置为在记录Headers
中传达有关序列化实体的类型信息。您可以通过将addTypeInfo
属性设置为 false来禁用此功能。接收方可以使用ParseStringDeserializer
信息。
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):您可以将其设置false
为禁用ToStringSerializer
功能(设置addTypeInfo
属性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以配置Charset
用于转换String
为/从byte[]
默认为UTF-8
.
您可以使用ConsumerConfig
属性使用解析器方法的名称配置反序列化器:
ParseStringDeserializer.KEY_PARSER
ParseStringDeserializer.VALUE_PARSER
属性必须包含类的完全限定名称,后跟方法名称,用句点.
分隔。该方法必须是静态的,并且具有一个(String, Headers)
或(String)
的签名。
还提供了 ToFromStringSerde
,用于 Kafka Streams。
JSON
Spring for Apache Kafka 还提供了基于 Jackson JSON 对象映射器的实现JsonSerializer
和JsonDeserializer
。JsonSerializer
允许将任何 Java 对象编写为 JSON byte[]
。JsonDeserializer
需要一个额外的Class<?> targetType
参数来允许将消费byte[]
对象反序列化为正确的目标对象。下面的例子展示了如何创建一个JsonDeserializer
:
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以使用ObjectMapper
自定义JsonSerializer
和JsonDeserializer
. 您还可以扩展它们以在configure(Map<String, ?> configs, boolean isKey)
方法中实现一些特定的配置逻辑。
从 2.3 版开始,所有支持 JSON 的组件都默认配置有一个JacksonUtils.enhancedObjectMapper()
实例,该实例禁用了MapperFeature.DEFAULT_VIEW_INCLUSION
和DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
功能。此外,此类实例还提供了用于自定义数据类型的众所周知的模块,例如 Java 时间和 Kotlin 支持。有关更多信息,请参阅JacksonUtils.enhancedObjectMapper()
JavaDocs。此方法还将对象序列化注册org.springframework.kafka.support.JacksonMimeTypeModule
到org.springframework.util.MimeType
纯字符串中,以便通过网络实现跨平台兼容性。JacksonMimeTypeModule
可以在应用程序上下文中注册为 bean,并将自动配置到Spring BootObjectMapper
实例中。
同样从 2.3 版开始,JsonDeserializer
提供了TypeReference
基于构造函数的构造函数,用于更好地处理目标通用容器类型。
从 2.1 版开始,您可以在记录中传达类型信息Headers
,从而允许处理多种类型。此外,您可以使用以下 Kafka 属性配置序列化器和反序列化器。如果您分别为KafkaConsumer
和KafkaProducer
提供Serializer
和Deserializer
实例,它们将无效。
配置属性
JsonSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):您可以将其设置false
为禁用此功能JsonSerializer
(设置addTypeInfo
属性)。JsonSerializer.TYPE_MAPPINGS
(默认empty
):见映射类型。JsonDeserializer.USE_TYPE_INFO_HEADERS
(默认true
):您可以将其设置false
为忽略序列化程序设置的标头。JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(默认true
):您可以将其设置false
为保留序列化程序设置的标头。JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在标头信息,则用于反序列化密钥的后备类型。JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在标头信息,则用于反序列化值的后备类型。JsonDeserializer.TRUSTED_PACKAGES
(默认java.util
,java.lang
):允许反序列化的包模式的逗号分隔列表。*
意味着反序列化所有。JsonDeserializer.TYPE_MAPPINGS
(默认empty
):见映射类型。JsonDeserializer.KEY_TYPE_METHOD
(默认empty
):请参阅使用方法确定类型。JsonDeserializer.VALUE_TYPE_METHOD
(默认empty
):请参阅使用方法确定类型。
从 2.2 版开始,类型信息标头(如果由序列化程序添加)被反序列化程序删除。您可以通过直接在反序列化器上或使用前面描述的配置属性将removeTypeHeaders
属性设置为 false
来恢复到以前的行为。
另请参阅自定义 JsonSerializer 和 JsonDeserializer。
从 2.8 版开始,如果您以编程方式构造序列化器或反序列化器,如Programmatic Construction所示,只要您没有显式设置任何属性(使用set*()
方法或使用 fluent API),工厂将应用上述属性。以前,在以编程方式创建时,从未应用配置属性;如果您直接在对象上显式设置属性,情况仍然如此。
映射类型
从版本 2.2 开始,在使用 JSON 时,您现在可以使用前面列表中的属性来提供类型映射。以前,您必须在序列化器和反序列化器中自定义类型映射器。映射由逗号分隔的token:className
对列表组成。在出站时,有效负载的类名映射到相应的令牌。在入站时,类型标头中的标记被映射到相应的类名。
以下示例创建一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
对应的对象必须是兼容的。
如果您使用Spring Boot ,则可以在application.properties
(或 yaml)文件中提供这些属性。以下示例显示了如何执行此操作:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您只能使用属性执行简单的配置。对于更高级的配置(例如ObjectMapper
在序列化器和反序列化器中使用自定义),您应该使用接受预构建的序列化器和反序列化器的生产者和消费者工厂构造函数。以下 Spring Boot 示例覆盖了默认工厂:
@Bean
public ConsumerFactory<String, Thing> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
Map<String, Object> properties = new HashMap<>();
// properties.put(..., ...)
// ...
return new DefaultKafkaConsumerFactory<>(properties,
new StringDeserializer(), customValueDeserializer);
}
@Bean
public ProducerFactory<String, Thing> kafkaProducerFactory(JsonSerializer customValueSerializer) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
new StringSerializer(), customValueSerializer);
}
还提供了 Setter,作为使用这些构造函数的替代方法。
从版本 2.2 开始,您可以使用具有布尔值useHeadersIfPresent
(true
默认情况下)的重载构造函数之一显式配置反序列化器以使用提供的目标类型并忽略标头中的类型信息。以下示例显示了如何执行此操作:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型
从 2.5 版开始,您现在可以通过属性配置反序列化器,以调用方法来确定目标类型。如果存在,这将覆盖上面讨论的任何其他技术。如果数据是由不使用 Spring 序列化程序的应用程序发布的,并且您需要根据数据或其他标头反序列化为不同的类型,这将很有用。将这些属性设置为方法名 - 一个完全限定的类名,后跟方法名,用句点分隔.
。该方法必须声明为public static
,具有三个签名之一(String topic, byte[] data, Headers headers)
,(byte[] data, Headers headers)
或者(byte[] data)
返回一个 Jackson JavaType
。
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意标题或检查数据以确定类型。
例子
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,请考虑使用JsonPath
或类似方法,但确定类型的测试越简单,该过程就越有效。
以下是以编程方式创建反序列化器的示例(在构造函数中为消费者工厂提供反序列化器时):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化建设
在以编程方式构建序列化器/反序列化器以供生产者/消费者工厂使用时,从 2.3 版本开始,您可以使用 fluent API,它简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
要以编程方式提供类型映射,类似于使用方法确定类型,请使用该typeFunction
属性。
例子
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要你不使用 fluent API 配置属性,或者使用set*()
方法设置它们,工厂将使用配置属性配置序列化器/反序列化器;请参阅配置属性。
委派序列化器和反序列化器
使用标题
2.3 版引入了DelegatingSerializer
和 DelegatingDeserializer
,它允许生成和使用具有不同键和/或值类型的记录。生产者必须将标头设置DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
为选择器值,该值用于选择将哪个序列化器用于值和DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
键;如果未找到匹配项,则抛出 IllegalStateException
。
对于传入的记录,反序列化器使用相同的标头来选择要使用的反序列化器;如果未找到匹配项或标头不存在,则返回原始byte[]
数据。
您可以通过构造函数将选择器映射配置为Serializer
/ Deserializer
,也可以通过 Kafka 生产者/消费者属性使用DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
和DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
键. 对于序列化程序,生产者属性可以是一个Map<String, Object>
,其中键是选择器,值是Serializer
实例、序列化Class
或类名。该属性也可以是逗号分隔的映射条目字符串,如下所示。
对于反序列化器,消费者属性可以是一个Map<String, Object>
,其中键是选择器,值是Deserializer
实例、反序列Class
化器或类名。该属性也可以是逗号分隔的映射条目字符串,如下所示。
要使用属性进行配置,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然后,生产者会将DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
标头设置为thing1
或thing2
。
该技术支持向同一主题(或不同主题)发送不同类型。
从版本 2.5.1 开始,如果类型(键或值)是Serdes
( Long
,Integer
等) 支持的标准类型之一,则无需设置选择器标头。相反,序列化程序会将标头设置为类型的类名。不必为这些类型配置序列化器或反序列化器,它们将被动态创建(一次)。
有关将不同类型发送到不同主题的另一种技术,请参阅使用RoutingKafkaTemplate
.
按类型
2.8 版引入了DelegatingByTypeSerializer
.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,您可以配置序列化程序以检查映射键是否可从目标对象分配,这在委托序列化程序可以序列化子类时很有用。在这种情况下,如果存在不明确的匹配,则应提供一个有序的Map
,例如 LinkedHashMap
。
按主题
从 2.8 版开始,DelegatingByTopicSerializer
和DelegatingByTopicDeserializer
允许根据主题名称选择序列化器/反序列化器。RegexPattern
用于查找要使用的实例。可以使用构造函数或通过属性(逗号分隔的pattern:serializer
列表)配置映射。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
将其用于键时使用KEY_SERIALIZATION_TOPIC_CONFIG
。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null,
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
和DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
指定在没有模式匹配时使用的默认序列化器/反序列化器。
一个附加属性DelegatingByTopicSerialization.CASE_SENSITIVE
(默认true
),当设置为false
时主题查找不区分大小写。
重试反序列化器
当代理在反序列化过程中可能出现暂时性错误(例如网络问题)时,RetryingDeserializer
使用委托Deserializer
和RetryTemplate
重试反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
RetryTemplate
的重试策略、回退策略等的配置请参考spring-retry项目。
Spring Messaging 消息转换
尽管从低级 Kafka Consumer
和 Producer
的角度来看,序列化器和反序列化器 API 非常简单且灵活,但在使用 @KafkaListener
或 Spring Integration
的 Apache Kafka
支持时,您可能需要在 Spring 消息传递级别具有更多灵活性。为了让您轻松地与 org.springframework.messaging.Message
进行相互转换,Spring for Apache Kafka 提供了 MessageConverter
抽象,其中包含 MessagingMessageConverter
实现及其 JsonMessageConverter
(及其子类)自定义。您可以直接将 MessageConverter
注入KafkaTemplate
实例,并使用 @KafkaListener.containerFactory()
属性的AbstractKafkaListenerContainerFactory bean
定义。以下示例展示了如何执行此操作:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用 Spring Boot 时,只需将转换器定义为 @Bean
,Spring Boot 自动配置会将其连接到自动配置的模板和容器工厂。
当您使用 @KafkaListener
时,参数类型被提供给消息转换器以协助转换。
只有在方法级别声明@KafkaListener
注解时,才能实现这种类型推断。对于类级别@KafkaListener
,有效负载类型用于选择@KafkaHandler
要调用的方法,因此在选择方法之前它必须已经转换。
在consumer端,可以配置一个JsonMessageConverter
;它可以处理 byte[]、Bytes 和 String 类型的 ConsumerRecord
值,因此应与 ByteArrayDeserializer
、BytesDeserializer
或 StringDeserializer
结合使用。 (byte[] 和 Bytes 效率更高,因为它们避免了不必要的 byte[] 到 String 转换)。如果您愿意,您还可以配置与反序列化器对应的JsonMessageConverter
的特定子类。
在生产者端,当您使用 Spring Integration 或 KafkaTemplate.send(Message message) 方法(请参阅使用 KafkaTemplate)时,您必须配置与配置的 Kafka Serializer 兼容的消息转换器。
StringSerializer
对应StringJsonMessageConverter
StringJsonMessageConverter
对应BytesJsonMessageConverter
ByteArraySerializer
对应ByteArrayJsonMessageConverter
同样,使用byte[]
或Bytes
更有效,因为它们避免了String
到byte[]
转换。为方便起见,从 2.3 版开始,该框架还提供了一个StringOrBytesSerializer
可以序列化所有三种值类型的方法,因此它可以与任何消息转换器一起使用。
从版本 2.7.1 开始,可以将消息负载转换委托给spring-messaging
SmartMessageConverter
; 例如,这使得转换可以基于MessageHeaders.CONTENT_TYPE
标题。
调用 KafkaMessageConverter.fromMessage()
方法以出站转换为 ProducerRecord
,消息负载位于 ProducerRecord.value()
属性中。调用KafkaMessageConverter.toMessage()
方法来从ConsumerRecord
进行入站转换,有效负载为ConsumerRecord.value()
属性。调用 SmartMessageConverter.toMessage()
方法,根据传递给“fromMessage()”的消息创建新的出站 Message(通常通过 KafkaTemplate.send(Message msg))。类似地,在 KafkaMessageConverter.toMessage()
方法中,转换器从ConsumerRecord
创建新的 Message 后,调用 SmartMessageConverter.fromMessage()
方法,然后使用新转换的有效负载创建最终的入站消息。无论哪种情况,如果SmartMessageConverter
返回 null
,则使用原始消息。
当在KafkaTemplate
监听器容器工厂中使用默认转换器时,您可以通过调用模板的setMessagingConverter()
方法SmartMessageConverter
和@KafkaListener
方法上的contentMessageConverter
属性来配置。
例子:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring 数据投影接口
从版本 2.1.1 开始,您可以将 JSON 转换为 Spring Data Projection 接口而不是具体类型。这允许对数据进行非常选择性和低耦合的绑定,包括从 JSON 文档中的多个位置查找值。例如,可以将以下接口定义为消息负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,访问器方法将用于在接收到的 JSON 文档中将属性名称作为字段查找。该@JsonPath
表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。
要启用此功能,请使用ProjectingMessageConverter
配置了适当的委托转换器(用于出站转换和转换非投影接口)。您还必须将spring-data:spring-data-commons
和添加com.jayway.jsonpath:json-path
到类路径。
当用作方法的参数时@KafkaListener
,接口类型会像往常一样自动传递给转换器。
使用ErrorHandlingDeserializer
当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,引入了ErrorHandlingDeserializer
。该解串器委托给真正的解串器(键或值)。如果委托无法反序列化记录内容,ErrorHandlingDeserializer
将返回 null 值,并在包含原因和原始字节的标头中返回 DeserializationException
。当您使用记录级 MessageListener
时,如果 ConsumerRecord 包含键或值的 DeserializationException
标头,则会使用失败的 ConsumerRecord
调用容器的 ErrorHandler。记录不会传递给侦听器。
或者,您可以将 ErrorHandlingDeserializer
配置为通过提供 failedDeserializationFunction
(Function)来创建自定义值。调用此函数来创建 T 的实例,该实例以通常的方式传递给侦听器。向函数提供了 ailedDeserializationInfo
类型的对象,其中包含所有上下文信息。您可以在标头中找到 DeserializationException
(作为序列化的 Java 对象)。有关详细信息,请参阅 ErrorHandlingDeserializer
的 Javadoc。
您可以使用 DefaultKafkaConsumerFactory
构造函数,该构造函数采用键和值Deserializer
对象,并连接到您已使用适当委托配置的适当 ErrorHandlingDeserializer
实例中。或者,您可以使用使用者配置属性(由 ErrorHandlingDeserializer
使用)来实例化委托。属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
。属性值可以是类或类名。以下示例展示了如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用failedDeserializationFunction
.
public class BadFoo extends Foo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {
@Override
public Foo apply(FailedDeserializationInfo info) {
return new BadFoo(info);
}
}
前面的示例使用以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class);
...
如果消费者配置了 ErrorHandlingDeserializer
,那么使用序列化器配置KafkaTemplate
及其生产者非常重要,该序列化器可以处理普通对象以及由反序列化异常导致的原始 byte[] 值。模板的通用值类型应该是Object。一种技术是使用 DelegatingByTypeSerializer
;示例如下:
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
使用ErrorHandlingDeserializer
批处理侦听器时,您必须检查消息头中的反序列化异常。当与 DefaultBatchErrorHandler
一起使用时,您可以使用该标头来确定异常失败的记录,并通过 BatchListenerFailedException
与错误处理程序通信。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger,
(byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
ListenerUtils.byteArrayToDeserializationException()
可用于将标头转换为DeserializationException
.
消费List<ConsumerRecord<?, ?>
时,使用ListenerUtils.getExceptionFromHeader()
代替:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
使用批处理侦听器进行有效负载转换
当您使用批处理侦听器容器工厂时,还可以在 BatchMessagingMessageConverter
中使用 JsonMessageConverter
来转换批处理消息。有关更多信息,请参阅序列化、反序列化和消息转换以及 Spring Messaging 消息转换。
默认情况下,转换的类型是从侦听器参数推断出来的。如果使用 DefaultJackson2TypeMapper
配置 JsonMessageConverter
,并将其TypePrecedence
设置为 TYPE_ID
(而不是默认的 INFERRED),则转换器将使用标头中的类型信息(如果存在)。例如,这允许使用接口而不是具体类来声明侦听器方法。此外,类型转换器支持映射,因此反序列化可以是与源不同的类型(只要数据兼容)。当您使用类级 @KafkaListener
实例(其中有效负载必须已转换以确定要调用哪个方法)时,这也很有用。以下示例创建使用此方法的 bean:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,为此,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,例如:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,您仍然可以访问批次标题。
如果批处理转换器有支持它的记录转换器,您还可以收到消息列表,其中根据泛型类型转换有效负载。以下示例显示了如何执行此操作:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> fooMessages) {
...
}
ConversionService
定制
从版本 2.1.1 开始,默认的o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory
使用 org.springframework.core.convert.ConversionService
来解析调用侦听器方法的参数,并由实现以下任何接口的所有 bean 提供:
org.springframework.core.convert.converter.Converter
org.springframework.core.convert.converter.GenericConverter
org.springframework.format.Formatter
这使您可以进一步自定义侦听器反序列化,而无需更改ConsumerFactory
和KafkaListenerContainerFactory
的默认配置。
通过 KafkaListenerConfigurer
bean 在 KafkaListenerEndpointRegistrar
设置自定义 MessageHandlerMethodFactory
会禁用此功能。
添加自定义HandlerMethodArgumentResolver
到@KafkaListener
从 2.4.2 版开始,您可以添加自己的HandlerMethodArgumentResolver
和解析自定义方法参数。您所需要的只是实现KafkaListenerConfigurer
和使用KafkaListenerEndpointRegistrar
类中的setCustomMethodArgumentResolvers()
方法。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您还可以通过向 KafkaListenerEndpointRegistrar
bean 添加自定义 MessageHandlerMethodFactory
来完全替换框架的参数解析。如果您这样做,并且您的应用程序需要处理带有 null value() 的逻辑删除记录(例如来自压缩主题),则应该向工厂添加 KafkaNullAwarePayloadArgumentResolver
;它必须是最后一个解析器,因为它支持所有类型并且可以匹配没有 @Payload
注释的参数。如果您使用的是 DefaultMessageHandlerMethodFactory
,请将此解析器设置为最后一个自定义解析器;工厂将确保此解析器将在标准 PayloadMethodArgumentResolver
之前使用,后者不了解 KafkaNull
有效负载。
4.1.18. 消息头
0.11.0.0 客户端引入了对消息头的支持。从 2.0 版开始,Spring for Apache Kafka 现在支持将这些标头映射到spring-messaging
MessageHeaders
.
以前的版本映射ConsumerRecord
和ProducerRecord
到spring-messaging Message<?>
,其中 value 属性映射到topic以及从payload
和其他属性(topic
,partition
,等等)映射到标题。情况仍然如此,但现在可以映射附加(任意)标头。
Apache Kafka 标头有一个简单的 API,如以下接口定义所示:
public interface Header {
String key();
byte[] value();
}
KafkaHeaderMapper
策略用于在 KafkaHeaders
和MessageHeaders
. 其接口定义如下:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
将SimpleKafkaHeaderMapper
原始标头映射为byte[]
,并带有用于转换为String
值的配置选项。
将DefaultKafkaHeaderMapper
键映射到MessageHeaders
标头名称,并且为了支持出站消息的丰富标头类型,执行 JSON 转换。“特殊”标头(带有spring_json_header_types
键)包含 的 JSON 映射<key>:<type>
。此标头用于入站端,以将每个标头值适当地转换为原始类型。
在入站端,所有 KafkaHeader
实例都映射到MessageHeaders
. 在出站端,默认情况下,所有MessageHeaders
都被映射,除了id
,timestamp
和映射到ConsumerRecord
属性的标头。
您可以通过向映射器提供模式来指定要为出站消息映射哪些标头。以下清单显示了一些示例映射:
//使用默认 Jackson`ObjectMapper`并映射大多数标头,如示例前所述。
public DefaultKafkaHeaderMapper() {
...
}
//使用提供的 Jackson`ObjectMapper`并映射大多数标头,如示例前所述。
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
...
}
//使用默认 Jackson`ObjectMapper`并根据提供的模式映射标题。
public DefaultKafkaHeaderMapper(String... patterns) {
...
}
//使用提供的 Jackson`ObjectMapper`并根据提供的模式映射标题。
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
...
}
模式相当简单,可以包含前导通配符 ( *
)、尾随通配符或两者都包含(例如,.cat.*
)。您可以使用前导!
来否定模式。第一个与标头名称(无论是正数还是负数)匹配的模式获胜。
当您提供自己的模式时,我们建议包含!id
和 !timestamp
,因为这些标头在入站端是只读的。
默认情况下,映射器仅反序列化java.lang
和java.util
中的类。您可以通过使用addTrustedPackages
方法添加受信任的包来信任其他(或所有)包。如果您从不受信任的来源收到消息,您可能希望仅添加您信任的那些包。要信任所有包,您可以使用mapper.addTrustedPackages("*")
.
在与不知道映射器 JSON 格式的系统通信时,以原始形式映射String
标头值很有用。
从版本 2.2.5 开始,您可以指定某些字符串值标头不应使用 JSON 映射,而应映射到原始 byte[] 或从原始 byte[] 映射。AbstractKafkaHeaderMapper
有新属性; mapAllStringsOut
当设置为 true
时,所有字符串值标头将使用 charset
属性(默认 UTF-8)转换为 byte[]
。另外,还有一个属性 rawMappedHeaders
,它是 header name : boolean
的映射;如果映射包含标头名称,并且标头包含 String 值,则它将使用字符集映射为原始 byte[]
。当且仅当映射值中的布尔值为true
时,此映射还用于使用字符集将原始传入 byte[]
标头映射到 String
。如果布尔值为 false
,或者标头名称不在具有true
值的映射中,则传入标头将简单地映射为原始未映射标头。
下面的测试用例说明了这种机制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,两个标头映射器都映射所有入站标头。从 2.8.8 版开始,这些模式也可以应用于入站映射。要为入站映射创建映射器,请使用相应映射器上的静态方法之一:
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以开头的标题abc
并包括所有其他标题。
默认情况下,只要 Jackson 在类路径上,就会在MessagingMessageConverter
和BatchMessagingMessageConverter
中使用DefaultKafkaHeaderMapper
。
使用批处理转换器,转换后的标头可用于列表中KafkaHeaders.BATCH_CONVERTED_HEADERS
的List<Map<String, Object>>
映射与有效负载中的数据位置相对应的位置。
如果没有转换器(因为 Jackson 不存在或明确设置为null
),则来自消费者记录的标头在KafkaHeaders.NATIVE_HEADERS
标头中提供未转换的。此标头是一个Headers
对象(或在批处理转换器的情况下为List<Headers>
),其中列表中的位置对应于有效负载中的数据位置)。
某些类型不适合 JSON 序列化,这些类型可能首选简单的toString()
序列化。DefaultKafkaHeaderMapper
有一个名为addToStringClasses()
的方法,可让您提供应该以这种方式处理出站映射的类的名称。在入站映射期间,它们被映射为String
. 默认情况下,只有org.springframework.util.MimeType
和org.springframework.http.MediaType
以这种方式映射。
从 2.3 版开始,简化了对字符串值标头的处理。默认情况下,此类标头不再是 JSON 编码的(即它们没有"…"
添加封闭)。该类型仍被添加到 JSON_TYPES 标头中,因此接收系统可以转换回字符串(来自byte[]
)。映射器可以处理(解码)旧版本产生的标头(它检查前导"
);这样,使用 2.3 的应用程序可以使用旧版本的记录。
要与早期版本兼容,请设置encodeStrings
为true
,如果使用 2.3 的版本生成的记录可能会被使用早期版本的应用程序使用。当所有应用程序都使用 2.3 或更高版本时,您可以将该属性保留为其默认值false
.
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它将自动将此转换器 bean 配置为 auto-configured KafkaTemplate
;否则您应该将此转换器添加到模板中。
4.1.19.‘Tombstone’ 记录的空负载和日志压缩
当您使用Log Compaction时,您可以发送和接收带有null
有效负载的消息,以识别密钥的删除。
您还可以出于其他原因接收null
值,例如在无法反序列化值时Deserializer
可能返回的null
值。
要使用 KafkaTemplate
发送null
有效负载,您可以将null
传递到 send()
方法的value
参数中。一个例外是 send(Message message)
变体。由于 pring-messaging Message
不能有 null 有效负载,因此您可以使用名为 KafkaNull 的特殊有效负载类型,并且框架发送 null。为了方便起见,提供了静态的 KafkaNull.INSTANCE
。
当您使用消息侦听器容器时,接收到ConsumerRecord
的具有null
value()
.
要配置@KafkaListener
以处理null
有效负载,您必须使用带有 required = false
的@Payload
注释。如果它是压缩日志的墓碑消息,您通常还需要密钥,以便您的应用程序可以确定哪个密钥被“删除”。以下示例显示了这样的配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您使用具有多个@KafkaHandler
方法的类级别@KafkaListener
时,需要进行一些额外的配置。具体来说,您需要一个带有KafkaNull
有效负载的@KafkaHandler
方法。以下示例显示了如何配置一个:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是null
,不是KafkaNull
。
请参阅手动分配所有分区。
此功能需要使用KafkaNullAwarePayloadArgumentResolver
框架将在使用默认配置时配置的MessageHandlerMethodFactory
. 使用自定义MessageHandlerMethodFactory
时,请参阅将自定义添加HandlerMethodArgumentResolver
到@KafkaListener
.
4.1.20. 处理异常
本节介绍如何处理使用 Spring for Apache Kafka 时可能出现的各种异常。
侦听器错误处理程序
从 2.0 版开始,@KafkaListener
注解有一个新属性:errorHandler
.
您可以使用 errorHandler 提供 KafkaListenerErrorHandler 实现的 bean 名称。该功能接口有一种方法,如下清单所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以访问Message<?>
消息转换器生成的 spring-messaging 对象以及侦听器引发的异常,该异常被包装在ListenerExecutionFailedException
. 错误处理程序可以抛出原始异常或新异常,该异常被抛出到容器中。错误处理程序返回的任何内容都将被忽略。
从版本 2.7 开始,您可以在 MessagingMessageConverter
和BatchMessagingMessageConverter
上设置 rawRecordHeader
属性,这会导致原始ConsumerRecord
添加到 KafkaHeaders.RAW_DATA
标头中转换后的 Message 中。例如,如果您希望在侦听器错误处理程序中使用 DeadLetterPublishingRecoverer
,这很有用。它可能用在请求/回复场景中,在这种情况下,您希望在捕获死信主题中的失败记录后,经过一定次数的重试后,将失败结果发送给发件人。
@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口 ( ConsumerAwareListenerErrorHandler
) 可以通过以下方法访问消费者对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口 (ManualAckListenerErrorHandler
) 在使用手动AckMode
时提供对 Acknowledgment
对象的访问。
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在任何一种情况下,您都不应该对消费者执行任何搜索,因为容器不会意识到它们。
容器错误处理程序
从 2.8 版开始,旧版ErrorHandler
和BatchErrorHandler
接口已被新的CommonErrorHandler
替代. 这些错误处理程序可以处理记录和批处理侦听器的错误,允许单个侦听器容器工厂为两种类型的侦听器创建容器。 CommonErrorHandler
提供了替换大多数遗留框架错误处理程序实现的实现,并且不推荐使用遗留错误处理程序。监听器容器和监听器容器工厂仍然支持遗留接口;它们将在未来的版本中被弃用。
有关将自定义错误处理程序通过迁移CommonErrorHandler
到CommonErrorHandler
.
使用事务时,默认情况下不配置错误处理程序,因此异常将回滚事务。事务容器的错误处理由AfterRollbackProcessor
. 如果您在使用事务时提供自定义错误处理程序,如果您希望事务回滚,它必须抛出异常。
该接口有一个默认方法isAckAfterHandle()
,由容器调用以确定如果错误处理程序返回而不抛出异常,是否应提交偏移量;它默认返回true。
通常,框架提供的错误处理程序将在错误未“处理”时抛出异常(例如,在执行查找操作之后)。默认情况下,此类异常由容器ERROR
级别记录。所有框架错误处理程序都进行了扩展KafkaExceptionLogLevelAware
,允许您控制记录这些异常的级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以为容器工厂中的所有侦听器指定一个全局错误处理程序。以下示例显示了如何执行此操作:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带注解的监听方法抛出异常,会抛出到容器中,消息按照容器配置进行处理。
容器在调用错误处理程序之前提交任何未决的偏移提交。
如果您使用的是 Spring Boot,您只需将错误处理程序添加为 @Bean
,Boot 就会将其添加到自动配置的工厂中。
回退处理程序
诸如DefaultErrorHandler之类的错误处理程序使用 BackOff
来确定在重试传递之前等待多长时间。从 2.9 版开始,您可以配置自定义BackOffHandler
. 默认处理程序只是暂停线程,直到回退时间过去(或容器停止)。该框架还提供了ContainerPausingBackOffHandler
暂停侦听器容器直到退避时间过去然后恢复容器的方法。当延迟比消费者属性max.poll.interval.ms
长时,这很有用。请注意,实际退避时间的分辨率会受到pollTimeout
容器属性的影响。
默认错误处理程序
这个新的错误处理程序取代了SeekToCurrentErrorHandler
和 RecoveringBatchErrorHandler
,它们现在已经是多个版本的默认错误处理程序。一个区别是批处理侦听器的回退行为(当抛出 BatchListenerFailedException
以外的异常时)等效于Retrying Complete Batches。
从版本 2.9 开始,可以将 DefaultErrorHandler
配置为提供与查找未处理记录偏移量相同的语义,如下所述,但无需实际查找。相反,记录由侦听器容器保留,并在错误处理程序退出后重新提交给侦听器(并且在执行单个暂停的 poll() 后,以保持使用者处于活动状态;如果使用非阻塞重试或 ContainerPausingBackOffHandler
,则暂停可能会延续到多个轮询)。错误处理程序向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者如果已恢复,则不会再次发送到侦听器。要启用此模式,请将属性 seekAfterError
设置为 false。
错误处理程序可以恢复(跳过)一直失败的记录。默认情况下,十次失败后,记录失败的记录(在ERROR
级别)。您可以使用自定义恢复器 (BiConsumer
) 和BackOff
控制交付尝试和每个之间延迟的a 来配置处理程序。使用FixedBackOff
withFixedBackOff.UNLIMITED_ATTEMPTS
会导致(有效地)无限重试。以下示例配置了三次尝试后的恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。
例如,使用@KafkaListener
容器工厂,您可以添加DefaultErrorHandler
如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录侦听器,这将重试传送最多 2 次(3 次传送尝试),后退 1 秒,而不是默认配置 ( FixedBackOff(0L, 9)
)。在重试用尽后,仅记录失败。
举个例子; 如果poll
返回 6 条记录(每个分区 0、1、2 中的两条)并且侦听器在第四条记录上抛出异常,则容器通过提交它们的偏移量来确认前三条消息。DefaultErrorHandler
寻求分区 1 的偏移量 1 和分区 2 的偏移量 0。下一个返回poll()
三个未处理的记录。
如果AckMode
是BATCH
,容器在调用错误处理程序之前提交前两个分区的偏移量。
对于批处理侦听器,侦听器必须抛出一个BatchListenerFailedException
指示批处理中的哪些记录失败。
事件的顺序是:
- 提交索引前记录的偏移量。
- 如果重试未用尽,则执行搜索,以便重新传递所有剩余的记录(包括失败的记录)。
- 如果重试次数用尽,则尝试恢复失败的记录(仅默认日志)并执行搜索,以便重新传递剩余的记录(不包括失败的记录)。已恢复记录的偏移量已提交
- 如果重试已用尽并且恢复失败,则执行搜索,就好像重试未用尽一样。
从版本 2.9 开始,DefaultErrorHandler
可以将其配置为提供与上述查找未处理记录偏移量相同的语义,但无需实际查找。相反,错误处理程序会创建一个ConsumerRecords<?, ?>
仅包含未处理记录的新记录,然后将其提交给侦听器(在执行一次 paused 后poll()
,以保持消费者活着)。要启用此模式,请将属性设置seekAfterError
为false
。
默认恢复器在重试用尽后记录失败的记录。您可以使用自定义恢复器,也可以使用框架提供的恢复器,例如DeadLetterPublishingRecoverer
.
当使用 POJO 批处理侦听器(例如List<Thing>
),并且您没有完整的消费者记录要添加到异常时,您可以只添加失败记录的索引:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < records.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置了 AckMode.MANUAL_IMMEDIATE
时,可以配置错误处理程序来提交恢复记录的偏移量;将commitRecovered
属性设置为true
。
另请参阅发布死信记录。
使用事务时,类似的功能由DefaultAfterRollbackProcessor
. 请参阅回滚后处理器。
DefaultErrorHandler
认为某些异常是致命的,并且会跳过此类异常的重试;在第一次失败时调用恢复器。默认情况下,被认为是致命的异常是:
DeserializationException
MessageConversionException
ConversionException
MethodArgumentResolutionException
NoSuchMethodException
ClassCastException
因为这些异常不太可能在重试交付时得到解决。
您可以将更多异常类型添加到不可重试类别中,或者完全替换分类异常的映射。有关详细信息,请参阅 DefaultErrorHandler.addNotRetryableException()
和 DefaultErrorHandler.setClassifications()
的 Javadocs,以及 spring-retry BinaryExceptionClassifier
的信息。
这是一个添加IllegalArgumentException
到不可重试异常的示例:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理程序可以配置一个或多个RetryListener
s,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 javadocs。
如果恢复器失败(抛出异常),失败的记录将包含在搜索中。如果恢复器失败,BackOff
默认情况下将重置,并且在再次尝试恢复之前,重新交付将再次经历回退。要在恢复失败后跳过重试,请将错误处理程序设置resetStateOnRecoveryFailure
为false
.
您可以为错误处理程序提供 BiFunction, Exception, BackOff>
来根据失败的记录和/或异常确定要使用的 BackOff
:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回null
,将使用处理程序的默认值BackOff
。
如果异常类型在两次失败之间发生变化,则设置resetStateOnExceptionChange
为true
并重新启动重试序列(包括选择新的BackOff
,如果已配置)。默认情况下,不考虑异常类型。
从 2.9 版开始,默认设置true
。
另请参阅交付尝试标题。
批处理错误处理程序的转换错误
从版本 2.8 开始,当将 MessageConverter
与 ByteArrayDeserializer
、BytesDeserializer
或 StringDeserializer
以及 DefaultErrorHandler
结合使用时,批处理侦听器现在可以正确处理转换错误。当发生转换错误时,有效负载将设置为 null,并将反序列化异常添加到记录标头,类似于 ErrorHandlingDeserializer
。侦听器中提供了 ConversionException
列表,因此侦听器可以抛出 BatchListenerFailedException
来指示发生转换异常的第一个索引。
例子:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重试完整批次
这现在是DefaultErrorHandler
批处理侦听器的后备行为,其中侦听器抛出除 BatchListenerFailedException
之外的异常。
无法保证在重新交付批次时,该批次具有相同数量的记录和/或重新交付的记录的顺序相同。因此,不可能轻松地维护批处理的重试状态。采用FallbackBatchErrorHandler
以下方法。如果批处理侦听器抛出不是BatchListenerFailedException
的异常,则从内存中的记录批处理执行重试。为了避免在扩展重试序列期间重新平衡,错误处理程序暂停消费者,在休眠之前轮询它以等待每次重试,然后再次调用侦听器。如果/当重试用尽时,ConsumerRecordRecoverer
为批处理中的每条记录调用。如果恢复器抛出异常,或者线程在睡眠期间被中断,那么这批记录将在下一次轮询时重新传递。在退出之前,无论结果如何,消费者都会被恢复。
此机制不能与事务一起使用。
在等待一个BackOff
间隔时,错误处理程序将循环短暂睡眠,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在stop()
之后很快退出,而不是导致延迟。
容器停止错误处理程序
如果侦听器引发异常,CommonContainerStoppingErrorHandler
将停止容器。对于记录侦听器,当AckMode
为RECORD
时,将提交已处理记录的偏移量。对于记录侦听器,当 AckMode
为任何手动值时,将提交已确认记录的偏移量。对于记录侦听器,当AckMode
为 BATCH
时,或者对于批处理侦听器,当容器重新启动时会重播整个批次。
容器停止后,会抛出一个包装容器的ListenerExecutionFailedException
异常。这是为了使事务回滚(如果启用了事务)。
委派错误处理程序
CommonDelegatingErrorHandler
可以委托给不同的错误处理程序,具体取决于异常类型。例如,您可能希望针对大多数异常调用 DefaultErrorHandler
,或者针对其他异常调用 CommonContainerStoppingErrorHandler
。
记录错误处理程序
CommonLoggingErrorHandler
简单地记录异常;使用记录侦听器,将上次轮询中的剩余记录传递给侦听器。对于批处理侦听器,将记录批处理中的所有记录。
为记录和批处理侦听器使用不同的常见错误处理程序
如果您希望对记录和批处理侦听器使用不同的错误处理策略,CommonMixedErrorHandler
则提供了允许为每种侦听器类型配置特定错误处理程序的方法。
常见错误处理程序总结
DefaultErrorHandler
CommonContainerStoppingErrorHandler
CommonDelegatingErrorHandler
CommonLoggingErrorHandler
CommonMixedErrorHandler
旧版错误处理程序及其替换
旧版错误处理程序 | 替代品 |
---|---|
LoggingErrorHandler |
CommonLoggingErrorHandler |
BatchLoggingErrorHandler |
CommonLoggingErrorHandler |
ConditionalDelegatingErrorHandler |
DelegatingErrorHandler |
ConditionalDelegatingBatchErrorHandler |
DelegatingErrorHandler |
ContainerStoppingErrorHandler |
CommonContainerStoppingErrorHandler |
ContainerStoppingBatchErrorHandler |
CommonContainerStoppingErrorHandler |
SeekToCurrentErrorHandler |
DefaultErrorHandler |
SeekToCurrentBatchErrorHandler |
没有替代品,DefaultErrorHandler 与无限一起使用BackOff 。 |
RecoveringBatchErrorHandler |
DefaultErrorHandler |
RetryingBatchErrorHandler |
没有替换 - 使用DefaultErrorHandler 并抛出除BatchListenerFailedException . |
将自定义遗留错误处理程序实现迁移到CommonErrorHandler
请参阅 CommonErrorHandler
中的 javadocs 。
要替换ErrorHandler
或ConsumerAwareErrorHandler
实现,您应该实现handleRecord()
并离开remainingRecords()
返回false
(默认)。您还应该实现handleOtherException()
- 来处理记录处理范围之外发生的异常(例如,消费者错误)。
要替换RemainingRecordsErrorHandler
实现,您应该实现handleRemaining()
并覆盖remainingRecords()
返回true
。您还应该实现handleOtherException()
- 来处理记录处理范围之外发生的异常(例如,消费者错误)。
要替换任何BatchErrorHandler
实现,您应该实现handleBatch()
您还应该实现handleOtherException()
- 以处理记录处理范围之外发生的异常(例如消费者错误)。
回滚后处理器
使用事务时,如果侦听器抛出异常(并且错误处理程序,如果存在,则抛出异常),事务将回滚。默认情况下,任何未处理的记录(包括失败的记录)都会在下一次轮询时重新获取。这是通过在DefaultAfterRollbackProcessor
的seek
方法. 使用批处理侦听器,将重新处理整批记录(容器不知道批处理中的哪条记录失败)。要修改此行为,您可以使用自定义AfterRollbackProcessor
配置侦听器容器。例如,对于基于记录的侦听器,您可能希望跟踪失败的记录并在尝试多次后放弃,可能通过将其发布到死信主题。
从 2.2 版开始,DefaultAfterRollbackProcessor
现在可以恢复(跳过)一直失败的记录。默认情况下,十次失败后,记录失败的记录(在ERROR
级别)。您可以使用自定义恢复器 ( BiConsumer
) 和最大故障数来配置处理器。将该maxFailures
属性设置为负数会导致无限次重试。以下示例配置了三次尝试后的恢复:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当你不使用事务时,你可以通过配置一个DefaultErrorHandler
. 请参阅容器错误处理程序。
使用批处理侦听器无法进行恢复,因为框架不知道批处理中的哪条记录一直失败。在这种情况下,应用程序侦听器必须处理不断失败的记录。
另请参阅发布死信记录。
从版本 2.2.5 开始,DefaultAfterRollbackProcessor
可以在新事务中调用(在失败的事务回滚后开始)。然后,如果您使用DeadLetterPublishingRecoverer
发布失败的记录,处理器会将恢复的记录在原始主题/分区中的偏移量发送到事务。要启用此功能,请在DefaultAfterRollbackProcessor
上设置commitRecovered
和 kafkaTemplate
属性。
如果恢复器失败(抛出异常),失败的记录将包含在搜索中。从版本 2.5.5 开始,如果恢复器失败,BackOff
将默认重置,并且在再次尝试恢复之前重新交付将再次经历回退。对于早期版本,BackOff
没有重置,并且在下一次故障时重新尝试恢复。要恢复到以前的行为,请将处理器的resetStateOnRecoveryFailure
属性设置为false
。
从版本 2.6 开始,您现在可以为处理器提供 BiFunction, Exception, BackOff> 来根据失败的记录和/或异常确定要使用的 BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回null
,将使用处理器的默认值BackOff
。
从版本 2.6.3 开始,如果异常类型在失败之间发生变化,则设置resetStateOnExceptionChange
为true
并且重试序列将重新启动(包括选择新的BackOff
,如果已配置)。默认情况下,不考虑异常类型。
从 2.3.1 版开始,类似于DefaultErrorHandler
,DefaultAfterRollbackProcessor
认为某些异常是致命的,并且会跳过此类异常的重试;在第一次失败时调用恢复器。默认情况下,被认为是致命的异常是:
DeserializationException
MessageConversionException
ConversionException
MethodArgumentResolutionException
NoSuchMethodException
ClassCastException
因为这些异常不太可能在重试交付时得到解决。
您可以在不可重试类别中添加更多异常类型,或者完全替换分类异常的映射。有关更多信息,请参阅 Javadocs DefaultAfterRollbackProcessor.setClassifications()
,以及spring-retry
BinaryExceptionClassifier
.
这是一个添加IllegalArgumentException
到不可重试异常的示例:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另请参阅交付尝试标题。
对于当前的 kafka-clients
,容器无法检测 ProducerFencedException
是否是由重新平衡引起的,或者生产者的 transactional.id
是否由于超时或到期而被撤销。因为,在大多数情况下,这是由重新平衡引起的,容器不会调用 AfterRollbackProcessor
(因为不适合查找分区,因为我们不再分配它们)。如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如通过 ListenerContainerIdleEvent
),您可以避免由于超时和到期而导致的隔离。或者,您可以将 stopContainerWhenFenced
容器属性设置为 true
,容器将停止,避免记录丢失。您可以使用ConsumerStoppedEvent
并检查 FENCED 的 Reason
属性来检测此情况。由于该事件还引用了容器,因此您可以使用此事件重新启动容器。
从 2.7 版本开始,在等待一个BackOff
时间间隔时,错误处理程序将循环短暂睡眠,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在stop()
之后立即退出,而不是导致延迟。
从2.7版本开始,处理器可以配置一个或多个RetryListener
s,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 javadocs。
传递尝试标题
以下仅适用于记录侦听器,不适用于批处理侦听器。
从版本 2.5 开始,当使用实现 DeliveryAttemptAware
的 ErrorHandler
或AfterRollbackProcessor
时,可以启用将 KafkaHeaders.DELIVERY_ATTEMPT
标头 (kafka_deliveryAttempt)
添加到记录中。该标头的值是一个从 1 开始递增的整数。当接收原始 ConsumerRecord
时,该整数位于 byte[4]
中。
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt()
@KafkaListener
与DefaultKafkaHeaderMapper
或SimpleKafkaHeaderMapper
一起使用时,可以通过将@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
其作为参数添加到侦听器方法中来获得。
要启用此标头的填充,请将容器属性设置deliveryAttemptHeader
为true
。默认情况下禁用它以避免查找每条记录的状态和添加标题的(小)开销。
DefaultErrorHandler
和 DefaultAfterRollbackProcessor
支持此功能。
侦听器信息标题
在某些情况下,能够知道侦听器在哪个容器中运行很有用。
从版本 2.8.4 开始,您现在可以listenerInfo
在侦听器容器上设置info
属性,或在@KafkaListener
注释上设置属性。然后,容器会将这个添加KafkaListener.LISTENER_INFO
到所有传入消息的标头中;然后,它可以用于记录拦截器、过滤器等,或者用于侦听器本身。
@KafkaListener(id = "something", topic = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen2(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
在 a RecordInterceptor
或 RecordFilterStrategy
实现中使用时,标头在消费者记录中作为字节数组,使用KafkaListenerAnnotationBeanPostProcessor
charSet
属性进行转换。
当从消费者记录创建MessageHeaders
时,标头映射器也会转换为String
,并且永远不会将此标头映射到出站记录上。
对于 POJO 批处理侦听器,从版本 2.8.6 开始,标头被复制到批处理的每个成员中,并且在转换后也可作为单个String
参数使用。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理侦听器具有过滤器并且过滤器导致空批处理,则您需要添加required = false
到@Header
参数,因为该信息不适用于空批处理。
如果您收到List<Message<Thing>>
信息,则该信息在每个Message<?>
的KafkaHeaders.LISTENER_INFO
标头中
有关使用批处理的更多信息,请参阅批处理侦听器。
发布死信记录
当达到记录的最大失败次数时,您可以使用记录恢复器配置DefaultErrorHandler
和DefaultAfterRollbackProcessor
。框架提供了DeadLetterPublishingRecoverer
,它将失败的消息发布到另一个主题。恢复器需要一个KafkaTemplate<Object, Object>
,用于发送记录。您还可以选择使用 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
配置它,调用它来解析目标主题和分区。
默认情况下,死信记录被发送到一个名为<originalTopic>.DLT
(原始主题名称后缀为.DLT
)的主题和与原始记录相同的分区。因此,当您使用默认解析器时,死信主题必须至少具有与原始主题一样多的分区。
如果返回TopicPartition
的有负分区,则说明中没有设置分区ProducerRecord
,所以这个分区是被Kafka选中的。从版本 2.2.4 开始,任何ListenerExecutionFailedException
(例如,在@KafkaListener
方法中检测到异常时抛出)都使用groupId
属性进行了增强。除了选择死信主题中ConsumerRecord
的信息外,这还允许目标解析器使用它。
以下示例显示了如何连接自定义目标解析器:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录通过以下标头进行了增强:
KafkaHeaders.DLT_EXCEPTION_FQCN
: Exception 类名(通常是ListenerExecutionFailedException
,但也可以是其他的)。KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
: 异常原因类名,如果存在(从 2.8 版开始)。KafkaHeaders.DLT_EXCEPTION_STACKTRACE
:异常堆栈跟踪。KafkaHeaders.DLT_EXCEPTION_MESSAGE
:异常消息。KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
: Exception 类名(仅限键反序列化错误)。KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
:异常堆栈跟踪(仅限键反序列化错误)。KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
:异常消息(仅限密钥反序列化错误)。KafkaHeaders.DLT_ORIGINAL_TOPIC
: 原题目。KafkaHeaders.DLT_ORIGINAL_PARTITION
: 原来的分区。KafkaHeaders.DLT_ORIGINAL_OFFSET
:原始偏移量。KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
: 原始时间戳。KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
:原始时间戳类型。KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
: 处理记录失败的原始消费者组(2.8 版起)。
关键异常仅由DeserializationException
s 引起,因此没有DLT_KEY_EXCEPTION_CAUSE_FQCN
.
有两种机制可以添加更多标头。
- 子类化恢复器并覆盖
createProducerRecord()
- 调用super.createProducerRecord()
并添加更多标头。 - 提供一个
BiFunction
接收消费者记录和异常,返回一个Headers
对象;那里的标头将被复制到最终的生产者记录中;另请参阅管理死信记录标头。用setHeadersFunction()
设置BiFunction
.
第二个实现起来更简单,但第一个有更多可用信息,包括已经组装的标准头文件。
从 2.3 版本开始,当与 ErrorHandlingDeserializer
一起使用时,发布者会将value()
死信生产者记录中的记录恢复为无法反序列化的原始值。以前,value()
为 null 并且用户代码必须从消息头中解码 DeserializationException
。此外,您可以向发布者提供多个KafkaTemplate
s;这可能是需要的,例如,如果您想要从 DeserializationException
发布 byte[]
,以及使用与成功反序列化的记录不同的序列化程序的值。以下是使用 String 和 byte[] 序列化器的KafkaTemplate
配置发布者的示例:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键来定位适合value()
即将发布的模板。建议使用 LinkedHashMap
,以便按顺序检查键。
发布null
值时,当有多个模板时,recoverer会为Void
类寻找一个模板;如果不存在,将使用来自values().iterator()
的第一个模板。
从 2.7 开始,您可以使用该setFailIfSendResultIsError
方法,以便在消息发布失败时引发异常。您还可以使用 setWaitForSendResultTimeout
设置验证发件人成功的超时时间。
如果恢复器失败(抛出异常),失败的记录将包含在搜索中。从版本 2.5.5 开始,如果恢复器失败,BackOff
将默认重置,并且在再次尝试恢复之前重新交付将再次经历回退。对于早期版本,BackOff
没有重置,并且在下一次故障时重新尝试恢复。要恢复到以前的行为,请将错误处理程序的resetStateOnRecoveryFailure
属性设置为false
。
从版本 2.6.3 开始,如果异常类型在失败之间发生变化,则设置resetStateOnExceptionChange
为true
并且重试序列将重新启动(包括选择新的BackOff
,如果已配置)。默认情况下,不考虑异常类型。
从 2.3 版开始,恢复器也可以与 Kafka Streams 一起使用 -有关更多信息,请参阅从反序列化异常中恢复。
在标头中ErrorHandlingDeserializer
添加反序列化异常ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
和ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
(使用 java 序列化)。默认情况下,这些标头不会保留在发布到死信主题的消息中。从版本 2.7 开始,如果键和值都未能反序列化,则两者的原始值都会填充到发送到 DLT 的记录中。
如果传入记录相互依赖,但可能无序到达,则将失败的记录重新发布到原始主题的尾部(一定次数)可能很有用,而不是将其直接发送到死信主题. 有关示例,请参阅此堆栈溢出问题。
以下错误处理程序配置将完全做到这一点:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,恢复器检查目标解析器选择的分区是否确实存在。如果分区不存在,则将 中的分区ProducerRecord
设置为null
,允许KafkaProducer
选择分区。verifyPartition
您可以通过将属性设置为 来禁用此检查false
。
管理死信记录头
参考上面的Publishing Dead-letter RecordsDeadLetterPublishingRecoverer
,当这些标头已经存在时(例如在重新处理失败的死信记录时,包括使用Non-Blocking Retries时),有两个属性用于管理标头。
appendOriginalHeaders
(默认true
)stripPreviousExceptionHeaders
(自 2.8 版起默认true
)
Apache Kafka 支持多个同名标头;要获得“最新”值,您可以使用headers.lastHeader(headerName)
; 要获取多个标头的迭代器,请使用headers.headers(headerName).iterator()
.
当反复重新发布失败的记录时,这些标头可能会增长(并最终导致发布失败,因为 a RecordTooLargeException
);对于异常标头尤其是堆栈跟踪标头尤其如此。
使用这两个属性的原因是,虽然您可能只想保留最后一个异常信息,但您可能希望保留每次失败时记录通过的主题的历史记录。
appendOriginalHeaders
应用于所有名为**ORIGINAL**
的头文件,而stripPreviousExceptionHeaders
应用于所有名为**EXCEPTION**
的头文件。
从版本 2.8.4 开始,您现在可以控制将哪些标准标头添加到输出记录中。请参阅enum HeadersToAdd
默认添加的(当前)10 个标准头文件的通用名称(这些不是实际的头文件名称,只是一个抽象;实际的头文件名称由getHeaderNames()
子类可以覆盖的方法设置。
要排除标题,请使用excludeHeaders()
方法;例如,要禁止在标头中添加异常堆栈跟踪,请使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以通过添加ExceptionHeadersCreator
来完全自定义异常头的添加。这也会禁用所有标准异常标头。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样从 2.8.4 版本开始,您现在可以通过该addHeadersFunction
方法提供多个标头功能。这允许应用附加功能,即使已经注册了另一个功能,例如,在使用Non-Blocking Retries时。
ExponentialBackOffWithMaxRetries
执行
Spring Framework 提供了许多BackOff
实现。默认情况下,ExponentialBackOff
将无限期重试;在多次重试尝试后放弃需要计算maxElapsedTime
. 从 2.7.3 版本开始,Spring for Apache Kafka 提供了一个子类ExponentialBackOffWithMaxRetries
,它接收maxRetries
属性并自动计算maxElapsedTime
,这更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
这将在1, 2, 4, 8, 10, 10
几秒钟后重试,然后再调用恢复器。
4.1.21. JAAS 和 Kerberos
从 2.0 版开始,KafkaJaasLoginModuleInitializer
添加了一个类来协助 Kerberos 配置。您可以使用所需的配置将此 bean 添加到应用程序上下文中。以下示例配置了这样一个 bean:
@Bean
public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException {
KafkaJaasLoginModuleInitializer jaasConfig = new KafkaJaasLoginModuleInitializer();
jaasConfig.setControlFlag("REQUIRED");
Map<String, String> options = new HashMap<>();
options.put("useKeyTab", "true");
options.put("storeKey", "true");
options.put("keyTab", "/etc/security/keytabs/kafka_client.keytab");
options.put("principal", "kafka-client-1@EXAMPLE.COM");
jaasConfig.setOptions(options);
return jaasConfig;
}
4.1.22. 生产者和消费者记录日志
从版本 2.7.12、2.8.4 开始,您可以确定这些记录将如何在调试日志等中呈现。
请参阅KafkaUtils.setProducerRecordFormatter()
和KafkaUtils.setProducerRecordFormatter()
了解更多信息。
评论区