【RabbitMQ高级篇】消息可靠性问题(1)
目录
1.消息可靠性
1.1.生产者消息确认
1.1.1.修改配置
1.1.2.定义Return回调
1.1.3.定义ConfirmCallback
1.2.消息持久化
1.2.1.交换机持久化
1.2.2.队列持久化
1.2.3.消息持久化
1.3.消费者消息确认
1.3.1.演示none模式
1.3.2.演示auto模式
1.4.消费失败重试机制
1.4.1.本地重试
1.4.2.失败策略
1.5.总结
1.消息可靠性
消息从发送,到消费者接收,会经理多个过程:

其中的每一步都可能导致消息丢失,常见的丢失原因包括:
-
发送时丢失:
-
生产者发送的消息未送达exchange
-
消息到达exchange后未到达queue
-
-
MQ宕机,queue将消息丢失
-
consumer接收到消息后未消费就宕机
针对这些问题,RabbitMQ分别给出了解决方案:
-
生产者确认机制
-
mq持久化
-
消费者确认机制
-
失败重试机制
下面我们就通过案例来演示每一个步骤。
首先,导入课前资料提供的demo工程:

项目结构如下:

1.1.生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
-
publisher-confirm,发送者确认
-
消息成功投递到交换机,返回ack
-
消息未投递到交换机,返回nack
-
-
publisher-return,发送者回执
-
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
-

注意:

1.1.1.修改配置
首先,修改publisher服务中的application.yml文件,添加下面的内容:
spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true
说明:
-
publish-confirm-type:开启publisher-confirm,这里支持两种类型:-
simple:同步等待confirm结果,直到超时 -
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
-
-
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback -
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
1.1.2.定义Return回调
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:
修改publisher服务,添加一个:
package cn.itcast.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}
1.1.3.定义ConfirmCallback
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执Thread.sleep(2000);
}
1.2.消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
-
交换机持久化
-
队列持久化
-
消息持久化
1.2.1.交换机持久化
RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}
事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。
可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示:

1.2.2.队列持久化
RabbitMQ中队列默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}
事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。
可以在RabbitMQ控制台看到持久化的队列都会带上D的标示:

1.2.3.消息持久化
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
-
1:非持久化
-
2:持久化
用java代码指定:

默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
1.3.消费者消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
-
1)RabbitMQ投递消息给消费者
-
2)消费者获取消息后,返回ACK给RabbitMQ
-
3)RabbitMQ删除消息
-
4)消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
而SpringAMQP则允许配置三种确认模式:
•manual:手动ack,需要在业务代码结束后,调用api发送ack。
•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
由此可知:
-
none模式下,消息投递是不可靠的,可能丢失
-
auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
-
manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。
1.3.1.演示none模式
修改consumer服务的application.yml文件,添加下面内容:
spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {log.info("消费者接收到simple.queue的消息:【{}】", msg);// 模拟异常System.out.println(1 / 0);log.debug("消息处理完成!");
}
测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。
1.3.2.演示auto模式
再次把确认机制修改为auto:
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:

