RabbitMQ---消息确认和持久化
(一)消息确认
1.概念
生产者发送消息后,到达消费端会有以下情况:
1.消息处理成功
2.消息处理异常

如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时候,造成消息丢失,但如果消息处理成功,不需要这条消息了,就可以进行删除。此时就不会有问题
所以如何保证消费者成功接收并正确处理?我们RabbitMQ就给我们提供了消息确认机制
消费者在订阅队列的时候,我们可以指定autoAck参数,根据这个参数,我们可以把确认机制大致分为两种(Spring boot有三种,但是本质也是这两种)
自动确认:当autoAck为true时,RabbitMQ只要发送了这个消息,就会从内存中删除,不会管消费者是否收到了这些消息,所以自动确认的可靠性不高
手动确认:当autoAck为false时,RabbitMQ会等待消费者调用Basic.Ack,回复确认信号后,RabbitMQ才会从队列中删除消息,所以手动确认的可靠性是比较高的
当我们设置为手动确认后,队列中的消息就分为了两个部分:
1.等待投递给消费者的消息
2.已经投递给消费者但是没有收到确认信号的消息

也就是Ready和Unacked
如果RabbitMQ一致没有收到消费者的确认信号,并且消息对应的消费者断开连接,RabbitMQ就会安排消息重新入队列,等待投递给下一个消费者
2.Spring Boot的三种策略和代码演示
Springboot给我们提供了三种策略(本质上还是自动确认和手动确认)
1)AcknowledgeMode.NONE
这种模式下,就是标准的自动确认,消息一旦投递给消费者,不管消费者是否正确处理了消息,RabbitMQ就会自动确认消息,并且从队列中移除,所以消息是很有可能丢失的
2)AcknowledgeMode.AUTO(默认)
这种模式下,消息成功处理时会自动确认消息,如果消息过程中抛出了异常就不会确认消息
3)AckniwledgeMode.MANUAL
这种模式下,就是标准的手动确认模式,消费者必须在成功处理消息后调用basicAck方法来确认消息,如果消息没有被确认,RabbitMQ就会认为消息没有被成功处理,会重新投递该消息,这种模式会提高消息的可靠性,因为消息不会丢失,而是重新入队
代码演示
配置文件代码
spring:rabbitmq:addresses: amqp://student:student@62.234.46.219:5672/testlistener:simple:acknowledge-mode: NONE
声明交换机和队列代码
@Component
public class Config {@Bean("ackExchange")public Exchange ackExchange(){return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Beanpublic Binding ackBind(@Qualifier("ackExchange") Exchange ackExchange,@Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("ack").noargs();}
}
生产者代码
@RequestMapping("producer")
@RestController
public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("ack")public String ackPro(){rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack test");return "发送成功";}
}
然后我们发送一条消息看现象(此时我们还没写消费者代码,所以没有自动确认)

消费者代码
@Component
public class AckConsumer {@RabbitListener(queues = Constants.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel){System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+message.getMessageProperties().getDeliveryTag());
// int num=3/0; //模拟失败System.out.println("处理完成");}
}
此时我们没有出错,我们看能否正确接收消息,已经队列中消息是否存在


我们发现能够准确的接收消息
如果我们此时抛出异常呢?


我们发现抛出了异常后,我们后端并没有接收到消息,但是队列的消息也消失了,所以就会导致我们的消息不见了,这也就是自动确认的弊端
此时我们把确认策略改成AUTO
我们这里只需要改配置文件即可
截图体现不出来,但是结果是一直在循环,因为我们如果设置确认策略为AUTO,在运行正常时,会自动确认,并且从队列中删除,如果抛出异常,就不会确认消息,造成循环

我们再来演示下正确情况

此时我们再来说一下手动确认策略
首先更改配置文件
其次和我们刚刚演示AUTO和NONE时,代码是没有改动的,但是手动确认,因为要我们手动做一些处理,所以代码也要有一定的改变
那我们先来讲一下手动确认方法
手动确认方法:
1.肯定确认

RabbitMQ已经知道该消息并且成功的处理消息,可以丢弃了
我们来解释下这个方法的参数
deliveryTag:消息的唯一标识,是一个单调递增的64位长整型,每个channel之间是独立的,所以在每个channel上是唯一的,当消费者确认一条消息时,需要用对应的信道上进行确认
multiple:是否批量确认,如果为false,就只确认当前值,如果为true就会确定小于等于的值

2.否定确认
有两个方法

![]()
他们两个的区别不大,唯一区别就是是否支持一次性批量拒绝消息
我们来看一些这个requeue这个参数,这个参数表示拒绝后,消息要如何处理,如果为true,RabbitMQ就会把他重新入队,发给下一个消费者,如果为false,RabbitMQ就会把他从队列中删除,不会把他发送给信道消费者
消费者代码
@Component
public class AckConsumer {@RabbitListener(queues = Constants.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws IOException {long Tag=message.getMessageProperties().getDeliveryTag();try {System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+Tag);int num=3/0; //模拟失败channel.basicAck(Tag,false);System.out.println("处理完成");}catch (Exception e){channel.basicNack(Tag,false,true);channel.basicReject(Tag,true);}}
}
出现异常时的现象
因为我们是设置出现异常重新入队给下一个消费者,所以会循环

如果消息成功接收
(二)持久化
我们刚刚做的消息确认和之前说过七大工作模式中的发送确认都是为了保证消息不丢失,能够正确的接收,但是我们如何保证RabbitMQ服务停掉以后,生产者发送过的消息不丢失(存储在RabbitMQ中的消息)?
1.RabbitMQ的持久化
RabbitMQ的持久化分为三个部分:交换机持久化,队列持久化和消息持久化
1)交换机持久化
交换机持久化是通过在声明交换机的时候,将durable参数设置为true实现的,相当于将交换机的属性在服务器中存储,M服务关闭后再次重新,RabbitMQ会自动重新建立交换机,此时就相当于一直存在
上面是我们把交换机设置为持久化的代码
其实我们不手动设置,默认交换机也是持久的,我们来看源码

我们发现就算不设置,他默认也是true,在我们创建的时候也是持久化的
2)队列持久化
队列持久化也是在声明队列时调用方法来实现的
如果队列不设置持久化,MQ重启时,队列就会被删掉,与此同时消息是存在队列中的,所以队列上的消息也会小时
所以我们要设置消息持久化,那么一定要设置队列持久化否则没用
这是设置为持久化的代码

这是设置为非持久化的代码
那我们还是来看一眼源码

然后我们点进这个QueueBuilder看看
我们发现这个就是给名字赋值用的
再来看看durable中的setDurable

我们发现是设置持久化的,因为是boolean类型,默认都是false所以非持久化就不需要做处理了
3)消息持久化
消息持久化
消息实现持久化就需要把消息的投递模式进行更改

