当前位置: 首页 > news >正文

【RocketMQ】重试机制及死信消息处理

【RocketMQ】重试机制及死信消息处理

文章目录

  • 【RocketMQ】重试机制及死信消息处理
    • 1. 重试机制
      • 1.1 生产者重试
      • 1.2 消费者重试
        • 1.2.1 死信队列

参考文档: 官方文档

1. 重试机制

1.1 生产者重试

rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异步发送为0次)。

当然也可以自定义重试次数及机制:

// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

1.2 消费者重试

若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。

  1. 在集群模式下,消费的业务逻辑代码会返回 Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多会重试 16 次,之后该消息会被丢弃。
  2. 在广播消费模式下,广播消费仍然保证消息至少被消费一次,但不提供重发的选项。

消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息

  • 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(5);

在实际生产中,一般重试3-5次,如果还没有消费成功,则可以把消息签收,通知人工介入。

  • 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);

示例:

消费者重试示例:

@Test
public void retryConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");//设置最大重试次数consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());//业务报错了 返回null 返回RECONSUME_LATER 都会重试if(true){throw new RuntimeException();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下:

消费类型重试间隔最大重试次数
顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX
并发消费间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。(根据延时等级划分,共16次)

1.2.1 死信队列

上面提到的 特殊Topic 称为死信Topic,对应的队列就是死信队列,其中存储的消息就是死信消息。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。应该通知人工介入处理。

对于死信的处理方案有多种,这里演示两种:

  1. 编写消费者监听死信队列
  2. 在消费者达到最大重试次数时立刻处理。

方案一

@Test
public void deadConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("%DLQ%retry-consumer-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");//业务报错了 返回null 返回RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

该方案有一个缺点,就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。

方案二:

针对方案一的缺点,方案二能够比较好的解决。

@Test
public void retryConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println(new Date());try {handleDb();// 10/0} catch (Exception e) {if (messageExt.getReconsumeTimes() >= 2) {//不要重试了System.out.println("消息体:" + new String(messageExt.getBody()));System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");} else {//重试System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}private void handleDb() {int i = 10 / 0;
}

相关文章:

【RocketMQ】重试机制及死信消息处理

【RocketMQ】重试机制及死信消息处理 文章目录 【RocketMQ】重试机制及死信消息处理1. 重试机制1.1 生产者重试1.2 消费者重试1.2.1 死信队列 参考文档&#xff1a; 官方文档 1. 重试机制 1.1 生产者重试 rocketmq生产者发送消息失败默认重试2次(同步发送为2次&#xff0c;异…...

Mysql DDL执行方式-pt-osc介绍 | 京东云技术团队

1 引言 大家好&#xff0c;接着上次和大家一起学习了《MySQL DDL执行方式-Online DDL介绍》&#xff0c;那么今天接着和大家一起学习另一种MySQL DDL执行方式之pt-soc。 在MySQL使用过程中&#xff0c;根据业务的需求对表结构进行变更是个普遍的运维操作&#xff0c;这些称为…...

C++ stack容器介绍

&#x1f914;stack容器介绍&#xff1a; &#x1f4d6; stack是一种数据结构&#xff0c;也可以被称为堆栈。它是一个容器&#xff0c;只允许在最顶层进行插入和删除&#xff0c;并且只能访问最后一个插入的元素。这个元素称为栈顶。所有新插入的元素都被放置在栈顶上面&#…...

在 Git 中撤消更改的 6 种方法!

目录 1. 修改最近的提交 2. 将分支重置为较旧的提交 硬重置 软重置分支 创建备份分支 3. 交互式变基 删除旧提交 改写提交消息 编辑旧提交 压缩 4. 还原提交 5. 签出文件 6. 使用 Git Reflog 当使用 Git 进行项目代码管理时&#xff0c;难免会出现一些错误操作或需…...

LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置

LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置 1、位置订阅1.1、国标设备编辑1.2、选择设备开启位置订阅1.3、全局开启位置订阅1.4、通过目录订阅获取位置(少数情况) 2、经纬度信息查询2.1、访问接口获取2.1.…...

编程(38)----------计算机的部分原理

本篇主要总结一些计算机的理论部分. 计算机在发展历程中,无论是最早的巨无霸机器,还是现在小到可以拿在手中的掌机.只要其本质上是计算机,在最基础的结构上,都是以冯诺依曼体系所构建的. 冯诺依曼体系大致将计算机分为几个最重要的部分:输入,输出,中央处理器,存储设备.也就是…...

若依框架快速搭建(二)

