当前位置: 首页 > news >正文

【RocketMQ】重试机制及死信消息处理

【RocketMQ】重试机制及死信消息处理

文章目录

  • 【RocketMQ】重试机制及死信消息处理
    • 1. 重试机制
      • 1.1 生产者重试
      • 1.2 消费者重试
        • 1.2.1 死信队列

参考文档: 官方文档

1. 重试机制

1.1 生产者重试

rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异步发送为0次)。

当然也可以自定义重试次数及机制:

// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

1.2 消费者重试

若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。

  1. 在集群模式下,消费的业务逻辑代码会返回 Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多会重试 16 次,之后该消息会被丢弃。
  2. 在广播消费模式下,广播消费仍然保证消息至少被消费一次,但不提供重发的选项。

消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息

  • 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(5);

在实际生产中,一般重试3-5次,如果还没有消费成功,则可以把消息签收,通知人工介入。

  • 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);

示例:

消费者重试示例:

@Test
public void retryConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");//设置最大重试次数consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());//业务报错了 返回null 返回RECONSUME_LATER 都会重试if(true){throw new RuntimeException();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下:

消费类型重试间隔最大重试次数
顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX
并发消费间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。(根据延时等级划分,共16次)

1.2.1 死信队列

上面提到的 特殊Topic 称为死信Topic,对应的队列就是死信队列,其中存储的消息就是死信消息。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。应该通知人工介入处理。

对于死信的处理方案有多种,这里演示两种:

  1. 编写消费者监听死信队列
  2. 在消费者达到最大重试次数时立刻处理。

方案一

@Test
public void deadConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("%DLQ%retry-consumer-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");//业务报错了 返回null 返回RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

该方案有一个缺点,就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。

方案二:

针对方案一的缺点,方案二能够比较好的解决。

@Test
public void retryConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println(new Date());try {handleDb();// 10/0} catch (Exception e) {if (messageExt.getReconsumeTimes() >= 2) {//不要重试了System.out.println("消息体:" + new String(messageExt.getBody()));System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");} else {//重试System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}private void handleDb() {int i = 10 / 0;
}

相关文章:

【RocketMQ】重试机制及死信消息处理

【RocketMQ】重试机制及死信消息处理 文章目录 【RocketMQ】重试机制及死信消息处理1. 重试机制1.1 生产者重试1.2 消费者重试1.2.1 死信队列 参考文档&#xff1a; 官方文档 1. 重试机制 1.1 生产者重试 rocketmq生产者发送消息失败默认重试2次(同步发送为2次&#xff0c;异…...

Mysql DDL执行方式-pt-osc介绍 | 京东云技术团队

1 引言 大家好&#xff0c;接着上次和大家一起学习了《MySQL DDL执行方式-Online DDL介绍》&#xff0c;那么今天接着和大家一起学习另一种MySQL DDL执行方式之pt-soc。 在MySQL使用过程中&#xff0c;根据业务的需求对表结构进行变更是个普遍的运维操作&#xff0c;这些称为…...

C++ stack容器介绍

&#x1f914;stack容器介绍&#xff1a; &#x1f4d6; stack是一种数据结构&#xff0c;也可以被称为堆栈。它是一个容器&#xff0c;只允许在最顶层进行插入和删除&#xff0c;并且只能访问最后一个插入的元素。这个元素称为栈顶。所有新插入的元素都被放置在栈顶上面&#…...

在 Git 中撤消更改的 6 种方法!

目录 1. 修改最近的提交 2. 将分支重置为较旧的提交 硬重置 软重置分支 创建备份分支 3. 交互式变基 删除旧提交 改写提交消息 编辑旧提交 压缩 4. 还原提交 5. 签出文件 6. 使用 Git Reflog 当使用 Git 进行项目代码管理时&#xff0c;难免会出现一些错误操作或需…...

LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置

LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置 1、位置订阅1.1、国标设备编辑1.2、选择设备开启位置订阅1.3、全局开启位置订阅1.4、通过目录订阅获取位置(少数情况) 2、经纬度信息查询2.1、访问接口获取2.1.…...

编程(38)----------计算机的部分原理

本篇主要总结一些计算机的理论部分. 计算机在发展历程中,无论是最早的巨无霸机器,还是现在小到可以拿在手中的掌机.只要其本质上是计算机,在最基础的结构上,都是以冯诺依曼体系所构建的. 冯诺依曼体系大致将计算机分为几个最重要的部分:输入,输出,中央处理器,存储设备.也就是…...

若依框架快速搭建(二)

目录 数据库设计功能模块设计XXX信息管理xxx查询xxx添加xxx删除xxx修改xxx导出 功能模块实现运行数据库自动代码生成在IDEA中找到RuoYi-generator&#xff0c;修改配置运行前后端项目&#xff0c;在网页中找到代码生成模块导入表后点击确定&#xff0c;序号前打勾&#xff0c;再…...

