当前位置: 首页 > news >正文

RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-上篇

RocketMQ 存储基础回顾: 源码分析RocketMQ之CommitLog消息存储机制

本文主要从源码的角度分析 Rocketmq 消费队列 ConsumeQueue 物理文件的构建与存储结构,同时分析 RocketMQ 索引文件IndexFile 文件的存储原理、存储格式以及检索方式。RocketMQ 的存储机制是所有的主题消息都存储在 CommitLog 文件中,也就是消息发送是完全的顺序 IO 操作,加上利用内存文件映射机制,极大的提供的 IO 性能。消息的全量信息存放在 commitlog 文件中,并且每条消息的长度是不一样的,消息的具体存储格式如下:

如果消费者直接基于commitlog 进行消费的话,简直就是一个恶梦,因为不同的主题的消息完全顺序的存储在 commitlog 文件中,根据主题去查询消息,不得不遍历整个 commitlog 文件,显然作为一款消息中间件这是绝不允许的。RocketMQ 的ConsumeQueue 文件就是来解决消息消费的。首先我们知道,一个主题,在 broker 上可以分成多个消费对列,默认为4个,也就是消费队列是基于主题+broker。那 ConsumeQueue 中当然不会再存储全量消息了,而是存储为定长(20字节,8字节commitlog 偏移量+4字节消息长度+8字节tag hashcode),消息消费时,首先根据 commitlog offset 去 commitlog 文件组(commitlog每个文件1G,填满了,另外创建一个文件),找到消息的起始位置,然后根据消息长度,读取整条消息。但问题又来了,如果我们需要根据消息ID,来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,为了解决这个问题,rocketmq 的 index 文件又派上了用场。

接下来,本文重点关注 ConsumeQueue、Index 文件是如何基于 Commitlog 构建的,并且根据 ConsumeQueue、Index 文件如何查找消息。

根据 commitlog 文件生成 consumequeue、index 文件,主要同运作于两种情况:

1、运行中,发送端发送消息到 commitlog文件,此时如何及时传达到 consume文件、Index文件呢?

2、broker 启动时,检测 commitlog 文件与 consumequeue、index 文件中信息是否一致,如果不一致,需要根据 commitlog 文件重新恢复 consumequeue 文件和 index 文件。

1、commitlog、consumequeue、index 文件同步问题

RocketMQ 采用专门的线程来根据 comitlog offset 来将 commitlog 转发给ConsumeQueue、Index。其线程为DefaultMessageStore$ReputMessageService

1.1 核心属性

  • private volatile long reputFromOffset = 0
    reputFromOffset ,从 commitlog 开始拉取的初始偏移量。

1.2 run方法

每处理一次 doReput 方法,休眠1毫秒,基本上是马不停蹄的在转发 commitlog 中的内容到 consumequeue、index。

接下来重点查看 doReput 方法。

private void doReput() {for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);        // @1if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);    // @2 int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {if (size > 0) {DefaultMessageStore.this.doDispatch(dispatchRequest);                                                                       // @3 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}this.reputFromOffset += size;readSize += size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size == 0) {this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}

代码@1,根据 offset 从 commitlog 找到一条消息,如果找不到,退出此次循环,doReput方法跳出,此处从 commitlog 文件中取出消息的逻辑,在下文会重点分析,故在此暂时跳过。

先浏览一下 SelectMappedBufferResult

代码@2:尝试构建转发请求对象 DispatchRequest ,我大概浏览了一下 commitLog.checkMessageAndReturnSize,主要是从Nio ByteBuffer中,根据 commitlog 消息存储格式,解析出消息的核心属性:

// 消息主题
private final String topic; 
// 消息队列
private final int queueId; 
// commitlog中的偏移量
private final long commitLogOffset;
// 消息大小
private final int msgSize; // tagsCode
private final long tagsCode;
// 消息存储时间
private final long storeTimestamp; 
//消息在消费队列的offset
private final long consumeQueueOffset; 
// 存放在消息属性中的keys: PROPERTY_KEYS = "KEYS"
private final String keys; 
// 是否成功
private final boolean success; 
// 消息唯一键 "UNIQ_KEY"
private final String uniqKey; 
// 系统标志
private final int sysFlag;
// 事务pre消息偏移量
private final long preparedTransactionOffset; 
// 属性
private final Map<String, String> propertiesMap; 

代码@3:转发DistpachRequest。

根据实现类,consumequeue,index 分别对应 CommitLogDispatcherBuildConsumeQueue 与 CommitlogDispatcherBuildIndex。

相关文章:

RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-上篇

RocketMQ 存储基础回顾&#xff1a; 源码分析RocketMQ之CommitLog消息存储机制 本文主要从源码的角度分析 Rocketmq 消费队列 ConsumeQueue 物理文件的构建与存储结构&#xff0c;同时分析 RocketMQ 索引文件IndexFile 文件的存储原理、存储格式以及检索方式。RocketMQ 的存储…...

基于Java的浏览器的设计与实现毕业设计