目录 数据库设计功能模块设计XXX信息管理xxx查询xxx添加xxx删除xxx修改xxx导出 功能模块实现运行数据库自动代码生成在IDEA中找到RuoYi-generator&#xff0c;修改配置运行前后端项目&#xff0c;在网页中找到代码生成模块导入表后点击确定&#xff0c;序号前打勾&#xff0c;再…...

为建筑物的供暖系统实施MPC控制器的小型项目(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

【概率论】中心极限定理(二)

文章目录 主观题主观题 每袋味精的净重为随机变量,平均重量为 100 克,标准差为 10 克。一箱内装 200 袋味精,求一箱味精的净重大于 20500 克的概率? 解: ① E ( X i ) = 100 , D ( X i ) = 1 0 2...

Blender UV展开流程

目录 1. UV1.1 blender默认物体1.2 创建物体1.3 UV参考图1.4 标记缝合边1.5 UV拉伸1.6 孤岛模式 1. UV 1.1 blender默认物体 默认物体已经自动生成UV 在UV编辑工作区&#xff0c;编辑模式&#xff0c;全选物体在左边自动展开UV 在物体数据属性-UV贴图-存在默认的UV贴图&#…...

Flutter 笔记 | Flutter 核心原理(二)关键类和启动流程

Widget、Element、BuildContext 和 RenderObject Widget Widget关键类及其子类继承关系如图所示&#xff1a; 其中&#xff0c;Widget是Widget Tree所有节点的基类。Widget的子类主要分为3类&#xff1a; 第1类是RenderObjectWidget的子类&#xff0c;具体来说又分为SingleCh…...

Android:主题切换

一.概述 正在开发的应用做了一版新UI&#xff0c;原打算将新版UI按项目名做成资源包&#xff0c;再在build.gradle里productFlavors{ }多渠道打包实现 但被告知新旧两个项目共用一个分支&#xff0c;那就做成两个主题(Theme1/Theme2)来适配了 如果只是变更UI&#xff0c;做成…...

terminalworks ASP.NET Core PDF 浏览器-Crack

ASP.NET Core 的 PDF 查看器 terminalworks在 ASP.NET Core 网页或应用程序中添加可靠的 PDF 查看器的简单方法。 我们的 Web PDF 查看器基于经过验证和测试的 Mozilla PdfJS 解决方案&#xff0c;该解决方案在 Firefox 中用作默认 PDF 查看器。我们专门设计了我们的查看器&…...

Rust每日一练(Leetday0020) 最后单词的长度、螺旋矩阵II、排列序列

目录 58. 最后一个单词的长度 Length of Last Word &#x1f31f; 59. 螺旋矩阵 II Spiral Matrix II &#x1f31f;&#x1f31f; 60. 排列序列 Permutation Sequence &#x1f31f;&#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日…...

短视频矩阵源码如何做应用编程?

短视频矩阵源码&#xff0c; 短视频矩阵系统技术文档&#xff1a; 可以采用电子文档或者纸质文档的形式交付&#xff0c;具体取决于需求方的要求。电子文档可以通过电子邮件、远程指导交付云存储等方式进行传输、 短视频矩阵{seo}源码是指将抖音平台上的视频资源进行筛选、排…...

【运维知识进阶篇】Ansible实现一套完整LNMP架构

前面介绍了PlayBook怎么写服务部署&#xff0c;把服务部署上后&#xff0c;我们来用Ansible来部署项目&#xff0c;实现一套完整的LNMP架构。我们部署wordpress、wecenter、phpshe、phpmyadmin这四个项目。将其所有的剧本都写入lnmp.yml中&#xff0c;相关备份数据都放入root/a…...

Spring Boot 自动配置一篇概览

一、什么是自动配置 bean 自动配置类通过添加 AutoConfiguration 注解实现。 因为 AutoConfiguration 注解本身是以 Configuration 注解的&#xff0c;所以自动配置类可以算是一个标准的基于 Configuration 注解的类。 Conditional 注解可以用于声明自动配置启用条件&#x…...

深入理解设计原则之接口隔离原则(ISP)【软件架构设计】

系列文章目录 C高性能优化编程系列 深入理解软件架构设计系列 深入理解设计模式系列 高级C并发线程编程 LSP&#xff1a;接口隔离原则 系列文章目录1、接口隔离原则的定义和解读2、案例解读3、如何判断一个接口是否符合接口隔离原则&#xff1f;小结 1、接口隔离原则的定义和…...

IMX6ULL裸机篇之I2C实验主控代码说明二

一. I2C实验 I2C实验内容&#xff1a; 学习如何使用 I.MX6U 的 I2C 接口来驱动 AP3216C&#xff0c;读取 AP3216C 的传感器数据。 I2C读写数据时序图&#xff1a; I2C写数据时序图如下&#xff1a; I2C读数据时序图如下&#xff1a; 二. I2C主控读写时序 1. 读数据与写数…...

【计算机组成原理与体系结构】数据的表示与运算

目录 一、进位计数制 二、信息编码 三、定点数数据表示 四、校验码 五、定点数补码加减运算 六、标志位的生成 七、定点数的移位运算 八、定点数的乘除运算 九、浮点数的表示 十、浮点数的运算 一、进位计数制 整数部分&#xff1a; 二进制、八进制、十六进制 --…...

DownGit终极指南:3分钟掌握GitHub精准下载技巧

DownGit终极指南&#xff1a;3分钟掌握GitHub精准下载技巧 【免费下载链接】DownGit github 资源打包下载工具 项目地址: https://gitcode.com/gh_mirrors/dow/DownGit 你是否曾经在GitHub上找到心仪的代码片段&#xff0c;却不得不下载整个庞大的项目仓库&#xff1f;或…...

旅游数据|基于Java+vue的旅游数据分享系统(源码+数据库+文档)​

旅游数据分享系统 目录 基于SprinBootvue的旅游数据分享系统 一、前言 二、系统设计 三、系统功能设计 5.1系统功能实现 5.2管理员模块实现 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍&#xff1a;…...

5大核心功能深度解析:如何用wvp-GB28181-pro构建企业级视频监控系统

5大核心功能深度解析&#xff1a;如何用wvp-GB28181-pro构建企业级视频监控系统 【免费下载链接】wvp-GB28181-pro 基于GB28181-2016、部标808、部标1078标准实现的开箱即用的网络视频平台。自带管理页面&#xff0c;支持NAT穿透&#xff0c;支持海康、大华、宇视等品牌的IPC、…...

新手教程使用curl命令通过Taotoken测试大模型API连通性

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 新手教程&#xff1a;使用curl命令通过Taotoken测试大模型API连通性 当你刚刚在Taotoken平台创建了API Key&#xff0c;最直接、最…...

Armv8/v9架构SCTLRMASK_EL2寄存器解析与应用

1. AArch64系统控制寄存器基础解析在Armv8/v9架构中&#xff0c;系统控制寄存器(System Control Registers)是处理器状态配置的核心组件&#xff0c;它们分布在不同的异常级别(EL0-EL3)&#xff0c;用于管理处理器行为、内存系统、安全状态等关键功能。这些寄存器通常通过MRS/M…...

【独家首发】Sora 2 v1.3.2内部一致性补丁文档泄露:仅限前500位AIGC工程师的8项prompt-engineering硬核干预法

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Sora 2人物一致性保持的核心挑战与底层机制 在长时序视频生成任务中&#xff0c;Sora 2需在数十秒甚至更长的视频序列中维持同一人物的外观、姿态、服饰、发型及微表情等多维度特征稳定复现。这一目标面…...

【学习笔记】探讨大模型应用安全建设系列5——供应链安全与数据防护

供应链安全在大模型场景里很容易被低估。很多团队以为管好代码依赖就够了&#xff0c;但大模型应用的供应链比传统应用长得多——模型、Prompt、知识库、插件、外部 API 都是攻击面。 LiteLLM 事件证明&#xff1a;一个依赖包投毒&#xff0c;短时间内就可能扩散到大量…...

Steam Deck多系统引导终极指南:3步完成图形化配置

Steam Deck多系统引导终极指南&#xff1a;3步完成图形化配置 【免费下载链接】SteamDeck_rEFInd Simple rEFInd install script for the Steam Deck (with GUI customization) 项目地址: https://gitcode.com/gh_mirrors/st/SteamDeck_rEFInd SteamDeck_rEFInd是一款专…...

BilibiliDown:3分钟掌握B站视频批量下载的终极解决方案

BilibiliDown&#xff1a;3分钟掌握B站视频批量下载的终极解决方案 【免费下载链接】BilibiliDown (GUI-多平台支持) B站 哔哩哔哩 视频下载器。支持稍后再看、收藏夹、UP主视频批量下载|Bilibili Video Downloader &#x1f633; 项目地址: https://gitcode.com/gh_mirrors/…...

基于FPGA的嵌入式频谱分析仪设计:低功耗实时信号处理方案

1. 项目概述&#xff1a;为什么要在FPGA上做频谱分析仪&#xff1f;做射频测试的工程师&#xff0c;对频谱分析仪肯定不陌生。实验室里动辄几十万上百万的台式机&#xff0c;性能强悍&#xff0c;功能全面&#xff0c;但有个问题&#xff1a;它离不开实验室。当你需要做外场测试…...