当前位置: 首页 > news >正文

RabbitMQ如何保证消息不丢失

RabbitMQ消息丢失的三种情况

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。

第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。

RabbitMQ消息丢失解决方案 

针对生产者 

1、开启RabbitMQ事务

可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

// 开启事务  
channel.txSelect();  
try {  // 这里发送消息  
} catch (Exception e) {  channel.txRollback(); 
// 这里再次重发这条消息
}
// 提交事务  
channel.txCommit();  

缺点:

RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。

2、使用confirm机制

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的

在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

//开启confirm  
channel.confirm();  
//发送成功回调  
public void ack(String messageId){
}
// 发送失败回调  
public void nack(String messageId){  //重发该消息  
}

针对RabbitMQ

RabbitMQ 自己弄丢了数据,这个必须要开启 RabbitMQ消息的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。

解决办法:

设置持久化有两个关注点:

第一个是创建 queue 的时候将其设置为持久化

第二个是发送消息的时候将消息设置为持久化

第一种:元配置持久化

在创建交换机和队列的时候,将其设置为持久化,这样就算重启RabbitMQ或者突然断电,元数据信息也会从磁盘中进行读取,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。

// 1. 创建持久化交换器 如果不存在自动创建
channel.exchangeDeclare(rabbitConfigDTO.getExchange(), BuiltinExchangeType.TOPIC, true);
// 2. 创建持久化队列 如果不存在自动创建
channel.queueDeclare(rabbitConfigDTO.getQueue(), true, false, false, null);
channel.queueBind(rabbitConfigDTO.getQueue(), rabbitConfigDTO.getExchange(), rabbitConfigDTO.getRoutingkey());

第二种:消息持久化

发送消息的时候将消息的deliveryMode设置为2,或者是

MessageProperties.PERSISTENT_TEXT_PLAIN 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

下面是采用springboot整合的rabbitTemplate来实现消息发送 

MessageProperties props = new MessageProperties();props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置为持久化模式Message message = new Message(payload, props);rabbitTemplate.send(exchange, routingKey, message);// 或者直接传递Message对象,其中包含已设置持久化的MessagePropertiesrabbitTemplate.convertAndSend(exchange, routingKey, new Message(messageBody, props));// 或者使用MessagePostProcessor接口动态设置消息属性rabbitTemplate.convertAndSend(exchange,routingKey,messageBody,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});

下面是采用SpringBoot中通过new来创建连接来实现消息发送

 Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();if (Objects.nonNull(connection)) {log.warn("共" + RABBIT_CONNECTION_URL_LIST.size() + "个连接!");Channel channel = connection.createChannel();//消息持久化到磁盘,避免因为突然断电或重启导致消息丢失channel.basicPublish(rabbitConnectionUrlDTO.getExchange(), rabbitConnectionUrlDTO.getRoutingkey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSONObject.toJSONString(map).getBytes(StandardCharsets.UTF_8));log.warn("连接RabbitMQ成功!" + "IP地址为" + rabbitConnectionUrlDTO.getIp() + "端口号为" + rabbitConnectionUrlDTO.getPort() + "交换器为" + rabbitConnectionUrlDTO.getExchange() + "队列名为" + rabbitConnectionUrlDTO.getQueue() + "路由键为" + rabbitConnectionUrlDTO.getRoutingkey());
}

 消费者确认机制

如果上述生产端消息队列都正确投递,那么问题出现在消费端是否可以正确消费?

消费者在成功处理了一条消息后通知RabbitMQ,这样RabbitMQ在收到确认后才会移除队列中的消息。

默认情况下,以下3种原因导致消息丢失:

1、 网络故障:消费端还没接收到消息之前,发生网络故障导致消息丢失;

2、 未接收消息前服务宕机:消费端突然挂机未接收到消息,此时消息会丢失;

3、 处理过程中服务宕机:消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

这是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处理完,就立即删除这条消息,导致消息丢失。

应对方案:

  • 将自动ack机制改为手动ack机制。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {//接收消息,业务处理//设置手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {//发生异常时,可以选择重新发送消息或进行错误处理// 例如,可以选择负确认(nack),让消息重回队列// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}
};
//设置autoAck为false,表示关闭自动确认机制,改为手动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});

消息补偿机制 

