当前位置: 首页 > news >正文

RabbitMQ---消息确认和持久化

(一)消息确认

1.概念

生产者发送消息后,到达消费端会有以下情况:

1.消息处理成功

2.消息处理异常

   如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时候,造成消息丢失,但如果消息处理成功,不需要这条消息了,就可以进行删除。此时就不会有问题

 所以如何保证消费者成功接收并正确处理?我们RabbitMQ就给我们提供了消息确认机制

 消费者在订阅队列的时候,我们可以指定autoAck参数,根据这个参数,我们可以把确认机制大致分为两种(Spring boot有三种,但是本质也是这两种)

  自动确认:当autoAck为true时,RabbitMQ只要发送了这个消息,就会从内存中删除,不会管消费者是否收到了这些消息,所以自动确认的可靠性不高

  手动确认:当autoAck为false时,RabbitMQ会等待消费者调用Basic.Ack,回复确认信号后,RabbitMQ才会从队列中删除消息,所以手动确认的可靠性是比较高的

当我们设置为手动确认后,队列中的消息就分为了两个部分:

1.等待投递给消费者的消息

2.已经投递给消费者但是没有收到确认信号的消息

也就是Ready和Unacked

如果RabbitMQ一致没有收到消费者的确认信号,并且消息对应的消费者断开连接,RabbitMQ就会安排消息重新入队列,等待投递给下一个消费者

 

 

 

2.Spring Boot的三种策略和代码演示

Springboot给我们提供了三种策略(本质上还是自动确认和手动确认)

1)AcknowledgeMode.NONE

 这种模式下,就是标准的自动确认,消息一旦投递给消费者,不管消费者是否正确处理了消息,RabbitMQ就会自动确认消息,并且从队列中移除,所以消息是很有可能丢失的

 2)AcknowledgeMode.AUTO(默认)

这种模式下,消息成功处理时会自动确认消息,如果消息过程中抛出了异常就不会确认消息

3)AckniwledgeMode.MANUAL

这种模式下,就是标准的手动确认模式,消费者必须在成功处理消息后调用basicAck方法来确认消息,如果消息没有被确认,RabbitMQ就会认为消息没有被成功处理,会重新投递该消息,这种模式会提高消息的可靠性,因为消息不会丢失,而是重新入队

代码演示

配置文件代码

spring:rabbitmq:addresses: amqp://student:student@62.234.46.219:5672/testlistener:simple:acknowledge-mode: NONE

声明交换机和队列代码

@Component
public class Config {@Bean("ackExchange")public Exchange ackExchange(){return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Beanpublic Binding ackBind(@Qualifier("ackExchange") Exchange ackExchange,@Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("ack").noargs();}
}

生产者代码


@RequestMapping("producer")
@RestController
public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("ack")public String ackPro(){rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack test");return "发送成功";}
}

然后我们发送一条消息看现象(此时我们还没写消费者代码,所以没有自动确认) 

消费者代码

@Component
public class AckConsumer {@RabbitListener(queues = Constants.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel){System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+message.getMessageProperties().getDeliveryTag());
//        int num=3/0;     //模拟失败System.out.println("处理完成");}
}

此时我们没有出错,我们看能否正确接收消息,已经队列中消息是否存在 

 

我们发现能够准确的接收消息

如果我们此时抛出异常呢? 

 

 

  我们发现抛出了异常后,我们后端并没有接收到消息,但是队列的消息也消失了,所以就会导致我们的消息不见了,这也就是自动确认的弊端 

此时我们把确认策略改成AUTO

我们这里只需要改配置文件即可

截图体现不出来,但是结果是一直在循环,因为我们如果设置确认策略为AUTO,在运行正常时,会自动确认,并且从队列中删除,如果抛出异常,就不会确认消息,造成循环

 

我们再来演示下正确情况 

 

此时我们再来说一下手动确认策略

首先更改配置文件

  其次和我们刚刚演示AUTO和NONE时,代码是没有改动的,但是手动确认,因为要我们手动做一些处理,所以代码也要有一定的改变 

