RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-上篇
RocketMQ 存储基础回顾: 源码分析RocketMQ之CommitLog消息存储机制
本文主要从源码的角度分析 Rocketmq 消费队列 ConsumeQueue 物理文件的构建与存储结构,同时分析 RocketMQ 索引文件IndexFile 文件的存储原理、存储格式以及检索方式。RocketMQ 的存储机制是所有的主题消息都存储在 CommitLog 文件中,也就是消息发送是完全的顺序 IO 操作,加上利用内存文件映射机制,极大的提供的 IO 性能。消息的全量信息存放在 commitlog 文件中,并且每条消息的长度是不一样的,消息的具体存储格式如下:
如果消费者直接基于commitlog 进行消费的话,简直就是一个恶梦,因为不同的主题的消息完全顺序的存储在 commitlog 文件中,根据主题去查询消息,不得不遍历整个 commitlog 文件,显然作为一款消息中间件这是绝不允许的。RocketMQ 的ConsumeQueue 文件就是来解决消息消费的。首先我们知道,一个主题,在 broker 上可以分成多个消费对列,默认为4个,也就是消费队列是基于主题+broker。那 ConsumeQueue 中当然不会再存储全量消息了,而是存储为定长(20字节,8字节commitlog 偏移量+4字节消息长度+8字节tag hashcode),消息消费时,首先根据 commitlog offset 去 commitlog 文件组(commitlog每个文件1G,填满了,另外创建一个文件),找到消息的起始位置,然后根据消息长度,读取整条消息。但问题又来了,如果我们需要根据消息ID,来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,为了解决这个问题,rocketmq 的 index 文件又派上了用场。
接下来,本文重点关注 ConsumeQueue、Index 文件是如何基于 Commitlog 构建的,并且根据 ConsumeQueue、Index 文件如何查找消息。
根据 commitlog 文件生成 consumequeue、index 文件,主要同运作于两种情况:
1、运行中,发送端发送消息到 commitlog文件,此时如何及时传达到 consume文件、Index文件呢?
2、broker 启动时,检测 commitlog 文件与 consumequeue、index 文件中信息是否一致,如果不一致,需要根据 commitlog 文件重新恢复 consumequeue 文件和 index 文件。
1、commitlog、consumequeue、index 文件同步问题
RocketMQ 采用专门的线程来根据 comitlog offset 来将 commitlog 转发给ConsumeQueue、Index。其线程为DefaultMessageStore$ReputMessageService
1.1 核心属性
- private volatile long reputFromOffset = 0
reputFromOffset ,从 commitlog 开始拉取的初始偏移量。
1.2 run方法
每处理一次 doReput 方法,休眠1毫秒,基本上是马不停蹄的在转发 commitlog 中的内容到 consumequeue、index。
接下来重点查看 doReput 方法。
private void doReput() {for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); // @1if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // @2 int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {if (size > 0) {DefaultMessageStore.this.doDispatch(dispatchRequest); // @3 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}this.reputFromOffset += size;readSize += size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size == 0) {this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}
代码@1,根据 offset 从 commitlog 找到一条消息,如果找不到,退出此次循环,doReput方法跳出,此处从 commitlog 文件中取出消息的逻辑,在下文会重点分析,故在此暂时跳过。
先浏览一下 SelectMappedBufferResult
代码@2:尝试构建转发请求对象 DispatchRequest ,我大概浏览了一下 commitLog.checkMessageAndReturnSize,主要是从Nio ByteBuffer中,根据 commitlog 消息存储格式,解析出消息的核心属性:
// 消息主题
private final String topic;
// 消息队列
private final int queueId;
// commitlog中的偏移量
private final long commitLogOffset;
// 消息大小
private final int msgSize; // tagsCode
private final long tagsCode;
// 消息存储时间
private final long storeTimestamp;
//消息在消费队列的offset
private final long consumeQueueOffset;
// 存放在消息属性中的keys: PROPERTY_KEYS = "KEYS"
private final String keys;
// 是否成功
private final boolean success;
// 消息唯一键 "UNIQ_KEY"
private final String uniqKey;
// 系统标志
private final int sysFlag;
// 事务pre消息偏移量
private final long preparedTransactionOffset;
// 属性
private final Map<String, String> propertiesMap;
代码@3:转发DistpachRequest。
根据实现类,consumequeue,index 分别对应 CommitLogDispatcherBuildConsumeQueue 与 CommitlogDispatcherBuildIndex。
相关文章:
RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-上篇
RocketMQ 存储基础回顾: 源码分析RocketMQ之CommitLog消息存储机制 本文主要从源码的角度分析 Rocketmq 消费队列 ConsumeQueue 物理文件的构建与存储结构,同时分析 RocketMQ 索引文件IndexFile 文件的存储原理、存储格式以及检索方式。RocketMQ 的存储…...
基于Java的浏览器的设计与实现毕业设计
技术:Java等摘要:当今世界是一个以计算机网络为核心的信息时代,互联网为人们快速获取、发布和传递信息提供了便捷,而浏览器作为互联网上查找信息的重要工具,给人们提供了巨大而又宝贵的信息财富,受到了大家…...
手把手教你使用vite打包自己的js代码包并推送到npm
准备 要有npm账号,没有的铁子去npm官网注册一个,又不要钱。 使用vite创建项目 一行代码搞定 npm create vite viet-demo框架选择Others 模板选择library 选择ts 这样项目就创建完了 这个项目默认有一个函数,用来记录按钮的点击次数并…...
Tomcat源码分析-关于tomcat热加载的一些思考
在前面的文章中,我们分析了 tomcat 类加载器的相关源码,也了解了 tomcat 支持类的热加载,意味着 tomcat 要涉及类的重复卸装/装载过程,这个过程是很敏感的,一旦处理不当,可能会引起内存泄露 卸载类 我们知…...
DataWhale 大数据处理技术组队学习task4
五、分布式并行编程模型MapReduce 1. 概述 1.1 分布式并行编程 背景:摩尔定律已经开始逐渐失效,提升数据处理计算能力刻不容缓。传统的程序开发与分布式并行编程 传统的程序开发:以单指令、单数据流的方式顺序执行,虽然这种方式…...
Oracle 12C以上统计信息收集CDB、PDB执行时间不一致问题
文章目录前言一、统计信息窗口期调查二、时区调查三、查询alert记录四、why Database Statistic Collection Job is running two times inside a Maintenance Window?五、Default Scheduler Timezone Value In PDB$SEED Different Than CDB六、总结前言 在实际工作中发现一个…...
用Python获取弹幕的两种方式(一种简单但量少,另一量大管饱)
前言 弹幕可以给观众一种“实时互动”的错觉,虽然不同弹幕的发送时间有所区别,但是其只会在视频中特定的一个时间点出现,因此在相同时刻发送的弹幕基本上也具有相同的主题,在参与评论时就会有与其他观众同时评论的错觉。 在国内…...
算法训练营 day55 动态规划 买卖股票问题系列3
算法训练营 day55 动态规划 买卖股票问题系列3 最佳买卖股票时机含冷冻期 309. 最佳买卖股票时机含冷冻期 - 力扣(LeetCode) 给定一个整数数组prices,其中第 prices[i] 表示第 i 天的股票价格 。 设计一个算法计算出最大利润。在满足以下…...
电商共享购模式,消费增值返利,app开发
在当今以市场需求为主导的数字经济时代,消费者需求呈现出精细化管理和多元化的特性,目标市场日渐完善,另外在大数据技术迅速进步和运用的驱动下,总体行业的发展节奏感也在不断加速。因而,企业需要建立一套灵活多变的经…...
机房信息牌系统
产品特色: 无线低功耗安装简单,快速布置易于维护墨水屏显示,清晰,更环保信息后台推送,远程管理多模版样式随意制作多尺寸:4.2寸,7.5寸,10.2寸4.2寸7.5寸10.2寸标签特性:…...
金测评 手感更细腻的游戏手柄,双模加持兼容更出色,雷柏V600S上手
很多朋友周末都喜欢玩玩游戏放松一下,在家玩游戏的时候,PC是大家常用的平台,当然了,玩游戏的时候用键鼠的话,手感难免差点意思,还是要手柄才能获得更好的体验。我现在用的是雷柏V600S,这是一款支…...
Windows10 下测试 Intel SGX 功能
文章目录参考文献系统要求一、安装Open Enclave SDK 环境(一)什么是Open Enclave SDK(二)启动SGX功能方法一: BIOS启动方法二:软件方式启动(三)安装必要环境(1࿰…...
Tina_Linux_功耗管理_开发指南
Tina Linux 功耗管理开发指南 1 概述 1.1 编写目的 简要介绍tina 平台功耗管理机制,为关注功耗的开发者,维护者和测试者提供使用和配置参考。 1.2 适用范围 表1-1: 适用产品列表产品名称内核版本休眠类型参与功耗管理的协处理器R328Linux-4.9NormalS…...
golang编译dll失败问题解决
执行go build -buildmodec-shared -o exportgo.dll exportgo.go报类似如下错误/usr/lib/gcc/x86_64-pc-msys/9.1.0/../../../../x86_64-pc-msys/bin/ld: 找不到 -lmingwex/usr/lib/gcc/x86_64-pc-msys/9.1.0/../../../../x86_64-pc-msys/bin/ld: 找不到 -lmingw32安装tdm gcc m…...
Convolutional Neural Networks for Sentence Classification
摘要 We report on a series of experiments with convolutional neural networks (CNN) trained on top of pre-trained word vectors for sentence-level classification tasks. We show that a simple CNN with little hyperparameter tuning and static vectors achieves e…...
基于SpringBoot的共享汽车管理系统
文末获取源码 开发语言:Java 框架:springboot JDK版本:JDK1.8 服务器:tomcat7 数据库:mysql 5.7/8.0 数据库工具:Navicat11 开发软件:eclipse/myeclipse/idea Maven包:Maven3.3.9 浏…...
TCP三次握手
参考:4.1 TCP 三次握手与四次挥手面试题 | 小林coding TCP 头格式 我们先来看看 TCP 头的格式,标注颜色的表示与本文关联比较大的字段,其他字段不做详细阐述。 序列号:在建立连接时由计算机生成的随机数作为其初始值,…...
未来土地利用模拟FLUS模型
未来土地利用模拟(FutureLand-Use Simulation, FLUS)模型1 模型简介1.1 基于ANN 的适宜性概率计算1.2 基于自适应惯性机制的元胞自动机1.3 模拟精度评价参考流域 径流变化是 自然因素和 人为因素共同作用的结果,其中人为因素最为直接的方式就…...
压力传感器MPX5700D/MPX5700GP/MPX5700AP产品概述、特征
MPX5700系列压阻式换能器是最先进的单片硅压力传感器,可广泛用于各种应用,特别是采用A/D输入微控制器或微处理器的应用。这一获得专利的单元件传感器集合了高级微加工技术、薄膜金属化、双极工艺,能够提供精确的、与所施加压力成正比的高电平…...
taobao.trades.sold.query( 根据收件人信息查询交易单号 )
¥开放平台免费API必须用户授权聚石塔内调用 根据收件人信息查询交易单号。 公共参数 请求地址: HTTP地址 公共请求参数: 公共响应参数: 请求参数 请求示例 TaobaoClient client new DefaultTaobaoClient(url, appkey, secret); TradesSoldQueryRequest req new…...
关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...
ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
华为OD最新机试真题-数组组成的最小数字-OD统一考试(B卷)
题目描述 给定一个整型数组,请从该数组中选择3个元素 组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述 行用半角逗号分割的字符串记录的整型数组,0<数组长度<= 100,0<整数的取值范围<= 10000。 输出描述 由3个元素组成…...
UE5 音效系统
一.音效管理 音乐一般都是WAV,创建一个背景音乐类SoudClass,一个音效类SoundClass。所有的音乐都分为这两个类。再创建一个总音乐类,将上述两个作为它的子类。 接着我们创建一个音乐混合类SoundMix,将上述三个类翻入其中,通过它管理每个音乐…...
2.2.2 ASPICE的需求分析
ASPICE的需求分析是汽车软件开发过程中至关重要的一环,它涉及到对需求进行详细分析、验证和确认,以确保软件产品能够满足客户和用户的需求。在ASPICE中,需求分析的关键步骤包括: 需求细化:将从需求收集阶段获得的高层需…...
初探用uniapp写微信小程序遇到的问题及解决(vue3+ts)
零、关于开发思路 (一)拿到工作任务,先理清楚需求 1.逻辑部分 不放过原型里说的每一句话,有疑惑的部分该问产品/测试/之前的开发就问 2.页面部分(含国际化) 整体看过需要开发页面的原型后,分类一下哪些组件/样式可以复用,直接提取出来使用 (时间充分的前提下,不…...
篇章一 论坛系统——前置知识
目录 1.软件开发 1.1 软件的生命周期 1.2 面向对象 1.3 CS、BS架构 1.CS架构编辑 2.BS架构 1.4 软件需求 1.需求分类 2.需求获取 1.5 需求分析 1. 工作内容 1.6 面向对象分析 1.OOA的任务 2.统一建模语言UML 3. 用例模型 3.1 用例图的元素 3.2 建立用例模型 …...
