【RocketMQ】RocketMq之ConsumeQueue深入研究
目录
一:RocketMq 整体文件存储介绍
二:ConsumeQueue 的文件结构
三:ConsumeQueue 写入和查询流程
一:RocketMq 整体文件存储介绍
存储⽂件主要分为三个部分:
CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。
ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。
这篇文章主要介绍ConsumeQueue的研究,以rocketmq5.3.0版本作为研究。
二:ConsumeQueue 的文件结构
ConsumeQueue 的文件格式:
每个 ConsumeQueue 条目占用 20 字节,包含以下三个字段:
字段名 | 长度(字节) | 说明 |
---|---|---|
CommitLog Offset | 8 | 消息在 CommitLog 文件中的物理偏移量。 |
Message Length | 4 | 消息的长度。 |
Tag HashCode | 8 | 消息 Tag 的哈希值,用于快速查找具有相同 Tag 的消息。 |
这种固定长度的设计使得 ConsumeQueue 文件可以像数组一样随机访问,极大地提高了读取性能
三:ConsumeQueue 写入和查询流程
1. ConsumeQueue 写入流程图
+---------------------+
| 消息写入 CommitLog |
+---------------------+|v
+---------------------+
| 触发 Reput 操作 |
| - ReputMessageService |
+---------------------+|v
+---------------------+
| 获取 ConsumeQueue |
| - 根据 Topic 和 QueueId |
+---------------------+|v
+---------------------+
| 构建索引条目 |
| - CommitLog Offset |
| - Message Length |
| - Tag HashCode |
+---------------------+|v
+---------------------+
| 写入 ConsumeQueue 文件 |
| - 每个条目 20 字节 |
| - 文件大小 30 万条目 |
+---------------------+|v
+---------------------+
| 刷盘操作 |
| - 定期刷盘到磁盘 |
+---------------------+
写入流程
-
消息写入 CommitLog:
-
Broker 接收到消息后,将其顺序写入 CommitLog 文件
-
-
触发 Reput 操作:
-
Broker 中的
ReputMessageService
线程异步将 CommitLog 中的消息重新构建到 ConsumeQueue 文件
-
-
获取 ConsumeQueue:
-
根据消息的 Topic 和 QueueId,从
consumeQueueTable
中获取对应的 ConsumeQueue。如果不存在,则创建一个新的 ConsumeQueue
-
-
构建索引条目:
-
为每条消息构建一个索引条目,包含以下信息:
-
CommitLog Offset:消息在 CommitLog 中的物理偏移量。
-
Message Length:消息的长度。
-
Tag HashCode:消息 Tag 的哈希值
-
-
-
写入 ConsumeQueue 文件:
-
将索引条目写入 ConsumeQueue 文件。每个条目占用 20 字节,文件大小固定为 30 万个条目
-
-
刷盘操作:
-
定期将 ConsumeQueue 文件中的数据刷盘,确保数据持久化
-
consumequeue写入代码:org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
2. ConsumeQueue 查询流程图
+---------------------+
| 消费者拉取消息 |
| - Topic, QueueId |
| - 起始逻辑偏移量 |
+---------------------+|v
+---------------------+
| 查询 ConsumeQueue |
| - 根据逻辑偏移量计算 |
| ConsumeQueue 文件位置 |
+---------------------+|v
+---------------------+
| 读取索引条目 |
| - 获取 CommitLog Offset |
| - 获取 Message Length |
| - 获取 Tag HashCode |
+---------------------+|v
+---------------------+
| 定位 CommitLog 文件 |
| - 根据物理偏移量计算 |
| CommitLog 文件位置 |
+---------------------+|v
+---------------------+
| 从 CommitLog 读取消息 |
| - 根据物理偏移量读取 |
| - 校验消息完整性 |
+---------------------+|v
+---------------------+
| 返回消息给消费者 |
+---------------------+
查询流程
-
消费者拉取消息:
-
消费者指定 Topic、QueueId 和起始逻辑偏移量,向 Broker 发起拉取消息请求
-
-
查询 ConsumeQueue:
-
Broker 根据逻辑偏移量计算 ConsumeQueue 文件的位置,读取对应的索引条目
-
-
读取索引条目:
-
从 ConsumeQueue 文件中读取索引条目,获取消息在 CommitLog 中的物理偏移量、消息长度和 Tag 哈希值
-
-
定位 CommitLog 文件:
-
根据物理偏移量计算 CommitLog 文件的位置,读取对应的消息内容
-
-
从 CommitLog 读取消息:
-
从 CommitLog 文件中读取消息内容,并校验消息的完整性
-
-
返回消息给消费者:
-
Broker 将读取到的消息内容返回给消费者
-
查询代码入口: org.apache.rocketmq.store.DefaultMessageStore#getMessage(java.lang.String, java.lang.String, int, long, int, int, org.apache.rocketmq.store.MessageFilter)
相关文章:

