当前位置: 首页 > news >正文

【RocketMQ】重试机制及死信消息处理

【RocketMQ】重试机制及死信消息处理

文章目录

  • 【RocketMQ】重试机制及死信消息处理
    • 1. 重试机制
      • 1.1 生产者重试
      • 1.2 消费者重试
        • 1.2.1 死信队列

参考文档: 官方文档

1. 重试机制

1.1 生产者重试

rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异步发送为0次)。

当然也可以自定义重试次数及机制:

// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

1.2 消费者重试

若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。

  1. 在集群模式下,消费的业务逻辑代码会返回 Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多会重试 16 次,之后该消息会被丢弃。
  2. 在广播消费模式下,广播消费仍然保证消息至少被消费一次,但不提供重发的选项。

消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息

  • 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(5);

在实际生产中,一般重试3-5次,如果还没有消费成功,则可以把消息签收,通知人工介入。

  • 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);

示例:

消费者重试示例:

@Test
public void retryConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");//设置最大重试次数consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());//业务报错了 返回null 返回RECONSUME_LATER 都会重试if(true){throw new RuntimeException();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下:

消费类型重试间隔最大重试次数
顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX
并发消费间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。(根据延时等级划分,共16次)

1.2.1 死信队列

上面提到的 特殊Topic 称为死信Topic,对应的队列就是死信队列,其中存储的消息就是死信消息。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。应该通知人工介入处理。

对于死信的处理方案有多种,这里演示两种:

  1. 编写消费者监听死信队列
  2. 在消费者达到最大重试次数时立刻处理。

方案一

@Test
public void deadConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("%DLQ%retry-consumer-group", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");//业务报错了 返回null 返回RECONSUME_LATER 都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

该方案有一个缺点,就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。

方案二:

针对方案一的缺点,方案二能够比较好的解决。

@Test
public void retryConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("retryTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println(new Date());try {handleDb();// 10/0} catch (Exception e) {if (messageExt.getReconsumeTimes() >= 2) {//不要重试了System.out.println("消息体:" + new String(messageExt.getBody()));System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");} else {//重试System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}private void handleDb() {int i = 10 / 0;
}

相关文章:

【RocketMQ】重试机制及死信消息处理

【RocketMQ】重试机制及死信消息处理 文章目录 【RocketMQ】重试机制及死信消息处理1. 重试机制1.1 生产者重试1.2 消费者重试1.2.1 死信队列 参考文档&#xff1a; 官方文档 1. 重试机制 1.1 生产者重试 rocketmq生产者发送消息失败默认重试2次(同步发送为2次&#xff0c;异…...

Mysql DDL执行方式-pt-osc介绍 | 京东云技术团队

1 引言 大家好&#xff0c;接着上次和大家一起学习了《MySQL DDL执行方式-Online DDL介绍》&#xff0c;那么今天接着和大家一起学习另一种MySQL DDL执行方式之pt-soc。 在MySQL使用过程中&#xff0c;根据业务的需求对表结构进行变更是个普遍的运维操作&#xff0c;这些称为…...

C++ stack容器介绍

&#x1f914;stack容器介绍&#xff1a; &#x1f4d6; stack是一种数据结构&#xff0c;也可以被称为堆栈。它是一个容器&#xff0c;只允许在最顶层进行插入和删除&#xff0c;并且只能访问最后一个插入的元素。这个元素称为栈顶。所有新插入的元素都被放置在栈顶上面&#…...

在 Git 中撤消更改的 6 种方法!

目录 1. 修改最近的提交 2. 将分支重置为较旧的提交 硬重置 软重置分支 创建备份分支 3. 交互式变基 删除旧提交 改写提交消息 编辑旧提交 压缩 4. 还原提交 5. 签出文件 6. 使用 Git Reflog 当使用 Git 进行项目代码管理时&#xff0c;难免会出现一些错误操作或需…...

LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置

LiveGBS国标GB/T28181国标平台功能-电子地图移动位置订阅mobileposition地图定位GPS轨迹坐标位置获取redis获取位置 1、位置订阅1.1、国标设备编辑1.2、选择设备开启位置订阅1.3、全局开启位置订阅1.4、通过目录订阅获取位置(少数情况) 2、经纬度信息查询2.1、访问接口获取2.1.…...

编程(38)----------计算机的部分原理

本篇主要总结一些计算机的理论部分. 计算机在发展历程中,无论是最早的巨无霸机器,还是现在小到可以拿在手中的掌机.只要其本质上是计算机,在最基础的结构上,都是以冯诺依曼体系所构建的. 冯诺依曼体系大致将计算机分为几个最重要的部分:输入,输出,中央处理器,存储设备.也就是…...

若依框架快速搭建(二)

目录 数据库设计功能模块设计XXX信息管理xxx查询xxx添加xxx删除xxx修改xxx导出 功能模块实现运行数据库自动代码生成在IDEA中找到RuoYi-generator&#xff0c;修改配置运行前后端项目&#xff0c;在网页中找到代码生成模块导入表后点击确定&#xff0c;序号前打勾&#xff0c;再…...

