当前位置: 首页 > news >正文

【RabbitMQ 实战】10 消息持久化和存储原理

一、持久化

1.1 持久化对象

rabbitmq的持久化分为三个部分:

  • 交换器的持久化。
  • 队列的持久化。
  • 消息的持久化。

1.1.1 交换器持久化

  • 交换器的持久化是通过在声明交换器时, 指定Durability参数为durable实现的。
  • 若交换器不设置持久化,在rabbitmq服务重启之后,相关的交换器元数据会丢失,但消息不会丢失,只是不能将消息发送到这个交换器中。
    所以在声明交换器时,都要设置持久化。
  • 在web监控创建时,默认也是持久化模式,指定持久化模式带有标识“D”。
    在这里插入图片描述
    springboot监听器,实现交换器持久化示例
    在这里插入图片描述

1.1.2 队列持久化

  • 队列的持久化是通过在声明队列时, 指定Durability参数为durable实现的。
  • 若队列不设置持久化,在rabbitmq服务重启之后,相关队列的元数据和消息数据同时丢失。
  • 若队列设置持久化,只能保证队列本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将消息设置为持久化
  • 在web监控创建时,默认也是持久化模式,指定持久化模式带有标识“D”。
    在这里插入图片描述
    springboot监听器,实现队列持久化示例
    在这里插入图片描述

1.1.3 消息持久化

消息的持久化可以通过消息的投递模式来实现,属于代码层面上的。可以控制每一条消息是否久化。
但是将所有消息都设置为持久化,会严重影响rabbitmq服务器性能,写入磁盘的速度比写入内存的速度慢得不只一点点。所以对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。
在这里插入图片描述
springboot代码设置消息的持久化示例
在这里插入图片描述

1.2 总结要点

  • 交换器、队列、消息都可以设置是否持久化。交换器和队列持久化的含义是元数据持久化。消息持久化的含义是消息本身持久化。

将交换器、队列、消息都设置了持久化之后能百分之百保证数据不丢失吗?答案是不能

  • 从消费者来说,如果在订阅消费队列时将 autoAck 参数设置为 true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。这种情况很好解决,将autoAck 参数设置为 false,并进行手动确认。
  • 在持久化的消息正确存入rabbitmq之后,还需要有一段时间(虽然很短,但是不可忽视) 才能存入磁盘之中。如果在这段时间内rabbitmq服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。这种情况可以使用镜像队列来解决。

二、存储机制

前面提到的消息持久化,其实是在rabbitmq的“持久层”中完成的。不管是持久化的消息,还是非持久化的消息都可以被写入到磁盘。

  • 持久化的消息在到达队列时就入盘,而且还可以设置持久化的消息在内存中也保存一份备份,这么做可以提高业务效率,当内存吃紧时会从内存中清除。
  • 非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。

2.1 存储方式

持久层是一个逻辑上的概念,实际包含两个部分:

  • 队列索引 (rabbit_queue_index):负责维护队列中落盘消息的信息,包括消息的存储地点、消息在队列中的位置、是否已被交付给消费者、是否已被消费者 ack 等。每个队列都有与之对应的一个队列索引。
  • 消息存储(rabbit_msg_store):而消息存储是以键值对的形式存储消息,它被所有队列共享,所以在每个节点中有且只有一个。从技术层面上来说,rabbit_msg_store 具体还可以分两类:
    • msg_store_persistent :负责持久化消息的持久化,重启后消息不会丢失。
    • msg_store_transient:负责非持久化消息的持久化,重启后消息会丢失。
      我们一般说消息存储,是习惯性地将 msg_store_persistent 和 msg_store_transient 看成 rabbit_msg_store 一个整体。
I have no name!@ed73deb9f1c5:/bitnami/rabbitmq/mnesia/rabbit@stats/msg_stores/vhosts/9PIHRMVSJH6VBOR100H7141ZT$ ls -al
drwxr-xr-x. 2 1001 root   19 Oct  7 02:57 msg_store_persistent
drwxr-xr-x. 2 1001 root   19 Oct  7 02:57 msg_store_transient
  • 存在队列索引里的好处?
    性能上的优化。相比存在消息存储里,直接存在队列索引仅需进行一次写操作。而存储在消息存储中的消息则需要两次写操,先写一次索引,再写一次消息存储,因此会有一定的性能提升。
    注意事项:
  • 若消息直接存在队列索引中,则当消息通过exchange同时路由到多个队列时,此消息会被写到每个队列的索引文件中。
  • 若消息是存在消息存储中,就仅仅只有一个副本。

