RabbitMQ---消息确认和持久化
(一)消息确认
1.概念
生产者发送消息后,到达消费端会有以下情况:
1.消息处理成功
2.消息处理异常
如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时候,造成消息丢失,但如果消息处理成功,不需要这条消息了,就可以进行删除。此时就不会有问题
所以如何保证消费者成功接收并正确处理?我们RabbitMQ就给我们提供了消息确认机制
消费者在订阅队列的时候,我们可以指定autoAck参数,根据这个参数,我们可以把确认机制大致分为两种(Spring boot有三种,但是本质也是这两种)
自动确认:当autoAck为true时,RabbitMQ只要发送了这个消息,就会从内存中删除,不会管消费者是否收到了这些消息,所以自动确认的可靠性不高
手动确认:当autoAck为false时,RabbitMQ会等待消费者调用Basic.Ack,回复确认信号后,RabbitMQ才会从队列中删除消息,所以手动确认的可靠性是比较高的
当我们设置为手动确认后,队列中的消息就分为了两个部分:
1.等待投递给消费者的消息
2.已经投递给消费者但是没有收到确认信号的消息
也就是Ready和Unacked
如果RabbitMQ一致没有收到消费者的确认信号,并且消息对应的消费者断开连接,RabbitMQ就会安排消息重新入队列,等待投递给下一个消费者
2.Spring Boot的三种策略和代码演示
Springboot给我们提供了三种策略(本质上还是自动确认和手动确认)
1)AcknowledgeMode.NONE
这种模式下,就是标准的自动确认,消息一旦投递给消费者,不管消费者是否正确处理了消息,RabbitMQ就会自动确认消息,并且从队列中移除,所以消息是很有可能丢失的
2)AcknowledgeMode.AUTO(默认)
这种模式下,消息成功处理时会自动确认消息,如果消息过程中抛出了异常就不会确认消息
3)AckniwledgeMode.MANUAL
这种模式下,就是标准的手动确认模式,消费者必须在成功处理消息后调用basicAck方法来确认消息,如果消息没有被确认,RabbitMQ就会认为消息没有被成功处理,会重新投递该消息,这种模式会提高消息的可靠性,因为消息不会丢失,而是重新入队
代码演示
配置文件代码
spring:rabbitmq:addresses: amqp://student:student@62.234.46.219:5672/testlistener:simple:acknowledge-mode: NONE
声明交换机和队列代码
@Component
public class Config {@Bean("ackExchange")public Exchange ackExchange(){return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Beanpublic Binding ackBind(@Qualifier("ackExchange") Exchange ackExchange,@Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("ack").noargs();}
}
生产者代码
@RequestMapping("producer")
@RestController
public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("ack")public String ackPro(){rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack test");return "发送成功";}
}
然后我们发送一条消息看现象(此时我们还没写消费者代码,所以没有自动确认)
消费者代码
@Component
public class AckConsumer {@RabbitListener(queues = Constants.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel){System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+message.getMessageProperties().getDeliveryTag());
// int num=3/0; //模拟失败System.out.println("处理完成");}
}
此时我们没有出错,我们看能否正确接收消息,已经队列中消息是否存在
我们发现能够准确的接收消息
如果我们此时抛出异常呢?
我们发现抛出了异常后,我们后端并没有接收到消息,但是队列的消息也消失了,所以就会导致我们的消息不见了,这也就是自动确认的弊端
此时我们把确认策略改成AUTO
我们这里只需要改配置文件即可
截图体现不出来,但是结果是一直在循环,因为我们如果设置确认策略为AUTO,在运行正常时,会自动确认,并且从队列中删除,如果抛出异常,就不会确认消息,造成循环
我们再来演示下正确情况
此时我们再来说一下手动确认策略
首先更改配置文件
其次和我们刚刚演示AUTO和NONE时,代码是没有改动的,但是手动确认,因为要我们手动做一些处理,所以代码也要有一定的改变
那我们先来讲一下手动确认方法
手动确认方法:
1.肯定确认
RabbitMQ已经知道该消息并且成功的处理消息,可以丢弃了
我们来解释下这个方法的参数
deliveryTag:消息的唯一标识,是一个单调递增的64位长整型,每个channel之间是独立的,所以在每个channel上是唯一的,当消费者确认一条消息时,需要用对应的信道上进行确认
multiple:是否批量确认,如果为false,就只确认当前值,如果为true就会确定小于等于的值
2.否定确认
有两个方法
他们两个的区别不大,唯一区别就是是否支持一次性批量拒绝消息
我们来看一些这个requeue这个参数,这个参数表示拒绝后,消息要如何处理,如果为true,RabbitMQ就会把他重新入队,发给下一个消费者,如果为false,RabbitMQ就会把他从队列中删除,不会把他发送给信道消费者
消费者代码
@Component
public class AckConsumer {@RabbitListener(queues = Constants.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws IOException {long Tag=message.getMessageProperties().getDeliveryTag();try {System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+Tag);int num=3/0; //模拟失败channel.basicAck(Tag,false);System.out.println("处理完成");}catch (Exception e){channel.basicNack(Tag,false,true);channel.basicReject(Tag,true);}}
}
出现异常时的现象
因为我们是设置出现异常重新入队给下一个消费者,所以会循环
如果消息成功接收
(二)持久化
我们刚刚做的消息确认和之前说过七大工作模式中的发送确认都是为了保证消息不丢失,能够正确的接收,但是我们如何保证RabbitMQ服务停掉以后,生产者发送过的消息不丢失(存储在RabbitMQ中的消息)?
1.RabbitMQ的持久化
RabbitMQ的持久化分为三个部分:交换机持久化,队列持久化和消息持久化
1)交换机持久化
交换机持久化是通过在声明交换机的时候,将durable参数设置为true实现的,相当于将交换机的属性在服务器中存储,M服务关闭后再次重新,RabbitMQ会自动重新建立交换机,此时就相当于一直存在
上面是我们把交换机设置为持久化的代码
其实我们不手动设置,默认交换机也是持久的,我们来看源码
我们发现就算不设置,他默认也是true,在我们创建的时候也是持久化的
2)队列持久化
队列持久化也是在声明队列时调用方法来实现的
如果队列不设置持久化,MQ重启时,队列就会被删掉,与此同时消息是存在队列中的,所以队列上的消息也会小时
所以我们要设置消息持久化,那么一定要设置队列持久化否则没用
这是设置为持久化的代码
这是设置为非持久化的代码
那我们还是来看一眼源码
然后我们点进这个QueueBuilder看看
我们发现这个就是给名字赋值用的
再来看看durable中的setDurable
我们发现是设置持久化的,因为是boolean类型,默认都是false所以非持久化就不需要做处理了
3)消息持久化
消息持久化
消息实现持久化就需要把消息的投递模式进行更改
设置了队列和消息的持久化,RabbitMQ服务器重启后,消息才会存在,其他情况下,重新MQ都武器消息都会丢失
消息持久化代码
@RequestMapping("producer")
@RestController
public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("ack")public String ackPro(){String s1="ack test";Message message=new Message(s1.getBytes(StandardCharsets.UTF_8),new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack",message);return "发送成功";}
}
注意我们这里传的不是一个单独字符串了,而是一个消息
RabbitMQ会默认将消息视为持久化的,除非队列被声明为非持久化,或者发送消息时被标记为非持久化
问题:
注:如果我们将所有消息都设置为持久化会严重影响RabbitMQ的性能,写入磁盘的速度比写入内存的速度慢很多,所以我们在选择消息持久化的时候,要做一个衡量
如果我们将交换机,队列,消息都设置了持久化后,就能保证百分百不丢失数据了吗?
答案是错误的
1.在持久化消息存入RabbitMQ时,还有一段时间,消息是在缓存上的还没有写入硬盘,因为RabbitMQ并不会为每条消息都同步存盘,因为这样会严重影响性能,所以会有缓存,等待数据一起写入硬盘。如果在这是MQ重启了,消息还没有写入硬盘,那这些消息就会丢失
2.消费者设置自动确认,然后消费者接收消息后宕机,此时消息也会丢失,此时我们只需要设置为手动确认即可
第一个问题的解决方案我们下一篇博客在讲
相关文章:

RabbitMQ---消息确认和持久化
(一)消息确认 1.概念 生产者发送消息后,到达消费端会有以下情况: 1.消息处理成功 2.消息处理异常 如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时…...
《鸿蒙Next旅游应用:人工智能赋能个性化与智能导览新体验》
随着鸿蒙Next的推出,旅游应用迎来了全新的发展机遇,借助人工智能技术能为用户带来更出色的个性化推荐和智能导览服务。 鸿蒙Next与人工智能融合优势 鸿蒙Next拥有强大的分布式能力和原生智能体验。其能打破设备界限,实现多设备协同…...
微信小程序获取当前页面路径,登录成功后重定向回原页面
🤵 作者:coderYYY 🧑 个人简介:前端程序媛,目前主攻web前端,后端辅助,其他技术知识也会偶尔分享🍀欢迎和我一起交流!🚀(评论和私信一般会回&#…...
【9.2】Golang后端开发系列--Gin路由定义与实战使用
文章目录 一、Gin 框架路由的基本定义方式1. 简单路由创建2. 路由参数3. 查询参数 二、商业大项目中的路由定义和服务调用1. 路由模块化2. 路由组和中间件3. 中间件的使用4. 服务层调用5. 错误处理6. 版本控制7. 路由注册 一、Gin 框架路由的基本定义方式 1. 简单路由创建 使…...
【微信小程序】let和const-综合实训
let 和 const 都是用于声明变量的关键字,它们与传统的 var 关键字相比,有很多不同之处。 let 声明块级作用域变量,可再赋值;const 声明块级作用域常量,不可再赋值。 以下是它们的详细介绍: 一、基本概念…...

图匹配算法(涵盖近似图匹配)
【图数据管理与挖掘-第四讲(子)图匹配算法(涵盖近似图匹配) 北京大学2021暑期-邹磊教授】https://www.bilibili.com/video/BV1zh411q7PW?vd_source7c2b5de7032bf3907543a7675013ce3a 图同构: 定义: 给定…...

java线程——Thread
java线程——Thread 基本步骤示例优劣总结 继承Thread类是Java中实现多线程的一种方式。使用时创建一个新的类,该类继承自java.lang.Thread,并重写其run()方法,在方法中定义线程执行的任务逻辑。 基本步骤 1、创建一个子类:定义一…...
MySQL8.0新特性
第十八章_MySQL8.0新特性 1.新特性概述 1. 数据库管理和存储 1.1 数据字典 特性: MySQL 8.0 使用统一的数据字典存储元数据(如表、列、索引等),并将其存储在 InnoDB 表中。 优点 : 提升性能:减少对文件系统的依赖。 提高一致…...

Oracle EBS GL定期盘存WIP日记账无法过账数据修复
系统环境 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.6 问题症状 用户反映来源为“定期盘存”和类别为“WIP”的日记账无法过账,标准日记账的界面上的过账按钮灰色不可用。但是,在超级用户职责下,该日记账又可以过账,细心检查发现该业务实体下有二个公司段值15100和…...
【绝对无坑】Mongodb获取集合的字段以及数据类型信息
Mongodb获取集合的字段以及数据类型信息 感觉很LOW的一个数据仓工具seatunel,竟然不能自动读取mongodb的表结构信息,需要手工创建。 然鹅,本人对mongodb也是新手,很多操作也不知所措,作为一个DBA,始终还是…...

【Git版本控制器--1】Git的基本操作--本地仓库
目录 初识git 本地仓库 认识工作区、暂存区、版本库 add操作与commit操作 master文件与commit id 修改文件 版本回退 撤销修改 删除文件 初识git Git 是一个分布式版本控制系统,主要用于跟踪文件的更改,特别是在软件开发中。 为什么要版本…...
C++并发编程之无锁数据结构及其优缺点
在C并发编程中,无锁数据结构(Lock-free Data Structures)是指那些在实现中不使用互斥锁(如std::mutex)来保证线程安全的数据结构。相反,它们利用原子操作和内存模型来确保多线程环境下的正确性和高效性。下…...

Ubuntu上,ffmpeg如何使用cuda硬件解码、编码、转码加速
本文使用 Ubuntu 环境。Ubuntu 直接使用 APT 安装的就支持 CUDA 加速。本文使用这样下载的版本进行演示,你自己编译或者其他源的版本可能会不同。 ffmpeg 的一些介绍,以及 macOS 版本的 ffmpeg 硬件加速请见《macOS上如何安装(不需要编译安装…...

rclone,云存储备份和迁移的瑞士军刀,千字常文解析,附下载链接和安装操作步骤...
一、什么是rclone? rclone是一个命令行程序,全称:rsync for cloud storage。是用于将文件和目录同步到云存储提供商的工具。因其支持多种云存储服务的备份,如Google Drive、Amazon S3、Dropbox、Backblaze B2、One Drive、Swift、…...
Ubuntu | 系统软件安装系列指导说明
文章目录 Ubuntu 系统软件安装系列指导说明工具系列1. Docker 与 Docker-Compose部署与安装 环境系列1. Golang部署与安装 数据库系列1. PostgreSQL17.2源码部署与安装 Ubuntu 系统软件安装系列指导说明 工具系列 1. Docker 与 Docker-Compose部署与安装 链接 环境系列 1…...

队列(算法十三)
简介 几乎没有单纯之考察队列的,队列一般只作为一个辅助工具 队列常服务于BFS queue接口 1.N叉树的层序遍历 link: 思路: 队列 层序遍历即可 code /* // Definition for a Node. class Node { public:int val;vector<Node*> children;Node()…...

vLLM私有化部署大语言模型LLM
目录 一、vLLM介绍 二、安装vLLM 1、安装环境 2、安装步骤 三、运行vLLM 1、运行方式 2、切换模型下载源 3、运行本地已下载模型 四、通过http访问vLLM 一、vLLM介绍 vLLM(官方网址:https://www.vllm.ai)是一种用于大规模语言模型&#x…...

OpenAI Whisper:语音识别技术的革新者—深入架构与参数
当下语音识别技术正以前所未有的速度发展,极大地推动了人机交互的便利性和效率。OpenAI的Whisper系统无疑是这一领域的佼佼者,它凭借其卓越的性能、广泛的适用性和创新的技术架构,正在重新定义语音转文本技术的规则。今天我们一起了解一下Whi…...

基于当前最前沿的前端(Vue3 + Vite + Antdv)和后台(Spring boot)实现的低代码开发平台
项目是一个基于当前最前沿的前端技术栈(Vue3 Vite Ant Design Vue,简称Antdv)和后台技术栈(Spring Boot)实现的低代码开发平台。以下是对该项目的详细介绍: 一、项目概述 项目名称:lowcode-s…...

【Rust】错误处理机制
目录 思维导图 引言 一、错误处理的重要性 1.1 软件中的错误普遍存在 1.2 编译时错误处理要求 二、错误的分类 2.1 可恢复错误(Recoverable Errors) 2.2 不可恢复错误(Unrecoverable Errors) 三、Rust 的错误处理机制 3…...

铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...

python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配
AI3D视觉的工业赋能者 迁移科技成立于2017年,作为行业领先的3D工业相机及视觉系统供应商,累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成,通过稳定、易用、高回报的AI3D视觉系统,为汽车、新能源、金属制造等行…...

多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...