技术&#xff1a;Java等摘要&#xff1a;当今世界是一个以计算机网络为核心的信息时代&#xff0c;互联网为人们快速获取、发布和传递信息提供了便捷&#xff0c;而浏览器作为互联网上查找信息的重要工具&#xff0c;给人们提供了巨大而又宝贵的信息财富&#xff0c;受到了大家…...

手把手教你使用vite打包自己的js代码包并推送到npm

准备 要有npm账号&#xff0c;没有的铁子去npm官网注册一个&#xff0c;又不要钱。 使用vite创建项目 一行代码搞定 npm create vite viet-demo框架选择Others 模板选择library 选择ts 这样项目就创建完了 这个项目默认有一个函数&#xff0c;用来记录按钮的点击次数并…...

Tomcat源码分析-关于tomcat热加载的一些思考

在前面的文章中&#xff0c;我们分析了 tomcat 类加载器的相关源码&#xff0c;也了解了 tomcat 支持类的热加载&#xff0c;意味着 tomcat 要涉及类的重复卸装/装载过程&#xff0c;这个过程是很敏感的&#xff0c;一旦处理不当&#xff0c;可能会引起内存泄露 卸载类 我们知…...

DataWhale 大数据处理技术组队学习task4

五、分布式并行编程模型MapReduce 1. 概述 1.1 分布式并行编程 背景&#xff1a;摩尔定律已经开始逐渐失效&#xff0c;提升数据处理计算能力刻不容缓。传统的程序开发与分布式并行编程 传统的程序开发&#xff1a;以单指令、单数据流的方式顺序执行&#xff0c;虽然这种方式…...

Oracle 12C以上统计信息收集CDB、PDB执行时间不一致问题

文章目录前言一、统计信息窗口期调查二、时区调查三、查询alert记录四、why Database Statistic Collection Job is running two times inside a Maintenance Window?五、Default Scheduler Timezone Value In PDB$SEED Different Than CDB六、总结前言 在实际工作中发现一个…...

用Python获取弹幕的两种方式(一种简单但量少,另一量大管饱)

前言 弹幕可以给观众一种“实时互动”的错觉&#xff0c;虽然不同弹幕的发送时间有所区别&#xff0c;但是其只会在视频中特定的一个时间点出现&#xff0c;因此在相同时刻发送的弹幕基本上也具有相同的主题&#xff0c;在参与评论时就会有与其他观众同时评论的错觉。 在国内…...

算法训练营 day55 动态规划 买卖股票问题系列3

算法训练营 day55 动态规划 买卖股票问题系列3 最佳买卖股票时机含冷冻期 309. 最佳买卖股票时机含冷冻期 - 力扣&#xff08;LeetCode&#xff09; 给定一个整数数组prices&#xff0c;其中第 prices[i] 表示第 i 天的股票价格 。 设计一个算法计算出最大利润。在满足以下…...

电商共享购模式,消费增值返利,app开发

在当今以市场需求为主导的数字经济时代&#xff0c;消费者需求呈现出精细化管理和多元化的特性&#xff0c;目标市场日渐完善&#xff0c;另外在大数据技术迅速进步和运用的驱动下&#xff0c;总体行业的发展节奏感也在不断加速。因而&#xff0c;企业需要建立一套灵活多变的经…...

机房信息牌系统

产品特色&#xff1a; 无线低功耗安装简单&#xff0c;快速布置易于维护墨水屏显示&#xff0c;清晰&#xff0c;更环保信息后台推送&#xff0c;远程管理多模版样式随意制作多尺寸&#xff1a;4.2寸&#xff0c;7.5寸&#xff0c;10.2寸4.2寸7.5寸10.2寸标签特性&#xff1a;…...

金测评 手感更细腻的游戏手柄,双模加持兼容更出色,雷柏V600S上手

很多朋友周末都喜欢玩玩游戏放松一下&#xff0c;在家玩游戏的时候&#xff0c;PC是大家常用的平台&#xff0c;当然了&#xff0c;玩游戏的时候用键鼠的话&#xff0c;手感难免差点意思&#xff0c;还是要手柄才能获得更好的体验。我现在用的是雷柏V600S&#xff0c;这是一款支…...

Windows10 下测试 Intel SGX 功能

文章目录参考文献系统要求一、安装Open Enclave SDK 环境&#xff08;一&#xff09;什么是Open Enclave SDK&#xff08;二&#xff09;启动SGX功能方法一&#xff1a; BIOS启动方法二&#xff1a;软件方式启动&#xff08;三&#xff09;安装必要环境&#xff08;1&#xff0…...

Tina_Linux_功耗管理_开发指南

Tina Linux 功耗管理开发指南 1 概述 1.1 编写目的 简要介绍tina 平台功耗管理机制&#xff0c;为关注功耗的开发者&#xff0c;维护者和测试者提供使用和配置参考。 1.2 适用范围 表1-1: 适用产品列表产品名称内核版本休眠类型参与功耗管理的协处理器R328Linux-4.9NormalS…...

