Spring Kafka:Retry Topic、DLT 的使用与原理

背景

原生 Kafka 是不支持 Retry Topic 和 DLT (Dead Letter Topic,死信队列)。但是 Spring Kafka 在客户端实现了这两个功能。

版本

spring-kafka 2.7.14(2.7.x 以下版本不支持 Retry Topic)

默认重试策略

默认情况下,spring-kafka 在消费逻辑抛出异常时,会快速重试 10 次(无间隔时间),如果重试完成后,依旧消费失败,spring-kafka 会 commit 这条记录。

默认重试的实现原理是:重置当前 consumer offset,感兴趣的同学可以在 SeekUtils#doSeeks debug 一下

可以通过自定义 SeekToCurrentErrorHandler 来控制消费失败后的处理逻辑。例如:添加重试间隔,重试完成后依旧失败的消息发送到 DLT

自定义 SeekToCurrentErrorHandler

1
2
3
4
5
6
7
8
@Bean
public ErrorHandler errorHandler(KafkaTemplate<Object, Object> kafkaTemplate) {
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
// 设置重试间隔 10秒 次数为 3次
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
// 创建 SeekToCurrentErrorHandler 对象
return new SeekToCurrentErrorHandler(recoverer, backOff);
}

添加上述代码后,消费逻辑抛出异常后,会间隔 10s 重试 3 次,重试后依旧失败,会将消息发送到 DLT

关于默认重试策略,Kafka 的 TopicPartition 只会分配给一个消费者,而消费者对于某条消息的重试,会占用消费线程,影响整个 TopicPartition 的消费速度。如果使用 Retry Topic 功能,不会占用消费线程,会有专门的 retry 线程订阅 Retry Topic 执行重试消费。

Retry Topic + DLT 使用

可以通过注解和全局配置的方式开启 Retry Topic 功能

@RetryableTopic

使用注解的方式启用 Retry Topic,在 @KafkaListener 方法上添加 @RetryableTopic 即可

1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j
@Component
public class SimpleConsumer {

@RetryableTopic()
@KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
public void onMessage(MessageWrapper message) {
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
throw new RuntimeException("test kafka exception");
}

}

此时 Retry Topic 功能已经启用了。当消费逻辑抛出异常时,spring-kafka 会先将消息发送到 Retry Topic,随后在 Main Topic(对应上文的test_topic)中 commit 这条消息。会有专门的线程订阅 Retry Topic,不会影响正常消费

默认重试 3 次,间隔为 1s,如果在重试结束后,还没有成功被消费,该消息会被发送到 DLT 中

默认情况,消息被发送到死信队列后,会输出一条日志。

1
2022-08-09 16:05:03.920  INFO 4048 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer    : Received message in dlt listener: test_topic-dlt-0@233

上述的日志输出是默认的死信订阅逻辑,用户可以在类中添加 @DltHandler 方法自定义死信消费逻辑

1
2
3
4
@DltHandler
public void processMessage(MessageWrapper message) {
log.info("dlt {}", message);
}

至此,你的 Kafka 就拥有了类似 RocketMQ 的消息重试能力,但是配置方面还需要调整一下。

定制 @RetryableTopic

可以自定义重试次数,延迟时间,死信策略等等,同时大部分参数还支持使用 Spring EL 表达式读取配置,这里简单列举下,更多的配置读者可以自行探索

1
2
3
4
5
6
7
8
9
10
@RetryableTopic(
attempts = "${kafka.retry.attempts}",
backoff = @Backoff(delayExpression = "${kafka.retry.delay}", multiplierExpression = "${kafka.retry.multiplier}"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
)
@KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
public void onMessage(MessageWrapper message) {
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
throw new RuntimeException("test kafka exception");
}

解释一下上述的配置

  • attempts:重试次数
  • @Backoff delayExpression:消费延迟时间
  • @Backoff multiplierExpression:乘数。举个例子,第一次delay = 10s,如果 multiplier = 2,则下次 delay = 20s,以此类推,但是会有一个 maxDelay 作为延迟时间上限
  • fixedDelayTopicStrategy:可选策略包括:每次重试发送到单独的 Topic、只使用一个重试 Topic

fixedDelayTopicStrategy 这个参数还是挺重要的,具体应该怎么选呢,我们稍后再说

RetryTopicConfiguration

1
2
3
4
5
6
7
8
9
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.maxAttempts(4)
.fixedBackOff(15000)
.includeTopic("test_topic")
.create(template);
}

使用这个方式配置项基本和注解一样,如果你有多个需要配置重试的消费者,使用 RetryTopicConfiguration 的方式要比注解方式更简单

源码解析

延迟重试怎么实现的

延迟重试这个功能应该分为两步

  1. 将需要重试的消息发送到 Retry Topic
  2. Retry Topic 的订阅者延迟消费

非常遗憾的是,Kafka 并没有延迟消息这样的功能,所以这个延迟消费也是 spring-kafka 自己实现的,不得不说这个组件真的下了很多功夫

接下来聊聊延迟重试的实现原理

延迟消息标识