2.2 存储文件

  • 上面提到的消息,是包括消息体属性headers,可以直接存储在队列索引中,也可以保存在消息存储中。
  • rabbitmq启动后,会针对每个vhost会启动两个进程:msg_store_persistent和msg_store_transient,这两个进程作为服务端负责将消息写入文件,从文件读取消息。
    • msg_store_persistent负责将持久化消息写入文件与从文件中读取消息。
    • msg_store_transient负责非持久化消息写入文件与从文件中读取消息。
  • 默认存储文件位置:通过日志可以看到存储文件地址,包含queues、msg_store_persistent、msg_store_transient 这3个文件夹。如下图,我这里是指定了存储文件地址。
I have no name!@ed73deb9f1c5:/bitnami/rabbitmq/mnesia/rabbit@stats/msg_stores/vhosts/9PIHRMVSJH6VBOR100H7141ZT$ ls -al
total 16
drwxr-xr-x. 5 1001 root  125 Oct  7 02:57 .
drwxr-xr-x. 4 1001 root   72 Oct  7 01:15 ..
-rw-r--r--. 1 1001 root   83 Oct  7 01:15 .config
drwxr-xr-x. 2 1001 root   19 Oct  7 02:57 msg_store_persistent
drwxr-xr-x. 2 1001 root   19 Oct  7 02:57 msg_store_transient
drwxr-xr-x. 3 1001 root   38 Oct  7 01:18 queues
-rw-r--r--. 1 1001 root 5464 Oct  7 02:57 recovery.dets
-rw-r--r--. 1 1001 root    9 Oct  7 02:57 .vhost

上面的地址/bitnami/rabbitmq/mnesia/rabbit@stats,是队列的数据存放目录,这个在在哪里找呢,可以通过日志来查看,如下图所示:
在这里插入图片描述
日志中还显示了,9PIHRMVSJH6VBOR100H7141ZT这个目录,对应着virtual01这个vhost的目录。对于rabbitmq来说,每一个租户vhost的消息存储,都是放在不同的目录的
在这里插入图片描述

2.2.1 队列索引.idx文件

rabbit_queue_index 中以顺序(文件名从 0 开始累加) 的段文件来进行存储,后缀为“ .idx "。

每个段文件中包含定的 SEGMENT_ENTRY_COUNT 条记录,SEGMENT_ENTRY_COUNT 默认值为16384字节。
每个rabbit_queue_index 从磁盘中读取消息的时候至少要在内存中维护一个段文件,所以设置queue_index_embed_msgs_below参数指定阈值大小时要格外谨慎,一点点增大也可能会引起内存爆炸式的增长。

2.2.2 消息存储.rdq文件

经过 rabbit_msg_store 处理的所有消息都会以追加的方式写入到文件中,当一个文件的大小超过指定的限制 (file_size_lmit)后,关闭这个文件再创建一个新的文件以供新的消息写入,文件后缀是“ .rdq ”。
文件名从0开始进行累加,所以文件名最小的文件也是最老的文件。
如下所示0.rdq文件

I have no name!@ed73deb9f1c5:/bitnami/rabbitmq/mnesia/rabbit@stats/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent$ ls -al
total 0
drwxr-xr-x. 2 1001 root  19 Oct  7 02:57 .
drwxr-xr-x. 4 1001 root 111 Oct  7 02:57 ..
-rw-r--r--. 1 1001 root   0 Oct  7 02:57 0.rdq

在进行消息的存储时,rabbitmq会在ETS (Erlang Term Storage) 表中记录消息在文件中的位置映射 (Index) 和文件的相关信息 (FileSummary)。

  • 读取文件信息:
    • 在读取消息的时候,先根据消息的 ID (msg_id)找到对应存储的文件。
    • 若文件存在并且未被锁住,则直接打开文件,从指定位置读取消息的内容。
    • 若文件不存在,或被锁住,则发送请求由 rabbit_msg_store 进行处理。
  • 删除文件信息:
  • 消息的删除只是从 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。
  • 执行消息删除操作时,不会立即对在文件中的消息进行删除,先是标记为垃圾数据。
    • 若一个文件中都是垃圾数据时,则删除文件。
    • 若一个文件中存在有效数据,则触发垃圾回收机制,进行文件合并选择性删除。
  • 垃圾回收文件合并机制:
    • 当检测到前后两个文件中的有效数据可以合并在一个文件中,并且所有的垃圾数据的大小和所有文件(至少有3 个文件存在的情况下)的数据大小的比值超过设置的值 GARBAGE_ERACTION (默认值为 0.5) 时,才会触发垃圾回收将两个文件合并。

