RabbitMQ 高级特性——重试机制
文章目录
- 前言
- 重试机制
- 配置文件设置
- 生命交换机、队列和绑定关系
- 生产者发送消息
- 消费消息
前言
前面我们学习了 RabbitMQ 保证消息传递可靠性的机制——消息确认、持久化和发送发确认,那么对于消息确认和发送方确认,如果接收方没有收到消息,那么这时该怎么做呢?这就要提到 RabbitMQ 的重试机制了,当发送方发送的消息没有成功到达接收方,那么发送方就会使用重试机制来向接收方重新发送消息,如果接收方因为程序逻辑错误引起的,那么不管发送方重新发送多少次,接收方都不能正确的处理这个消息,那么发送方也不能无休止的向接收方发送消息,当发送的次数超过某个数量的时候,那么这个消息就会被标记为死信,然后被处理掉。那么这篇文章将详细的说明一下 RabbitMQ 的重试机制。
重试机制
RabbitMQ的重试机制是一种强大的功能,它允许在消息处理失败时自动重试,从而提高系统的可靠性和稳定性。
重试机制的触发条件
- 消息发送失败:当消息发送到RabbitMQ服务器时,如果因为网络问题、认证失败或其他原因导致发送失败,RabbitMQ会尝试重试发送。
- 消息消费失败:当消费者从队列中获取消息并尝试处理时,如果因为某种原因(如业务逻辑错误、系统异常等)导致处理失败,RabbitMQ会根据预设的重试策略进行重试。
我们都知道 Spring AMQP 提供了四种确认策略:NONE、MANUAL、AUTO。当确认策略为 NONE 的时候队列不管这个消息是否被成功处理都会将这个消息从队列中删除,而确认策略为 MANUAL 的时候则更多的靠程序本身处理,那么重试机制更多的使用在确认策略为 AUTO 的情况下。
配置文件设置
spring:rabbitmq:addresses: amqp://admin:admin@x.x.x.x:5672/testlistener:simple:acknowledge-mode: auto # 确认模式retry:enabled: true # 开始重试机制initial-interval: 5000ms # 每次重试的间隔时间max-attempts: 5 # 最大重试次数
生命交换机、队列和绑定关系
public class Constants {public static final String RETRY_EXCHANGE = "retry.exchange";public static final String RETRY_QUEUE = "retry.queue";
}
@Configuration
public class RabbitConfig {@Bean("retryExchange")public DirectExchange retryExchange() {return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();}@Bean("retryQueue")public Queue retryQueue() {return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("retry");}
}
生产者发送消息
@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/retry")public String retry() {rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","rabbitmq retry");return "消息发送成功";}
}
消费消息
@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void listener(Message message, Channel channel) {System.out.println("接收到消息:" + message + channel);//制造出异常使得消费者不能正常处理消息int ret = 3/0;System.out.println("消息处理成功");}
}
我们先将重试机制的配置给注释掉,然后运行之后会发现消费者会重复读取一条消息并且一直抛异常,这是因为:
由于监听器方法抛出了异常并且没有被捕获,Spring AMQP将不会发送ACK,RabbitMQ将不断重新投递这条消息给消费者,直到它最终被确认、被拒绝(并可能发送到死信队列)或队列被删除。
那么我们配置重试机制的重试次数之后再看看什么效果:
可以发现,当配置了重试次数之后,包括第一次投递消息,队列一共给消费者发送了五次消息,并且这个五次的消息的 deliveryTag 是一样的,也就说明是队列的重试机制投递的,五次之后队列便不再向该消费者发送该消息,这样就避免了因消费者的程序逻辑问题而导致队列无休止的向消费者发送消息的问题,并且还保证了消息传递的可靠性。
那么我们将消息的确认策略更换为 MANUAL 手动确认的方式,然后看看这个重试机制的配置会不会生效:
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void listener(Message message, Channel channel) throws IOException, InterruptedException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("接收到消息:" + message + channel);//制造出异常使得消费者不能正常处理消息int ret = 3/0;System.out.println("消息处理成功");channel.basicAck(deliveryTag,false);} catch (Exception e) {Thread.sleep(1000);//第三个参数requeue表示被拒绝的消息是否重新进入队列,true表示重新进入队列并且重新投递这个消息,否则直接丢弃channel.basicNack(deliveryTag,false,true);}
}
发现当确认策略为 manual 手动确认的时候,我们的重试机制的配置是不生效的,并且可以发现我们的消息的 deliveryTag 是不断递增的,也就是说之前拒绝的消息每次都会重新入队列然后重新投递,跟重试机制是不一样的。
而如果我们 basicNack 方法的第三个参数 requeue 参数的值为 fasle 时候,那么这个被拒绝的消息就会被丢弃或者投递到的死信队列中。
使用重试机制时需要注意:
-
自动确认模式下:程序逻辑出现异常,即使多次重试还是失败,消息也会被自动确认,那么这条消息就会丢失。
-
在手动确认模式下,消费者需要显式地发送ACK(Acknowledgment)或NACK来告诉RabbitMQ消息是否已被成功处理。如果发生异常且没有发送ACK,消息将保持为unacked状态,这允许RabbitMQ重新将消息发送给其他消费者(如果配置了多个消费者)或根据队列的其他配置(如死信队列)来处理未确认的消息。然而,如果所有消费者都无法处理该消息,它将导致消息在队列中积压,除非配置了适当的超时机制或死信队列来处理这些长时间未确认的消息。
相关文章:

RabbitMQ 高级特性——重试机制
文章目录 前言重试机制配置文件设置生命交换机、队列和绑定关系生产者发送消息消费消息 前言 前面我们学习了 RabbitMQ 保证消息传递可靠性的机制——消息确认、持久化和发送发确认,那么对于消息确认和发送方确认,如果接收方没有收到消息,那…...
每天一道面试题(20):锁的发生原因和避免措施
死锁的发生原因和避免措施 一、概述 在多线程编程中,死锁是一个常见的问题。理解死锁的成因和避免策略是提升程序稳定性和性能的重要能力。 二、普通人 vs 高手的回答 普通人回答: “临场发挥…” 高手回答: “死锁是指两个或多个线程在执…...

2024淘宝双11活动,收下这份必买好物推荐清单
双11如何真正值得购买的好物呢?为了帮助大家把握住这次难得的购物良机,我们特别推出了这份“2024淘宝双11必买好物推荐清单”。这份清单涵盖数码电子、家居生活、个护健康等多个领域的优质产品。无论你是科技爱好者,还是注重生活品质的消费者…...

vue-cli,element-plus,axios,proxy
一、vue-cli vue-cli俗称vue脚手架,是vue官方提供的快速生成vue 工程化项目的工具。 1.官网:https://cn.vuejs.org/ 中文官网: https://cli.vuejs.org/zh/ 特点:基于webpack,功能丰富且易于扩展,支持创建vue2和vu…...

《Zeotero的学习》
学习视频链接 Zeotera的安装 官网点击download,选择合适的版本进行下载,并安装插件。 下载完成之后,点击安装包,一路默认就可以。如果不想下载在C盘,可以在步骤中选择自定义路径。 Zeotero的注册 官网进行注册&am…...
大数据复习知识点1
1、HDFS和MapReduce的起源:HDFS起源于Google的GFS论文,它是为了解决大规模数据集的存储问题而设计的。而MapReduce则是Google为了解决大规模数据处理问题而提出的一种并行计算模型。 2、YARN的作用:YARN是Hadoop的资源管理器,它负…...

9.26 Buu俩题解
[CISCN2019 华东北赛区]Web2 看wp写完之后写的 知识点 存储型XSS与过滤绕过sql注入 题解 好几个页面,存在登录框可以注册,存在管理员页面(admin.php) ->既然存在管理员页面,且直接访问admin.php提示我们 说明存在身份验证࿰…...