golang编译dll失败问题解决

执行go build -buildmodec-shared -o exportgo.dll exportgo.go报类似如下错误/usr/lib/gcc/x86_64-pc-msys/9.1.0/../../../../x86_64-pc-msys/bin/ld: 找不到 -lmingwex/usr/lib/gcc/x86_64-pc-msys/9.1.0/../../../../x86_64-pc-msys/bin/ld: 找不到 -lmingw32安装tdm gcc m…...

Convolutional Neural Networks for Sentence Classification

摘要 We report on a series of experiments with convolutional neural networks (CNN) trained on top of pre-trained word vectors for sentence-level classification tasks. We show that a simple CNN with little hyperparameter tuning and static vectors achieves e…...

基于SpringBoot的共享汽车管理系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏…...

TCP三次握手

参考&#xff1a;4.1 TCP 三次握手与四次挥手面试题 | 小林coding TCP 头格式 我们先来看看 TCP 头的格式&#xff0c;标注颜色的表示与本文关联比较大的字段&#xff0c;其他字段不做详细阐述。 序列号&#xff1a;在建立连接时由计算机生成的随机数作为其初始值&#xff0c…...

未来土地利用模拟FLUS模型

未来土地利用模拟&#xff08;FutureLand-Use Simulation, FLUS&#xff09;模型1 模型简介1.1 基于ANN 的适宜性概率计算1.2 基于自适应惯性机制的元胞自动机1.3 模拟精度评价参考流域 径流变化是 自然因素和 人为因素共同作用的结果&#xff0c;其中人为因素最为直接的方式就…...

压力传感器MPX5700D/MPX5700GP/MPX5700AP产品概述、特征

MPX5700系列压阻式换能器是最先进的单片硅压力传感器&#xff0c;可广泛用于各种应用&#xff0c;特别是采用A/D输入微控制器或微处理器的应用。这一获得专利的单元件传感器集合了高级微加工技术、薄膜金属化、双极工艺&#xff0c;能够提供精确的、与所施加压力成正比的高电平…...

taobao.trades.sold.query( 根据收件人信息查询交易单号 )

&#xffe5;开放平台免费API必须用户授权聚石塔内调用 根据收件人信息查询交易单号。 公共参数 请求地址: HTTP地址 公共请求参数: 公共响应参数: 请求参数 请求示例 TaobaoClient client new DefaultTaobaoClient(url, appkey, secret); TradesSoldQueryRequest req new…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 &#xff08;部分有免费额度&#x…...

ArcGIS Pro制作水平横向图例+多级标注

今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作&#xff1a;ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等&#xff08;ArcGIS出图图例8大技巧&#xff09;&#xff0c;那这次我们看看ArcGIS Pro如何更加快捷的操作。…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

JVM 内存结构 详解

内存结构 运行时数据区&#xff1a; Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器&#xff1a; ​ 线程私有&#xff0c;程序控制流的指示器&#xff0c;分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 ​ 每个线程都有一个程序计数…...

华为OD最新机试真题-数组组成的最小数字-OD统一考试(B卷)

题目描述 给定一个整型数组,请从该数组中选择3个元素 组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述 行用半角逗号分割的字符串记录的整型数组,0<数组长度<= 100,0<整数的取值范围<= 10000。 输出描述 由3个元素组成…...

UE5 音效系统

一.音效管理 音乐一般都是WAV,创建一个背景音乐类SoudClass,一个音效类SoundClass。所有的音乐都分为这两个类。再创建一个总音乐类&#xff0c;将上述两个作为它的子类。 接着我们创建一个音乐混合类SoundMix&#xff0c;将上述三个类翻入其中&#xff0c;通过它管理每个音乐…...

2.2.2 ASPICE的需求分析

ASPICE的需求分析是汽车软件开发过程中至关重要的一环&#xff0c;它涉及到对需求进行详细分析、验证和确认&#xff0c;以确保软件产品能够满足客户和用户的需求。在ASPICE中&#xff0c;需求分析的关键步骤包括&#xff1a; 需求细化&#xff1a;将从需求收集阶段获得的高层需…...

初探用uniapp写微信小程序遇到的问题及解决(vue3+ts)

零、关于开发思路 (一)拿到工作任务,先理清楚需求 1.逻辑部分 不放过原型里说的每一句话,有疑惑的部分该问产品/测试/之前的开发就问 2.页面部分(含国际化) 整体看过需要开发页面的原型后,分类一下哪些组件/样式可以复用,直接提取出来使用 (时间充分的前提下,不…...

篇章一 论坛系统——前置知识

目录 1.软件开发 1.1 软件的生命周期 1.2 面向对象 1.3 CS、BS架构 1.CS架构​编辑 2.BS架构 1.4 软件需求 1.需求分类 2.需求获取 1.5 需求分析 1. 工作内容 1.6 面向对象分析 1.OOA的任务 2.统一建模语言UML 3. 用例模型 3.1 用例图的元素 3.2 建立用例模型 …...