消息发送到 Retry Topic 这个步骤,感兴趣的同学可以 debug 一下 SeekToCurrentErrorHandler#handle 这里就不详细说了

每个需要被重试的消息,都会被添加 retry_topic-backoff-timestamp 这个 header,这个值代表这个消息的期望执行时间

开启了重试功能的 KafkaListener,在执行消费逻辑前,会先执行KafkaBackoffAwareMessageListenerAdapter#onMessage,该方法会先对消息进行检查

KafkaBackoffAwareMessageListenerAdapter#onMessage

这部分逻辑是:

  1. 首先检查 consumerRecord 是否包含 retry_topic-backoff-timestamp,如果有则进入步骤2
  2. 现在时间是否达到了期望执行时间,if ( nowTime > executeTime ) 该方法什么也不做,程序会立刻执行消费逻辑
  3. 未达到期望执行时间,准备暂停消费者对当前 TopicPartition 的消费,但是并不是在这里完成的,这个方法内部只是记录了一下需要暂停的 TopicPartition(这个数据存储在 KafkaMessageListenerContainer 的 pauseRequestedPartitions 中),并在 PartitionPausingBackoffManager 中存储了 BackOffContext,随后抛出一个异常打断消费流程

暂停分区

只要 Kafka 消费线程还在运行,就会无限调用 KafkaMessageListenerContainer#pollAndInvoke

KafkaMessageListenerContainer#pollAndInvoke

pollAndInvoke 中 pausePartitionsIfNecessary 方法会根据 KafkaMessageListenerContainer 中存储的 pauseRequestedPartitions 暂停 partition,使用的方法是 Kafka Client 的 consumer.pause

调用 consumer.pause 之后,之后调用 consumer.poll 不会返回任何数据,直到调用 resume 恢复消费。该方法不会造成 Rebalance

恢复分区

有了上面暂停消费的逻辑,还得有对应的恢复消费才能实现“延迟消费”,下面来看下恢复消费的逻辑

KafkaMessageListenerContainer#checkIdlePartition

KafkaMessageListenerContainer#checkIdlePartition 方法会不断地检查 partition 是否空闲(长时间未拉取到消息)。如果符合了空闲 partition 的标准,则发送事件 ListenerContainerPartitionIdleEvent

PartitionPausingBackoffManager

PartitionPausingBackoffManager 监听该事件,并尝试查找该 TopicPartition 是否存在 BackOffContext。存在则代表该分区被暂停,如果时间条件满足,从 KafkaMessageListenerContainer 的 pauseRequestedPartitions 删除该分区

KafkaMessageListenerContainer#resumePartitionsIfNecessary

最后 KafkaMessageListenerContainer#resumePartitionsIfNecessary 会将“已被 Kafka Consumer 暂停”但是“不存在于 KafkaMessageListenerContainer 的 pauseRequestedPartitions 的分区”恢复消费(通过 consumer.resume

小结

画一张图来总结一下 Retry Topic 的执行流程

这里补充说明一下

  • 其实 MAIN_TOPIC 和 RETRY TOPIC 执行的代码是完全相同的,上图只是为了更好的让大家理解 Retry Topic 的流程
  • 本身 Kafka 消费流程是一个无限循环

关于 Retry Topic 策略

下面详细说说 Topic 策略这个事

FixedDelayStrategy.MULTIPLE_TOPICS

test_topic 为例,此时我 attempts = 3, delay=10, multiplier=2,会额外创建以下三个 Topic

  • test_topic-retry-0
  • test_topic-retry-1
  • test_topic-dlt

第一次消费失败,会发送到 test_topic-retry-0,消息延迟为 10s
第二次消费失败,会发送到 test_topic-retry-1,消息延迟为 20s
第三次消费失败,会发送到 test_topic-dlt

此时每个 Retry Topic 中的消息延迟时间是相同的,在消费时间可控的情况下,消息延迟的时间不会有过大的偏差

该策略的缺点就是,使用了过多的 Topic,但是可以实现重试时间指数级上升

FixedDelayStrategy.SINGLE_TOPIC

延迟时间固定的情况适合使用 SINGLE_TOPIC 策略,该策略下只有一个 Retry Topic。如果 SINGLE_TOPIC 延迟时间指数级增长的话,很可能出现的问题是,第一条消息第三次重试延迟时间为 30s,第二条消息第一次重试延迟时间为 10s,两条消息被分配到同一分区,这二条消息被迫在 40s 之后才能重试

补充:如何使用多个 retry 线程

默认情况下,Main Topic,每个 Retry Topic,DLT 分别有 1 个消费线程,默认情况下 Retry 和 DLT 会使用 KafkaListener 提供的 ContainerFactory 初始化。

例如我把 KafkaListener concurrency 设置为 4。此时 Retry Topic,每个 Retry Topic,DLT 分别有 4 个消费线程

也可以自定义 Retry Topic 消费者使用的 ContainerFactory

spring-kafka 相关 demo

https://github.com/TavenYin/taven-springboot-learning/tree/master/springboot-kafka

参考