为建筑物的供暖系统实施MPC控制器的小型项目(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

【概率论】中心极限定理(二)

文章目录 主观题主观题 每袋味精的净重为随机变量,平均重量为 100 克,标准差为 10 克。一箱内装 200 袋味精,求一箱味精的净重大于 20500 克的概率? 解: ① E ( X i ) = 100 , D ( X i ) = 1 0 2...

Blender UV展开流程

目录 1. UV1.1 blender默认物体1.2 创建物体1.3 UV参考图1.4 标记缝合边1.5 UV拉伸1.6 孤岛模式 1. UV 1.1 blender默认物体 默认物体已经自动生成UV 在UV编辑工作区&#xff0c;编辑模式&#xff0c;全选物体在左边自动展开UV 在物体数据属性-UV贴图-存在默认的UV贴图&#…...

Flutter 笔记 | Flutter 核心原理(二)关键类和启动流程

Widget、Element、BuildContext 和 RenderObject Widget Widget关键类及其子类继承关系如图所示&#xff1a; 其中&#xff0c;Widget是Widget Tree所有节点的基类。Widget的子类主要分为3类&#xff1a; 第1类是RenderObjectWidget的子类&#xff0c;具体来说又分为SingleCh…...

Android:主题切换

一.概述 正在开发的应用做了一版新UI&#xff0c;原打算将新版UI按项目名做成资源包&#xff0c;再在build.gradle里productFlavors{ }多渠道打包实现 但被告知新旧两个项目共用一个分支&#xff0c;那就做成两个主题(Theme1/Theme2)来适配了 如果只是变更UI&#xff0c;做成…...

terminalworks ASP.NET Core PDF 浏览器-Crack

ASP.NET Core 的 PDF 查看器 terminalworks在 ASP.NET Core 网页或应用程序中添加可靠的 PDF 查看器的简单方法。 我们的 Web PDF 查看器基于经过验证和测试的 Mozilla PdfJS 解决方案&#xff0c;该解决方案在 Firefox 中用作默认 PDF 查看器。我们专门设计了我们的查看器&…...

Rust每日一练(Leetday0020) 最后单词的长度、螺旋矩阵II、排列序列

目录 58. 最后一个单词的长度 Length of Last Word &#x1f31f; 59. 螺旋矩阵 II Spiral Matrix II &#x1f31f;&#x1f31f; 60. 排列序列 Permutation Sequence &#x1f31f;&#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日…...

短视频矩阵源码如何做应用编程?

短视频矩阵源码&#xff0c; 短视频矩阵系统技术文档&#xff1a; 可以采用电子文档或者纸质文档的形式交付&#xff0c;具体取决于需求方的要求。电子文档可以通过电子邮件、远程指导交付云存储等方式进行传输、 短视频矩阵{seo}源码是指将抖音平台上的视频资源进行筛选、排…...

【运维知识进阶篇】Ansible实现一套完整LNMP架构

前面介绍了PlayBook怎么写服务部署&#xff0c;把服务部署上后&#xff0c;我们来用Ansible来部署项目&#xff0c;实现一套完整的LNMP架构。我们部署wordpress、wecenter、phpshe、phpmyadmin这四个项目。将其所有的剧本都写入lnmp.yml中&#xff0c;相关备份数据都放入root/a…...

Spring Boot 自动配置一篇概览

一、什么是自动配置 bean 自动配置类通过添加 AutoConfiguration 注解实现。 因为 AutoConfiguration 注解本身是以 Configuration 注解的&#xff0c;所以自动配置类可以算是一个标准的基于 Configuration 注解的类。 Conditional 注解可以用于声明自动配置启用条件&#x…...

深入理解设计原则之接口隔离原则(ISP)【软件架构设计】

系列文章目录 C高性能优化编程系列 深入理解软件架构设计系列 深入理解设计模式系列 高级C并发线程编程 LSP&#xff1a;接口隔离原则 系列文章目录1、接口隔离原则的定义和解读2、案例解读3、如何判断一个接口是否符合接口隔离原则&#xff1f;小结 1、接口隔离原则的定义和…...

IMX6ULL裸机篇之I2C实验主控代码说明二

一. I2C实验 I2C实验内容&#xff1a; 学习如何使用 I.MX6U 的 I2C 接口来驱动 AP3216C&#xff0c;读取 AP3216C 的传感器数据。 I2C读写数据时序图&#xff1a; I2C写数据时序图如下&#xff1a; I2C读数据时序图如下&#xff1a; 二. I2C主控读写时序 1. 读数据与写数…...

【计算机组成原理与体系结构】数据的表示与运算

目录 一、进位计数制 二、信息编码 三、定点数数据表示 四、校验码 五、定点数补码加减运算 六、标志位的生成 七、定点数的移位运算 八、定点数的乘除运算 九、浮点数的表示 十、浮点数的运算 一、进位计数制 整数部分&#xff1a; 二进制、八进制、十六进制 --…...

C++:std::is_convertible

C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:

在 HarmonyOS 应用开发中&#xff0c;手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力&#xff0c;既支持点击、长按、拖拽等基础单一手势的精细控制&#xff0c;也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档&#xff0c…...

高频面试之3Zookeeper

高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个&#xff1f;3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制&#xff08;过半机制&#xff0…...

PL0语法,分析器实现!

简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计

随着大语言模型&#xff08;LLM&#xff09;参数规模的增长&#xff0c;推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长&#xff0c;而KV缓存的内存消耗可能高达数十GB&#xff08;例如Llama2-7B处理100K token时需50GB内存&a…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…...