以上3种解决办法理论上可靠,但是系统的异常或者故障比较偶然,我们没法做到100%消息不丢失。因此需要介入补偿机制或者人工干预。这是我们的最后一道防线。

如何做消息补偿呢?其实就是将消息入库,通过定时任务重新发送失败的消息。详细流程如下:

  • 生产端发送消息;

  • 确认失败,将消息保存到数据库中,并设置初始状态0;

  • 定时任务以一定频率扫描数据库中status=0 的消息(失败消息);

  • 重发消息,可多次;

  • 重发成功,更新数据库:status=1;

  • 超过固定次数重发仍然失败,人工干预。

标注:

超过最大失败次数后,对于无法被正常消费的消息可移入死信队列

  • 可人工干预手动排查

  • 也可自动重试,需要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。这里涉及到消息去重、幂等性处理等。

相关文章:

RabbitMQ如何保证消息不丢失

RabbitMQ消息丢失的三种情况 第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。 第三种…...

(亲测有效)SpringBoot项目集成腾讯云COS对象存储(1)

目录 一、腾讯云对象存储使用 1、创建Bucket 2、使用web控制台上传和浏览文件 3、创建API秘钥 二、代码对接腾讯云COS(以Java为例) 1、初始化客户端 2、填写配置文件 3、通用能力类 文件上传 测试 一、腾讯云对象存储使用 1、创建Bucket &am…...

无人机之故障排除篇

一、识别故障 掌握基本的无人机系统知识,遵循“先易后难、先外后内、先软件后硬件”的原则进行故障识别。一旦发现故障,立即停止飞行,避免进一步损坏。 二、机械部件维修 对于机身裂痕、螺旋桨损坏等情况,根据损坏程度更换相应部…...

深入理解Python常见数据类型处理

目录 概述数字类型 整数(int)浮点数(float)复数(complex) 字符串(str) 字符串基本操作字符串方法 列表(list) 列表基本操作列表方法列表推导式 元组&#xf…...

最佳实践:CI/CD交付模式下的运维展望丨IDCF

李洪锋 启迪万众数字技术(广州)有限公司 ,产品研发中心-系统运维部、研发效能(DevOps)工程师(中级)课程学员 一、DevOps现状 据云计算产业联盟《中国DevOps现状调查报告2023》显示,国内DevOps 落地成熟度…...

Flat Ads:开发者如何应对全球手游市场的洗牌与转型

2023年下半年至2024年上半年,中国手游的海外市场表现经历了显著变化,开发者要如何应对全球手游市场的洗牌与转型?本篇文章我们将结合相关行业白皮书的最新数据对中国手游出海表现进行分析与洞察。 一、中国手游海外市场表现 根据Sensor Tower《2024年海外手游市场洞察》最新…...

ai取名软件上哪找?一文揭秘5大ai取名生成器

在这个世界上,每一个新生命的到来都是一份奇迹,无论是一个新生儿的第一声啼哭,还是一只宠物的第一次摇尾巴,都充满了无限的希望和喜悦。 然而,给这个小生命起一个响亮、独特且富有意义的名字,往往让人煞费…...

ppt转换成pdf文件,这5个方法一键搞定!小白也能上手~

不管是工作上还是学习上,我们都会遇到转换文档格式的问题。比如常见的pdf转word,ppt转pdf,图片转pdf等。 很多软件都有自带的转换功能可以实现,但是需要保证转换后不乱码,且清晰度足够的方法还是少见的。本文整理了几个…...

中国每个软件创业者都是这个时代的“黑悟空”

作者 | 白鲸开源CEO 郭炜 我作为一个具有30游龄而20年都不碰游戏的游戏玩家,最近为了《黑神话:悟空》(简称,黑悟空),不但花重金更新了显卡,还第一次下载了Steam并绑定了支付,为的就是支持这个第…...

解决Qt多线程中fromRawData函数生成的QByteArray数据不一致问题

解决Qt多线程中fromRawData函数生成的QByteArray数据不一致问题 目录 🔔 问题背景📄 问题代码❓ 问题描述🩺 问题分析✔ 解决方案 🔔 问题背景 在开发一个使用Qt框架的多线程应用程序时,我们遇到了一个棘手的问题&…...

datax关于postsql数据增量迁移的问题

