当前位置: 首页 > news >正文

【RocketMQ】RocketMq之ConsumeQueue深入研究

目录

一:RocketMq 整体文件存储介绍

二:ConsumeQueue 的文件结构

三:ConsumeQueue 写入和查询流程


一:RocketMq 整体文件存储介绍

存储⽂件主要分为三个部分:

    CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。
    ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
    IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。

这篇文章主要介绍ConsumeQueue的研究,以rocketmq5.3.0版本作为研究。


二:ConsumeQueue 的文件结构

ConsumeQueue 的文件格式:

每个 ConsumeQueue 条目占用 20 字节,包含以下三个字段:

字段名长度(字节)说明
CommitLog Offset8消息在 CommitLog 文件中的物理偏移量。
Message Length4消息的长度。
Tag HashCode8消息 Tag 的哈希值,用于快速查找具有相同 Tag 的消息。

这种固定长度的设计使得 ConsumeQueue 文件可以像数组一样随机访问,极大地提高了读取性能


三:ConsumeQueue 写入和查询流程

1. ConsumeQueue 写入流程图

+---------------------+
| 消息写入 CommitLog   |
+---------------------+|v
+---------------------+
| 触发 Reput 操作       |
| - ReputMessageService |
+---------------------+|v
+---------------------+
| 获取 ConsumeQueue    |
| - 根据 Topic 和 QueueId  |
+---------------------+|v
+---------------------+
| 构建索引条目         |
| - CommitLog Offset   |
| - Message Length     |
| - Tag HashCode       |
+---------------------+|v
+---------------------+
| 写入 ConsumeQueue 文件 |
| - 每个条目 20 字节     |
| - 文件大小 30 万条目   |
+---------------------+|v
+---------------------+
| 刷盘操作             |
| - 定期刷盘到磁盘     |
+---------------------+

写入流程

  1. 消息写入 CommitLog

    • Broker 接收到消息后,将其顺序写入 CommitLog 文件

  • 触发 Reput 操作

    • Broker 中的 ReputMessageService 线程异步将 CommitLog 中的消息重新构建到 ConsumeQueue 文件

  • 获取 ConsumeQueue

    • 根据消息的 Topic 和 QueueId,从 consumeQueueTable 中获取对应的 ConsumeQueue。如果不存在,则创建一个新的 ConsumeQueue

  • 构建索引条目

    • 为每条消息构建一个索引条目,包含以下信息:

      • CommitLog Offset:消息在 CommitLog 中的物理偏移量。

      • Message Length:消息的长度。

      • Tag HashCode:消息 Tag 的哈希值

  • 写入 ConsumeQueue 文件

    • 将索引条目写入 ConsumeQueue 文件。每个条目占用 20 字节,文件大小固定为 30 万个条目

  • 刷盘操作

    • 定期将 ConsumeQueue 文件中的数据刷盘,确保数据持久化

consumequeue写入代码:org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo


2. ConsumeQueue 查询流程图

+---------------------+
| 消费者拉取消息       |
| - Topic, QueueId     |
| - 起始逻辑偏移量     |
+---------------------+|v
+---------------------+
| 查询 ConsumeQueue    |
| - 根据逻辑偏移量计算 |
|   ConsumeQueue 文件位置 |
+---------------------+|v
+---------------------+
| 读取索引条目         |
| - 获取 CommitLog Offset |
| - 获取 Message Length |
| - 获取 Tag HashCode   |
+---------------------+|v
+---------------------+
| 定位 CommitLog 文件   |
| - 根据物理偏移量计算  |
|   CommitLog 文件位置   |
+---------------------+|v
+---------------------+
| 从 CommitLog 读取消息 |
| - 根据物理偏移量读取  |
| - 校验消息完整性      |
+---------------------+|v
+---------------------+
| 返回消息给消费者     |
+---------------------+

查询流程

  1. 消费者拉取消息

    • 消费者指定 Topic、QueueId 和起始逻辑偏移量,向 Broker 发起拉取消息请求

  • 查询 ConsumeQueue

    • Broker 根据逻辑偏移量计算 ConsumeQueue 文件的位置,读取对应的索引条目

  • 读取索引条目

    • 从 ConsumeQueue 文件中读取索引条目,获取消息在 CommitLog 中的物理偏移量、消息长度和 Tag 哈希值

  • 定位 CommitLog 文件

    • 根据物理偏移量计算 CommitLog 文件的位置,读取对应的消息内容

  • 从 CommitLog 读取消息

    • 从 CommitLog 文件中读取消息内容,并校验消息的完整性

  • 返回消息给消费者

    • Broker 将读取到的消息内容返回给消费者

