当前位置: 首页 > news >正文

RabbitMQ如何保证消息不丢失

RabbitMQ消息丢失的三种情况

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。

第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。

RabbitMQ消息丢失解决方案 

针对生产者 

1、开启RabbitMQ事务

可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

// 开启事务  
channel.txSelect();  
try {  // 这里发送消息  
} catch (Exception e) {  channel.txRollback(); 
// 这里再次重发这条消息
}
// 提交事务  
channel.txCommit();  

缺点:

RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。

2、使用confirm机制

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的

在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

//开启confirm  
channel.confirm();  
//发送成功回调  
public void ack(String messageId){
}
// 发送失败回调  
public void nack(String messageId){  //重发该消息  
}

针对RabbitMQ

RabbitMQ 自己弄丢了数据,这个必须要开启 RabbitMQ消息的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。

解决办法:

设置持久化有两个关注点:

第一个是创建 queue 的时候将其设置为持久化

第二个是发送消息的时候将消息设置为持久化

第一种:元配置持久化

在创建交换机和队列的时候,将其设置为持久化,这样就算重启RabbitMQ或者突然断电,元数据信息也会从磁盘中进行读取,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。

// 1. 创建持久化交换器 如果不存在自动创建
channel.exchangeDeclare(rabbitConfigDTO.getExchange(), BuiltinExchangeType.TOPIC, true);
// 2. 创建持久化队列 如果不存在自动创建
channel.queueDeclare(rabbitConfigDTO.getQueue(), true, false, false, null);
channel.queueBind(rabbitConfigDTO.getQueue(), rabbitConfigDTO.getExchange(), rabbitConfigDTO.getRoutingkey());

第二种:消息持久化

发送消息的时候将消息的deliveryMode设置为2,或者是

MessageProperties.PERSISTENT_TEXT_PLAIN 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

下面是采用springboot整合的rabbitTemplate来实现消息发送 

MessageProperties props = new MessageProperties();props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置为持久化模式Message message = new Message(payload, props);rabbitTemplate.send(exchange, routingKey, message);// 或者直接传递Message对象,其中包含已设置持久化的MessagePropertiesrabbitTemplate.convertAndSend(exchange, routingKey, new Message(messageBody, props));// 或者使用MessagePostProcessor接口动态设置消息属性rabbitTemplate.convertAndSend(exchange,routingKey,messageBody,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});

下面是采用SpringBoot中通过new来创建连接来实现消息发送

 Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();if (Objects.nonNull(connection)) {log.warn("共" + RABBIT_CONNECTION_URL_LIST.size() + "个连接!");Channel channel = connection.createChannel();//消息持久化到磁盘,避免因为突然断电或重启导致消息丢失channel.basicPublish(rabbitConnectionUrlDTO.getExchange(), rabbitConnectionUrlDTO.getRoutingkey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSONObject.toJSONString(map).getBytes(StandardCharsets.UTF_8));log.warn("连接RabbitMQ成功!" + "IP地址为" + rabbitConnectionUrlDTO.getIp() + "端口号为" + rabbitConnectionUrlDTO.getPort() + "交换器为" + rabbitConnectionUrlDTO.getExchange() + "队列名为" + rabbitConnectionUrlDTO.getQueue() + "路由键为" + rabbitConnectionUrlDTO.getRoutingkey());
}

 消费者确认机制

如果上述生产端消息队列都正确投递,那么问题出现在消费端是否可以正确消费?

消费者在成功处理了一条消息后通知RabbitMQ,这样RabbitMQ在收到确认后才会移除队列中的消息。

默认情况下,以下3种原因导致消息丢失:

1、 网络故障:消费端还没接收到消息之前,发生网络故障导致消息丢失;

2、 未接收消息前服务宕机:消费端突然挂机未接收到消息,此时消息会丢失;

3、 处理过程中服务宕机:消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

这是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处理完,就立即删除这条消息,导致消息丢失。

应对方案:

  • 将自动ack机制改为手动ack机制。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {//接收消息,业务处理//设置手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {//发生异常时,可以选择重新发送消息或进行错误处理// 例如,可以选择负确认(nack),让消息重回队列// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}
};
//设置autoAck为false,表示关闭自动确认机制,改为手动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});

消息补偿机制 

以上3种解决办法理论上可靠,但是系统的异常或者故障比较偶然,我们没法做到100%消息不丢失。因此需要介入补偿机制或者人工干预。这是我们的最后一道防线。

如何做消息补偿呢?其实就是将消息入库,通过定时任务重新发送失败的消息。详细流程如下:

  • 生产端发送消息;

  • 确认失败,将消息保存到数据库中,并设置初始状态0;

  • 定时任务以一定频率扫描数据库中status=0 的消息(失败消息);

  • 重发消息,可多次;

  • 重发成功,更新数据库:status=1;

  • 超过固定次数重发仍然失败,人工干预。