【RocketMQ】RocketMq之ConsumeQueue深入研究
目录 一:RocketMq 整体文件存储介绍 二:ConsumeQueue 的文件结构 三:ConsumeQueue 写入和查询流程 一:RocketMq 整体文件存储介绍 存储⽂件主要分为三个部分: CommitLog:存储消息的元数据。所有消息都会…...

如今物联网的快速发展对hmi的更新有哪些积极影响
一、功能更加丰富 物联网的快速发展使得 HMI(人机界面)能够连接更多的设备和系统,从而实现更加丰富的功能。例如,通过与传感器网络的连接,HMI 可以实时显示设备的运行状态、环境参数等信息,为用户提供更加…...

linux 性能60秒分析
linux 60秒分析 需要运行的工具是 1、uptime 2、dmesg | tail 3、vmstat 1 4、mpstat -P ALL 1 5、pidstat 1 6、iostat -xz 1 7、free -m 8、sar -n DEV 1 9、sar -n TCP,ETCP 1 10、topuptime 快速检查平均负载 [rootaaaaaa ~]# uptime15:17:20 up 3 days, 14 min, 7 us…...

Redisson全面解析:从使用方法到工作原理的深度探索
文章目录 写在文章开头详解Redisson基本数据类型基础配置字符串操作列表操作映射集阻塞队列延迟队列更多关于Redisson详解Redisson 中的原子类详解redisson中的发布订阅模型小结参考写在文章开头 Redisson是基于原生redis操作指令上进一步的封装,屏蔽了redis数据结构的实现细…...

neo4j-解决导入数据后出现:Database ‘xxxx‘ is unavailable. Run :sysinfo for more info.
目录 问题描述 解决方法 重新导入 问题描述 最近在linux上部署了neo4j,参照之前写的博客:neo4j-数据的导出和导入_neo4j数据导入导出-CSDN博客 进行了数据导出、导入操作。但是在进行导入后,重新登录网页版neo4j,发现对应的数据库状态变…...