  那我们先来讲一下手动确认方法

手动确认方法:

1.肯定确认

RabbitMQ已经知道该消息并且成功的处理消息,可以丢弃了 

我们来解释下这个方法的参数

deliveryTag:消息的唯一标识,是一个单调递增的64位长整型,每个channel之间是独立的,所以在每个channel上是唯一的,当消费者确认一条消息时,需要用对应的信道上进行确认

multiple:是否批量确认,如果为false,就只确认当前值,如果为true就会确定小于等于的值

2.否定确认

有两个方法

他们两个的区别不大,唯一区别就是是否支持一次性批量拒绝消息

我们来看一些这个requeue这个参数,这个参数表示拒绝后,消息要如何处理,如果为true,RabbitMQ就会把他重新入队,发给下一个消费者,如果为false,RabbitMQ就会把他从队列中删除,不会把他发送给信道消费者

消费者代码

@Component
public class AckConsumer {@RabbitListener(queues = Constants.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws IOException {long Tag=message.getMessageProperties().getDeliveryTag();try {System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+Tag);int num=3/0;     //模拟失败channel.basicAck(Tag,false);System.out.println("处理完成");}catch (Exception e){channel.basicNack(Tag,false,true);channel.basicReject(Tag,true);}}
}

出现异常时的现象 

因为我们是设置出现异常重新入队给下一个消费者,所以会循环

如果消息成功接收

 

(二)持久化

  我们刚刚做的消息确认和之前说过七大工作模式中的发送确认都是为了保证消息不丢失,能够正确的接收,但是我们如何保证RabbitMQ服务停掉以后,生产者发送过的消息不丢失(存储在RabbitMQ中的消息)?

1.RabbitMQ的持久化

  RabbitMQ的持久化分为三个部分:交换机持久化,队列持久化和消息持久化

1)交换机持久化

  交换机持久化是通过在声明交换机的时候,将durable参数设置为true实现的,相当于将交换机的属性在服务器中存储,M服务关闭后再次重新,RabbitMQ会自动重新建立交换机,此时就相当于一直存在

上面是我们把交换机设置为持久化的代码

其实我们不手动设置,默认交换机也是持久的,我们来看源码 

 

 

我们发现就算不设置,他默认也是true,在我们创建的时候也是持久化的

2)队列持久化

队列持久化也是在声明队列时调用方法来实现的

如果队列不设置持久化,MQ重启时,队列就会被删掉,与此同时消息是存在队列中的,所以队列上的消息也会小时

所以我们要设置消息持久化,那么一定要设置队列持久化否则没用

这是设置为持久化的代码

这是设置为非持久化的代码

那我们还是来看一眼源码

然后我们点进这个QueueBuilder看看

 

我们发现这个就是给名字赋值用的

再来看看durable中的setDurable

我们发现是设置持久化的,因为是boolean类型,默认都是false所以非持久化就不需要做处理了

3)消息持久化

消息持久化

消息实现持久化就需要把消息的投递模式进行更改

设置了队列和消息的持久化,RabbitMQ服务器重启后,消息才会存在,其他情况下,重新MQ都武器消息都会丢失

消息持久化代码

