RabbitMQ如何保证消息不丢失
RabbitMQ消息丢失的三种情况

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。
第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。
第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。
RabbitMQ消息丢失解决方案

针对生产者
1、开启RabbitMQ事务
可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
// 开启事务
channel.txSelect();
try { // 这里发送消息
} catch (Exception e) { channel.txRollback();
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit();
缺点:
RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。
2、使用confirm机制
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的
在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。
//开启confirm
channel.confirm();
//发送成功回调
public void ack(String messageId){
}
// 发送失败回调
public void nack(String messageId){ //重发该消息
}
针对RabbitMQ
RabbitMQ 自己弄丢了数据,这个必须要开启 RabbitMQ消息的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。
解决办法:
设置持久化有两个关注点:
第一个是创建 queue 的时候将其设置为持久化
第二个是发送消息的时候将消息设置为持久化
第一种:元配置持久化
在创建交换机和队列的时候,将其设置为持久化,这样就算重启RabbitMQ或者突然断电,元数据信息也会从磁盘中进行读取,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。
// 1. 创建持久化交换器 如果不存在自动创建
channel.exchangeDeclare(rabbitConfigDTO.getExchange(), BuiltinExchangeType.TOPIC, true);
// 2. 创建持久化队列 如果不存在自动创建
channel.queueDeclare(rabbitConfigDTO.getQueue(), true, false, false, null);
channel.queueBind(rabbitConfigDTO.getQueue(), rabbitConfigDTO.getExchange(), rabbitConfigDTO.getRoutingkey());
第二种:消息持久化
发送消息的时候将消息的deliveryMode设置为2,或者是
MessageProperties.PERSISTENT_TEXT_PLAIN 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
下面是采用springboot整合的rabbitTemplate来实现消息发送
MessageProperties props = new MessageProperties();props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置为持久化模式Message message = new Message(payload, props);rabbitTemplate.send(exchange, routingKey, message);// 或者直接传递Message对象,其中包含已设置持久化的MessagePropertiesrabbitTemplate.convertAndSend(exchange, routingKey, new Message(messageBody, props));// 或者使用MessagePostProcessor接口动态设置消息属性rabbitTemplate.convertAndSend(exchange,routingKey,messageBody,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});
下面是采用SpringBoot中通过new来创建连接来实现消息发送
Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();if (Objects.nonNull(connection)) {log.warn("共" + RABBIT_CONNECTION_URL_LIST.size() + "个连接!");Channel channel = connection.createChannel();//消息持久化到磁盘,避免因为突然断电或重启导致消息丢失channel.basicPublish(rabbitConnectionUrlDTO.getExchange(), rabbitConnectionUrlDTO.getRoutingkey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSONObject.toJSONString(map).getBytes(StandardCharsets.UTF_8));log.warn("连接RabbitMQ成功!" + "IP地址为" + rabbitConnectionUrlDTO.getIp() + "端口号为" + rabbitConnectionUrlDTO.getPort() + "交换器为" + rabbitConnectionUrlDTO.getExchange() + "队列名为" + rabbitConnectionUrlDTO.getQueue() + "路由键为" + rabbitConnectionUrlDTO.getRoutingkey());
}
消费者确认机制
如果上述生产端、消息队列都正确投递,那么问题出现在消费端是否可以正确消费?

消费者在成功处理了一条消息后通知RabbitMQ,这样RabbitMQ在收到确认后才会移除队列中的消息。
默认情况下,以下3种原因导致消息丢失:
1、 网络故障:消费端还没接收到消息之前,发生网络故障导致消息丢失;
2、 未接收消息前服务宕机:消费端突然挂机未接收到消息,此时消息会丢失;
3、 处理过程中服务宕机:消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。
这是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处理完,就立即删除这条消息,导致消息丢失。

应对方案:
-
将自动ack机制改为手动ack机制。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {//接收消息,业务处理//设置手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {//发生异常时,可以选择重新发送消息或进行错误处理// 例如,可以选择负确认(nack),让消息重回队列// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}
};
//设置autoAck为false,表示关闭自动确认机制,改为手动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
消息补偿机制
以上3种解决办法理论上可靠,但是系统的异常或者故障比较偶然,我们没法做到100%消息不丢失。因此需要介入补偿机制或者人工干预。这是我们的最后一道防线。
如何做消息补偿呢?其实就是将消息入库,通过定时任务重新发送失败的消息。详细流程如下:

-
生产端发送消息;
-
确认失败,将消息保存到数据库中,并设置初始状态0;
-
定时任务以一定频率扫描数据库中status=0 的消息(失败消息);
-
重发消息,可多次;
-
重发成功,更新数据库:status=1;
-
超过固定次数重发仍然失败,人工干预。
标注:
超过最大失败次数后,对于无法被正常消费的消息可移入死信队列。
-
可人工干预手动排查
-
也可自动重试,需要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。这里涉及到消息去重、幂等性处理等。
相关文章:
RabbitMQ如何保证消息不丢失
RabbitMQ消息丢失的三种情况 第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。 第三种…...
(亲测有效)SpringBoot项目集成腾讯云COS对象存储(1)
目录 一、腾讯云对象存储使用 1、创建Bucket 2、使用web控制台上传和浏览文件 3、创建API秘钥 二、代码对接腾讯云COS(以Java为例) 1、初始化客户端 2、填写配置文件 3、通用能力类 文件上传 测试 一、腾讯云对象存储使用 1、创建Bucket &am…...
无人机之故障排除篇
一、识别故障 掌握基本的无人机系统知识,遵循“先易后难、先外后内、先软件后硬件”的原则进行故障识别。一旦发现故障,立即停止飞行,避免进一步损坏。 二、机械部件维修 对于机身裂痕、螺旋桨损坏等情况,根据损坏程度更换相应部…...
深入理解Python常见数据类型处理
目录 概述数字类型 整数(int)浮点数(float)复数(complex) 字符串(str) 字符串基本操作字符串方法 列表(list) 列表基本操作列表方法列表推导式 元组…...
最佳实践:CI/CD交付模式下的运维展望丨IDCF
李洪锋 启迪万众数字技术(广州)有限公司 ,产品研发中心-系统运维部、研发效能(DevOps)工程师(中级)课程学员 一、DevOps现状 据云计算产业联盟《中国DevOps现状调查报告2023》显示,国内DevOps 落地成熟度…...
Flat Ads:开发者如何应对全球手游市场的洗牌与转型
2023年下半年至2024年上半年,中国手游的海外市场表现经历了显著变化,开发者要如何应对全球手游市场的洗牌与转型?本篇文章我们将结合相关行业白皮书的最新数据对中国手游出海表现进行分析与洞察。 一、中国手游海外市场表现 根据Sensor Tower《2024年海外手游市场洞察》最新…...
ai取名软件上哪找?一文揭秘5大ai取名生成器
在这个世界上,每一个新生命的到来都是一份奇迹,无论是一个新生儿的第一声啼哭,还是一只宠物的第一次摇尾巴,都充满了无限的希望和喜悦。 然而,给这个小生命起一个响亮、独特且富有意义的名字,往往让人煞费…...
ppt转换成pdf文件,这5个方法一键搞定!小白也能上手~
不管是工作上还是学习上,我们都会遇到转换文档格式的问题。比如常见的pdf转word,ppt转pdf,图片转pdf等。 很多软件都有自带的转换功能可以实现,但是需要保证转换后不乱码,且清晰度足够的方法还是少见的。本文整理了几个…...
中国每个软件创业者都是这个时代的“黑悟空”
作者 | 白鲸开源CEO 郭炜 我作为一个具有30游龄而20年都不碰游戏的游戏玩家,最近为了《黑神话:悟空》(简称,黑悟空),不但花重金更新了显卡,还第一次下载了Steam并绑定了支付,为的就是支持这个第…...
解决Qt多线程中fromRawData函数生成的QByteArray数据不一致问题
解决Qt多线程中fromRawData函数生成的QByteArray数据不一致问题 目录 🔔 问题背景📄 问题代码❓ 问题描述🩺 问题分析✔ 解决方案 🔔 问题背景 在开发一个使用Qt框架的多线程应用程序时,我们遇到了一个棘手的问题&…...
datax关于postsql数据增量迁移的问题
看官方文档是不支持的 数据源及同步方案_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心 (aliyun.com) 看了下源码有个postsqlwriter 看了下也就拼接sql 将 PostgresqlWriter中的不允许更新先注释了 让他过去先 然后看到 WriterUtil中的对应方法 getWriteTemplat…...
【Go】实现字符切片零拷贝开销转为字符串
package mainimport ("fmt""unsafe" )func main() {bytes : []byte("hello world")s : *(*string)(unsafe.Pointer(&bytes))fmt.Println(s)bytes[0] Hfmt.Println(s) }slice的底层结构是底层数组、len字段、cap字段。string的底层结构是底层…...
[sqlserver][sql]sqlserver查询执行过的历史sql
SQL是一个针对SQL Server数据库的查询执行过的历史 select * from (SELECT *FROM sys.dm_exec_query_stats QS CROSS APPLY sys.dm_exec_sql_text(QS.sql_handle) ST ) a where a.creation_time >2018-07-18 17:00:00 and charindex(delete from ckcdlist ,text)>0 an…...
python中n次方怎么表示
Python中的n次方用pow()方法来表示,pow()方法返回 xy(x的y次方)的值。 语法 以下是 math 模块 pow() 方法的语法: import math math.pow( x, y ) 内置的 pow() 方法 pow(x, y[, z]) 函数是计算x的y次方,如果z在存在&…...
Java数组怎么转List,Stream的基本方法使用教程
Stream流 Java 的 Stream 流操作是一种简洁而强大的处理集合数据的方式,允许对数据进行高效的操作,如过滤、映射、排序和聚合。Stream API 于 Java 8 引入,极大地简化了对集合(如 List、Set)等数据的处理。 一、创建 Stream 从集合创建: List<String> list = Ar…...
2024-07-12 - 基于 sealos 部署高可用 K8S 管理系统
摘要 Sealos 是一款以 Kubernetes 为内核的云操作系统发行版。它以云原生的方式,抛弃了传统的云计算架构,转向以 Kubernetes 为云内核的新架构,使企业能够像使用个人电脑一样简单地使用云。 操作实践 1、服务器规划 kubernetes集群大体上…...
Ps:首选项 - 单位与标尺
Ps菜单:编辑/首选项 Edit/Preferences 快捷键:Ctrl K Photoshop 首选项中的“单位与标尺” Units & Rulers选项卡允许用户根据工作需求定制 Photoshop 的测量单位和标尺显示方式。这对于保持工作的一致性和精确性,尤其是在跨设备或跨平台…...
DiskDigger(文件恢复工具) v2.0.3 中文授权版
DiskDigger中文版是一款实用文件恢复工具,它能从任何媒介中恢复误删除的文件。支持硬盘、USB 闪存盘、闪存卡(SD/CF/MMS)、CD、DVD 和软盘等。支持恢复任何格式的文件。DiskDigger 能彻底的扫描每个扇区来跟踪文件,从而最大限度恢复挽回文件信息。 软件…...
C/C++逆向:x96dbg(x64dbg/x86dbg)的使用
这篇文章主要来说一下x96dbg(x64/x86)的基本使用,这里还是使用上篇文章中的简单程序用来作为本篇文章的实例,因为上篇文章再生成程序时选用的解决方案平台为x86所以生成的程序则需要我们使用x32dbg来进行分析。 这边与IDA一样,我们可以将程序…...
超声波清洗机是智商税吗?专业博主分享四大必买超声波清洗机款式
有些人觉得超声波清洗机不过是个“智商税”,花几百块买个小盒子不值当,毕竟自己用手也能清洗。但这种看法过于片面。事实上,超声波清洗已经有几十年的历史,随着科技的发展,现代超声波清洗机不仅能够批量、自动清洁&…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践
6月5日,2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席,并作《智能体在安全领域的应用实践》主题演讲,分享了在智能体在安全领域的突破性实践。他指出,百度通过将安全能力…...
