RabbitMQ如何保证消息不丢失
RabbitMQ消息丢失的三种情况
第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。
第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。
第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。
RabbitMQ消息丢失解决方案
针对生产者
1、开启RabbitMQ事务
可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
// 开启事务
channel.txSelect();
try { // 这里发送消息
} catch (Exception e) { channel.txRollback();
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit();
缺点:
RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。
2、使用confirm机制
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的
在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。
//开启confirm
channel.confirm();
//发送成功回调
public void ack(String messageId){
}
// 发送失败回调
public void nack(String messageId){ //重发该消息
}
针对RabbitMQ
RabbitMQ 自己弄丢了数据,这个必须要开启 RabbitMQ消息的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。
解决办法:
设置持久化有两个关注点:
第一个是创建 queue 的时候将其设置为持久化
第二个是发送消息的时候将消息设置为持久化
第一种:元配置持久化
在创建交换机和队列的时候,将其设置为持久化,这样就算重启RabbitMQ或者突然断电,元数据信息也会从磁盘中进行读取,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。
// 1. 创建持久化交换器 如果不存在自动创建
channel.exchangeDeclare(rabbitConfigDTO.getExchange(), BuiltinExchangeType.TOPIC, true);
// 2. 创建持久化队列 如果不存在自动创建
channel.queueDeclare(rabbitConfigDTO.getQueue(), true, false, false, null);
channel.queueBind(rabbitConfigDTO.getQueue(), rabbitConfigDTO.getExchange(), rabbitConfigDTO.getRoutingkey());
第二种:消息持久化
发送消息的时候将消息的deliveryMode设置为2,或者是
MessageProperties.PERSISTENT_TEXT_PLAIN 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
下面是采用springboot整合的rabbitTemplate来实现消息发送
MessageProperties props = new MessageProperties();props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置为持久化模式Message message = new Message(payload, props);rabbitTemplate.send(exchange, routingKey, message);// 或者直接传递Message对象,其中包含已设置持久化的MessagePropertiesrabbitTemplate.convertAndSend(exchange, routingKey, new Message(messageBody, props));// 或者使用MessagePostProcessor接口动态设置消息属性rabbitTemplate.convertAndSend(exchange,routingKey,messageBody,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});
下面是采用SpringBoot中通过new来创建连接来实现消息发送
Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();if (Objects.nonNull(connection)) {log.warn("共" + RABBIT_CONNECTION_URL_LIST.size() + "个连接!");Channel channel = connection.createChannel();//消息持久化到磁盘,避免因为突然断电或重启导致消息丢失channel.basicPublish(rabbitConnectionUrlDTO.getExchange(), rabbitConnectionUrlDTO.getRoutingkey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSONObject.toJSONString(map).getBytes(StandardCharsets.UTF_8));log.warn("连接RabbitMQ成功!" + "IP地址为" + rabbitConnectionUrlDTO.getIp() + "端口号为" + rabbitConnectionUrlDTO.getPort() + "交换器为" + rabbitConnectionUrlDTO.getExchange() + "队列名为" + rabbitConnectionUrlDTO.getQueue() + "路由键为" + rabbitConnectionUrlDTO.getRoutingkey());
}
消费者确认机制
如果上述生产端
、消息队列
都正确投递,那么问题出现在消费端
是否可以正确消费?
消费者在成功处理了一条消息后通知RabbitMQ,这样RabbitMQ在收到确认后才会移除队列中的消息。
默认情况下,以下3种原因导致消息丢失:
1、 网络故障
:消费端还没接收到消息之前,发生网络故障导致消息丢失;
2、 未接收消息前服务宕机
:消费端突然挂机未接收到消息,此时消息会丢失;
3、 处理过程中服务宕机
:消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。
这是因为RabbitMQ的自动ack
机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处理完,就立即删除这条消息,导致消息丢失。
应对方案:
-
将自动ack机制改为手动ack机制。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {//接收消息,业务处理//设置手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {//发生异常时,可以选择重新发送消息或进行错误处理// 例如,可以选择负确认(nack),让消息重回队列// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}
};
//设置autoAck为false,表示关闭自动确认机制,改为手动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
消息补偿机制
以上3种解决办法理论上可靠,但是系统的异常或者故障比较偶然,我们没法做到100%消息不丢失。因此需要介入补偿机制
或者人工干预
。这是我们的最后一道防线。
如何做消息补偿呢?其实就是将消息入库,通过定时任务
重新发送失败的消息。详细流程如下:
-
生产端发送消息;
-
确认失败,将消息保存到数据库中,并设置初始状态0;
-
定时任务以一定频率扫描数据库中status=0 的消息(失败消息);
-
重发消息,可多次;
-
重发成功,更新数据库:status=1;
-
超过固定次数重发仍然失败,人工干预。
标注:
超过最大失败次数后,对于无法被正常消费的消息可移入死信队列
。
-
可人工干预手动排查
-
也可自动重试,需要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。这里涉及到消息去重、幂等性处理等。
相关文章:

RabbitMQ如何保证消息不丢失
RabbitMQ消息丢失的三种情况 第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。 第三种…...

(亲测有效)SpringBoot项目集成腾讯云COS对象存储(1)
目录 一、腾讯云对象存储使用 1、创建Bucket 2、使用web控制台上传和浏览文件 3、创建API秘钥 二、代码对接腾讯云COS(以Java为例) 1、初始化客户端 2、填写配置文件 3、通用能力类 文件上传 测试 一、腾讯云对象存储使用 1、创建Bucket &am…...

无人机之故障排除篇
一、识别故障 掌握基本的无人机系统知识,遵循“先易后难、先外后内、先软件后硬件”的原则进行故障识别。一旦发现故障,立即停止飞行,避免进一步损坏。 二、机械部件维修 对于机身裂痕、螺旋桨损坏等情况,根据损坏程度更换相应部…...
深入理解Python常见数据类型处理
目录 概述数字类型 整数(int)浮点数(float)复数(complex) 字符串(str) 字符串基本操作字符串方法 列表(list) 列表基本操作列表方法列表推导式 元组…...

最佳实践:CI/CD交付模式下的运维展望丨IDCF
李洪锋 启迪万众数字技术(广州)有限公司 ,产品研发中心-系统运维部、研发效能(DevOps)工程师(中级)课程学员 一、DevOps现状 据云计算产业联盟《中国DevOps现状调查报告2023》显示,国内DevOps 落地成熟度…...

Flat Ads:开发者如何应对全球手游市场的洗牌与转型
2023年下半年至2024年上半年,中国手游的海外市场表现经历了显著变化,开发者要如何应对全球手游市场的洗牌与转型?本篇文章我们将结合相关行业白皮书的最新数据对中国手游出海表现进行分析与洞察。 一、中国手游海外市场表现 根据Sensor Tower《2024年海外手游市场洞察》最新…...

ai取名软件上哪找?一文揭秘5大ai取名生成器
在这个世界上,每一个新生命的到来都是一份奇迹,无论是一个新生儿的第一声啼哭,还是一只宠物的第一次摇尾巴,都充满了无限的希望和喜悦。 然而,给这个小生命起一个响亮、独特且富有意义的名字,往往让人煞费…...

ppt转换成pdf文件,这5个方法一键搞定!小白也能上手~
不管是工作上还是学习上,我们都会遇到转换文档格式的问题。比如常见的pdf转word,ppt转pdf,图片转pdf等。 很多软件都有自带的转换功能可以实现,但是需要保证转换后不乱码,且清晰度足够的方法还是少见的。本文整理了几个…...

中国每个软件创业者都是这个时代的“黑悟空”
作者 | 白鲸开源CEO 郭炜 我作为一个具有30游龄而20年都不碰游戏的游戏玩家,最近为了《黑神话:悟空》(简称,黑悟空),不但花重金更新了显卡,还第一次下载了Steam并绑定了支付,为的就是支持这个第…...

解决Qt多线程中fromRawData函数生成的QByteArray数据不一致问题
解决Qt多线程中fromRawData函数生成的QByteArray数据不一致问题 目录 🔔 问题背景📄 问题代码❓ 问题描述🩺 问题分析✔ 解决方案 🔔 问题背景 在开发一个使用Qt框架的多线程应用程序时,我们遇到了一个棘手的问题&…...

datax关于postsql数据增量迁移的问题
看官方文档是不支持的 数据源及同步方案_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心 (aliyun.com) 看了下源码有个postsqlwriter 看了下也就拼接sql 将 PostgresqlWriter中的不允许更新先注释了 让他过去先 然后看到 WriterUtil中的对应方法 getWriteTemplat…...

【Go】实现字符切片零拷贝开销转为字符串
package mainimport ("fmt""unsafe" )func main() {bytes : []byte("hello world")s : *(*string)(unsafe.Pointer(&bytes))fmt.Println(s)bytes[0] Hfmt.Println(s) }slice的底层结构是底层数组、len字段、cap字段。string的底层结构是底层…...
[sqlserver][sql]sqlserver查询执行过的历史sql
SQL是一个针对SQL Server数据库的查询执行过的历史 select * from (SELECT *FROM sys.dm_exec_query_stats QS CROSS APPLY sys.dm_exec_sql_text(QS.sql_handle) ST ) a where a.creation_time >2018-07-18 17:00:00 and charindex(delete from ckcdlist ,text)>0 an…...

python中n次方怎么表示
Python中的n次方用pow()方法来表示,pow()方法返回 xy(x的y次方)的值。 语法 以下是 math 模块 pow() 方法的语法: import math math.pow( x, y ) 内置的 pow() 方法 pow(x, y[, z]) 函数是计算x的y次方,如果z在存在&…...

Java数组怎么转List,Stream的基本方法使用教程
Stream流 Java 的 Stream 流操作是一种简洁而强大的处理集合数据的方式,允许对数据进行高效的操作,如过滤、映射、排序和聚合。Stream API 于 Java 8 引入,极大地简化了对集合(如 List、Set)等数据的处理。 一、创建 Stream 从集合创建: List<String> list = Ar…...
2024-07-12 - 基于 sealos 部署高可用 K8S 管理系统
摘要 Sealos 是一款以 Kubernetes 为内核的云操作系统发行版。它以云原生的方式,抛弃了传统的云计算架构,转向以 Kubernetes 为云内核的新架构,使企业能够像使用个人电脑一样简单地使用云。 操作实践 1、服务器规划 kubernetes集群大体上…...

Ps:首选项 - 单位与标尺
Ps菜单:编辑/首选项 Edit/Preferences 快捷键:Ctrl K Photoshop 首选项中的“单位与标尺” Units & Rulers选项卡允许用户根据工作需求定制 Photoshop 的测量单位和标尺显示方式。这对于保持工作的一致性和精确性,尤其是在跨设备或跨平台…...

DiskDigger(文件恢复工具) v2.0.3 中文授权版
DiskDigger中文版是一款实用文件恢复工具,它能从任何媒介中恢复误删除的文件。支持硬盘、USB 闪存盘、闪存卡(SD/CF/MMS)、CD、DVD 和软盘等。支持恢复任何格式的文件。DiskDigger 能彻底的扫描每个扇区来跟踪文件,从而最大限度恢复挽回文件信息。 软件…...

C/C++逆向:x96dbg(x64dbg/x86dbg)的使用
这篇文章主要来说一下x96dbg(x64/x86)的基本使用,这里还是使用上篇文章中的简单程序用来作为本篇文章的实例,因为上篇文章再生成程序时选用的解决方案平台为x86所以生成的程序则需要我们使用x32dbg来进行分析。 这边与IDA一样,我们可以将程序…...

超声波清洗机是智商税吗?专业博主分享四大必买超声波清洗机款式
有些人觉得超声波清洗机不过是个“智商税”,花几百块买个小盒子不值当,毕竟自己用手也能清洗。但这种看法过于片面。事实上,超声波清洗已经有几十年的历史,随着科技的发展,现代超声波清洗机不仅能够批量、自动清洁&…...

龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...

零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...

【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

C# 表达式和运算符(求值顺序)
求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如,已知表达式3*52,依照子表达式的求值顺序,有两种可能的结果,如图9-3所示。 如果乘法先执行,结果是17。如果5…...

【从零开始学习JVM | 第四篇】类加载器和双亲委派机制(高频面试题)
前言: 双亲委派机制对于面试这块来说非常重要,在实际开发中也是经常遇见需要打破双亲委派的需求,今天我们一起来探索一下什么是双亲委派机制,在此之前我们先介绍一下类的加载器。 目录 编辑 前言: 类加载器 1. …...
安卓基础(Java 和 Gradle 版本)
1. 设置项目的 JDK 版本 方法1:通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分,设置 Gradle JDK 方法2:通过 Settings File → Settings... (或 CtrlAltS)…...