【RocketMQ】RocketMq之ConsumeQueue深入研究
目录
一:RocketMq 整体文件存储介绍
二:ConsumeQueue 的文件结构
三:ConsumeQueue 写入和查询流程
一:RocketMq 整体文件存储介绍
存储⽂件主要分为三个部分:
CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。
ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。
这篇文章主要介绍ConsumeQueue的研究,以rocketmq5.3.0版本作为研究。
二:ConsumeQueue 的文件结构
ConsumeQueue 的文件格式:

每个 ConsumeQueue 条目占用 20 字节,包含以下三个字段:
| 字段名 | 长度(字节) | 说明 |
|---|---|---|
| CommitLog Offset | 8 | 消息在 CommitLog 文件中的物理偏移量。 |
| Message Length | 4 | 消息的长度。 |
| Tag HashCode | 8 | 消息 Tag 的哈希值,用于快速查找具有相同 Tag 的消息。 |
这种固定长度的设计使得 ConsumeQueue 文件可以像数组一样随机访问,极大地提高了读取性能
三:ConsumeQueue 写入和查询流程
1. ConsumeQueue 写入流程图
+---------------------+
| 消息写入 CommitLog |
+---------------------+|v
+---------------------+
| 触发 Reput 操作 |
| - ReputMessageService |
+---------------------+|v
+---------------------+
| 获取 ConsumeQueue |
| - 根据 Topic 和 QueueId |
+---------------------+|v
+---------------------+
| 构建索引条目 |
| - CommitLog Offset |
| - Message Length |
| - Tag HashCode |
+---------------------+|v
+---------------------+
| 写入 ConsumeQueue 文件 |
| - 每个条目 20 字节 |
| - 文件大小 30 万条目 |
+---------------------+|v
+---------------------+
| 刷盘操作 |
| - 定期刷盘到磁盘 |
+---------------------+
写入流程
-
消息写入 CommitLog:
-
Broker 接收到消息后,将其顺序写入 CommitLog 文件
-
-
触发 Reput 操作:
-
Broker 中的
ReputMessageService线程异步将 CommitLog 中的消息重新构建到 ConsumeQueue 文件
-
-
获取 ConsumeQueue:
-
根据消息的 Topic 和 QueueId,从
consumeQueueTable中获取对应的 ConsumeQueue。如果不存在,则创建一个新的 ConsumeQueue
-
-
构建索引条目:
-
为每条消息构建一个索引条目,包含以下信息:
-
CommitLog Offset:消息在 CommitLog 中的物理偏移量。
-
Message Length:消息的长度。
-
Tag HashCode:消息 Tag 的哈希值
-
-
-
写入 ConsumeQueue 文件:
-
将索引条目写入 ConsumeQueue 文件。每个条目占用 20 字节,文件大小固定为 30 万个条目
-
-
刷盘操作:
-
定期将 ConsumeQueue 文件中的数据刷盘,确保数据持久化
-
consumequeue写入代码:org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
2. ConsumeQueue 查询流程图
+---------------------+
| 消费者拉取消息 |
| - Topic, QueueId |
| - 起始逻辑偏移量 |
+---------------------+|v
+---------------------+
| 查询 ConsumeQueue |
| - 根据逻辑偏移量计算 |
| ConsumeQueue 文件位置 |
+---------------------+|v
+---------------------+
| 读取索引条目 |
| - 获取 CommitLog Offset |
| - 获取 Message Length |
| - 获取 Tag HashCode |
+---------------------+|v
+---------------------+
| 定位 CommitLog 文件 |
| - 根据物理偏移量计算 |
| CommitLog 文件位置 |
+---------------------+|v
+---------------------+
| 从 CommitLog 读取消息 |
| - 根据物理偏移量读取 |
| - 校验消息完整性 |
+---------------------+|v
+---------------------+
| 返回消息给消费者 |
+---------------------+
查询流程
-
消费者拉取消息:
-
消费者指定 Topic、QueueId 和起始逻辑偏移量,向 Broker 发起拉取消息请求
-
-
查询 ConsumeQueue:
-
Broker 根据逻辑偏移量计算 ConsumeQueue 文件的位置,读取对应的索引条目
-
-
读取索引条目:
-
从 ConsumeQueue 文件中读取索引条目,获取消息在 CommitLog 中的物理偏移量、消息长度和 Tag 哈希值
-
-
定位 CommitLog 文件:
-
根据物理偏移量计算 CommitLog 文件的位置,读取对应的消息内容
-
-
从 CommitLog 读取消息:
-
从 CommitLog 文件中读取消息内容,并校验消息的完整性
-
-
返回消息给消费者:
-
Broker 将读取到的消息内容返回给消费者
-
查询代码入口: org.apache.rocketmq.store.DefaultMessageStore#getMessage(java.lang.String, java.lang.String, int, long, int, int, org.apache.rocketmq.store.MessageFilter)
相关文章:
【RocketMQ】RocketMq之ConsumeQueue深入研究
目录 一:RocketMq 整体文件存储介绍 二:ConsumeQueue 的文件结构 三:ConsumeQueue 写入和查询流程 一:RocketMq 整体文件存储介绍 存储⽂件主要分为三个部分: CommitLog:存储消息的元数据。所有消息都会…...
如今物联网的快速发展对hmi的更新有哪些积极影响
一、功能更加丰富 物联网的快速发展使得 HMI(人机界面)能够连接更多的设备和系统,从而实现更加丰富的功能。例如,通过与传感器网络的连接,HMI 可以实时显示设备的运行状态、环境参数等信息,为用户提供更加…...
linux 性能60秒分析
linux 60秒分析 需要运行的工具是 1、uptime 2、dmesg | tail 3、vmstat 1 4、mpstat -P ALL 1 5、pidstat 1 6、iostat -xz 1 7、free -m 8、sar -n DEV 1 9、sar -n TCP,ETCP 1 10、topuptime 快速检查平均负载 [rootaaaaaa ~]# uptime15:17:20 up 3 days, 14 min, 7 us…...
Redisson全面解析:从使用方法到工作原理的深度探索
文章目录 写在文章开头详解Redisson基本数据类型基础配置字符串操作列表操作映射集阻塞队列延迟队列更多关于Redisson详解Redisson 中的原子类详解redisson中的发布订阅模型小结参考写在文章开头 Redisson是基于原生redis操作指令上进一步的封装,屏蔽了redis数据结构的实现细…...
neo4j-解决导入数据后出现:Database ‘xxxx‘ is unavailable. Run :sysinfo for more info.
目录 问题描述 解决方法 重新导入 问题描述 最近在linux上部署了neo4j,参照之前写的博客:neo4j-数据的导出和导入_neo4j数据导入导出-CSDN博客 进行了数据导出、导入操作。但是在进行导入后,重新登录网页版neo4j,发现对应的数据库状态变…...
51单片机之引脚图(详解)
8051单片机引脚分类与功能笔记 1. 电源引脚 VCC(第40脚):接入5V电源,为单片机提供工作电压。GND(第20脚):接地端,确保电路的电位参考点。 2.时钟引脚 XTAL1(第19脚&a…...
Hangfire.NET:.NET任务调度
引言:为何选择 Hangfire? 在开发.NET 应用程序时,我们常常会遇到这样的场景:应用程序需要定期发送报告,像财务报表,每日业务数据汇总报告等,这些报告需要定时生成并发送给相关人员;…...
深入解析:React 事件处理的秘密与高效实践
在 React 中,事件处理是构建交互式应用的核心。本文将带你深入探索 React 事件处理的机制、最佳实践以及如何避免常见陷阱,助你写出更高效、更健壮的代码。 1. React 事件处理的独特之处 合成事件(SyntheticEvent) React 使用合…...
开源像素字体,可用于独立游戏开发
方舟像素字体 / Ark Pixel Font 开源的泛中日韩像素字体,使用 SIL 开放字体许可证 第1.1版(SIL Open Font License 1.1)授权。 支持 10、12 和 16 像素尺寸。 支持语言特殊字形:中文-中国大陆、中文-香港特别行政区、中文-台湾…...
【论文阅读】Comment on the Security of “VOSA“
Comment on the Security of Verifiable and Oblivious Secure Aggregation for Privacy-Preserving Federated Learning -- 关于隐私保护联邦中可验证与遗忘的安全聚合的安全性 论文来源摘要Introduction回顾 VOSA 方案对VOSA不可伪造性的攻击对于类型 I 的攻击对于类型 II 的…...
了解传输层TCP协议
目录 一、TCP协议段格式 二、TCP原理 1.确认应答 2.超时重传 3.连接管理 建立连接 断开连接 4.滑动窗口 5.流量控制 6.拥塞控制 7.延时应答 8.捎带应答 9.面向字节流 10.TCP异常情况 TCP,即Transmission Control Protocol,传输控制协议。人如…...
flask实现用户名查重,重复的用户名阻止注册,以及如何优化
在 Flask 中实现用户名查重,并阻止重复的用户名进行注册,可以使用数据库(如 SQLite、MySQL、PostgreSQL)存储用户信息,并在注册时检查用户名是否已存在。以下是实现步骤: 1. 安装 Flask 及 SQLAlchemy 确保…...
ASP.NET Core对JWT的封装
目录 JWT封装 [Authorize]的注意事项 JWT封装 NuGet 库 |Microsoft.AspNetCore.Authentication.JwtBearer 9.0.1https://www.nuget.org/packages/Microsoft.AspNetCore.Authentication.JwtBearerhttps://www.nuget.org/packages/Microsoft.AspNetCore.Authentication.JwtBea…...
wordpressAI工具,已接入Deepseek 支持自动生成文章、生成图片、生成长尾关键词、前端AI窗口互动、批量采集等
基于关键词或现有内容生成SEO优化的文章,支持多种AI服务(如OpenAI、百度文心一言、智谱AI等),并提供定时任务、内容采集、关键词生成等功能。 核心功能 文章生成 关键词生成:根据输入的关键词生成高质量文章。 内容…...
Ollama部署 DeepSeek-R1:70B 模型的详细步骤
1. 确认环境准备 (1) 硬件要求 显存需求:70B 参数的模型需要大量显存。若使用 NVIDIA T4(16GB 显存),需多卡并行(如 8 卡)或开启量化(如 q4_0、q8_0)。内存需求:建议至…...
PAT乙级( 1009 说反话 1010 一元多项式求导)C语言版本超详细解析
1009 说反话 给定一句英语,要求你编写程序,将句中所有单词的顺序颠倒输出。 输入格式: 测试输入包含一个测试用例,在一行内给出总长度不超过 80的字符串。字符串由若干单词和若干空格组成,其中单词是由英文字母&#x…...
学习笔记十九:K8S生成pod过程
K8S生成pod过程 流程图具体生成过程用户提交 Pod 定义API Server 处理请求调度器分配节点(Scheduling)目标节点上的 Pod 创建网络配置状态上报与监控控制器管理(Controller Manager)就绪与服务发现 关键错误场景高级特性 流程图 具…...
Qwen2-VL:增强视觉语言模型对世界任意分辨率的感知能力
1、摘要 Qwen2-VL系列是Qwen-VL模型的高级升级版本,它重新定义了传统视觉处理中预设分辨率的方法。Qwen2-VL引入了Naive Dynamic Resolution机制,使模型能够动态处理不同分辨率的图像,并将其转换为不同数量的视觉标记。这种机制使模型能够生…...
原神新版本角色牌上新 七圣召唤增添新玩法
在原神这款游戏中,5.4版本更新后七圣召唤玩法将新增2张角色牌和对应天赋牌、3张行动牌,并进行部分卡牌平衡调整,今天就给大家介绍一下。 一、角色牌【基尼奇】 1.元素战技:选一个我方角色,自身附属钩索链接并进入夜魂…...
Spring 中的 事务 隔离级别以及传播行为
1. 事务隔离级别(Isolation Level) 事务隔离级别定义了事务在并发环境下的行为,主要解决以下问题: 脏读(Dirty Read):一个事务读取了另一个未提交事务的数据。 不可重复读(Non-Re…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...
12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...
selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
Python 包管理器 uv 介绍
Python 包管理器 uv 全面介绍 uv 是由 Astral(热门工具 Ruff 的开发者)推出的下一代高性能 Python 包管理器和构建工具,用 Rust 编写。它旨在解决传统工具(如 pip、virtualenv、pip-tools)的性能瓶颈,同时…...
深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...
《信号与系统》第 6 章 信号与系统的时域和频域特性
目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...
DAY 45 超大力王爱学Python
来自超大力王的友情提示:在用tensordoard的时候一定一定要用绝对位置,例如:tensorboard --logdir"D:\代码\archive (1)\runs\cifar10_mlp_experiment_2" 不然读取不了数据 知识点回顾: tensorboard的发展历史和原理tens…...