看官方文档是不支持的 数据源及同步方案_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心 (aliyun.com) 看了下源码有个postsqlwriter 看了下也就拼接sql 将 PostgresqlWriter中的不允许更新先注释了 让他过去先 然后看到 WriterUtil中的对应方法 getWriteTemplat…...

【Go】实现字符切片零拷贝开销转为字符串

package mainimport ("fmt""unsafe" )func main() {bytes : []byte("hello world")s : *(*string)(unsafe.Pointer(&bytes))fmt.Println(s)bytes[0] Hfmt.Println(s) }slice的底层结构是底层数组、len字段、cap字段。string的底层结构是底层…...

[sqlserver][sql]sqlserver查询执行过的历史sql

SQL是一个针对SQL Server数据库的查询执行过的历史 select * from (SELECT *FROM sys.dm_exec_query_stats QS CROSS APPLY sys.dm_exec_sql_text(QS.sql_handle) ST ) a where a.creation_time >2018-07-18 17:00:00 and charindex(delete from ckcdlist ,text)>0 an…...

python中n次方怎么表示

Python中的n次方用pow()方法来表示,pow()方法返回 xy(x的y次方)的值。 语法 以下是 math 模块 pow() 方法的语法: import math math.pow( x, y ) 内置的 pow() 方法 pow(x, y[, z]) 函数是计算x的y次方,如果z在存在&…...

Java数组怎么转List,Stream的基本方法使用教程

Stream流 Java 的 Stream 流操作是一种简洁而强大的处理集合数据的方式,允许对数据进行高效的操作,如过滤、映射、排序和聚合。Stream API 于 Java 8 引入,极大地简化了对集合(如 List、Set)等数据的处理。 一、创建 Stream 从集合创建: List<String> list = Ar…...

2024-07-12 - 基于 sealos 部署高可用 K8S 管理系统

摘要 Sealos 是一款以 Kubernetes 为内核的云操作系统发行版。它以云原生的方式&#xff0c;抛弃了传统的云计算架构&#xff0c;转向以 Kubernetes 为云内核的新架构&#xff0c;使企业能够像使用个人电脑一样简单地使用云。 操作实践 1、服务器规划 kubernetes集群大体上…...

Ps:首选项 - 单位与标尺

Ps菜单&#xff1a;编辑/首选项 Edit/Preferences 快捷键&#xff1a;Ctrl K Photoshop 首选项中的“单位与标尺” Units & Rulers选项卡允许用户根据工作需求定制 Photoshop 的测量单位和标尺显示方式。这对于保持工作的一致性和精确性&#xff0c;尤其是在跨设备或跨平台…...

DiskDigger(文件恢复工具) v2.0.3 中文授权版

DiskDigger中文版是一款实用文件恢复工具&#xff0c;它能从任何媒介中恢复误删除的文件。支持硬盘、USB 闪存盘、闪存卡(SD/CF/MMS)、CD、DVD 和软盘等。支持恢复任何格式的文件。DiskDigger 能彻底的扫描每个扇区来跟踪文件&#xff0c;从而最大限度恢复挽回文件信息。 软件…...

C/C++逆向:x96dbg(x64dbg/x86dbg)的使用

这篇文章主要来说一下x96dbg(x64/x86)的基本使用&#xff0c;这里还是使用上篇文章中的简单程序用来作为本篇文章的实例&#xff0c;因为上篇文章再生成程序时选用的解决方案平台为x86所以生成的程序则需要我们使用x32dbg来进行分析。 这边与IDA一样&#xff0c;我们可以将程序…...

超声波清洗机是智商税吗?专业博主分享四大必买超声波清洗机款式

有些人觉得超声波清洗机不过是个“智商税”&#xff0c;花几百块买个小盒子不值当&#xff0c;毕竟自己用手也能清洗。但这种看法过于片面。事实上&#xff0c;超声波清洗已经有几十年的历史&#xff0c;随着科技的发展&#xff0c;现代超声波清洗机不仅能够批量、自动清洁&…...

[实战] 2026年CNC加工质量控制:从工程图纸数字化到检验计划生成的全流程解析