2.2.3 垃圾回收机制(文件合并)

文件合并前提:
执行合并的两个文件一定是逻辑上相邻的两个文件。
文件合并流程:

  • 第一步,执行合并时首先锁定这两个文件。
  • 第二步,先对前面文件中的有效数据进行整理。
  • 第三步,再将后面文件的有效数据写入到前面的文件。
  • 第四步,更新消息在 ETS 表中的记录。
  • 第五步,最后删除后面的文件。

2.3 存储原理

  • 从3.5.0版本开始,较小的消息是直接存储在队列索引.rdx中。
  • 较大的消息存在.rdq队列文件中

如下图所示,我发布的消息,消息比较小时,在0.idx中,即存在索引中
下面是通过查看0.idx,发现里面有消息的正文内容
在这里插入图片描述
当消息体比较大时,存放的是rdq文件时面
在这里插入图片描述

  • 在进行消息的存储时,rabbitmq会在ETS表中记录消息在文件中的映射,以及文件的相关信息。
  • 消息读取时,根据消息ID找到该消息所存储的文件,在文件中的偏移量,然后打开文件进行读取。
  • 消息的删除只是从ETC表删除指定消息的相关信息,同时更新消息对应存储的文件的相关信息(更新文件有效数据大小)。

2.3.1 生产者消息写入原理

每个队列则看成是一个客户端,当生产者发送的消息达到队列时,向服务端请求写,写入过程如下:

  • 第一步,rabbitmq启动后,针对每个vhost开启两个进程,msg_store_persistent进程和msg_store_transient进程。两进程作为服务端,每个队列作为客户端。
  • 第二步,当生产者发送消息到队列时,每个队列都会向两进程发起写入请求。
  • 第三步,两进程开始往磁盘里写入消息。
    • msg_store_persistent进程将持久化消息写入到服务器的msg_store_persistent目录下,文件名称依次为0.rdq、1.rdq、2.rdq等等。
    • msg_store_transient进程将非持久化消息写入到服务器的msg_store_transient目录下,文件名称依次为0.rdq、1.rdq、2.rdq等等。
      在这里插入图片描述

2.3.2 消费者消息读取原理

  • 第一步,消费者向队列获取消息体。
  • 第二步,队列汇聚消息ID去找落盘文件。
    • 若文件存在,且未被锁住,则直接读取文件内容,返回消息给消费者。
    • 若文件不存在,或已被锁住,则让rabbit_msg_store进程处理。
  • 第三步,队列向两进程发起请求,进程先是通过GC进程去查看文件是否被锁住,同时也会清理垃圾,进行有效数据合并。
    • 若被锁住则解锁,获取消息,返回给消费者。
    • 若清理垃圾后,发现还是没有此消息,则向rabbitmq其他节点发送询问请求。
  • 第四步,其他节点会根据消息ID挨个寻找,直至将rabbitmq集群每个节点找遍,之后返回结果给消费者。

在这里插入图片描述

相关文章:

【RabbitMQ 实战】10 消息持久化和存储原理

一、持久化 1.1 持久化对象 rabbitmq的持久化分为三个部分: 交换器的持久化。队列的持久化。消息的持久化。 1.1.1 交换器持久化 交换器的持久化是通过在声明交换器时, 指定Durability参数为durable实现的。若交换器不设置持久化,在rabb…...

vscode 连接ubuntu git下载缓慢

在ubuntu20.04下载: git clone https://github.com/introlab/rtabmap.git src/rtabmap 挂掉情况 export https_proxyhttp://10.10.10.176:7890export http_proxyhttp://10.10.10.176:7890 其中 10.10.10.176是我本机的ip地址,7890是我的代理后几位 如…...

2731. 移动机器人

