RabbitMQ---消息确认和持久化
(一)消息确认
1.概念
生产者发送消息后,到达消费端会有以下情况:
1.消息处理成功
2.消息处理异常
如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时候,造成消息丢失,但如果消息处理成功,不需要这条消息了,就可以进行删除。此时就不会有问题
所以如何保证消费者成功接收并正确处理?我们RabbitMQ就给我们提供了消息确认机制
消费者在订阅队列的时候,我们可以指定autoAck参数,根据这个参数,我们可以把确认机制大致分为两种(Spring boot有三种,但是本质也是这两种)
自动确认:当autoAck为true时,RabbitMQ只要发送了这个消息,就会从内存中删除,不会管消费者是否收到了这些消息,所以自动确认的可靠性不高
手动确认:当autoAck为false时,RabbitMQ会等待消费者调用Basic.Ack,回复确认信号后,RabbitMQ才会从队列中删除消息,所以手动确认的可靠性是比较高的
当我们设置为手动确认后,队列中的消息就分为了两个部分:
1.等待投递给消费者的消息
2.已经投递给消费者但是没有收到确认信号的消息
也就是Ready和Unacked
如果RabbitMQ一致没有收到消费者的确认信号,并且消息对应的消费者断开连接,RabbitMQ就会安排消息重新入队列,等待投递给下一个消费者
2.Spring Boot的三种策略和代码演示
Springboot给我们提供了三种策略(本质上还是自动确认和手动确认)
1)AcknowledgeMode.NONE
这种模式下,就是标准的自动确认,消息一旦投递给消费者,不管消费者是否正确处理了消息,RabbitMQ就会自动确认消息,并且从队列中移除,所以消息是很有可能丢失的
2)AcknowledgeMode.AUTO(默认)
这种模式下,消息成功处理时会自动确认消息,如果消息过程中抛出了异常就不会确认消息
3)AckniwledgeMode.MANUAL
这种模式下,就是标准的手动确认模式,消费者必须在成功处理消息后调用basicAck方法来确认消息,如果消息没有被确认,RabbitMQ就会认为消息没有被成功处理,会重新投递该消息,这种模式会提高消息的可靠性,因为消息不会丢失,而是重新入队
代码演示
配置文件代码
spring:rabbitmq:addresses: amqp://student:student@62.234.46.219:5672/testlistener:simple:acknowledge-mode: NONE
声明交换机和队列代码
@Component
public class Config {@Bean("ackExchange")public Exchange ackExchange(){return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Beanpublic Binding ackBind(@Qualifier("ackExchange") Exchange ackExchange,@Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("ack").noargs();}
}
生产者代码
@RequestMapping("producer")
@RestController
public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("ack")public String ackPro(){rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack test");return "发送成功";}
}
然后我们发送一条消息看现象(此时我们还没写消费者代码,所以没有自动确认)
消费者代码
@Component
public class AckConsumer {@RabbitListener(queues = Constants.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel){System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+message.getMessageProperties().getDeliveryTag());
// int num=3/0; //模拟失败System.out.println("处理完成");}
}
此时我们没有出错,我们看能否正确接收消息,已经队列中消息是否存在
我们发现能够准确的接收消息
如果我们此时抛出异常呢?
我们发现抛出了异常后,我们后端并没有接收到消息,但是队列的消息也消失了,所以就会导致我们的消息不见了,这也就是自动确认的弊端
此时我们把确认策略改成AUTO
我们这里只需要改配置文件即可
截图体现不出来,但是结果是一直在循环,因为我们如果设置确认策略为AUTO,在运行正常时,会自动确认,并且从队列中删除,如果抛出异常,就不会确认消息,造成循环
我们再来演示下正确情况
此时我们再来说一下手动确认策略
首先更改配置文件
其次和我们刚刚演示AUTO和NONE时,代码是没有改动的,但是手动确认,因为要我们手动做一些处理,所以代码也要有一定的改变
那我们先来讲一下手动确认方法
手动确认方法:
1.肯定确认
RabbitMQ已经知道该消息并且成功的处理消息,可以丢弃了
我们来解释下这个方法的参数
deliveryTag:消息的唯一标识,是一个单调递增的64位长整型,每个channel之间是独立的,所以在每个channel上是唯一的,当消费者确认一条消息时,需要用对应的信道上进行确认
multiple:是否批量确认,如果为false,就只确认当前值,如果为true就会确定小于等于的值
2.否定确认
有两个方法
他们两个的区别不大,唯一区别就是是否支持一次性批量拒绝消息
我们来看一些这个requeue这个参数,这个参数表示拒绝后,消息要如何处理,如果为true,RabbitMQ就会把他重新入队,发给下一个消费者,如果为false,RabbitMQ就会把他从队列中删除,不会把他发送给信道消费者
消费者代码
@Component
public class AckConsumer {@RabbitListener(queues = Constants.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws IOException {long Tag=message.getMessageProperties().getDeliveryTag();try {System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+Tag);int num=3/0; //模拟失败channel.basicAck(Tag,false);System.out.println("处理完成");}catch (Exception e){channel.basicNack(Tag,false,true);channel.basicReject(Tag,true);}}
}
出现异常时的现象
因为我们是设置出现异常重新入队给下一个消费者,所以会循环
如果消息成功接收
(二)持久化
我们刚刚做的消息确认和之前说过七大工作模式中的发送确认都是为了保证消息不丢失,能够正确的接收,但是我们如何保证RabbitMQ服务停掉以后,生产者发送过的消息不丢失(存储在RabbitMQ中的消息)?
1.RabbitMQ的持久化
RabbitMQ的持久化分为三个部分:交换机持久化,队列持久化和消息持久化
1)交换机持久化
交换机持久化是通过在声明交换机的时候,将durable参数设置为true实现的,相当于将交换机的属性在服务器中存储,M服务关闭后再次重新,RabbitMQ会自动重新建立交换机,此时就相当于一直存在
上面是我们把交换机设置为持久化的代码
其实我们不手动设置,默认交换机也是持久的,我们来看源码
我们发现就算不设置,他默认也是true,在我们创建的时候也是持久化的
2)队列持久化
队列持久化也是在声明队列时调用方法来实现的
如果队列不设置持久化,MQ重启时,队列就会被删掉,与此同时消息是存在队列中的,所以队列上的消息也会小时
所以我们要设置消息持久化,那么一定要设置队列持久化否则没用
这是设置为持久化的代码
这是设置为非持久化的代码
那我们还是来看一眼源码
然后我们点进这个QueueBuilder看看
我们发现这个就是给名字赋值用的
再来看看durable中的setDurable
我们发现是设置持久化的,因为是boolean类型,默认都是false所以非持久化就不需要做处理了
3)消息持久化
消息持久化
消息实现持久化就需要把消息的投递模式进行更改
设置了队列和消息的持久化,RabbitMQ服务器重启后,消息才会存在,其他情况下,重新MQ都武器消息都会丢失
消息持久化代码
@RequestMapping("producer")
@RestController
public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("ack")public String ackPro(){String s1="ack test";Message message=new Message(s1.getBytes(StandardCharsets.UTF_8),new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack",message);return "发送成功";}
}
注意我们这里传的不是一个单独字符串了,而是一个消息
RabbitMQ会默认将消息视为持久化的,除非队列被声明为非持久化,或者发送消息时被标记为非持久化
问题:
注:如果我们将所有消息都设置为持久化会严重影响RabbitMQ的性能,写入磁盘的速度比写入内存的速度慢很多,所以我们在选择消息持久化的时候,要做一个衡量
如果我们将交换机,队列,消息都设置了持久化后,就能保证百分百不丢失数据了吗?
答案是错误的
1.在持久化消息存入RabbitMQ时,还有一段时间,消息是在缓存上的还没有写入硬盘,因为RabbitMQ并不会为每条消息都同步存盘,因为这样会严重影响性能,所以会有缓存,等待数据一起写入硬盘。如果在这是MQ重启了,消息还没有写入硬盘,那这些消息就会丢失
2.消费者设置自动确认,然后消费者接收消息后宕机,此时消息也会丢失,此时我们只需要设置为手动确认即可
第一个问题的解决方案我们下一篇博客在讲
相关文章:

RabbitMQ---消息确认和持久化
(一)消息确认 1.概念 生产者发送消息后,到达消费端会有以下情况: 1.消息处理成功 2.消息处理异常 如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时…...
《鸿蒙Next旅游应用:人工智能赋能个性化与智能导览新体验》
随着鸿蒙Next的推出,旅游应用迎来了全新的发展机遇,借助人工智能技术能为用户带来更出色的个性化推荐和智能导览服务。 鸿蒙Next与人工智能融合优势 鸿蒙Next拥有强大的分布式能力和原生智能体验。其能打破设备界限,实现多设备协同…...
微信小程序获取当前页面路径,登录成功后重定向回原页面
🤵 作者:coderYYY 🧑 个人简介:前端程序媛,目前主攻web前端,后端辅助,其他技术知识也会偶尔分享🍀欢迎和我一起交流!🚀(评论和私信一般会回&#…...
【9.2】Golang后端开发系列--Gin路由定义与实战使用
文章目录 一、Gin 框架路由的基本定义方式1. 简单路由创建2. 路由参数3. 查询参数 二、商业大项目中的路由定义和服务调用1. 路由模块化2. 路由组和中间件3. 中间件的使用4. 服务层调用5. 错误处理6. 版本控制7. 路由注册 一、Gin 框架路由的基本定义方式 1. 简单路由创建 使…...
【微信小程序】let和const-综合实训
let 和 const 都是用于声明变量的关键字,它们与传统的 var 关键字相比,有很多不同之处。 let 声明块级作用域变量,可再赋值;const 声明块级作用域常量,不可再赋值。 以下是它们的详细介绍: 一、基本概念…...

图匹配算法(涵盖近似图匹配)
【图数据管理与挖掘-第四讲(子)图匹配算法(涵盖近似图匹配) 北京大学2021暑期-邹磊教授】https://www.bilibili.com/video/BV1zh411q7PW?vd_source7c2b5de7032bf3907543a7675013ce3a 图同构: 定义: 给定…...

java线程——Thread
java线程——Thread 基本步骤示例优劣总结 继承Thread类是Java中实现多线程的一种方式。使用时创建一个新的类,该类继承自java.lang.Thread,并重写其run()方法,在方法中定义线程执行的任务逻辑。 基本步骤 1、创建一个子类:定义一…...
MySQL8.0新特性
第十八章_MySQL8.0新特性 1.新特性概述 1. 数据库管理和存储 1.1 数据字典 特性: MySQL 8.0 使用统一的数据字典存储元数据(如表、列、索引等),并将其存储在 InnoDB 表中。 优点 : 提升性能:减少对文件系统的依赖。 提高一致…...

Oracle EBS GL定期盘存WIP日记账无法过账数据修复
系统环境 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.6 问题症状 用户反映来源为“定期盘存”和类别为“WIP”的日记账无法过账,标准日记账的界面上的过账按钮灰色不可用。但是,在超级用户职责下,该日记账又可以过账,细心检查发现该业务实体下有二个公司段值15100和…...
【绝对无坑】Mongodb获取集合的字段以及数据类型信息
Mongodb获取集合的字段以及数据类型信息 感觉很LOW的一个数据仓工具seatunel,竟然不能自动读取mongodb的表结构信息,需要手工创建。 然鹅,本人对mongodb也是新手,很多操作也不知所措,作为一个DBA,始终还是…...

【Git版本控制器--1】Git的基本操作--本地仓库
目录 初识git 本地仓库 认识工作区、暂存区、版本库 add操作与commit操作 master文件与commit id 修改文件 版本回退 撤销修改 删除文件 初识git Git 是一个分布式版本控制系统,主要用于跟踪文件的更改,特别是在软件开发中。 为什么要版本…...
C++并发编程之无锁数据结构及其优缺点
在C并发编程中,无锁数据结构(Lock-free Data Structures)是指那些在实现中不使用互斥锁(如std::mutex)来保证线程安全的数据结构。相反,它们利用原子操作和内存模型来确保多线程环境下的正确性和高效性。下…...

Ubuntu上,ffmpeg如何使用cuda硬件解码、编码、转码加速
本文使用 Ubuntu 环境。Ubuntu 直接使用 APT 安装的就支持 CUDA 加速。本文使用这样下载的版本进行演示,你自己编译或者其他源的版本可能会不同。 ffmpeg 的一些介绍,以及 macOS 版本的 ffmpeg 硬件加速请见《macOS上如何安装(不需要编译安装…...

rclone,云存储备份和迁移的瑞士军刀,千字常文解析,附下载链接和安装操作步骤...
一、什么是rclone? rclone是一个命令行程序,全称:rsync for cloud storage。是用于将文件和目录同步到云存储提供商的工具。因其支持多种云存储服务的备份,如Google Drive、Amazon S3、Dropbox、Backblaze B2、One Drive、Swift、…...
Ubuntu | 系统软件安装系列指导说明
文章目录 Ubuntu 系统软件安装系列指导说明工具系列1. Docker 与 Docker-Compose部署与安装 环境系列1. Golang部署与安装 数据库系列1. PostgreSQL17.2源码部署与安装 Ubuntu 系统软件安装系列指导说明 工具系列 1. Docker 与 Docker-Compose部署与安装 链接 环境系列 1…...

队列(算法十三)
简介 几乎没有单纯之考察队列的,队列一般只作为一个辅助工具 队列常服务于BFS queue接口 1.N叉树的层序遍历 link: 思路: 队列 层序遍历即可 code /* // Definition for a Node. class Node { public:int val;vector<Node*> children;Node()…...

vLLM私有化部署大语言模型LLM
目录 一、vLLM介绍 二、安装vLLM 1、安装环境 2、安装步骤 三、运行vLLM 1、运行方式 2、切换模型下载源 3、运行本地已下载模型 四、通过http访问vLLM 一、vLLM介绍 vLLM(官方网址:https://www.vllm.ai)是一种用于大规模语言模型&#x…...

OpenAI Whisper:语音识别技术的革新者—深入架构与参数
当下语音识别技术正以前所未有的速度发展,极大地推动了人机交互的便利性和效率。OpenAI的Whisper系统无疑是这一领域的佼佼者,它凭借其卓越的性能、广泛的适用性和创新的技术架构,正在重新定义语音转文本技术的规则。今天我们一起了解一下Whi…...

基于当前最前沿的前端(Vue3 + Vite + Antdv)和后台(Spring boot)实现的低代码开发平台
项目是一个基于当前最前沿的前端技术栈(Vue3 Vite Ant Design Vue,简称Antdv)和后台技术栈(Spring Boot)实现的低代码开发平台。以下是对该项目的详细介绍: 一、项目概述 项目名称:lowcode-s…...

【Rust】错误处理机制
目录 思维导图 引言 一、错误处理的重要性 1.1 软件中的错误普遍存在 1.2 编译时错误处理要求 二、错误的分类 2.1 可恢复错误(Recoverable Errors) 2.2 不可恢复错误(Unrecoverable Errors) 三、Rust 的错误处理机制 3…...
双面沉金线路板制作流程解析:高可靠性PCB的核心工艺
在高端电子制造领域,双面沉金(ENIG)线路板因其优异的焊接性能、抗氧化能力和信号完整性,已成为5G通信、医疗设备和汽车电子等领域的首选。本文将深入解析其制作流程的关键环节,帮助工程师更好地理解这一核心工艺。 一、…...
Java项目中常用的中间件及其高频问题避坑
Java项目中常用的中间件及其高频问题避坑如下: 一、常用中间件分类及作用 1. 消息队列中间件 作用:解耦系统、异步通信、削峰填谷。代表产品: Kafka:高吞吐量流处理,适合日志收集、实时分析。RocketMQ:金融级可靠性,支持事务消…...

基于51单片机的光强控制LED灯亮灭
目录 具体实现功能 设计介绍 资料内容 全部内容 资料获取 具体实现功能 具体功能: (1)按下按键K后光敏电阻进行光照检测,LCD1602显示光照强度值; (2)光照值小于15时,上面2个LE…...
React Native图片预加载:让你的应用图片预览像德芙一样丝滑
写在前面:一张图片引发的性能血案 你有没有遇到过这种情况?——用户疯狂滑动你的React Native图片列表,结果图片加载慢得像蜗牛,甚至出现空白闪烁?等到图片终于加载出来,用户早就失去耐心,愤然退出…… 但你知道吗?这个问题只需要几行代码就能解决! 比如,使用reac…...
Python 函数全攻略:函数基础
函数(Functions)基础 什么是函数? 一个命名的代码块,代指一大堆代码。 定义: def function_name(): (使用def关键字,英文括号,冒号,缩进代码块)。 执行/调用: function…...

YOLO11解决方案之分析
概述 Ultralytics提供了一系列的解决方案,利用YOLO11解决现实世界的问题,包括物体计数、模糊处理、热力图、安防系统、速度估计、物体追踪等多个方面的应用。 Ultralytics提供了三种基本的数据可视化类型:折线图(面积图…...
Axios学习笔记
Axios简介 axios前端异步请求库类似JQuery ajax技术, ajax用来在页面发起异步请求到后端服务,并将后端服务响应数据渲染到页面上, jquery推荐ajax技术,但vue里面并不推荐在使用jquery框架,vue推荐使用axios异步请求库。…...

基于STM32的DHT11温湿度远程监测LCD1602显示Proteus仿真+程序+设计报告+讲解视频
DHT11温湿度远程监测proteus仿真 1. 主要功能2.仿真3. 程序4. 设计报告5. 资料清单&下载链接 基于STM32的DHT11温湿度远程监测LCD1602显示Proteus仿真设计(仿真程序设计报告讲解视频) 仿真图proteus 8.9 程序编译器:keil 5 编程语言:C…...
04 Deep learning神经网络编程基础 梯度下降 --吴恩达
梯度下降在深度学习的应用 梯度下降是优化神经网络参数的核心算法,通过迭代调整参数最小化损失函数。 核心公式 参数更新规则: θ t + 1 = θ t − η ∇ J ( θ...

USART 串口通信全解析:原理、结构与代码实战
文章目录 USARTUSART简介USART框图USART基本结构数据帧起始位侦测数据采样波特率发生器串口发送数据 主要代码串口接收数据与发送数据主要代码 USART USART简介 一、USART 的全称与基本定义 英文全称 USART:Universal Synchronous Asynchronous Receiver Transmi…...