设置了队列和消息的持久化,RabbitMQ服务器重启后,消息才会存在,其他情况下,重新MQ都武器消息都会丢失
消息持久化代码
@RequestMapping("producer")
@RestController
public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("ack")public String ackPro(){String s1="ack test";Message message=new Message(s1.getBytes(StandardCharsets.UTF_8),new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack",message);return "发送成功";}
}
注意我们这里传的不是一个单独字符串了,而是一个消息
RabbitMQ会默认将消息视为持久化的,除非队列被声明为非持久化,或者发送消息时被标记为非持久化
问题:
注:如果我们将所有消息都设置为持久化会严重影响RabbitMQ的性能,写入磁盘的速度比写入内存的速度慢很多,所以我们在选择消息持久化的时候,要做一个衡量
如果我们将交换机,队列,消息都设置了持久化后,就能保证百分百不丢失数据了吗?
答案是错误的
1.在持久化消息存入RabbitMQ时,还有一段时间,消息是在缓存上的还没有写入硬盘,因为RabbitMQ并不会为每条消息都同步存盘,因为这样会严重影响性能,所以会有缓存,等待数据一起写入硬盘。如果在这是MQ重启了,消息还没有写入硬盘,那这些消息就会丢失
2.消费者设置自动确认,然后消费者接收消息后宕机,此时消息也会丢失,此时我们只需要设置为手动确认即可
第一个问题的解决方案我们下一篇博客在讲
相关文章:
RabbitMQ---消息确认和持久化
(一)消息确认 1.概念 生产者发送消息后,到达消费端会有以下情况: 1.消息处理成功 2.消息处理异常 如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时…...
《鸿蒙Next旅游应用:人工智能赋能个性化与智能导览新体验》
随着鸿蒙Next的推出,旅游应用迎来了全新的发展机遇,借助人工智能技术能为用户带来更出色的个性化推荐和智能导览服务。 鸿蒙Next与人工智能融合优势 鸿蒙Next拥有强大的分布式能力和原生智能体验。其能打破设备界限,实现多设备协同…...
微信小程序获取当前页面路径,登录成功后重定向回原页面
🤵 作者:coderYYY 🧑 个人简介:前端程序媛,目前主攻web前端,后端辅助,其他技术知识也会偶尔分享🍀欢迎和我一起交流!🚀(评论和私信一般会回&#…...
【9.2】Golang后端开发系列--Gin路由定义与实战使用
文章目录 一、Gin 框架路由的基本定义方式1. 简单路由创建2. 路由参数3. 查询参数 二、商业大项目中的路由定义和服务调用1. 路由模块化2. 路由组和中间件3. 中间件的使用4. 服务层调用5. 错误处理6. 版本控制7. 路由注册 一、Gin 框架路由的基本定义方式 1. 简单路由创建 使…...
【微信小程序】let和const-综合实训
let 和 const 都是用于声明变量的关键字,它们与传统的 var 关键字相比,有很多不同之处。 let 声明块级作用域变量,可再赋值;const 声明块级作用域常量,不可再赋值。 以下是它们的详细介绍: 一、基本概念…...
图匹配算法(涵盖近似图匹配)
【图数据管理与挖掘-第四讲(子)图匹配算法(涵盖近似图匹配) 北京大学2021暑期-邹磊教授】https://www.bilibili.com/video/BV1zh411q7PW?vd_source7c2b5de7032bf3907543a7675013ce3a 图同构: 定义: 给定…...
java线程——Thread
java线程——Thread 基本步骤示例优劣总结 继承Thread类是Java中实现多线程的一种方式。使用时创建一个新的类,该类继承自java.lang.Thread,并重写其run()方法,在方法中定义线程执行的任务逻辑。 基本步骤 1、创建一个子类:定义一…...
MySQL8.0新特性
第十八章_MySQL8.0新特性 1.新特性概述 1. 数据库管理和存储 1.1 数据字典 特性: MySQL 8.0 使用统一的数据字典存储元数据(如表、列、索引等),并将其存储在 InnoDB 表中。 优点 : 提升性能:减少对文件系统的依赖。 提高一致…...
Oracle EBS GL定期盘存WIP日记账无法过账数据修复
系统环境 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.6 问题症状 用户反映来源为“定期盘存”和类别为“WIP”的日记账无法过账,标准日记账的界面上的过账按钮灰色不可用。但是,在超级用户职责下,该日记账又可以过账,细心检查发现该业务实体下有二个公司段值15100和…...
【绝对无坑】Mongodb获取集合的字段以及数据类型信息
Mongodb获取集合的字段以及数据类型信息 感觉很LOW的一个数据仓工具seatunel,竟然不能自动读取mongodb的表结构信息,需要手工创建。 然鹅,本人对mongodb也是新手,很多操作也不知所措,作为一个DBA,始终还是…...
【Git版本控制器--1】Git的基本操作--本地仓库
目录 初识git 本地仓库 认识工作区、暂存区、版本库 add操作与commit操作 master文件与commit id 修改文件 版本回退 撤销修改 删除文件 初识git Git 是一个分布式版本控制系统,主要用于跟踪文件的更改,特别是在软件开发中。 为什么要版本…...
C++并发编程之无锁数据结构及其优缺点
在C并发编程中,无锁数据结构(Lock-free Data Structures)是指那些在实现中不使用互斥锁(如std::mutex)来保证线程安全的数据结构。相反,它们利用原子操作和内存模型来确保多线程环境下的正确性和高效性。下…...
Ubuntu上,ffmpeg如何使用cuda硬件解码、编码、转码加速
本文使用 Ubuntu 环境。Ubuntu 直接使用 APT 安装的就支持 CUDA 加速。本文使用这样下载的版本进行演示,你自己编译或者其他源的版本可能会不同。 ffmpeg 的一些介绍,以及 macOS 版本的 ffmpeg 硬件加速请见《macOS上如何安装(不需要编译安装…...
rclone,云存储备份和迁移的瑞士军刀,千字常文解析,附下载链接和安装操作步骤...
一、什么是rclone? rclone是一个命令行程序,全称:rsync for cloud storage。是用于将文件和目录同步到云存储提供商的工具。因其支持多种云存储服务的备份,如Google Drive、Amazon S3、Dropbox、Backblaze B2、One Drive、Swift、…...
Ubuntu | 系统软件安装系列指导说明
文章目录 Ubuntu 系统软件安装系列指导说明工具系列1. Docker 与 Docker-Compose部署与安装 环境系列1. Golang部署与安装 数据库系列1. PostgreSQL17.2源码部署与安装 Ubuntu 系统软件安装系列指导说明 工具系列 1. Docker 与 Docker-Compose部署与安装 链接 环境系列 1…...
队列(算法十三)
简介 几乎没有单纯之考察队列的,队列一般只作为一个辅助工具 队列常服务于BFS queue接口 1.N叉树的层序遍历 link: 思路: 队列 层序遍历即可 code /* // Definition for a Node. class Node { public:int val;vector<Node*> children;Node()…...
vLLM私有化部署大语言模型LLM
目录 一、vLLM介绍 二、安装vLLM 1、安装环境 2、安装步骤 三、运行vLLM 1、运行方式 2、切换模型下载源 3、运行本地已下载模型 四、通过http访问vLLM 一、vLLM介绍 vLLM(官方网址:https://www.vllm.ai)是一种用于大规模语言模型&#x…...
OpenAI Whisper:语音识别技术的革新者—深入架构与参数
当下语音识别技术正以前所未有的速度发展,极大地推动了人机交互的便利性和效率。OpenAI的Whisper系统无疑是这一领域的佼佼者,它凭借其卓越的性能、广泛的适用性和创新的技术架构,正在重新定义语音转文本技术的规则。今天我们一起了解一下Whi…...
基于当前最前沿的前端(Vue3 + Vite + Antdv)和后台(Spring boot)实现的低代码开发平台
项目是一个基于当前最前沿的前端技术栈(Vue3 Vite Ant Design Vue,简称Antdv)和后台技术栈(Spring Boot)实现的低代码开发平台。以下是对该项目的详细介绍: 一、项目概述 项目名称:lowcode-s…...
【Rust】错误处理机制
目录 思维导图 引言 一、错误处理的重要性 1.1 软件中的错误普遍存在 1.2 编译时错误处理要求 二、错误的分类 2.1 可恢复错误(Recoverable Errors) 2.2 不可恢复错误(Unrecoverable Errors) 三、Rust 的错误处理机制 3…...
椭圆曲线密码学(ECC)
一、ECC算法概述 椭圆曲线密码学(Elliptic Curve Cryptography)是基于椭圆曲线数学理论的公钥密码系统,由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA,ECC在相同安全强度下密钥更短(256位ECC ≈ 3072位RSA…...
Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...
多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...
Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...
虚拟电厂发展三大趋势:市场化、技术主导、车网互联
市场化:从政策驱动到多元盈利 政策全面赋能 2025年4月,国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》,首次明确虚拟电厂为“独立市场主体”,提出硬性目标:2027年全国调节能力≥2000万千瓦࿰…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...
消防一体化安全管控平台:构建消防“一张图”和APP统一管理
在城市的某个角落,一场突如其来的火灾打破了平静。熊熊烈火迅速蔓延,滚滚浓烟弥漫开来,周围群众的生命财产安全受到严重威胁。就在这千钧一发之际,消防救援队伍迅速行动,而豪越科技消防一体化安全管控平台构建的消防“…...
ui框架-文件列表展示
ui框架-文件列表展示 介绍 UI框架的文件列表展示组件,可以展示文件夹,支持列表展示和图标展示模式。组件提供了丰富的功能和可配置选项,适用于文件管理、文件上传等场景。 功能特性 支持列表模式和网格模式的切换展示支持文件和文件夹的层…...