51单片机之引脚图(详解)
8051单片机引脚分类与功能笔记 1. 电源引脚 VCC(第40脚):接入5V电源,为单片机提供工作电压。GND(第20脚):接地端,确保电路的电位参考点。 2.时钟引脚 XTAL1(第19脚&a…...

Hangfire.NET:.NET任务调度
引言:为何选择 Hangfire? 在开发.NET 应用程序时,我们常常会遇到这样的场景:应用程序需要定期发送报告,像财务报表,每日业务数据汇总报告等,这些报告需要定时生成并发送给相关人员;…...

深入解析:React 事件处理的秘密与高效实践
在 React 中,事件处理是构建交互式应用的核心。本文将带你深入探索 React 事件处理的机制、最佳实践以及如何避免常见陷阱,助你写出更高效、更健壮的代码。 1. React 事件处理的独特之处 合成事件(SyntheticEvent) React 使用合…...

开源像素字体,可用于独立游戏开发
方舟像素字体 / Ark Pixel Font 开源的泛中日韩像素字体,使用 SIL 开放字体许可证 第1.1版(SIL Open Font License 1.1)授权。 支持 10、12 和 16 像素尺寸。 支持语言特殊字形:中文-中国大陆、中文-香港特别行政区、中文-台湾…...

【论文阅读】Comment on the Security of “VOSA“
Comment on the Security of Verifiable and Oblivious Secure Aggregation for Privacy-Preserving Federated Learning -- 关于隐私保护联邦中可验证与遗忘的安全聚合的安全性 论文来源摘要Introduction回顾 VOSA 方案对VOSA不可伪造性的攻击对于类型 I 的攻击对于类型 II 的…...

了解传输层TCP协议
目录 一、TCP协议段格式 二、TCP原理 1.确认应答 2.超时重传 3.连接管理 建立连接 断开连接 4.滑动窗口 5.流量控制 6.拥塞控制 7.延时应答 8.捎带应答 9.面向字节流 10.TCP异常情况 TCP,即Transmission Control Protocol,传输控制协议。人如…...

flask实现用户名查重,重复的用户名阻止注册,以及如何优化
在 Flask 中实现用户名查重,并阻止重复的用户名进行注册,可以使用数据库(如 SQLite、MySQL、PostgreSQL)存储用户信息,并在注册时检查用户名是否已存在。以下是实现步骤: 1. 安装 Flask 及 SQLAlchemy 确保…...

ASP.NET Core对JWT的封装
目录 JWT封装 [Authorize]的注意事项 JWT封装 NuGet 库 |Microsoft.AspNetCore.Authentication.JwtBearer 9.0.1https://www.nuget.org/packages/Microsoft.AspNetCore.Authentication.JwtBearerhttps://www.nuget.org/packages/Microsoft.AspNetCore.Authentication.JwtBea…...

wordpressAI工具,已接入Deepseek 支持自动生成文章、生成图片、生成长尾关键词、前端AI窗口互动、批量采集等
基于关键词或现有内容生成SEO优化的文章,支持多种AI服务(如OpenAI、百度文心一言、智谱AI等),并提供定时任务、内容采集、关键词生成等功能。 核心功能 文章生成 关键词生成:根据输入的关键词生成高质量文章。 内容…...

Ollama部署 DeepSeek-R1:70B 模型的详细步骤
1. 确认环境准备 (1) 硬件要求 显存需求:70B 参数的模型需要大量显存。若使用 NVIDIA T4(16GB 显存),需多卡并行(如 8 卡)或开启量化(如 q4_0、q8_0)。内存需求:建议至…...

PAT乙级( 1009 说反话 1010 一元多项式求导)C语言版本超详细解析
1009 说反话 给定一句英语,要求你编写程序,将句中所有单词的顺序颠倒输出。 输入格式: 测试输入包含一个测试用例,在一行内给出总长度不超过 80的字符串。字符串由若干单词和若干空格组成,其中单词是由英文字母&#x…...

学习笔记十九:K8S生成pod过程
K8S生成pod过程 流程图具体生成过程用户提交 Pod 定义API Server 处理请求调度器分配节点(Scheduling)目标节点上的 Pod 创建网络配置状态上报与监控控制器管理(Controller Manager)就绪与服务发现 关键错误场景高级特性 流程图 具…...

Qwen2-VL:增强视觉语言模型对世界任意分辨率的感知能力
1、摘要 Qwen2-VL系列是Qwen-VL模型的高级升级版本,它重新定义了传统视觉处理中预设分辨率的方法。Qwen2-VL引入了Naive Dynamic Resolution机制,使模型能够动态处理不同分辨率的图像,并将其转换为不同数量的视觉标记。这种机制使模型能够生…...

原神新版本角色牌上新 七圣召唤增添新玩法
在原神这款游戏中,5.4版本更新后七圣召唤玩法将新增2张角色牌和对应天赋牌、3张行动牌,并进行部分卡牌平衡调整,今天就给大家介绍一下。 一、角色牌【基尼奇】 1.元素战技:选一个我方角色,自身附属钩索链接并进入夜魂…...

Spring 中的 事务 隔离级别以及传播行为
1. 事务隔离级别(Isolation Level) 事务隔离级别定义了事务在并发环境下的行为,主要解决以下问题: 脏读(Dirty Read):一个事务读取了另一个未提交事务的数据。 不可重复读(Non-Re…...

为多个GitHub账户配置SSH密钥
背景 当需要同时使用多个GitHub账户(例如工作和个人账户)时,默认的SSH配置可能导致冲突。本文介绍如何通过生成不同的SSH密钥对并配置SSH客户端来管理多个账户。 操作步骤 生成SSH密钥对 为每个GitHub账户生成独立的密钥对,并指…...

OSPF基础(3):区域划分
OSPF的区域划分 1、区域产生背景 路由器在同一个区域中泛洪LSA。为了确保每台路由器都拥有对网络拓扑的一致认知,LSDB需要在区域内进行同步。OSPF域如果仅有一个区域,随着网络规模越来越大,OSPF路由器的数量越来越多,这将导致诸…...

android studio无痛入门
在Android Studio中创建和管理项目主要涉及以下几个步骤: 1. 创建新项目 打开Android Studio,点击“Start a new Android Studio project”或者“File” > “New” > “New Project”。 选择一个模板,例如“Empty Activity”࿰…...

免费windows pdf编辑工具Epdf
Epdf(完全免费) 作者:不染心 时间:2025/2/6 Github: https://github.com/dog-tired/Epdf Epdf Epdf 是一款使用 Rust 编写的 PDF 编辑器,目前仍在开发中。它提供了一系列实用的命令行选项,方便用户对 PDF …...

CNN 卷积神经网络处理图片任务 | PyTorch 深度学习实战
前一篇文章,学习率调整策略 | PyTorch 深度学习实战 本系列文章 GitHub Repo: https://github.com/hailiang-wang/pytorch-get-started CNN 卷积神经网络 CNN什么是卷积工作原理深度学习的卷积运算提取特征不同特征核的效果比较卷积核感受野共享权重池化 示例源码 …...

LeetCode 128: 最长连续序列
LeetCode 128: 最长连续序列 题目: 给定一个未排序的整数数组 nums ,找出数字连续的最长序列(不要求序列元素在原数组中连续)的长度。 请你设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例 1: 输入:nums […...

大语言模型需要的可观测性数据的关联方式
可观测性数据的关联方式及其优缺点 随着现代分布式架构和微服务的普及,可观测性(Observability)已经成为确保系统健康、排查故障、优化性能的重要组成部分。有效的可观测性数据关联方式不仅能够帮助我们实时监控系统的运行状态,还…...

【韩顺平linux】部分上课笔记整理
整理一下一些韩顺平老师上课时候的笔记 课程:【小白入门 通俗易懂】韩顺平 一周学会Linux linux环境:使用阿里云服务器 笔记参考 : [学习笔记]2021韩顺平一周学会Linux 一、自定义函数 基本语法 应用实例: 计算两个参数的和…...

python调用pc的语音借口
先安装: pip install pyttsx3再运行: import pyttsx3 # 初始化语音引擎 def init_engine():engine pyttsx3.init()# 设置中文语音voices engine.getProperty(voices)for voice in voices:if chinese in voice.name.lower():engine.setProperty(voice…...

【Golang学习之旅】Golang 内存管理与 GC 机制详解
文章目录 前言1. Go 语言的内存管理的简述2. Golang 内存管理机制2.1 Go 语言的内存分配模型2.2 Go 变量分配示例2.3 Go 语言的内存池(sync.Pool) 3. Golang 垃圾回收(GC)机制详解3.1 Go 的 GC 机制概述3.2 GC 触发条件3.3 手动触…...