@RequestMapping("producer")
@RestController
public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("ack")public String ackPro(){String s1="ack test";Message message=new Message(s1.getBytes(StandardCharsets.UTF_8),new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack",message);return "发送成功";}
}

注意我们这里传的不是一个单独字符串了,而是一个消息 

  RabbitMQ会默认将消息视为持久化的,除非队列被声明为非持久化,或者发送消息时被标记为非持久化 

问题:

 注:如果我们将所有消息都设置为持久化会严重影响RabbitMQ的性能,写入磁盘的速度比写入内存的速度慢很多,所以我们在选择消息持久化的时候,要做一个衡量

如果我们将交换机,队列,消息都设置了持久化后,就能保证百分百不丢失数据了吗?

答案是错误的

1.在持久化消息存入RabbitMQ时,还有一段时间,消息是在缓存上的还没有写入硬盘,因为RabbitMQ并不会为每条消息都同步存盘,因为这样会严重影响性能,所以会有缓存,等待数据一起写入硬盘。如果在这是MQ重启了,消息还没有写入硬盘,那这些消息就会丢失

2.消费者设置自动确认,然后消费者接收消息后宕机,此时消息也会丢失,此时我们只需要设置为手动确认即可

第一个问题的解决方案我们下一篇博客在讲

  

 

 

 

相关文章:

RabbitMQ---消息确认和持久化

(一)消息确认 1.概念 生产者发送消息后,到达消费端会有以下情况: 1.消息处理成功 2.消息处理异常 如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时…...

《鸿蒙Next旅游应用:人工智能赋能个性化与智能导览新体验》

随着鸿蒙Next的推出,旅游应用迎来了全新的发展机遇,借助人工智能技术能为用户带来更出色的个性化推荐和智能导览服务。 鸿蒙Next与人工智能融合优势 鸿蒙Next拥有强大的分布式能力和原生智能体验。其能打破设备界限,实现多设备协同&#xf…...

微信小程序获取当前页面路径,登录成功后重定向回原页面

🤵 作者:coderYYY 🧑 个人简介:前端程序媛,目前主攻web前端,后端辅助,其他技术知识也会偶尔分享🍀欢迎和我一起交流!🚀(评论和私信一般会回&#…...

【9.2】Golang后端开发系列--Gin路由定义与实战使用

文章目录 一、Gin 框架路由的基本定义方式1. 简单路由创建2. 路由参数3. 查询参数 二、商业大项目中的路由定义和服务调用1. 路由模块化2. 路由组和中间件3. 中间件的使用4. 服务层调用5. 错误处理6. 版本控制7. 路由注册 一、Gin 框架路由的基本定义方式 1. 简单路由创建 使…...

【微信小程序】let和const-综合实训

let 和 const 都是用于声明变量的关键字,它们与传统的 var 关键字相比,有很多不同之处。 let 声明块级作用域变量,可再赋值;const 声明块级作用域常量,不可再赋值。 以下是它们的详细介绍: 一、基本概念…...

图匹配算法(涵盖近似图匹配)

【图数据管理与挖掘-第四讲(子)图匹配算法(涵盖近似图匹配) 北京大学2021暑期-邹磊教授】https://www.bilibili.com/video/BV1zh411q7PW?vd_source7c2b5de7032bf3907543a7675013ce3a 图同构: 定义: 给定…...

java线程——Thread

java线程——Thread 基本步骤示例优劣总结 继承Thread类是Java中实现多线程的一种方式。使用时创建一个新的类,该类继承自java.lang.Thread,并重写其run()方法,在方法中定义线程执行的任务逻辑。 基本步骤 1、创建一个子类:定义一…...

MySQL8.0新特性

第十八章_MySQL8.0新特性 1.新特性概述 1. 数据库管理和存储 1.1 数据字典 特性: MySQL 8.0 使用统一的数据字典存储元数据(如表、列、索引等),并将其存储在 InnoDB 表中。 优点 : 提升性能:减少对文件系统的依赖。 提高一致…...

Oracle EBS GL定期盘存WIP日记账无法过账数据修复

系统环境 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.6 问题症状 用户反映来源为“定期盘存”和类别为“WIP”的日记账无法过账,标准日记账的界面上的过账按钮灰色不可用。但是,在超级用户职责下,该日记账又可以过账,细心检查发现该业务实体下有二个公司段值15100和…...

【绝对无坑】Mongodb获取集合的字段以及数据类型信息

Mongodb获取集合的字段以及数据类型信息 感觉很LOW的一个数据仓工具seatunel,竟然不能自动读取mongodb的表结构信息,需要手工创建。 然鹅,本人对mongodb也是新手,很多操作也不知所措,作为一个DBA,始终还是…...

【Git版本控制器--1】Git的基本操作--本地仓库

目录 初识git 本地仓库 认识工作区、暂存区、版本库 add操作与commit操作 master文件与commit id 修改文件 版本回退 撤销修改 删除文件 初识git Git 是一个分布式版本控制系统,主要用于跟踪文件的更改,特别是在软件开发中。 为什么要版本…...

C++并发编程之无锁数据结构及其优缺点

在C并发编程中,无锁数据结构(Lock-free Data Structures)是指那些在实现中不使用互斥锁(如std::mutex)来保证线程安全的数据结构。相反,它们利用原子操作和内存模型来确保多线程环境下的正确性和高效性。下…...

Ubuntu上,ffmpeg如何使用cuda硬件解码、编码、转码加速

本文使用 Ubuntu 环境。Ubuntu 直接使用 APT 安装的就支持 CUDA 加速。本文使用这样下载的版本进行演示,你自己编译或者其他源的版本可能会不同。 ffmpeg 的一些介绍,以及 macOS 版本的 ffmpeg 硬件加速请见《macOS上如何安装(不需要编译安装…...

rclone,云存储备份和迁移的瑞士军刀,千字常文解析,附下载链接和安装操作步骤...

一、什么是rclone? rclone是一个命令行程序,全称:rsync for cloud storage。是用于将文件和目录同步到云存储提供商的工具。因其支持多种云存储服务的备份,如Google Drive、Amazon S3、Dropbox、Backblaze B2、One Drive、Swift、…...

Ubuntu | 系统软件安装系列指导说明

文章目录 Ubuntu 系统软件安装系列指导说明工具系列1. Docker 与 Docker-Compose部署与安装 环境系列1. Golang部署与安装 数据库系列1. PostgreSQL17.2源码部署与安装 Ubuntu 系统软件安装系列指导说明 工具系列 1. Docker 与 Docker-Compose部署与安装 链接 环境系列 1…...

队列(算法十三)

简介 几乎没有单纯之考察队列的&#xff0c;队列一般只作为一个辅助工具 队列常服务于BFS queue接口 1.N叉树的层序遍历 link: 思路&#xff1a; 队列 层序遍历即可 code /* // Definition for a Node. class Node { public:int val;vector<Node*> children;Node()…...

vLLM私有化部署大语言模型LLM

目录 一、vLLM介绍 二、安装vLLM 1、安装环境 2、安装步骤 三、运行vLLM 1、运行方式 2、切换模型下载源 3、运行本地已下载模型 四、通过http访问vLLM 一、vLLM介绍 vLLM&#xff08;官方网址&#xff1a;https://www.vllm.ai&#xff09;是一种用于大规模语言模型&#x…...

OpenAI Whisper:语音识别技术的革新者—深入架构与参数

当下语音识别技术正以前所未有的速度发展&#xff0c;极大地推动了人机交互的便利性和效率。OpenAI的Whisper系统无疑是这一领域的佼佼者&#xff0c;它凭借其卓越的性能、广泛的适用性和创新的技术架构&#xff0c;正在重新定义语音转文本技术的规则。今天我们一起了解一下Whi…...

基于当前最前沿的前端(Vue3 + Vite + Antdv)和后台(Spring boot)实现的低代码开发平台

项目是一个基于当前最前沿的前端技术栈&#xff08;Vue3 Vite Ant Design Vue&#xff0c;简称Antdv&#xff09;和后台技术栈&#xff08;Spring Boot&#xff09;实现的低代码开发平台。以下是对该项目的详细介绍&#xff1a; 一、项目概述 项目名称&#xff1a;lowcode-s…...

【Rust】错误处理机制

目录 思维导图 引言 一、错误处理的重要性 1.1 软件中的错误普遍存在 1.2 编译时错误处理要求 二、错误的分类 2.1 可恢复错误&#xff08;Recoverable Errors&#xff09; 2.2 不可恢复错误&#xff08;Unrecoverable Errors&#xff09; 三、Rust 的错误处理机制 3…...

synchronized 学习

学习源&#xff1a; https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖&#xff0c;也要考虑性能问题&#xff08;场景&#xff09; 2.常见面试问题&#xff1a; sync出…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

【JavaEE】-- HTTP

1. HTTP是什么&#xff1f; HTTP&#xff08;全称为"超文本传输协议"&#xff09;是一种应用非常广泛的应用层协议&#xff0c;HTTP是基于TCP协议的一种应用层协议。 应用层协议&#xff1a;是计算机网络协议栈中最高层的协议&#xff0c;它定义了运行在不同主机上…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)

一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解&#xff0c;适合用作学习或写简历项目背景说明。 &#x1f9e0; 一、概念简介&#xff1a;Solidity 合约开发 Solidity 是一种专门为 以太坊&#xff08;Ethereum&#xff09;平台编写智能合约的高级编…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)

在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

NPOI Excel用OLE对象的形式插入文件附件以及插入图片

static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?

Pod IP 的本质与特性 Pod IP 的定位 纯端点地址&#xff1a;Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址&#xff08;如 10.244.1.2&#xff09;无特殊名称&#xff1a;在 Kubernetes 中&#xff0c;它通常被称为 “Pod IP” 或 “容器 IP”生命周期&#xff1a;与 Pod …...