当前位置: 首页 > news >正文

RocketMQ源码阅读-Message消息存储

RocketMQ源码阅读-Message消息存储

  • 1. CommitLog的作用
  • 2. CommitLog 存储消息
  • 3. 时序图
  • 4. 小结

在Broker消息接收一篇中,分析到Broker接收到消息,最终会调用CommitLong#putMessage方法存储消息。
本篇来分析CommitLong#putMessage存储消息的流程。

1. CommitLog的作用

Store all metadata downtime for recovery, data protection reliability

翻译为:存储元数据,提供在停机时恢复和数据保护的能力

CommitLog是针对 MappedFileQueue 的封装使用。

其中有一个属性:

protected final MappedFileQueue mappedFileQueue;

MappedFileQueue是用来存储消息的文件队列,对上层提供可无限使用的文件容量,有一个属性:

private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

是实际存储消息的链表,MappedFile是消息文件实体,每个 MappedFile 统一文件大小。

他们三个的关系是:

他们的比例为:CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N

CommitLog 存储在 MappedFile 有两种内容类型:

  1. MESSAGE :消息。
  2. BLANK :文件不足以存储消息时的空白占位。

2. CommitLog 存储消息

CommitLog存储消息的入口方法为CommitLog#putMessage(…):

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();// 事务相关final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = ScheduleMessageService.SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock = 0;// 获取写入映射文件MappedFile unlockMappedFile = null;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 获取写入锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 当不存在映射文件时,进行创建if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}// 存储消息result = mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 当文件尾时,获取新的映射文件,并进行插入unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result = mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {// 释放写入锁putMessageLock.unlock();}if (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// 刷新磁盘handleDiskFlush(result, putMessageResult, msg);// 数据同步,主节点同步到从节点可以看到handleHA(result, putMessageResult, msg);return putMessageResult;
}

可以看到,此方法获取到映射文件,然后将消息写入文件,并刷入磁盘,其中会使用到写入锁进行并发控制。
最后进行数据同步,将主节点的数据同步到从节点。
刷新磁盘的方法为CommitLog#handleDiskFlush:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 进行同步||异步 flush||commit// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else {commitLogService.wakeup();}}
}

主从数据同步的方法为CommitLog#handleHA:

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 如果是Master节点,同步到从节点if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service = this.defaultMessageStore.getHaService();if (messageExt.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);service.getWaitNotifyObject().wakeupAll();boolean flushOK =request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}}// Slave problemelse {// Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}}

对于获取文件和文件刷新落盘先不做深究,后面再深入分析。

3. 时序图

在这里插入图片描述

4. 小结

本篇主要分析了CommitLog的作用以及其进行消息存储的流程:
CommitLog主要是对MappedFileQueue的封装使用,MappedFileQueue是对消息文件的封装。
存储流程主要是:

  • 获取映射文件
  • 获取写锁
  • 写文件
  • 释放写锁
  • 刷新文件到磁盘(分为同步刷新和异步刷新)
  • 如果是Master节点,需要同步数据到从节点

Master同步数据到从节点的过程放在下一篇《RocketMQ源码阅读-Master数据同步从节点》进行分析。

相关文章:

RocketMQ源码阅读-Message消息存储

RocketMQ源码阅读-Message消息存储 1. CommitLog的作用2. CommitLog 存储消息3. 时序图4. 小结 在Broker消息接收一篇中&#xff0c;分析到Broker接收到消息&#xff0c;最终会调用CommitLong#putMessage方法存储消息。 本篇来分析CommitLong#putMessage存储消息的流程。 1. C…...

《C语言学习》---郝斌版---笔记

简介 学习计算机&#xff0c;离不开C语言的学习&#xff0c;而C语言学习过程中的视频课教程&#xff0c;目前来说&#xff0c;如果郝斌老师的C语言排第二&#xff0c;没有人敢排第一 郝斌老师的C语言教程&#xff0c;通俗易懂&#xff0c;引人发思&#xff0c;特别适合新手入门…...

Python(32):字符串转换成列表或元组,列表转换成字典小例子

1、python 两个列表转换成字典 字符串转换成列表 列表转换成字典 column "ID,aes,sm4,sm4_a,email,phone,ssn,military,passport,intelssn,intelpassport,intelmilitary,intelganghui,inteltaitonei,credit_card_short,credit_card_long,job,sm4_cbc,sm4_a_cbc" …...