标注:

超过最大失败次数后,对于无法被正常消费的消息可移入死信队列

  • 可人工干预手动排查

  • 也可自动重试,需要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。这里涉及到消息去重、幂等性处理等。

相关文章:

RabbitMQ如何保证消息不丢失

RabbitMQ消息丢失的三种情况 第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。 第三种…...

(亲测有效)SpringBoot项目集成腾讯云COS对象存储(1)

目录 一、腾讯云对象存储使用 1、创建Bucket 2、使用web控制台上传和浏览文件 3、创建API秘钥 二、代码对接腾讯云COS(以Java为例) 1、初始化客户端 2、填写配置文件 3、通用能力类 文件上传 测试 一、腾讯云对象存储使用 1、创建Bucket &am…...

无人机之故障排除篇

一、识别故障 掌握基本的无人机系统知识,遵循“先易后难、先外后内、先软件后硬件”的原则进行故障识别。一旦发现故障,立即停止飞行,避免进一步损坏。 二、机械部件维修 对于机身裂痕、螺旋桨损坏等情况,根据损坏程度更换相应部…...

深入理解Python常见数据类型处理

目录 概述数字类型 整数(int)浮点数(float)复数(complex) 字符串(str) 字符串基本操作字符串方法 列表(list) 列表基本操作列表方法列表推导式 元组&#xf…...

最佳实践:CI/CD交付模式下的运维展望丨IDCF

李洪锋 启迪万众数字技术(广州)有限公司 ,产品研发中心-系统运维部、研发效能(DevOps)工程师(中级)课程学员 一、DevOps现状 据云计算产业联盟《中国DevOps现状调查报告2023》显示,国内DevOps 落地成熟度…...

Flat Ads:开发者如何应对全球手游市场的洗牌与转型

2023年下半年至2024年上半年,中国手游的海外市场表现经历了显著变化,开发者要如何应对全球手游市场的洗牌与转型?本篇文章我们将结合相关行业白皮书的最新数据对中国手游出海表现进行分析与洞察。 一、中国手游海外市场表现 根据Sensor Tower《2024年海外手游市场洞察》最新…...

ai取名软件上哪找?一文揭秘5大ai取名生成器

在这个世界上,每一个新生命的到来都是一份奇迹,无论是一个新生儿的第一声啼哭,还是一只宠物的第一次摇尾巴,都充满了无限的希望和喜悦。 然而,给这个小生命起一个响亮、独特且富有意义的名字,往往让人煞费…...

ppt转换成pdf文件,这5个方法一键搞定!小白也能上手~

不管是工作上还是学习上,我们都会遇到转换文档格式的问题。比如常见的pdf转word,ppt转pdf,图片转pdf等。 很多软件都有自带的转换功能可以实现,但是需要保证转换后不乱码,且清晰度足够的方法还是少见的。本文整理了几个…...

中国每个软件创业者都是这个时代的“黑悟空”

作者 | 白鲸开源CEO 郭炜 我作为一个具有30游龄而20年都不碰游戏的游戏玩家,最近为了《黑神话:悟空》(简称,黑悟空),不但花重金更新了显卡,还第一次下载了Steam并绑定了支付,为的就是支持这个第…...

解决Qt多线程中fromRawData函数生成的QByteArray数据不一致问题

解决Qt多线程中fromRawData函数生成的QByteArray数据不一致问题 目录 🔔 问题背景📄 问题代码❓ 问题描述🩺 问题分析✔ 解决方案 🔔 问题背景 在开发一个使用Qt框架的多线程应用程序时,我们遇到了一个棘手的问题&…...

datax关于postsql数据增量迁移的问题

看官方文档是不支持的 数据源及同步方案_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心 (aliyun.com) 看了下源码有个postsqlwriter 看了下也就拼接sql 将 PostgresqlWriter中的不允许更新先注释了 让他过去先 然后看到 WriterUtil中的对应方法 getWriteTemplat…...

【Go】实现字符切片零拷贝开销转为字符串

package mainimport ("fmt""unsafe" )func main() {bytes : []byte("hello world")s : *(*string)(unsafe.Pointer(&bytes))fmt.Println(s)bytes[0] Hfmt.Println(s) }slice的底层结构是底层数组、len字段、cap字段。string的底层结构是底层…...

[sqlserver][sql]sqlserver查询执行过的历史sql

SQL是一个针对SQL Server数据库的查询执行过的历史 select * from (SELECT *FROM sys.dm_exec_query_stats QS CROSS APPLY sys.dm_exec_sql_text(QS.sql_handle) ST ) a where a.creation_time >2018-07-18 17:00:00 and charindex(delete from ckcdlist ,text)>0 an…...

python中n次方怎么表示

