概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值 。该偏移值是消费者将读取的下一条消息的值 。可以自动或手动增加该值 。如果我们由于错误而无法处理消息并想重试,我们可以选择手动管理 , 并在成功的情况下增加偏移量 。但是,这会暂时阻止队列消息的处理 。我们可以选择异步方法 。
为什么我们需要它?如果发生错误,而不是停止队列消息的处理;我们可以将错误消息转移到不同的主题并再次处理 。
如果在处理 Kafka 消息时出现错误,可以使用 RetryableTopic 注解以一定的时间间隔和一定的次数再次处理消息 。如果完成尝试次数后错误仍然存在,则消息将发送到 DLT 队列 。
如何使用?我们首先回顾一下RetryableTopic注解可以取的一些值,以便您可以做出最适合您的设置:
attempts:尝试处理消息的次数 。它的默认值为 3 。如果完成所有尝试后仍然收到错误,则消息将发送到 DLT 队列 。
backoff:用于确定处理消息的时间间隔 。从 Backoff 类获取一个值 。您可以在下面找到退避的详细示例 。
排除/排除名称:允许您排除指定的异常类 。当您添加到列表中的任何错误被抛出时,重试机制将不会被激活 。
include / includeNames:仅当抛出指定的异常时才会激活重试机制 。
kafkaTemplate:虽然您可以给出现有 kafkaTemplate bean 的名称,但您也可以为特定于重试的 Kafka 模板定义不同的 bean 。
autoCreateTopics:决定是否自动创建Retry和DLT主题 。
retryTopicSuffix / dltTopicSuffix:用于确定要添加到自动创建的主题末尾的后缀 。
dltStrategy:如果不需要DLT , 可以定义为NO_DLT 。
SameIntervalTopicReuseStrategy/fixedDelayTopicStrategy(3.0.4之前):用于确定要创建的重试主题策略 。创建 (SINGLE_TOPIC) 或尽可能多的尝试值 (MULTIPLE_TOPICS) 重试主题 。
Backoff的示例:
- 具有固定的增量值
Backoff(delay = 600000 ) // 每 10 分钟- 具有指数价值
Backoff(delay = 60000 , multiplier = 2 ) // 1、2、4、8... 分钟后重复 。- 用占位符定义值
Backoff(delayExpression = "${delay}", multiplierExpression = "${multiplier}")@RetryableTopic 示例:@RetryableTopic(backoff = @Backoff(delay = 300000),attempts = 12,sameIntervalTopicReuseStrategy =SameIntervalTopicReuseStrategy.SINGLE_TOPIC,kafkaTemplate = "kafkaRetryableTopicTemplate",exclude = { SerializationException.class,DeserializationException.class,NullPointerException.class} ) @KafkaListener(topics = "my-topic") public void processMessage(RetryableDto retryableDto) {log.info("Retrying process RetryableDto : {}", retryableDto);// process message }在上面的例子中,消息将每5分钟重新处理一次,总共12次,即1小时 。如果任何尝试均顺利完成 , 则试用将终止 。由于定义了 SINGLE_TOPIC,因此将创建单个主题以进行重试 。如果没有进行此定义,则会创建 12 个重试主题 。
如果抛出了排除中定义的任何错误 , 则不会执行重做 。
如果需要,您可以编写自己的 RetryableException 并在包含中定义此值 , 以便仅在引发此错误时才重试 。
DLT队列处理如果完成了定义的尝试次数并且继续收到错误,则消息将发送到 DLT 队列 。如果要处理这些消息,可以使用DltHandler注解 。
用法示例:
@DltHandlerpublicvoidhandleDltMessage (RetryableDto retryableDto) {log.error("DLT处理程序消息:{}", retryableDto); }注意事项【Spring实现Kafka重试Topic,真的太香了】虽然使用 RetryableTopic 的异步处理优势为我们带来了性能提升 , 但这种使用也有一些缺点 。使用RetryableTopic可能会破坏消息的处理顺序 。
让我们用一个例子来解释这种情况:当主主题在时间 t 处理时,一条消息出错并被发送到重试主题 。在时间 t + 1 时 , 另一条消息来到主主题并成功处理 。让我们在重试主题中的消息在时间 t + 2 时被成功处理 。在这种情况下,第一条传入消息将在第二条消息之后处理 。如果订购对您很重要 , 我建议您在消息处理过程中进行必要的检查 。
另一个缺点是消息双重处理的风险 。您可以通过考虑这种可能性来进行改进 。
推荐阅读
- RabbitMQ如何实现延迟队列?
- 如何使用Python、Apache Kafka和云平台构建健壮的实时数据管道
- 车载wifi怎么用,车载wifi上互联网是怎么实现的
- 如何优雅的实现前端国际化?
- OpenHarmony - 基于ArkUI框架实现日历应用
- C++质数检测器的设计与实现?
- 为争视后两度延期注册,前TVB花旦突传婚讯,盼封后实现双喜临门
- 为何仅靠佛手和玫瑰花茶,就能实现祛斑养颜的奇效?
- “为何花大钱买护肤品?美容祛斑可以通过这些日常食物实现!”
- Spring Boot 3.0是什么?