查询代码入口: org.apache.rocketmq.store.DefaultMessageStore#getMessage(java.lang.String, java.lang.String, int, long, int, int, org.apache.rocketmq.store.MessageFilter)

相关文章:

【RocketMQ】RocketMq之ConsumeQueue深入研究

目录 一:RocketMq 整体文件存储介绍 二:ConsumeQueue 的文件结构 三:ConsumeQueue 写入和查询流程 一:RocketMq 整体文件存储介绍 存储⽂件主要分为三个部分: CommitLog:存储消息的元数据。所有消息都会…...

如今物联网的快速发展对hmi的更新有哪些积极影响

一、功能更加丰富 物联网的快速发展使得 HMI(人机界面)能够连接更多的设备和系统,从而实现更加丰富的功能。例如,通过与传感器网络的连接,HMI 可以实时显示设备的运行状态、环境参数等信息,为用户提供更加…...

linux 性能60秒分析

linux 60秒分析 需要运行的工具是 1、uptime 2、dmesg | tail 3、vmstat 1 4、mpstat -P ALL 1 5、pidstat 1 6、iostat -xz 1 7、free -m 8、sar -n DEV 1 9、sar -n TCP,ETCP 1 10、topuptime 快速检查平均负载 [rootaaaaaa ~]# uptime15:17:20 up 3 days, 14 min, 7 us…...

Redisson全面解析:从使用方法到工作原理的深度探索

文章目录 写在文章开头详解Redisson基本数据类型基础配置字符串操作列表操作映射集阻塞队列延迟队列更多关于Redisson详解Redisson 中的原子类详解redisson中的发布订阅模型小结参考写在文章开头 Redisson是基于原生redis操作指令上进一步的封装,屏蔽了redis数据结构的实现细…...

neo4j-解决导入数据后出现:Database ‘xxxx‘ is unavailable. Run :sysinfo for more info.

目录 问题描述 解决方法 重新导入 问题描述 最近在linux上部署了neo4j,参照之前写的博客:neo4j-数据的导出和导入_neo4j数据导入导出-CSDN博客 进行了数据导出、导入操作。但是在进行导入后,重新登录网页版neo4j,发现对应的数据库状态变…...

51单片机之引脚图(详解)