Python中的n次方用pow()方法来表示,pow()方法返回 xy(x的y次方)的值。 语法 以下是 math 模块 pow() 方法的语法: import math math.pow( x, y ) 内置的 pow() 方法 pow(x, y[, z]) 函数是计算x的y次方,如果z在存在&…...

Java数组怎么转List,Stream的基本方法使用教程

Stream流 Java 的 Stream 流操作是一种简洁而强大的处理集合数据的方式,允许对数据进行高效的操作,如过滤、映射、排序和聚合。Stream API 于 Java 8 引入,极大地简化了对集合(如 List、Set)等数据的处理。 一、创建 Stream 从集合创建: List<String> list = Ar…...

2024-07-12 - 基于 sealos 部署高可用 K8S 管理系统

摘要 Sealos 是一款以 Kubernetes 为内核的云操作系统发行版。它以云原生的方式&#xff0c;抛弃了传统的云计算架构&#xff0c;转向以 Kubernetes 为云内核的新架构&#xff0c;使企业能够像使用个人电脑一样简单地使用云。 操作实践 1、服务器规划 kubernetes集群大体上…...

Ps:首选项 - 单位与标尺

Ps菜单&#xff1a;编辑/首选项 Edit/Preferences 快捷键&#xff1a;Ctrl K Photoshop 首选项中的“单位与标尺” Units & Rulers选项卡允许用户根据工作需求定制 Photoshop 的测量单位和标尺显示方式。这对于保持工作的一致性和精确性&#xff0c;尤其是在跨设备或跨平台…...

DiskDigger(文件恢复工具) v2.0.3 中文授权版

DiskDigger中文版是一款实用文件恢复工具&#xff0c;它能从任何媒介中恢复误删除的文件。支持硬盘、USB 闪存盘、闪存卡(SD/CF/MMS)、CD、DVD 和软盘等。支持恢复任何格式的文件。DiskDigger 能彻底的扫描每个扇区来跟踪文件&#xff0c;从而最大限度恢复挽回文件信息。 软件…...

C/C++逆向:x96dbg(x64dbg/x86dbg)的使用

这篇文章主要来说一下x96dbg(x64/x86)的基本使用&#xff0c;这里还是使用上篇文章中的简单程序用来作为本篇文章的实例&#xff0c;因为上篇文章再生成程序时选用的解决方案平台为x86所以生成的程序则需要我们使用x32dbg来进行分析。 这边与IDA一样&#xff0c;我们可以将程序…...

超声波清洗机是智商税吗?专业博主分享四大必买超声波清洗机款式

有些人觉得超声波清洗机不过是个“智商税”&#xff0c;花几百块买个小盒子不值当&#xff0c;毕竟自己用手也能清洗。但这种看法过于片面。事实上&#xff0c;超声波清洗已经有几十年的历史&#xff0c;随着科技的发展&#xff0c;现代超声波清洗机不仅能够批量、自动清洁&…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

04-初识css

一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

基于matlab策略迭代和值迭代法的动态规划

经典的基于策略迭代和值迭代法的动态规划matlab代码&#xff0c;实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)

在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马&#xff08;服务器方面的&#xff09;的原理&#xff0c;连接&#xff0c;以及各种木马及连接工具的分享 文件木马&#xff1a;https://w…...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf

FTP 客服管理系统 实现kefu123登录&#xff0c;不允许匿名访问&#xff0c;kefu只能访问/data/kefu目录&#xff0c;不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...

C#中的CLR属性、依赖属性与附加属性

CLR属性的主要特征 封装性&#xff1a; 隐藏字段的实现细节 提供对字段的受控访问 访问控制&#xff1a; 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性&#xff1a; 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑&#xff1a; 可以…...

对象回调初步研究

_OBJECT_TYPE结构分析 在介绍什么是对象回调前&#xff0c;首先要熟悉下结构 以我们上篇线程回调介绍过的导出的PsProcessType 结构为例&#xff0c;用_OBJECT_TYPE这个结构来解析它&#xff0c;0x80处就是今天要介绍的回调链表&#xff0c;但是先不着急&#xff0c;先把目光…...

游戏开发中常见的战斗数值英文缩写对照表

游戏开发中常见的战斗数值英文缩写对照表 基础属性&#xff08;Basic Attributes&#xff09; 缩写英文全称中文释义常见使用场景HPHit Points / Health Points生命值角色生存状态MPMana Points / Magic Points魔法值技能释放资源SPStamina Points体力值动作消耗资源APAction…...

渗透实战PortSwigger Labs指南:自定义标签XSS和SVG XSS利用

阻止除自定义标签之外的所有标签 先输入一些标签测试&#xff0c;说是全部标签都被禁了 除了自定义的 自定义<my-tag onmouseoveralert(xss)> <my-tag idx onfocusalert(document.cookie) tabindex1> onfocus 当元素获得焦点时&#xff08;如通过点击或键盘导航&…...