【RocketMQ】RocketMq之IndexFile深入研究
一:RocketMq 整体文件存储介绍
- CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。
- ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
- IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。
这篇文章主要介绍IndexFile的研究,以rocketmq5.3.0版本作为研究。
二:IndexFile的文件结构
文件整理格式,如下图2-1所示

图2-1 IndexFile 文件结构图
IndexFile 文件格式
-
文件名:以时间戳命名(例如
20240301120000000),表示该文件索引的消息的时间范围。 -
文件大小:默认为
400MB,可通过maxIndexSize配置调整。 -
存储路径:默认在
~/store/index目录下。
每个 IndexFile 文件由三部分组成:
1. 文件头部(Header)
2. 哈希槽(Hash Slot)区域
3. 索引条目(Index Entry)区域
1. 文件头部(Header)
| 字段名 | 长度(字节) | 说明 |
|---|---|---|
| beginTimestamp | 8 | 索引文件覆盖的最小时间戳(消息存储时间) |
| endTimestamp | 8 | 索引文件覆盖的最大时间戳(消息存储时间) |
| beginPhyOffset | 8 | 索引文件对应的最小物理偏移量(CommitLog 中的起始位置) |
| endPhyOffset | 8 | 索引文件对应的最大物理偏移量(CommitLog 中的结束位置) |
| hashSlotCount | 4 | 哈希槽数量(固定为 5,000,000) |
| indexCount | 4 | 当前已写入的索引条目数量 |
2. 哈希槽(Hash Slot)区域
-
哈希槽数量:固定为 500 万个(5,000,000),每个哈希槽占 4 字节。
-
哈希函数:对消息的 Key(如
UNIQ_KEY或KEYS)进行哈希计算,得到槽位索引:
slotPos = abs(hash(key)) % 5000000
每个哈希槽存储的是 索引条目区域 的起始位置(索引条目链表的头节点)。
3. 索引条目(Index Entry)区域
每个索引条目占 20 字节,包含以下字段:
| 字段名 | 长度(字节) | 说明 |
|---|---|---|
| keyHash | 4 | 消息 Key 的哈希值(用于快速比对) |
| phyOffset | 8 | 消息在 CommitLog 中的物理偏移量 |
| timeDiff | 4 | 消息存储时间与文件头部 |
| slotValue | 4 | 下一个索引条目的位置(用于解决哈希冲突的链表结构) |
三:IndexFile 写入和查询流程
IndexFile 写入流程:
+---------------------+
| Producer 发送消息 |
+---------------------+|v
+---------------------+
| 提取消息的 Key | --> 如 UNIQ_KEY 或 KEYS 属性
+---------------------+|v
+---------------------+
| 检查 IndexFile 容量 | --> 是否已满?(indexCount >= indexNum)
+---------------------+| 是v
+---------------------+
| 返回 false,写入失败 |
+---------------------+| 否v
+---------------------+
| 计算 Key 的哈希值 | --> `keyHash = indexKeyHashMethod(key)`
+---------------------+|v
+---------------------+
| 计算哈希槽位置 | --> `slotPos = keyHash % hashSlotNum`
+---------------------+|v
+---------------------+
| 计算哈希槽绝对位置 | --> `absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize`
+---------------------+|v
+---------------------+
| 读取哈希槽的当前值 | --> `slotValue = mappedByteBuffer.getInt(absSlotPos)`
+---------------------+|v
+---------------------+
| 校验 slotValue 有效性 | --> 是否无效?(slotValue <= invalidIndex || slotValue > indexCount)
+---------------------+| 是v
+---------------------+
| 将 slotValue 设为无效 | --> `slotValue = invalidIndex`
+---------------------+| 否v
+---------------------+
| 计算时间差 (timeDiff) | --> `timeDiff = (storeTimestamp - beginTimestamp) / 1000`
+---------------------+|v
+---------------------+
| 处理 timeDiff 边界值 | --> 确保 `0 <= timeDiff <= Integer.MAX_VALUE`
+---------------------+|v
+---------------------+
| 计算索引条目绝对位置 | --> `absIndexPos = IndexHeader.INDEX_HEADER_SIZE + hashSlotNum * hashSlotSize + indexCount * indexSize`
+---------------------+|v
+---------------------+
| 写入索引条目内容 |
| - keyHash |
| - phyOffset |
| - timeDiff |
| - slotValue (nextIndex)|
+---------------------+|v
+---------------------+
| 更新哈希槽指向新条目 | --> `mappedByteBuffer.putInt(absSlotPos, indexCount)`
+---------------------+|v
+---------------------+
| 更新 IndexFile 头部信息 |
| - 若 indexCount <= 1,更新 beginPhyOffset 和 beginTimestamp |
| - 若 slotValue 无效,增加 hashSlotCount |
| - 增加 indexCount |
| - 更新 endPhyOffset 和 endTimestamp |
+---------------------+|v
+---------------------+
| 返回 true,写入成功 |
+---------------------+|v
+---------------------+
| IndexFile 是否已满? | -- 是 --> 创建新 IndexFile
| (文件大小 ≥ 400MB) |
+---------------------+
源码入口:org.apache.rocketmq.store.index.IndexFile#putKey
IndexFile 查询流程:
+---------------------+
| Consumer 根据 Key 查询 |
+---------------------+|v
+---------------------+
| 计算 Key 的哈希值 | --> `keyHash = Math.abs(key.hashCode())`
+---------------------+|v
+---------------------+
| 计算哈希槽位置 | --> `slotPos = keyHash % 5,000,000`
+---------------------+|v
+---------------------+
| 读取哈希槽的链表头位置 | --> `slotValue = mappedByteBuffer.getInt(slotPos * 4)`
+---------------------+|v
+---------------------+
| 遍历链表条目 |
| while (slotValue > 0)|
+---------------------+|v
+---------------------+
| 读取索引条目: |
| - keyHashRead |
| - phyOffset |
| - timeDiff |
| - nextIndex |
+---------------------+|v
+---------------------+
| 检查时间范围是否匹配? | --> `storeTime = beginTimestamp + timeDiff * 1000`
| (storeTime ∈ [begin, end]?)|
+---------------------+| 否|------------------> 跳过,继续下一个条目| 是v
+---------------------+
| 比对 keyHashRead 和 keyHash |
| (是否相等?) |
+---------------------+| 否|------------------> 跳过,继续下一个条目| 是v
+---------------------+
| 从 CommitLog 读取实际 Key |
| (检查 Key 是否一致?) |
+---------------------+| 否|------------------> 跳过,继续下一个条目| 是v
+---------------------+
| 返回 phyOffset | --> 添加到结果列表
+---------------------+|v
+---------------------+
| slotValue = nextIndex| --> 继续遍历下一个条目
+---------------------+|v
+---------------------+
| 遍历结束,返回结果列表 |
+---------------------+
源码入口:org.apache.rocketmq.store.index.IndexService#queryOffset
四:IndexFile解决hash冲突问题思想
RocketMQ 的 IndexFile 通过 链地址法(Chaining) 解决哈希冲突问题,其核心思想是将哈希到同一槽位的多个索引条目组织成链表结构,并通过哈希槽(Hash Slot)与索引条目(Index Entry)的关联实现高效写入和查询。以下是具体实现思想及关键设计:
1. 哈希冲突的背景
-
哈希冲突:不同 Key 经过哈希函数计算后可能得到相同的哈希值,导致被分配到同一个哈希槽。
-
问题:若不处理冲突,后续 Key 的索引会覆盖已有数据,导致查询结果错误。
2. 解决冲突的核心思想:链地址法
RocketMQ 的 IndexFile 采用 单链表 结构管理同一哈希槽下的所有冲突条目,具体流程如下:
(1) 写入时的链表插入
-
新条目插入链表头部:
当新 Key 的哈希值与某槽位已有条目冲突时,新条目会被插入链表头部,并更新哈希槽指针指向新条目。// 新条目的 nextIndex 指向原头节点 this.mappedByteBuffer.putInt(absIndexPos + 16, slotValue); // 更新哈希槽指针为新条目位置 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());-
优势:插入时间复杂度为 O(1),无需遍历链表。
-
(2) 查询时的链表遍历
-
遍历链表比对 Key:
查询时,从哈希槽指向的链表头节点开始,依次遍历所有条目,通过两次比对(哈希值 + 实际 Key)过滤冲突。while (nextIndexToRead > 0) {// 1. 读取条目内容int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);// 2. 比对哈希值if (keyHashRead == keyHash) {// 3. 从 CommitLog 读取实际 Key 比对String keyStored = readKeyFromCommitLog(phyOffsetRead);if (key.equals(keyStored)) {phyOffsets.add(phyOffsetRead);}}// 4. 移动到下一个节点nextIndexToRead = prevIndexRead; }
3. 关键设计优化
(1) 哈希槽数量固定
-
默认 500 万个哈希槽:
private static final int HASH_SLOT_NUM = 5000000; // 默认槽数-
目的:通过大量槽位减少哈希冲突的概率,使冲突链表尽可能短。
-
权衡:槽数过多会占用更多内存,但查询效率更高。
-
(2) 时间范围过滤
-
索引条目存储时间差(timeDiff):
每个索引条目记录消息存储时间与 IndexFile 起始时间的差值(秒级),查询时快速过滤掉不满足时间范围的条目。long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff * 1000L; if (timeRead < begin || timeRead > end) {continue; // 跳过不符合时间条件的条目 }-
优势:减少无效条目的遍历,提升查询性能。
-
(3) 文件滚动(Rolling)
-
按时间或大小滚动:
IndexFile 文件默认大小上限为 400MB,或时间跨度超过阈值时,创建新文件。-
目的:避免单个文件过大导致链表过长,同时支持按时间范围快速定位文件。
-
4. 示例场景
写入冲突场景
-
Key1: Ea#20231001123456 → 哈希值 19583063 → 槽位 18332292
-
Key2: FB#20231001123456 → 哈希值 19583063 → 槽位 18332292(冲突)
-
处理流程:
-
Key1 写入槽位 18332292,链表头指向 Key1。
-
Key2 写入时,插入链表头部,槽位指针更新为 Key2,Key2 的
nextIndex指向 Key1。
-
查询冲突场景
-
查询 Key: Ea#20231001123456
-
哈希计算定位到槽位 18332292。
-
遍历链表:
-
先读取 Key2(哈希值匹配但 Key 不匹配,跳过)。
-
再读取 Key1(哈希值 + Key 均匹配,返回
phyOffset)。
-
-
hash冲突代码调试示例
public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Message msg = new Message("Ea", "TagA" , ("消息1").getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setKeys("20231001123456");producer.sendOneway(msg);Message msg2 = new Message("FB", "TagA" , ("消息3").getBytes(RemotingHelper.DEFAULT_CHARSET));msg2.setKeys("20231001123456");producer.sendOneway(msg2);producer.shutdown();}
相关文章:
【RocketMQ】RocketMq之IndexFile深入研究
一:RocketMq 整体文件存储介绍 存储⽂件主要分为三个部分: CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。 ConsumerQue…...
小白零基础--CPP多线程
进程 进程就是运行中的程序线程进程中的进程 1、C11 Thread线程库基础 #include <iostream> #include <thread> #include<string>void printthread(std::string msg){std::cout<<msg<<std::endl;for (int i 0; i < 1000; i){std::cout<…...
利用deepseek参与软件测试 基本架构如何 又该在什么环节接入deepseek
利用DeepSeek参与软件测试,可以考虑以下基本架构和接入环节: ### 基本架构 - **数据层** - **测试数据存储**:用于存放各种测试数据,包括正常输入数据、边界值数据、异常数据等,这些数据可以作为DeepSeek的输入&…...
大模型微调技术总结及使用GPU对VisualGLM-6B进行高效微调
1. 概述 在深度学习中,微调(Fine-tuning)是一种重要的技术,用于改进预训练模型的性能。在预训练模型的基础上,针对特定任务(如文本分类、机器翻译、情感分析等),使用相对较小的有监…...
WPF进阶 | WPF 样式与模板:打造个性化用户界面的利器
WPF进阶 | WPF 样式与模板:打造个性化用户界面的利器 一、前言二、WPF 样式基础2.1 什么是样式2.2 样式的定义2.3 样式的应用 三、WPF 模板基础3.1 什么是模板3.2 控件模板3.3 数据模板 四、样式与模板的高级应用4.1 样式继承4.2 模板绑定4.3 资源字典 五、实际应用…...
Java 大视界 -- Java 大数据在自动驾驶中的数据处理与决策支持(68)
💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…...
自动化构建-make/Makefile 【Linux基础开发工具】
文章目录 一、背景二、Makefile编译过程三、变量四、变量赋值1、""是最普通的等号2、“:” 表示直接赋值3、“?” 表示如果该变量没有被赋值,4、""和写代码是一样的, 五、预定义变量六、函数**通配符** 七、伪目标 .PHONY八、其他常…...
python学opencv|读取图像(五十二)使用cv.matchTemplate()函数实现最佳图像匹配
【1】引言 前序学习了图像的常规读取和基本按位操作技巧,相关文章包括且不限于: python学opencv|读取图像-CSDN博客 python学opencv|读取图像(四十九)原理探究:使用cv2.bitwise()系列函数实现图像按位运算-CSDN博客…...
通信方式、点对点通信、集合通信
文章目录 从硬件PCIE、NVLINK、RDMA原理到通信NCCL、MPI原理!通信实现方式:机器内通信、机器间通信通信实现方式:通讯协调通信实现方式:机器内通信:PCIe通信实现方式:机器内通信:NVLink通信实现…...
TCP编程
1.socket函数 int socket(int domain, int type, int protocol); 头文件:include<sys/types.h>,include<sys/socket.h> 参数 int domain AF_INET: IPv4 Internet protocols AF_INET6: IPv6 Internet protocols AF_UNIX, AF_LOCAL : Local…...
OpenAI 实战进阶教程 - 第七节: 与数据库集成 - 生成 SQL 查询与优化
内容目标 学习如何使用 OpenAI 辅助生成和优化多表 SQL 查询了解如何获取数据库结构信息并与 OpenAI 结合使用 实操步骤 1. 创建 SQLite 数据库示例 创建数据库及表结构: import sqlite3# 连接 SQLite 数据库(如果不存在则创建) conn sq…...
Apache Iceberg数据湖技术在海量实时数据处理、实时特征工程和模型训练的应用技术方案和具体实施步骤及代码
Apache Iceberg在处理海量实时数据、支持实时特征工程和模型训练方面的强大能力。Iceberg支持实时特征工程和模型训练,特别适用于需要处理海量实时数据的机器学习工作流。 Iceberg作为数据湖,以支持其机器学习平台中的特征存储。Iceberg的分层结构、快照…...
QT交叉编译环境搭建(Cmake和qmake)
介绍一共有两种方法(基于qmake和cmake): 1.直接调用虚拟机中的交叉编译工具编译 2.在QT中新建编译套件kits camke和qmake的区别:CMake 和 qmake 都是自动化构建工具,用于简化构建过程,管理编译设置&…...
Turing Complete-成对的麻烦
这一关是4个输入,当输入中1的个数大于等于2时,输出1。 那么首先用个与门来检测4个输入中,1的个数是否大于等于2,当大于等于2时,至少会有一个与门输出1,所以再用两级或门讲6个与门的输出取或,得…...
寒假刷题Day20
一、80. 删除有序数组中的重复项 II class Solution { public:int removeDuplicates(vector<int>& nums) {int n nums.size();int stackSize 2;for(int i 2; i < n; i){if(nums[i] ! nums[stackSize - 2]){nums[stackSize] nums[i];}}return min(stackSize, …...
deepseek 本地化部署和小模型微调
安装ollama 因为本人gpu卡的机器系统是centos 7, 直接使用ollama会报 所以ollama使用镜像方式进行部署, 拉取镜像ollama/ollama 启动命令 docker run -d --privileged -v ollama:/root/.ollama -p 11434:11434 --name ollama ollama/ollama 查看ollama 是否启动…...
【Java异步编程】基于任务类型创建不同的线程池
文章目录 一. 按照任务类型对线程池进行分类1. IO密集型任务的线程数2. CPU密集型任务的线程数3. 混合型任务的线程数 二. 线程数越多越好吗三. Redis 单线程的高效性 使用线程池的好处主要有以下三点: 降低资源消耗:线程是稀缺资源,如果无限…...
makailio-alias_db模块详解
ALIAS_DB 模块 作者 Daniel-Constantin Mierla micondagmail.com Elena-Ramona Modroiu ramonaasipto.com 编辑 Daniel-Constantin Mierla micondagmail.com 版权 © 2005 Voice Sistem SRL © 2008 asipto.com 目录 管理员指南 概述依赖 2.1 Kamailio 模块 2.2 外…...
文字显示省略号
多行文本溢出显示省略号...
[LeetCode] 字符串完整版 — 双指针法 | KMP
字符串 基础知识双指针法344# 反转字符串541# 反转字符串II54K 替换数字151# 反转字符串中的单词55K 右旋字符串 KMP 字符串匹配算法28# 找出字符串中第一个匹配项的下标#459 重复的子字符串 基础知识 字符串的结尾:空终止字符00 char* name "hello"; …...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
大数据零基础学习day1之环境准备和大数据初步理解
学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 (1)设置网关 打开VMware虚拟机,点击编辑…...
深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装
以下是基于 vant-ui(适配 Vue2 版本 )实现截图中照片上传预览、删除功能,并封装成可复用组件的完整代码,包含样式和逻辑实现,可直接在 Vue2 项目中使用: 1. 封装的图片上传组件 ImageUploader.vue <te…...
华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
python执行测试用例,allure报乱码且未成功生成报告
allure执行测试用例时显示乱码:‘allure’ �����ڲ����ⲿ���Ҳ���ǿ�&am…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
人工智能--安全大模型训练计划:基于Fine-tuning + LLM Agent
安全大模型训练计划:基于Fine-tuning LLM Agent 1. 构建高质量安全数据集 目标:为安全大模型创建高质量、去偏、符合伦理的训练数据集,涵盖安全相关任务(如有害内容检测、隐私保护、道德推理等)。 1.1 数据收集 描…...