1.4.消费失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
怎么办呢?
1.4.1.本地重试
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
-
在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
-
查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
结论:
-
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
-
重试达到最大次数后,Spring会返回ack,消息会被丢弃
1.4.2.失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
-
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
-
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代码:
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
1.5.总结
如何确保RabbitMQ消息的可靠性?
-
开启生产者确认机制,确保生产者的消息能到达队列
-
开启持久化功能,确保消息未消费前在队列中不会丢失
-
开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
-
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
相关文章:
【RabbitMQ高级篇】消息可靠性问题(1)
目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制…...
ASP.NET |日常开发中常见问题归纳讲解
ASP.NET |日常开发中常见问题归纳讲解 前言一、性能问题1.1 数据库访问性能1.2 视图状态(在ASP.NET Web Forms 中) 二、安全问题2.1 SQL 注入2.2 跨站脚本攻击(XSS) 三、状态管理问题3.1 会话状态(Session …...
【【深入浅出TinyRisc-v】】
深入浅出TinyRisc-v 本代码参考于 https://gitee.com/liangkangnan/tinyriscv 自己理解之后又重新写了一遍 tinyriscv.v // 涓嬮潰鏄鏁翠釜top妯″潡鐨勪功鍐? module tinyriscv(input clk ,input rst_n …...
常见的限流算法
常见的限流算法 限流的定义固定窗口算法滑动窗口算法漏桶算法(推荐)令牌桶算法(推荐)限流粒度本地限流(单机限流)分布式限流(多机限流)分布式限流的实现 限流的定义 限流,也称流量控制。是指系统…...
【Leetcode 每日一题】3159. 查询数组中元素的出现位置
问题背景 给你一个整数数组 n u m s nums nums,一个整数数组 q u e r i e s queries queries 和一个整数 x x x。 对于每个查询 q u e r i e s [ i ] queries[i] queries[i],你需要找到 n u m s nums nums 中第 q u e r i e s [ i ] queries[i] q…...
xadmin后台首页增加一个导入数据按钮
xadmin后台首页增加一个导入数据按钮 效果 流程 1、在添加小组件中添加一个html页面 2、写入html代码 3、在urls.py添加导入数据路由 4、在views.py中添加响应函数html代码 <!DOCTYPE html> <html lang...
行为树详解(5)——事件驱动
【分析】 如果行为树的节点很多,那么会存在要经过很多节点才会走到动作节点的情况。显然,性能上不如状态机。 每帧都需要重新遍历一系列节点才会走到动作节点,而实际上很多条件节点在数帧内不会有变化,这是造成性能问题的重要原…...
3.若依前端项目拉取、部署、访问
因为默认RuoYi-Vue是使用的Vue2,所以需要另外去下载vue3来部署。 拉取代码 git clone https://gitee.com/ys-gitee/RuoYi-Vue3.git 安装node才能执行npm相关的命令 执行命令npm install 如果npm install比较慢的话,需要添加上国内镜像 npm install --registrhttp…...
Debian操作系统相对于Ubuntu有什么优势吗?
更高的稳定性:Debian 以其出色的稳定性闻名,得益于严格的软件包测试和发布流程。其稳定版经过长时间测试与验证,确保了系统的高度稳定,更适合对稳定性要求极高的长期运行服务器环境。而 Ubuntu 虽有稳定版本,但更新周期…...
【漏洞复现】CVE-2015-3337 Arbitrary File Reading
漏洞信息 NVD - CVE-2015-3337 Directory traversal vulnerability in Elasticsearch before 1.4.5 and 1.5.x before 1.5.2, when a site plugin is enabled, allows remote attackers to read arbitrary files via unspecified vectors. 在安装了具有“site”功能的插件以…...
win10、win11-鼠标右键还原、暂停更新
系统优化 win 10jihuo win 11jihuo鼠标右键还原暂停更新 update 2024.12.28win 10 jihuo winx,打开powershell管理员,输入以下命令,选择1并等待 irm https://get.activated.win | iex参考:https://www.bilibili.com/video/BV1TN411M72J/?sp…...
FFmpeg来从HTTP拉取流并实时推流到RTMP服务器
当使用FFmpeg来从HTTP拉取流并实时推流到RTMP服务器时,你可以使用以下命令: ffmpeg -i http://输入流地址 -c:v copy -c:a copy -f flv rtmp://RTMP服务器地址/应用名称/流名称 这是一个基本的命令示例,其中: - -i http://输入流地…...
Quo Vadis, Anomaly Detection? LLMs and VLMs in the Spotlight 论文阅读
文章信息: 原文链接:https://arxiv.org/abs/2412.18298 Abstract 视频异常检测(VAD)通过整合大语言模型(LLMs)和视觉语言模型(VLMs)取得了显著进展,解决了动态开放世界…...
Rust : tokio中select!
关于tokio的select宏,有不少的用途。包括超时和竞态选择等。 关于select宏需要关注,相关的异步条件,会同时执行,只是当有一个最早完成时,会执行“抛弃”和“对应”策略。 说明:对本文以下素材的来源表示感…...
【hackmyvm】hacked靶机wp
tags: HMVrootkitDiamorphine Type: wp 1. 基本信息^toc 文章目录 1. 基本信息^toc2. 信息收集2.1. 端口扫描2.2. 目录扫描2.3. 获取参数 3. 提权 靶机链接 https://hackmyvm.eu/machines/machine.php?vmHacked 作者 sml 难度 ⭐️⭐️⭐️⭐️️ 2. 信息收集 2.1. 端口扫描…...
MaixBit k210学习记录
开发背景:Window系统主机,在主机上安装了虚拟机(VirtualBoxUbuntu23.04) 目标实现:在虚拟机(Ubuntu)中,实现对Maix bit(k210)开发板的开发 虚拟机的安装参考…...
Wordperss漏洞 DeDeCMS漏洞
Wordperss漏洞 环境搭建 #执⾏命令 cd /vulhub/wordpress/pwnscriptum docker-compose up -d #靶场地址 http://8.155.7.173:8080/wp-admin/ 注册账号 登录 漏洞一:后台修改模板拿WebShell 步骤一:思路是修改其WP的模板写入⼀句话木马后门并访问其文件…...
如何构建有效的AI Agents:从复杂到简约——深度解读Claude实践总结《Building effective agents》(上)
在人工智能技术日新月异的今天,大语言模型(LLM)已经成为技术创新的热点。 然而,在追逐技术前沿的热潮中,我们是否忽视了工程设计的本质? 作为全球人工智能领域的领军企业之一,Anthropic以其在AI安全和伦理方面的深入…...
git status 耗时
某个git库每次status一下就是半小时起步,gc后还是没有效果,后来排查记录发现某笔记录提交几百G的冗余文件,虽然revert了,但是还是存在库中,遂如下清理: # 查找大文件 git verify-pack -v .git/objects/pac…...
C++进阶重点知识(一)|智能指针|右值|lambda|STL|正则表达式
目录 1智能指针1.shared_ptr1.1 shared_ptr的基本用法使用shared_ptr要注意的问题运用 2.unique_ptr独占的智能指针示例:管理动态内存 3.weak_ptr弱引用的智能指针weak_ptr的基本用法lock 的作用:weak_ptr返回this指针weak_ptr解决循环引用问题weak_ptr使…...
51单片机+DAC0832信号发生器实战:从硬件搭建到波形调试全记录(附避坑指南)
51单片机DAC0832信号发生器实战:从硬件搭建到波形调试全记录(附避坑指南) 在电子设计领域,信号发生器是工程师和爱好者不可或缺的工具。传统商用设备虽然功能强大,但对于学习嵌入式系统和数模转换原理而言,…...
LiuJuan Z-Image效果对比展示:BF16 vs FP16在人像细节与稳定性上的差异
1. 1. 1. 1. 1. 1. 1. 1. 1. 概述 1. 1. 1. 概述 1. 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 1. 概述 1. 概述 1. 概述 1. 概述 1. 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1. 概述 1…...
嵌入式WebSocket客户端:零malloc、状态机驱动的轻量级实现
1. WebSocketClient 库深度解析:面向嵌入式系统的轻量级 WebSocket 客户端实现WebSocket 协议(RFC 6455)作为全双工通信的工业级标准,在嵌入式边缘设备与云平台、Web 控制台、MQTT 网关桥接等场景中已成刚需。然而,主流…...
Qwen3-TTS-1.7B惊艳案例:带背景音乐的语音合成抗干扰能力实测
Qwen3-TTS-1.7B惊艳案例:带背景音乐的语音合成抗干扰能力实测 1. 引言:当AI语音合成遇上背景音乐 想象一下这个场景:你正在制作一个短视频,需要给画面配上解说。你找到了一段完美的背景音乐,但当你尝试录制旁白时&am…...
保姆级教程:用MQTT.fx客户端连接电信AEP物联网平台,实现设备数据上报与远程控制
从零到一:用MQTT.fx玩转电信AEP物联网平台全流程实战 在物联网开发领域,电信AEP平台作为国内主流物联网云服务平台之一,为开发者提供了从设备接入到数据管理的完整解决方案。而MQTT.fx作为轻量级MQTT客户端工具,因其简洁直观的界面…...
汽车UDS刷写避坑指南:从S32K144 Bootloader的链接文件到安全访问,这些细节你注意了吗?
汽车UDS刷写实战避坑手册:S32K144 Bootloader开发中的七个致命细节 当你在凌晨三点的实验室里盯着CANoe窗口不断跳出的NRC 31(requestOutOfRange)错误码时,会不会突然怀念用J-Link直接烧录的简单日子?UDS刷写就像汽车电…...
国产AI 调用量反超美国,22个免费大模型API集结,DMXAPI 成开发者首选
据 OpenRouter 最新数据,2026 年 3 月中国 AI 大模型周调用量达 4.69 万亿 Token,连续两周超越美国,全球调用量前三席位被小米 MiMo-V2-Pro、阶跃星辰 Step 3.5 Flash、MiniMax M2.5 包揽,国产模型凭性能与性价比获全球开发者认可…...
别再为‘file must be a file‘报错头疼了!手把手教你用Apifox搞定Dify文件上传接口
深度解析Dify文件上传接口:从报错排查到Apifox高效调试实战 当你正在为Dify AI应用集成文件上传功能时,是否曾在Apifox中反复遭遇file must be a file的报错而束手无策?这种看似简单的接口调试背后,隐藏着文件传输机制、参数组合…...
数据清洗提速37倍的秘密:Polars 2.0中arrow2内核的零拷贝cast、predicate pushdown与pl.scan_parquet深度调优
第一章:Polars 2.0 大规模数据清洗技巧 面试题汇总Polars 2.0 引入了更严格的惰性执行模型、增强的字符串处理 API 以及对空值语义的统一规范,使其在面试中成为高频考察对象。高频考点聚焦于内存效率、链式操作健壮性及跨类型转换的边界处理。高效处理缺…...
深度解析CloverBootloader内存管理:AptioMemoryFix原理与实现详解
深度解析CloverBootloader内存管理:AptioMemoryFix原理与实现详解 【免费下载链接】CloverBootloader Bootloader for macOS, Windows and Linux in UEFI and in legacy mode 项目地址: https://gitcode.com/gh_mirrors/cl/CloverBootloader CloverBootloade…...