2731. 移动机器人有一些机器人分布在一条无限长的数轴上,他们初始坐标用一个下标从 0 开始的整数数组 nums 表示。当你给机器人下达命令时,它们以每秒钟一单位的速度开始移动。 给你一个字符串 s ,每个字符按顺序分别表示每个机器人移动的方…...

小程序实现人脸识别功能

调用api wx.startFacialRecognitionVerify 第一步: // 修改方法expertUpdate() {wx.startFacialRecognitionVerify({name: _this.registerForm.realName, //身份证名称idCardNumber: _this.registerForm.idCard, //身份证号码checkAliveType: 1, //屏幕闪烁(人脸核验的交互…...

【】javax.crypto.IllegalBlockSizeException: Input length not multiple of 8 bytes

问题描述 jdk版本:8 用DES进行加解密,其中转换模式为“DES/CBC/NoPadding”,要加密的明文为 “密码学浅析”,执行加密操作,报如下错误 Exception in thread "main" javax.crypto.IllegalBlockSizeExcepti…...

312.戳气球

将戳气球转换到添加气球&#xff0c;记忆搜索slove(i,j)&#xff1a;在开区间(i,j)全部填满气球得到的最多硬币数&#xff0c;两端val[i]、val[j] class Solution { public:vector<vector<int>> ans;vector<int> val;int slove(int left,int right){if(left&…...

get_trade_detail_data函数使用

