【RocketMQ】重试机制及死信消息处理
【RocketMQ】重试机制及死信消息处理
文章目录
- 【RocketMQ】重试机制及死信消息处理
- 1. 重试机制
- 1.1 生产者重试
- 1.2 消费者重试
- 1.2.1 死信队列
参考文档: 官方文档
1. 重试机制
1.1 生产者重试
rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异步发送为0次)。
当然也可以自定义重试次数及机制:
// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);
1.2 消费者重试
若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。
- 在集群模式下,消费的业务逻辑代码会返回 Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多会重试 16 次,之后该消息会被丢弃。
- 在广播消费模式下,广播消费仍然保证消息至少被消费一次,但不提供重发的选项。
消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
- 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(5);
在实际生产中,一般重试3-5次,如果还没有消费成功,则可以把消息签收,通知人工介入。
- 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);
示例:
消费者重试示例:
@Test
public void retryConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");//设置最大重试次数consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());//业务报错了 返回null 返回RECONSUME_LATER 都会重试if(true){throw new RuntimeException();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。
并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下:
| 消费类型 | 重试间隔 | 最大重试次数 |
|---|---|---|
| 顺序消费 | 间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis | 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX |
| 并发消费 | 间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置 | 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。(根据延时等级划分,共16次) |
1.2.1 死信队列
上面提到的 特殊Topic 称为死信Topic,对应的队列就是死信队列,其中存储的消息就是死信消息。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。应该通知人工介入处理。
对于死信的处理方案有多种,这里演示两种:
- 编写消费者监听死信队列
- 在消费者达到最大重试次数时立刻处理。
方案一:
@Test
public void deadConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("%DLQ%retry-consumer-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");//业务报错了 返回null 返回RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
该方案有一个缺点,就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。
方案二:
针对方案一的缺点,方案二能够比较好的解决。
@Test
public void retryConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println(new Date());try {handleDb();// 10/0} catch (Exception e) {if (messageExt.getReconsumeTimes() >= 2) {//不要重试了System.out.println("消息体:" + new String(messageExt.getBody()));System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");} else {//重试System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}private void handleDb() {int i = 10 / 0;
}
相关文章:
【RocketMQ】重试机制及死信消息处理
【RocketMQ】重试机制及死信消息处理 文章目录 【RocketMQ】重试机制及死信消息处理1. 重试机制1.1 生产者重试1.2 消费者重试1.2.1 死信队列 参考文档: 官方文档 1. 重试机制 1.1 生产者重试 rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异…...
Mysql DDL执行方式-pt-osc介绍 | 京东云技术团队
1 引言 大家好,接着上次和大家一起学习了《MySQL DDL执行方式-Online DDL介绍》,那么今天接着和大家一起学习另一种MySQL DDL执行方式之pt-soc。 在MySQL使用过程中,根据业务的需求对表结构进行变更是个普遍的运维操作,这些称为…...
C++ stack容器介绍
🤔stack容器介绍: 📖 stack是一种数据结构,也可以被称为堆栈。它是一个容器,只允许在最顶层进行插入和删除,并且只能访问最后一个插入的元素。这个元素称为栈顶。所有新插入的元素都被放置在栈顶上面&#…...
在 Git 中撤消更改的 6 种方法!
目录 1. 修改最近的提交 2. 将分支重置为较旧的提交 硬重置 软重置分支 创建备份分支 3. 交互式变基 删除旧提交 改写提交消息 编辑旧提交 压缩 4. 还原提交 5. 签出文件 6. 使用 Git Reflog 当使用 Git 进行项目代码管理时,难免会出现一些错误操作或需…...
LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置
LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置 1、位置订阅1.1、国标设备编辑1.2、选择设备开启位置订阅1.3、全局开启位置订阅1.4、通过目录订阅获取位置(少数情况) 2、经纬度信息查询2.1、访问接口获取2.1.…...
编程(38)----------计算机的部分原理
本篇主要总结一些计算机的理论部分. 计算机在发展历程中,无论是最早的巨无霸机器,还是现在小到可以拿在手中的掌机.只要其本质上是计算机,在最基础的结构上,都是以冯诺依曼体系所构建的. 冯诺依曼体系大致将计算机分为几个最重要的部分:输入,输出,中央处理器,存储设备.也就是…...
若依框架快速搭建(二)
目录 数据库设计功能模块设计XXX信息管理xxx查询xxx添加xxx删除xxx修改xxx导出 功能模块实现运行数据库自动代码生成在IDEA中找到RuoYi-generator,修改配置运行前后端项目,在网页中找到代码生成模块导入表后点击确定,序号前打勾,再…...
为建筑物的供暖系统实施MPC控制器的小型项目(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
【概率论】中心极限定理(二)
文章目录 主观题主观题 每袋味精的净重为随机变量,平均重量为 100 克,标准差为 10 克。一箱内装 200 袋味精,求一箱味精的净重大于 20500 克的概率? 解: ① E ( X i ) = 100 , D ( X i ) = 1 0 2...
Blender UV展开流程
目录 1. UV1.1 blender默认物体1.2 创建物体1.3 UV参考图1.4 标记缝合边1.5 UV拉伸1.6 孤岛模式 1. UV 1.1 blender默认物体 默认物体已经自动生成UV 在UV编辑工作区,编辑模式,全选物体在左边自动展开UV 在物体数据属性-UV贴图-存在默认的UV贴图&#…...
Flutter 笔记 | Flutter 核心原理(二)关键类和启动流程
Widget、Element、BuildContext 和 RenderObject Widget Widget关键类及其子类继承关系如图所示: 其中,Widget是Widget Tree所有节点的基类。Widget的子类主要分为3类: 第1类是RenderObjectWidget的子类,具体来说又分为SingleCh…...
Android:主题切换
一.概述 正在开发的应用做了一版新UI,原打算将新版UI按项目名做成资源包,再在build.gradle里productFlavors{ }多渠道打包实现 但被告知新旧两个项目共用一个分支,那就做成两个主题(Theme1/Theme2)来适配了 如果只是变更UI,做成…...
terminalworks ASP.NET Core PDF 浏览器-Crack
ASP.NET Core 的 PDF 查看器 terminalworks在 ASP.NET Core 网页或应用程序中添加可靠的 PDF 查看器的简单方法。 我们的 Web PDF 查看器基于经过验证和测试的 Mozilla PdfJS 解决方案,该解决方案在 Firefox 中用作默认 PDF 查看器。我们专门设计了我们的查看器&…...
Rust每日一练(Leetday0020) 最后单词的长度、螺旋矩阵II、排列序列
目录 58. 最后一个单词的长度 Length of Last Word 🌟 59. 螺旋矩阵 II Spiral Matrix II 🌟🌟 60. 排列序列 Permutation Sequence 🌟🌟🌟 🌟 每日一练刷题专栏 🌟 Rust每日…...
短视频矩阵源码如何做应用编程?
短视频矩阵源码, 短视频矩阵系统技术文档: 可以采用电子文档或者纸质文档的形式交付,具体取决于需求方的要求。电子文档可以通过电子邮件、远程指导交付云存储等方式进行传输、 短视频矩阵{seo}源码是指将抖音平台上的视频资源进行筛选、排…...
【运维知识进阶篇】Ansible实现一套完整LNMP架构
前面介绍了PlayBook怎么写服务部署,把服务部署上后,我们来用Ansible来部署项目,实现一套完整的LNMP架构。我们部署wordpress、wecenter、phpshe、phpmyadmin这四个项目。将其所有的剧本都写入lnmp.yml中,相关备份数据都放入root/a…...
Spring Boot 自动配置一篇概览
一、什么是自动配置 bean 自动配置类通过添加 AutoConfiguration 注解实现。 因为 AutoConfiguration 注解本身是以 Configuration 注解的,所以自动配置类可以算是一个标准的基于 Configuration 注解的类。 Conditional 注解可以用于声明自动配置启用条件&#x…...
深入理解设计原则之接口隔离原则(ISP)【软件架构设计】
系列文章目录 C高性能优化编程系列 深入理解软件架构设计系列 深入理解设计模式系列 高级C并发线程编程 LSP:接口隔离原则 系列文章目录1、接口隔离原则的定义和解读2、案例解读3、如何判断一个接口是否符合接口隔离原则?小结 1、接口隔离原则的定义和…...
IMX6ULL裸机篇之I2C实验主控代码说明二
一. I2C实验 I2C实验内容: 学习如何使用 I.MX6U 的 I2C 接口来驱动 AP3216C,读取 AP3216C 的传感器数据。 I2C读写数据时序图: I2C写数据时序图如下: I2C读数据时序图如下: 二. I2C主控读写时序 1. 读数据与写数…...
【计算机组成原理与体系结构】数据的表示与运算
目录 一、进位计数制 二、信息编码 三、定点数数据表示 四、校验码 五、定点数补码加减运算 六、标志位的生成 七、定点数的移位运算 八、定点数的乘除运算 九、浮点数的表示 十、浮点数的运算 一、进位计数制 整数部分: 二进制、八进制、十六进制 --…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
Robots.txt 文件
什么是robots.txt? robots.txt 是一个位于网站根目录下的文本文件(如:https://example.com/robots.txt),它用于指导网络爬虫(如搜索引擎的蜘蛛程序)如何抓取该网站的内容。这个文件遵循 Robots…...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...
PAN/FPN
import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
基于PHP的连锁酒店管理系统
有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发,数据库mysql,前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...
解析两阶段提交与三阶段提交的核心差异及MySQL实现方案
引言 在分布式系统的事务处理中,如何保障跨节点数据操作的一致性始终是核心挑战。经典的两阶段提交协议(2PC)通过准备阶段与提交阶段的协调机制,以同步决策模式确保事务原子性。其改进版本三阶段提交协议(3PC…...
JS红宝书笔记 - 3.3 变量
要定义变量,可以使用var操作符,后跟变量名 ES实现变量初始化,因此可以同时定义变量并设置它的值 使用var操作符定义的变量会成为包含它的函数的局部变量。 在函数内定义变量时省略var操作符,可以创建一个全局变量 如果需要定义…...
Tauri2学习笔记
教程地址:https://www.bilibili.com/video/BV1Ca411N7mF?spm_id_from333.788.player.switch&vd_source707ec8983cc32e6e065d5496a7f79ee6 官方指引:https://tauri.app/zh-cn/start/ 目前Tauri2的教程视频不多,我按照Tauri1的教程来学习&…...