Mitsuba 渲染基础
Mitsuba 渲染基础 0. Abstract1. 安装 Mitsuba21.1 下载 Mitsuba2 源码1.2 选择后端 (variants)1.3 编译 2. [Mitsuba2PointCloudRenderer](https://github.com/tolgabirdal/Mitsuba2PointCloudRenderer)2.1 Mitsuba2 渲染 XML2.2 Scene 场景的 XML 文件格式2.2.1 chair.npy to…...
深入理解 WebSocket:实时通信的利器
深入理解 WebSocket:实时通信的利器 1. 什么是 WebSocket? WebSocket 是一种网络通信协议,它允许服务器和客户端之间建立持久的双向通信通道,适用于高频率、低延迟的数据交换场景。在传统的 HTTP 通信中,每次请求都需…...

OpenEuler配置本地yum源
0x00 服务器版本 将本地镜像传输至服务器 操作步骤如下 # 创建一个目录用于挂载光盘映像 mkdir /media/cdrom/# 将光盘映像挂载到指定目录 mount /kvm/openeuler.iso /media/cdrom/#进入Yum仓库配置目录 cd /etc/yum.repos.d/# 备份原有的 openEuler.repo 文件 mv openEuler.…...

论文不同写作风格下的ChatGPT提示词分享
学境思源,一键生成论文初稿: AcademicIdeas - 学境思源AI论文写作 在学术论文写作中,不同的写作风格能显著影响文章的表达效果与读者的理解。无论是描述性、分析性、论证性,还是批判性写作风格,合理选择和运用恰当的写…...

单点登录(SSO)基础
单点登录(SSO, Single Sign-On) 是一种身份认证机制,允许用户在多个独立的应用系统中只进行一次登录操作,即可访问所有授权的应用或服务,而无需每次切换应用时都进行登录。SSO 提高了用户体验的便捷性,同时…...

设置VsCode搜索时排除文件,文件列表中隐藏文件
按照《VsCode gdb gdbserver远程调试C程序》中介绍的方法,配置好VsCode后,打开一个C/C工程,发现左侧的面板会显示编译时生成的中间文件(比如.d和.o文件)。我们可以通过设置隐藏掉一些我们不需要打开的文件以简洁面板…...

急!现在转大模型还来得及吗?零基础入门到精通,收藏这一篇就够了
大模型的出现,让行内和行外大多数人都感到非常焦虑。 行外很多人想了解却感到无从下手,行内很多人苦于没有硬件条件无法尝试。想转大模型方向,相关的招聘虽然层出不穷,但一般都要求有大模型经验。而更多的人,则一直处…...

使用 lstm + crf 实现NER
条件随机场CRF 前言 CRF是给定一组输入随机变量条件下另一组输出随机变量的条件概率分布的模型。特点:假设输出随机变量构成马尔卡夫随机场。CRF可以用于不同的预测问题。但是主要讨论线性链条件随机场,这时问题变成了由输入序列对输出序列的判别模型&…...

【牛掰】这款RPA多平台引流获客软件已正式上线,助您日引流1000+
哈喽大家好我是年哥,自从上次与大家团购了那款基于autojs开发的RPA引流获客的源码,经过本缝合怪不断地修修补补,终于将它变成自己的了,还为它起了个魔幻的名字:获客宝RPA。尽管部分功能还有点瑕疵,但是不管…...

Python的包管理工具pip安装
Python的包管理工具pip安装 一、安装步骤1.检查 pip是否已安装2.安装 pip方法一:通过 ensurepip 模块安装(推荐)方法二:通过 get-pip.py 脚本安装(经常应为网络域名问题连接不上) 3.验证pip安装4.创建别名5.更新pip 二、常…...
《AIGC 时代程序员的应对之策》
在 AIGC 大语言模型不断涌现、AI 辅助编程工具日益普及的当下,程序员的工作方式确实面临着深刻变革。对于程序员来说,如何应对这一趋势成为了至关重要的问题。 一方面,有人担忧 AI 可能取代部分编程工作。不可否认,随着技术的发展…...

51单片机系列-串口(UART)通信技术
🌈个人主页: 羽晨同学 💫个人格言:“成为自己未来的主人~” 并行通信和串行通信 并行方式 并行方式:数据的各位用多条数据线同时发送或者同时接收 并行通信特点:传送速度快,但因需要多根传输线…...

使用k8s部署java前后端服务
一、项目架构 前端、后端、数据库 1)前端 静态的资源:img css html js文件 js:axios、ajax 2)后端 提供数据:根据web前端发送的请求,从数据库中获取数据 请求都是无状态的,如何保持会话 …...

华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...

遍历 Map 类型集合的方法汇总
1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...