8051单片机引脚分类与功能笔记 1. 电源引脚 VCC(第40脚):接入5V电源,为单片机提供工作电压。GND(第20脚):接地端,确保电路的电位参考点。 2.时钟引脚 XTAL1(第19脚&a…...

Hangfire.NET:.NET任务调度

引言:为何选择 Hangfire? 在开发.NET 应用程序时,我们常常会遇到这样的场景:应用程序需要定期发送报告,像财务报表,每日业务数据汇总报告等,这些报告需要定时生成并发送给相关人员;…...

深入解析:React 事件处理的秘密与高效实践

在 React 中,事件处理是构建交互式应用的核心。本文将带你深入探索 React 事件处理的机制、最佳实践以及如何避免常见陷阱,助你写出更高效、更健壮的代码。 1. React 事件处理的独特之处 合成事件(SyntheticEvent) React 使用合…...

开源像素字体,可用于独立游戏开发

方舟像素字体 / Ark Pixel Font 开源的泛中日韩像素字体,使用 SIL 开放字体许可证 第1.1版(SIL Open Font License 1.1)授权。 支持 10、12 和 16 像素尺寸。 支持语言特殊字形:中文-中国大陆、中文-香港特别行政区、中文-台湾…...

【论文阅读】Comment on the Security of “VOSA“

Comment on the Security of Verifiable and Oblivious Secure Aggregation for Privacy-Preserving Federated Learning -- 关于隐私保护联邦中可验证与遗忘的安全聚合的安全性 论文来源摘要Introduction回顾 VOSA 方案对VOSA不可伪造性的攻击对于类型 I 的攻击对于类型 II 的…...

了解传输层TCP协议

目录 一、TCP协议段格式 二、TCP原理 1.确认应答 2.超时重传 3.连接管理 建立连接 断开连接 4.滑动窗口 5.流量控制 6.拥塞控制 7.延时应答 8.捎带应答 9.面向字节流 10.TCP异常情况 TCP,即Transmission Control Protocol,传输控制协议。人如…...

flask实现用户名查重,重复的用户名阻止注册,以及如何优化

在 Flask 中实现用户名查重,并阻止重复的用户名进行注册,可以使用数据库(如 SQLite、MySQL、PostgreSQL)存储用户信息,并在注册时检查用户名是否已存在。以下是实现步骤: 1. 安装 Flask 及 SQLAlchemy 确保…...

ASP.NET Core对JWT的封装

目录 JWT封装 [Authorize]的注意事项 JWT封装 NuGet 库 |Microsoft.AspNetCore.Authentication.JwtBearer 9.0.1https://www.nuget.org/packages/Microsoft.AspNetCore.Authentication.JwtBearerhttps://www.nuget.org/packages/Microsoft.AspNetCore.Authentication.JwtBea…...

wordpressAI工具,已接入Deepseek 支持自动生成文章、生成图片、生成长尾关键词、前端AI窗口互动、批量采集等

基于关键词或现有内容生成SEO优化的文章,支持多种AI服务(如OpenAI、百度文心一言、智谱AI等),并提供定时任务、内容采集、关键词生成等功能。 核心功能 文章生成 关键词生成:根据输入的关键词生成高质量文章。 内容…...

Ollama部署 DeepSeek-R1:70B 模型的详细步骤

1. 确认环境准备 (1) 硬件要求 显存需求:70B 参数的模型需要大量显存。若使用 NVIDIA T4(16GB 显存),需多卡并行(如 8 卡)或开启量化(如 q4_0、q8_0)。内存需求:建议至…...

PAT乙级( 1009 说反话 1010 一元多项式求导)C语言版本超详细解析

1009 说反话 给定一句英语,要求你编写程序,将句中所有单词的顺序颠倒输出。 输入格式: 测试输入包含一个测试用例,在一行内给出总长度不超过 80的字符串。字符串由若干单词和若干空格组成,其中单词是由英文字母&#x…...

学习笔记十九:K8S生成pod过程

K8S生成pod过程 流程图具体生成过程用户提交 Pod 定义API Server 处理请求调度器分配节点(Scheduling)目标节点上的 Pod 创建网络配置状态上报与监控控制器管理(Controller Manager)就绪与服务发现 关键错误场景高级特性 流程图 具…...

Qwen2-VL:增强视觉语言模型对世界任意分辨率的感知能力

1、摘要 Qwen2-VL系列是Qwen-VL模型的高级升级版本,它重新定义了传统视觉处理中预设分辨率的方法。Qwen2-VL引入了Naive Dynamic Resolution机制,使模型能够动态处理不同分辨率的图像,并将其转换为不同数量的视觉标记。这种机制使模型能够生…...

原神新版本角色牌上新 七圣召唤增添新玩法

在原神这款游戏中,5.4版本更新后七圣召唤玩法将新增2张角色牌和对应天赋牌、3张行动牌,并进行部分卡牌平衡调整,今天就给大家介绍一下。 一、角色牌【基尼奇】 1.元素战技:选一个我方角色,自身附属钩索链接并进入夜魂…...

Spring 中的 事务 隔离级别以及传播行为

1. 事务隔离级别(Isolation Level) 事务隔离级别定义了事务在并发环境下的行为,主要解决以下问题: 脏读(Dirty Read):一个事务读取了另一个未提交事务的数据。 不可重复读(Non-Re…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...

PHP和Node.js哪个更爽?

先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...

Admin.Net中的消息通信SignalR解释

定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异&#xff…...

linux 下常用变更-8

1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

SpringCloudGateway 自定义局部过滤器

场景: 将所有请求转化为同一路径请求(方便穿网配置)在请求头内标识原来路径,然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

重启Eureka集群中的节点,对已经注册的服务有什么影响

先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

10-Oracle 23 ai Vector Search 概述和参数

一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI,使用客户端或是内部自己搭建集成大模型的终端,加速与大型语言模型(LLM)的结合,同时使用检索增强生成(Retrieval Augmented Generation &#…...