在 2026 年的精密制造环境中&#xff0c;cnc 加工&#xff08;CNC machining&#xff09;已不再仅仅是切削工艺的竞争&#xff0c;更是数字化协作与质量控制能力的较量。随着多品种、小批量生产模式成为主流&#xff0c;如何快速解析复杂的工程图纸并制定高精度的检验计划&…...

MAA助手:解放双手的明日方舟全自动游戏管理工具实战指南

MAA助手&#xff1a;解放双手的明日方舟全自动游戏管理工具实战指南 【免费下载链接】MaaAssistantArknights 《明日方舟》小助手&#xff0c;全日常一键长草&#xff01;| A one-click tool for the daily tasks of Arknights, supporting all clients. 项目地址: https://g…...

基于DocFX与CI/CD构建.NET私有NuGet包文档一体化管理方案

1. 项目概述与核心价值最近在整理团队内部的.NET技术资产时&#xff0c;我重新审视了一个看似简单但极其重要的仓库&#xff1a;abellobm3681/nuget-docs。这名字乍一看&#xff0c;可能很多人会以为又是一个NuGet官方文档的镜像或者翻译项目。但如果你深入进去&#xff0c;会发…...

RPFM:重新定义全面战争MOD开发的工作流革命

RPFM&#xff1a;重新定义全面战争MOD开发的工作流革命 【免费下载链接】rpfm Rusted PackFile Manager (RPFM) is a... reimplementation in Rust and Qt6 of PackFile Manager (PFM), one of the best modding tools for Total War Games. 项目地址: https://gitcode.com/g…...

ncmdump终极解决方案:解锁网易云音乐NCM格式的完整指南

ncmdump终极解决方案&#xff1a;解锁网易云音乐NCM格式的完整指南 【免费下载链接】ncmdump 项目地址: https://gitcode.com/gh_mirrors/ncmd/ncmdump 还在为网易云音乐下载的NCM加密文件无法在其他设备播放而烦恼吗&#xff1f;ncmdump工具使用为你提供了完美的NCM格…...

PangoDesign Suite与Modelsim协同仿真:从库编译到实战排错全解析

1. 为什么需要PangoDesign Suite与Modelsim协同仿真 第一次接触FPGA仿真时&#xff0c;我也被各种专业术语绕晕了。直到某次项目出现时序问题&#xff0c;才发现仿真工具就像汽车的"安全气囊"——平时感觉不到存在&#xff0c;关键时刻能救命。PangoDesign Suite&…...

给操作系统爱好者的RISC-V中断实战指南:从SiFive Unleashed开发板到Xv6内核代码

RISC-V中断机制深度解析&#xff1a;从硬件触发到Xv6内核实战 1. RISC-V中断体系架构全景 RISC-V中断系统采用分层设计理念&#xff0c;硬件与软件协同构成了完整的异常处理框架。作为开源指令集架构&#xff0c;RISC-V的中断设计既保持了精简性&#xff0c;又通过可扩展机制满…...

Ardb多存储引擎深度解析:RocksDB、LevelDB、LMDB、WiredTiger、PerconaFT、ForestDB全方位对比

Ardb多存储引擎深度解析&#xff1a;RocksDB、LevelDB、LMDB、WiredTiger、PerconaFT、ForestDB全方位对比 【免费下载链接】ardb A redis protocol compatible nosql, it support multiple storage engines as backend like Googles LevelDB, Facebooks RocksDB, OpenLDAPs LM…...

解放双手!绝区零智能自动化工具让你的游戏体验翻倍升级

解放双手&#xff01;绝区零智能自动化工具让你的游戏体验翻倍升级 【免费下载链接】ZenlessZoneZero-OneDragon 绝区零 一条龙 | 全自动 | 自动闪避 | 自动每日 | 自动空洞 | 支持手柄 项目地址: https://gitcode.com/gh_mirrors/ze/ZenlessZoneZero-OneDragon 还在为《…...

ThinkPad风扇控制终极指南:5分钟告别噪音与过热烦恼

ThinkPad风扇控制终极指南&#xff1a;5分钟告别噪音与过热烦恼 【免费下载链接】TPFanCtrl2 ThinkPad Fan Control 2 (Dual Fan) for Windows 10 and 11 项目地址: https://gitcode.com/gh_mirrors/tp/TPFanCtrl2 你是否曾因ThinkPad风扇的"直升机起飞"声而烦…...