为建筑物的供暖系统实施MPC控制器的小型项目(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

【概率论】中心极限定理(二)

文章目录 主观题主观题 每袋味精的净重为随机变量,平均重量为 100 克,标准差为 10 克。一箱内装 200 袋味精,求一箱味精的净重大于 20500 克的概率? 解: ① E ( X i ) = 100 , D ( X i ) = 1 0 2...

Blender UV展开流程

目录 1. UV1.1 blender默认物体1.2 创建物体1.3 UV参考图1.4 标记缝合边1.5 UV拉伸1.6 孤岛模式 1. UV 1.1 blender默认物体 默认物体已经自动生成UV 在UV编辑工作区&#xff0c;编辑模式&#xff0c;全选物体在左边自动展开UV 在物体数据属性-UV贴图-存在默认的UV贴图&#…...

Flutter 笔记 | Flutter 核心原理(二)关键类和启动流程

Widget、Element、BuildContext 和 RenderObject Widget Widget关键类及其子类继承关系如图所示&#xff1a; 其中&#xff0c;Widget是Widget Tree所有节点的基类。Widget的子类主要分为3类&#xff1a; 第1类是RenderObjectWidget的子类&#xff0c;具体来说又分为SingleCh…...

Android:主题切换

一.概述 正在开发的应用做了一版新UI&#xff0c;原打算将新版UI按项目名做成资源包&#xff0c;再在build.gradle里productFlavors{ }多渠道打包实现 但被告知新旧两个项目共用一个分支&#xff0c;那就做成两个主题(Theme1/Theme2)来适配了 如果只是变更UI&#xff0c;做成…...

terminalworks ASP.NET Core PDF 浏览器-Crack

ASP.NET Core 的 PDF 查看器 terminalworks在 ASP.NET Core 网页或应用程序中添加可靠的 PDF 查看器的简单方法。 我们的 Web PDF 查看器基于经过验证和测试的 Mozilla PdfJS 解决方案&#xff0c;该解决方案在 Firefox 中用作默认 PDF 查看器。我们专门设计了我们的查看器&…...

Rust每日一练(Leetday0020) 最后单词的长度、螺旋矩阵II、排列序列

目录 58. 最后一个单词的长度 Length of Last Word &#x1f31f; 59. 螺旋矩阵 II Spiral Matrix II &#x1f31f;&#x1f31f; 60. 排列序列 Permutation Sequence &#x1f31f;&#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日…...

短视频矩阵源码如何做应用编程?

短视频矩阵源码&#xff0c; 短视频矩阵系统技术文档&#xff1a; 可以采用电子文档或者纸质文档的形式交付&#xff0c;具体取决于需求方的要求。电子文档可以通过电子邮件、远程指导交付云存储等方式进行传输、 短视频矩阵{seo}源码是指将抖音平台上的视频资源进行筛选、排…...

【运维知识进阶篇】Ansible实现一套完整LNMP架构

前面介绍了PlayBook怎么写服务部署&#xff0c;把服务部署上后&#xff0c;我们来用Ansible来部署项目&#xff0c;实现一套完整的LNMP架构。我们部署wordpress、wecenter、phpshe、phpmyadmin这四个项目。将其所有的剧本都写入lnmp.yml中&#xff0c;相关备份数据都放入root/a…...

Spring Boot 自动配置一篇概览

一、什么是自动配置 bean 自动配置类通过添加 AutoConfiguration 注解实现。 因为 AutoConfiguration 注解本身是以 Configuration 注解的&#xff0c;所以自动配置类可以算是一个标准的基于 Configuration 注解的类。 Conditional 注解可以用于声明自动配置启用条件&#x…...

深入理解设计原则之接口隔离原则(ISP)【软件架构设计】

系列文章目录 C高性能优化编程系列 深入理解软件架构设计系列 深入理解设计模式系列 高级C并发线程编程 LSP&#xff1a;接口隔离原则 系列文章目录1、接口隔离原则的定义和解读2、案例解读3、如何判断一个接口是否符合接口隔离原则&#xff1f;小结 1、接口隔离原则的定义和…...

IMX6ULL裸机篇之I2C实验主控代码说明二

一. I2C实验 I2C实验内容&#xff1a; 学习如何使用 I.MX6U 的 I2C 接口来驱动 AP3216C&#xff0c;读取 AP3216C 的传感器数据。 I2C读写数据时序图&#xff1a; I2C写数据时序图如下&#xff1a; I2C读数据时序图如下&#xff1a; 二. I2C主控读写时序 1. 读数据与写数…...

【计算机组成原理与体系结构】数据的表示与运算

目录 一、进位计数制 二、信息编码 三、定点数数据表示 四、校验码 五、定点数补码加减运算 六、标志位的生成 七、定点数的移位运算 八、定点数的乘除运算 九、浮点数的表示 十、浮点数的运算 一、进位计数制 整数部分&#xff1a; 二进制、八进制、十六进制 --…...

CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型

CVPR 2025 | MIMO&#xff1a;支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题&#xff1a;MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者&#xff1a;Yanyuan Chen, Dexuan Xu, Yu Hu…...

Admin.Net中的消息通信SignalR解释

定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...

PL0语法,分析器实现!

简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...

【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)

升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点&#xff0c;但无自动故障转移能力&#xff0c;Master宕机后需人工切换&#xff0c;期间消息可能无法读取。Slave仅存储数据&#xff0c;无法主动升级为Master响应请求&#xff…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝

目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为&#xff1a;一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

Redis:现代应用开发的高效内存数据存储利器

一、Redis的起源与发展 Redis最初由意大利程序员Salvatore Sanfilippo在2009年开发&#xff0c;其初衷是为了满足他自己的一个项目需求&#xff0c;即需要一个高性能的键值存储系统来解决传统数据库在高并发场景下的性能瓶颈。随着项目的开源&#xff0c;Redis凭借其简单易用、…...

作为测试我们应该关注redis哪些方面

1、功能测试 数据结构操作&#xff1a;验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化&#xff1a;测试aof和aof持久化机制&#xff0c;确保数据在开启后正确恢复。 事务&#xff1a;检查事务的原子性和回滚机制。 发布订阅&#xff1a;确保消息正确传递。 2、性…...

02.运算符

目录 什么是运算符 算术运算符 1.基本四则运算符 2.增量运算符 3.自增/自减运算符 关系运算符 逻辑运算符 &&&#xff1a;逻辑与 ||&#xff1a;逻辑或 &#xff01;&#xff1a;逻辑非 短路求值 位运算符 按位与&&#xff1a; 按位或 | 按位取反~ …...