CentOS 7 安装私有平台OpenNebula

目录 一、配置yum源 二、配置数据库MySQL 2.1 安装MySQL 2.2 修改MySQL密码 2.3 创建项目用户和库 三、安装配置前端包 四、设置oneadmin账号密码 五、验证安装 5.1 命令行验证安装 5.2 数据存放位置 5.3 端口介绍 5.4 命令介绍 六、访问 6.1 设置语言 6.2 创建主…...

(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器

1. 背景介绍 在先前的博客文章中&#xff0c;我们已经搭建了一个基于SRS的流媒体服务器。现在&#xff0c;我们希望通过Web接口来控制这个服务器的行为&#xff0c;特别是对于正在进行的 RTSP 转码任务的管理。这将使我们能够在不停止整个服务器的情况下&#xff0c;动态地启动…...

基于STM32微控制器的四轮智能小车控制系统设计

标题&#xff1a;基于STM32微控制器的四轮智能小车控制系统设计与实现 摘要&#xff1a; 本文针对移动机器人领域的应用需求&#xff0c;详细介绍了基于STM32系列单片机&#xff08;以STM32F103C8T6为例&#xff09;为核心的四轮小车控制系统的设计和实现过程。该系统集成了电…...

JPA的复杂查询包括一对多多对一和多对多的查询

1. 多表关联查询和排序 假设我们有两个实体类&#xff1a;Customer和Order&#xff0c;它们之间是一对多的关系&#xff0c;即一个客户可以有多个订单。我们想要查询某个客户的所有订单&#xff0c;并按订单金额进行降序排序。 Entity Table(name "customers") pu…...

电脑文件mfc100u.dll丢失的解决方法分析,怎么修复mfc100u.dll靠谱

mfc100u.dll丢失了要怎么办&#xff1f;其实很多人都遇到过这样的电脑故障吧&#xff0c;说这个mfc100u.dll文件已经不见了&#xff0c;然后一些程序打不开了&#xff0c;那么这种情况我们要怎么解决呢&#xff1f;今天我们就来给大家详细的说说mfc100u.dll丢失的解决方法。 一…...

从DETR到Mask2former(2): 损失函数loss function

DETR的损失函数包括几个部分&#xff0c;如果只看论文或者代码&#xff0c;比较难理解&#xff0c;最好是可以打断点调试&#xff0c;对照着论文看。但是现在DETR模型都已经被集成进各种框架中&#xff0c;很难进入内部打断掉调试。与此同时&#xff0c;数据的label的前处理也比…...

Java21 + SpringBoot3集成WebSocket

文章目录 前言相关技术简介什么是WebSocketWebSocket的原理WebSocket与HTTP协议的关系WebSocket优点WebSocket应用场景 实现方式1. 添加maven依赖2. 添加WebSocket配置类&#xff0c;定义ServerEndpointExporter Bean3. 定义WebSocket Endpoint4. 前端创建WebSocket对象 总结 前…...

鲸鱼优化算法WOA改进预告

鲸鱼优化算法&#xff08;Whale Optimization Algorithm&#xff0c;WOA&#xff09;是一种基于自然界中鲸鱼群体行为的启发式优化算法。这个算法模拟了鲸鱼的觅食行为和社会行为&#xff0c;通过模拟这些行为来解决优化问题。 以下是鲸鱼优化算法的一些关键特点和步骤&#x…...

Nightingale 夜莺监控系统 - 告警篇(3)

Author&#xff1a;rab 官方文档&#xff1a;https://flashcat.cloud/docs/content/flashcat-monitor/nightingale-v6/usage/alert/alert-rule/ 目录 前言一、配置1.1 创建钉钉机器人1.2 n9e 创建通知用户1.3 n9e 创建团队&#xff08;组&#xff09;1.4 将通知用户添加团队1.…...

【LeetCode2696】删除子串后的字符串最小长度

1、题目描述 【题目链接】 标签&#xff1a;栈 、字符串、模拟 难度&#xff1a;简单 给你一个仅由 大写 英文字符组成的字符串 s 。 你可以对此字符串执行一些操作&#xff0c;在每一步操作中&#xff0c;你可以从 s 中删除 任一个 “AB” 或 “CD” 子字符串。 通过执行操作…...

VMware安装CentOS7虚拟机

VMware 安装 获取 VMware 安装包 下载地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1ELR5NZa7rO6YVplZ1IUigw?pwdplz3 提取码&#xff1a;plz3 包括&#xff1a;当然&#xff0c;也可以自己去别的地方下载&#xff0c;WMware 版本都差不多&#xff0c;现在用的比…...

Linux第22步_安装CH340驱动和串口终端软件MobaXterm

开发板输出信息通常是采用串口&#xff0c;而计算机通常是USB接口&#xff0c;为了让他们之间能够交换数据&#xff0c;我们通常采用USB转串口的转换器来实现。目前市场上的串口转换器大多是采用CH340芯片来实现的&#xff0c;因此我们需要在计算中安装一个CH340驱动程序&#…...

Elasticsearch 地理空间搜索 - 远超 OpenSearch

作者&#xff1a;来自 Elastic Nathan_Reese 2021 年&#xff0c;OpenSearch 和 OpenSearch Dashboards 开始作为 Elasticsearch 和 Kibana 的分支。 尽管 OpenSearch 和 OpenSearch Dashboards 具有相似的血统&#xff0c;但它们不提供相同的功能。 在分叉时&#xff0c;只能克…...

USB micro输入口中三个问题详解——差分信号、自恢复保险丝SMD1210P050TF、电容滤波

前言&#xff1a;本文对USB micro输入口中遇见的三个问题进行详解&#xff1a;差分信号、自恢复保险丝SMD1210P050TF、电容滤波 目录&#xff1a; 差分信号 自恢复保险丝SMD1210P050TF 电容滤波 如下图&#xff0c;USB为U-F-M5DD-Y-1型号&#xff08;9个引脚&#xff0c;除…...

mysql原理--undo日志1

1.事务回滚的需求 我们说过 事务 需要保证 原子性 &#xff0c;也就是事务中的操作要么全部完成&#xff0c;要么什么也不做。但是偏偏有时候事务执行到一半会出现一些情况&#xff0c;比如&#xff1a; (1). 事务执行过程中可能遇到各种错误&#xff0c;比如服务器本身的错误&…...

Zookeeper系列(一)集群搭建(非容器)

系列文章 Zookeeper系列&#xff08;一&#xff09;集群搭建&#xff08;非容器&#xff09; 目录 前言 下载 搭建 Data目录 Conf目录 集群复制和修改 启动 配置示例 测试 总结 前言 Zookeeper是一个开源的分布式协调服务&#xff0c;其设计目标是将那些复杂的且容易出错的分…...

【高等数学之泰勒公式】

一、从零开始 1.1、泰勒中值定理1 什么是泰勒公式?我们先看看权威解读: 那么我们从古至今到底是如何创造出泰勒公式的呢? 由上图可知&#xff0c;任一无穷小数均可以表示成用一系列数字的求和而得出的结果&#xff0c;我们称之为“无穷算法”。 那么同理我们想对任一曲线来…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

CTF show Web 红包题第六弹

提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框&#xff0c;很难让人不联想到SQL注入&#xff0c;但提示都说了不是SQL注入&#xff0c;所以就不往这方面想了 ​ 先查看一下网页源码&#xff0c;发现一段JavaScript代码&#xff0c;有一个关键类ctfs…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)

本期内容并不是很难&#xff0c;相信大家会学的很愉快&#xff0c;当然对于有后端基础的朋友来说&#xff0c;本期内容更加容易了解&#xff0c;当然没有基础的也别担心&#xff0c;本期内容会详细解释有关内容 本期用到的软件&#xff1a;yakit&#xff08;因为经过之前好多期…...

AGain DB和倍数增益的关系

我在设置一款索尼CMOS芯片时&#xff0c;Again增益0db变化为6DB&#xff0c;画面的变化只有2倍DN的增益&#xff0c;比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析&#xff1a; 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...

Go 语言并发编程基础:无缓冲与有缓冲通道

在上一章节中&#xff0c;我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道&#xff0c;它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好&#xff0…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

腾讯云V3签名

想要接入腾讯云的Api&#xff0c;必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口&#xff0c;但总是卡在签名这一步&#xff0c;最后放弃选择SDK&#xff0c;这次终于自己代码实现。 可能腾讯云翻新了接口文档&#xff0c;现在阅读起来&#xff0c;清晰了很多&…...