查阅股票持仓情况 positions get_trade_detail_data(‘8000000213’, ‘stock’, ‘position’) for dt in positions: print(f’股票代码: {dt.m_strInstrumentID}, 市场类型: {dt.m_strExchangeID}, 证券名称: {dt.m_strInstrumentName}, 持仓量: {dt.m_nVolume}, 可用数量:…...

【融合ChatGPT等AI模型】Python-GEE遥感云大数据分析、管理与可视化及多领域案例实践应用

目录 第一章 理论基础 第二章 开发环境搭建 第三章 遥感大数据处理基础与ChatGPT等AI模型交互 第四章 典型案例操作实践 第五章 输入输出及数据资产高效管理 第六章 云端数据论文出版级可视化 更多应用 随着航空、航天、近地空间等多个遥感平台的不断发展&#xff0c;近…...

LeetCode862 和至少为k的最短子数组

题目&#xff1a; 解析&#xff1a; 1、先构造前缀和数组 2、单调队列存放滑动窗口&#xff0c;目的求Sj-Si >k的情况下&#xff0c;窗口最小。 代码&#xff1a; class Solution {public int shortestSubarray(int[] nums, int k) {int n nums.length;long[] sums new …...

网卡bonding模式 - bond模式配置介绍

网卡bonding简介 网卡绑定就是把多张物理网卡通过软件虚拟成一个虚拟的网卡&#xff0c;配置完毕后&#xff0c;所有的物理网卡的ip和mac将会变成相同的。多网卡同时工作可以提高网络速度&#xff0c;还可以实现网卡的负载均衡、冗余。 bonding模式 1 round-robin(mode0) 轮转…...

做了个 chrome 插件实现 B 站视频截图功能,直接从当前视频帧无损复制

起因是看 B 站视频想截个图很麻烦&#xff0c;右下角暂停按钮无法去除&#xff0c;于是写了一行代码把暂停按钮隐藏。 后经提醒&#xff0c;发现可以通过 canvas 获取视频帧来截取图片&#xff0c;于是写了如下代码完美获取视频帧。 var v document.querySelector(".bpx…...

Docker linux 安装

sudo yum update sudo yum clean all sudo yum makecache#安装依赖 sudo yum install -y yum-utils device-mapper-persistent-data lvm2 #添加官方存储库 sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo#安装-跳过一些异常依赖…...

windows部署django服务器

windows部署django服务器 1、安装IIS1.1 控制面板-----程序----程序和功能----启用或关闭windows功能1.2安装IIS服务器&#xff0c;完成后&#xff0c;重新进入&#xff0c;把CGI安装进系统 2、安装python与虚拟环境2.1 安装python2.2 安装virtualenv虚拟环境2.3 创建一个虚拟环…...

ChatGPT prompt汇总-个人使用-持续更新....

用途 学术写作更新记录 学术写作 中译英(GPT-4) I am a researcher studying deep learning and now trying to revise my manuscript which will be submitted to the Journal of Nature . I want you to act as a scientific English-Chinese translator, I will provide yo…...

Vue实现简单的接口封装

1. 在src中创建一个api文件夹 2. 按功能、模块等新建对应的js文件 3. 在内部写对应的封装接口&#xff0c;并导出 import axios from "axios";/*** 接口名称&#xff1a;* 接收参数&#xff1a;* 返回参数&#xff1a;* */export const miens ()>{return new P…...

软件测试工具有什么作用?有哪些好用的测试工具推荐?

软件测试工具是现代软件测试中不可或缺的重要组成部分&#xff0c;指的是一系列在软件开发过程中使用的工具&#xff0c;用于帮助测试人员进行测试活动&#xff0c;提高测试效率&#xff0c;减少测试成本。选择并使用合适的软件测试工具&#xff0c;可提高软件质量和效率。 一…...

写爬虫?前端er何必用python

前言 说起网络爬虫,很多人第一时间想到python,但爬虫并非只能用python实现,虽然网上大部分爬虫文章都在说python爬虫,但对于前端程序员来说,我觉得js才是最屌的(对于简单爬取任务来说,复杂的我暂时没碰到~),下面说说我的经验(是的,仅限本人经验),希望能给各位前…...

交通物流模型 | 基于交通图卷积长短时记忆网络的网络级交通流预测

交通物流模型 | 基于交通图卷积长短时记忆网络的网络级交通流预测 由于道路网络时变的交通模式和复杂的空间依赖性,交通流预测是一个具有挑战性的时空预测问题。为了克服该挑战,作者将交通网络看为一张图,并提出一个新的深度学习预测模型,交通图卷积长短时记忆网络(TGC-L…...

web 基础和http 协议

一、域名 域名的概念 IP地址不易记忆&#xff0c;域名方便记住&#xff0c;以便于用户进行搜索访问 早期使用Hosts文件解析域名地址 缺点&#xff1a; ① 主机名称重复 ② 主机维护困难 DNS&#xff08;Domain Name System&#xff09;域名系统 ① 分布式 将一个大的数…...

Java常量与变量

Java常量与变量 在程序执行过程中&#xff0c;其值不能被改变的量称为常量&#xff0c;其值能被改变的量称为变量。 Java关键字 Java关键字 int public (公有的,可跨包) new finally throw (抛出一个异常对象) continuefloatlongshort extends (继承,用于类继承类) returnbrea…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲

文章目录 前言第一部分&#xff1a;体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分&#xff1a;体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

【笔记】WSL 中 Rust 安装与测试完整记录

#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统&#xff1a;Ubuntu 24.04 LTS (WSL2)架构&#xff1a;x86_64 (GNU/Linux)Rust 版本&#xff1a;rustc 1.87.0 (2025-05-09)Cargo 版本&#xff1a;cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...

Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成

一个面向 Java 开发者的 Sring-Ai 示例工程项目&#xff0c;该项目是一个 Spring AI 快速入门的样例工程项目&#xff0c;旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计&#xff0c;每个模块都专注于特定的功能领域&#xff0c;便于学习和…...

前端高频面试题2:浏览器/计算机网络

本专栏相关链接 前端高频面试题1&#xff1a;HTML/CSS 前端高频面试题2&#xff1a;浏览器/计算机网络 前端高频面试题3&#xff1a;JavaScript 1.什么是强缓存、协商缓存&#xff1f; 强缓存&#xff1a; 当浏览器请求资源时&#xff0c;首先检查本地缓存是否命中。如果命…...

02.运算符

目录 什么是运算符 算术运算符 1.基本四则运算符 2.增量运算符 3.自增/自减运算符 关系运算符 逻辑运算符 &&&#xff1a;逻辑与 ||&#xff1a;逻辑或 &#xff01;&#xff1a;逻辑非 短路求值 位运算符 按位与&&#xff1a; 按位或 | 按位取反~ …...

从零开始了解数据采集(二十八)——制造业数字孪生

近年来&#xff0c;我国的工业领域正经历一场前所未有的数字化变革&#xff0c;从“双碳目标”到工业互联网平台的推广&#xff0c;国家政策和市场需求共同推动了制造业的升级。在这场变革中&#xff0c;数字孪生技术成为备受关注的关键工具&#xff0c;它不仅让企业“看见”设…...