当前位置: 首页 > news >正文

RocketMQ源码阅读-Message消息存储

RocketMQ源码阅读-Message消息存储

  • 1. CommitLog的作用
  • 2. CommitLog 存储消息
  • 3. 时序图
  • 4. 小结

在Broker消息接收一篇中,分析到Broker接收到消息,最终会调用CommitLong#putMessage方法存储消息。
本篇来分析CommitLong#putMessage存储消息的流程。

1. CommitLog的作用

Store all metadata downtime for recovery, data protection reliability

翻译为:存储元数据,提供在停机时恢复和数据保护的能力

CommitLog是针对 MappedFileQueue 的封装使用。

其中有一个属性:

protected final MappedFileQueue mappedFileQueue;

MappedFileQueue是用来存储消息的文件队列,对上层提供可无限使用的文件容量,有一个属性:

private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

是实际存储消息的链表,MappedFile是消息文件实体,每个 MappedFile 统一文件大小。

他们三个的关系是:

他们的比例为:CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N

CommitLog 存储在 MappedFile 有两种内容类型:

  1. MESSAGE :消息。
  2. BLANK :文件不足以存储消息时的空白占位。

2. CommitLog 存储消息

CommitLog存储消息的入口方法为CommitLog#putMessage(…):

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();// 事务相关final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = ScheduleMessageService.SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock = 0;// 获取写入映射文件MappedFile unlockMappedFile = null;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 获取写入锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 当不存在映射文件时,进行创建if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}// 存储消息result = mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 当文件尾时,获取新的映射文件,并进行插入unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result = mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {// 释放写入锁putMessageLock.unlock();}if (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// 刷新磁盘handleDiskFlush(result, putMessageResult, msg);// 数据同步,主节点同步到从节点可以看到handleHA(result, putMessageResult, msg);return putMessageResult;
}

可以看到,此方法获取到映射文件,然后将消息写入文件,并刷入磁盘,其中会使用到写入锁进行并发控制。
最后进行数据同步,将主节点的数据同步到从节点。
刷新磁盘的方法为CommitLog#handleDiskFlush:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 进行同步||异步 flush||commit// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else {commitLogService.wakeup();}}
}

主从数据同步的方法为CommitLog#handleHA:

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 如果是Master节点,同步到从节点if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service = this.defaultMessageStore.getHaService();if (messageExt.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);service.getWaitNotifyObject().wakeupAll();boolean flushOK =request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}}// Slave problemelse {// Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}}

对于获取文件和文件刷新落盘先不做深究,后面再深入分析。

3. 时序图

在这里插入图片描述

4. 小结

本篇主要分析了CommitLog的作用以及其进行消息存储的流程:
CommitLog主要是对MappedFileQueue的封装使用,MappedFileQueue是对消息文件的封装。
存储流程主要是:

  • 获取映射文件
  • 获取写锁
  • 写文件
  • 释放写锁
  • 刷新文件到磁盘(分为同步刷新和异步刷新)
  • 如果是Master节点,需要同步数据到从节点

Master同步数据到从节点的过程放在下一篇《RocketMQ源码阅读-Master数据同步从节点》进行分析。

相关文章:

RocketMQ源码阅读-Message消息存储

RocketMQ源码阅读-Message消息存储 1. CommitLog的作用2. CommitLog 存储消息3. 时序图4. 小结 在Broker消息接收一篇中&#xff0c;分析到Broker接收到消息&#xff0c;最终会调用CommitLong#putMessage方法存储消息。 本篇来分析CommitLong#putMessage存储消息的流程。 1. C…...

《C语言学习》---郝斌版---笔记

简介 学习计算机&#xff0c;离不开C语言的学习&#xff0c;而C语言学习过程中的视频课教程&#xff0c;目前来说&#xff0c;如果郝斌老师的C语言排第二&#xff0c;没有人敢排第一 郝斌老师的C语言教程&#xff0c;通俗易懂&#xff0c;引人发思&#xff0c;特别适合新手入门…...

Python(32):字符串转换成列表或元组,列表转换成字典小例子

1、python 两个列表转换成字典 字符串转换成列表 列表转换成字典 column "ID,aes,sm4,sm4_a,email,phone,ssn,military,passport,intelssn,intelpassport,intelmilitary,intelganghui,inteltaitonei,credit_card_short,credit_card_long,job,sm4_cbc,sm4_a_cbc" …...

CentOS 7 安装私有平台OpenNebula

目录 一、配置yum源 二、配置数据库MySQL 2.1 安装MySQL 2.2 修改MySQL密码 2.3 创建项目用户和库 三、安装配置前端包 四、设置oneadmin账号密码 五、验证安装 5.1 命令行验证安装 5.2 数据存放位置 5.3 端口介绍 5.4 命令介绍 六、访问 6.1 设置语言 6.2 创建主…...

(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器

1. 背景介绍 在先前的博客文章中&#xff0c;我们已经搭建了一个基于SRS的流媒体服务器。现在&#xff0c;我们希望通过Web接口来控制这个服务器的行为&#xff0c;特别是对于正在进行的 RTSP 转码任务的管理。这将使我们能够在不停止整个服务器的情况下&#xff0c;动态地启动…...

基于STM32微控制器的四轮智能小车控制系统设计

标题&#xff1a;基于STM32微控制器的四轮智能小车控制系统设计与实现 摘要&#xff1a; 本文针对移动机器人领域的应用需求&#xff0c;详细介绍了基于STM32系列单片机&#xff08;以STM32F103C8T6为例&#xff09;为核心的四轮小车控制系统的设计和实现过程。该系统集成了电…...

JPA的复杂查询包括一对多多对一和多对多的查询

1. 多表关联查询和排序 假设我们有两个实体类&#xff1a;Customer和Order&#xff0c;它们之间是一对多的关系&#xff0c;即一个客户可以有多个订单。我们想要查询某个客户的所有订单&#xff0c;并按订单金额进行降序排序。 Entity Table(name "customers") pu…...

电脑文件mfc100u.dll丢失的解决方法分析,怎么修复mfc100u.dll靠谱

mfc100u.dll丢失了要怎么办&#xff1f;其实很多人都遇到过这样的电脑故障吧&#xff0c;说这个mfc100u.dll文件已经不见了&#xff0c;然后一些程序打不开了&#xff0c;那么这种情况我们要怎么解决呢&#xff1f;今天我们就来给大家详细的说说mfc100u.dll丢失的解决方法。 一…...

从DETR到Mask2former(2): 损失函数loss function

DETR的损失函数包括几个部分&#xff0c;如果只看论文或者代码&#xff0c;比较难理解&#xff0c;最好是可以打断点调试&#xff0c;对照着论文看。但是现在DETR模型都已经被集成进各种框架中&#xff0c;很难进入内部打断掉调试。与此同时&#xff0c;数据的label的前处理也比…...

Java21 + SpringBoot3集成WebSocket

文章目录 前言相关技术简介什么是WebSocketWebSocket的原理WebSocket与HTTP协议的关系WebSocket优点WebSocket应用场景 实现方式1. 添加maven依赖2. 添加WebSocket配置类&#xff0c;定义ServerEndpointExporter Bean3. 定义WebSocket Endpoint4. 前端创建WebSocket对象 总结 前…...

鲸鱼优化算法WOA改进预告

鲸鱼优化算法&#xff08;Whale Optimization Algorithm&#xff0c;WOA&#xff09;是一种基于自然界中鲸鱼群体行为的启发式优化算法。这个算法模拟了鲸鱼的觅食行为和社会行为&#xff0c;通过模拟这些行为来解决优化问题。 以下是鲸鱼优化算法的一些关键特点和步骤&#x…...

Nightingale 夜莺监控系统 - 告警篇(3)

Author&#xff1a;rab 官方文档&#xff1a;https://flashcat.cloud/docs/content/flashcat-monitor/nightingale-v6/usage/alert/alert-rule/ 目录 前言一、配置1.1 创建钉钉机器人1.2 n9e 创建通知用户1.3 n9e 创建团队&#xff08;组&#xff09;1.4 将通知用户添加团队1.…...

【LeetCode2696】删除子串后的字符串最小长度

1、题目描述 【题目链接】 标签&#xff1a;栈 、字符串、模拟 难度&#xff1a;简单 给你一个仅由 大写 英文字符组成的字符串 s 。 你可以对此字符串执行一些操作&#xff0c;在每一步操作中&#xff0c;你可以从 s 中删除 任一个 “AB” 或 “CD” 子字符串。 通过执行操作…...

VMware安装CentOS7虚拟机

VMware 安装 获取 VMware 安装包 下载地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1ELR5NZa7rO6YVplZ1IUigw?pwdplz3 提取码&#xff1a;plz3 包括&#xff1a;当然&#xff0c;也可以自己去别的地方下载&#xff0c;WMware 版本都差不多&#xff0c;现在用的比…...

Linux第22步_安装CH340驱动和串口终端软件MobaXterm

开发板输出信息通常是采用串口&#xff0c;而计算机通常是USB接口&#xff0c;为了让他们之间能够交换数据&#xff0c;我们通常采用USB转串口的转换器来实现。目前市场上的串口转换器大多是采用CH340芯片来实现的&#xff0c;因此我们需要在计算中安装一个CH340驱动程序&#…...

Elasticsearch 地理空间搜索 - 远超 OpenSearch

作者&#xff1a;来自 Elastic Nathan_Reese 2021 年&#xff0c;OpenSearch 和 OpenSearch Dashboards 开始作为 Elasticsearch 和 Kibana 的分支。 尽管 OpenSearch 和 OpenSearch Dashboards 具有相似的血统&#xff0c;但它们不提供相同的功能。 在分叉时&#xff0c;只能克…...

USB micro输入口中三个问题详解——差分信号、自恢复保险丝SMD1210P050TF、电容滤波

前言&#xff1a;本文对USB micro输入口中遇见的三个问题进行详解&#xff1a;差分信号、自恢复保险丝SMD1210P050TF、电容滤波 目录&#xff1a; 差分信号 自恢复保险丝SMD1210P050TF 电容滤波 如下图&#xff0c;USB为U-F-M5DD-Y-1型号&#xff08;9个引脚&#xff0c;除…...

mysql原理--undo日志1

1.事务回滚的需求 我们说过 事务 需要保证 原子性 &#xff0c;也就是事务中的操作要么全部完成&#xff0c;要么什么也不做。但是偏偏有时候事务执行到一半会出现一些情况&#xff0c;比如&#xff1a; (1). 事务执行过程中可能遇到各种错误&#xff0c;比如服务器本身的错误&…...

Zookeeper系列(一)集群搭建(非容器)

系列文章 Zookeeper系列&#xff08;一&#xff09;集群搭建&#xff08;非容器&#xff09; 目录 前言 下载 搭建 Data目录 Conf目录 集群复制和修改 启动 配置示例 测试 总结 前言 Zookeeper是一个开源的分布式协调服务&#xff0c;其设计目标是将那些复杂的且容易出错的分…...

【高等数学之泰勒公式】

一、从零开始 1.1、泰勒中值定理1 什么是泰勒公式?我们先看看权威解读: 那么我们从古至今到底是如何创造出泰勒公式的呢? 由上图可知&#xff0c;任一无穷小数均可以表示成用一系列数字的求和而得出的结果&#xff0c;我们称之为“无穷算法”。 那么同理我们想对任一曲线来…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘

美国西海岸的夏天&#xff0c;再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至&#xff0c;这不仅是开发者的盛宴&#xff0c;更是全球数亿苹果用户翘首以盼的科技春晚。今年&#xff0c;苹果依旧为我们带来了全家桶式的系统更新&#xff0c;包括 iOS 26、iPadOS 26…...

Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件

今天呢&#xff0c;博主的学习进度也是步入了Java Mybatis 框架&#xff0c;目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学&#xff0c;希望能对大家有所帮助&#xff0c;也特别欢迎大家指点不足之处&#xff0c;小生很乐意接受正确的建议&…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

深度剖析 DeepSeek 开源模型部署与应用:策略、权衡与未来走向

在人工智能技术呈指数级发展的当下&#xff0c;大模型已然成为推动各行业变革的核心驱动力。DeepSeek 开源模型以其卓越的性能和灵活的开源特性&#xff0c;吸引了众多企业与开发者的目光。如何高效且合理地部署与运用 DeepSeek 模型&#xff0c;成为释放其巨大潜力的关键所在&…...

goreplay

1.github地址 https://github.com/buger/goreplay 2.简单介绍 GoReplay 是一个开源的网络监控工具&#xff0c;可以记录用户的实时流量并将其用于镜像、负载测试、监控和详细分析。 3.出现背景 随着应用程序的增长&#xff0c;测试它所需的工作量也会呈指数级增长。GoRepl…...

qt+vs Generated File下的moc_和ui_文件丢失导致 error LNK2001

qt 5.9.7 vs2013 qt add-in 2.3.2 起因是添加一个新的控件类&#xff0c;直接把源文件拖进VS的项目里&#xff0c;然后VS卡住十秒&#xff0c;然后编译就报一堆 error LNK2001 一看项目的Generated Files下的moc_和ui_文件丢失了一部分&#xff0c;导致编译的时候找不到了。因…...

python打卡第47天

昨天代码中注意力热图的部分顺移至今天 知识点回顾&#xff1a; 热力图 作业&#xff1a;对比不同卷积层热图可视化的结果 def visualize_attention_map(model, test_loader, device, class_names, num_samples3):"""可视化模型的注意力热力图&#xff0c;展示模…...

__VUE_PROD_HYDRATION_MISMATCH_DETAILS__ is not explicitly defined.

这个警告表明您在使用Vue的esm-bundler构建版本时&#xff0c;未明确定义编译时特性标志。以下是详细解释和解决方案&#xff1a; ‌问题原因‌&#xff1a; 该标志是Vue 3.4引入的编译时特性标志&#xff0c;用于控制生产环境下SSR水合不匹配错误的详细报告1使用esm-bundler…...

构建Docker镜像的Dockerfile文件详解

文章目录 前言Dockerfile 案例docker build1. 基本构建2. 指定 Dockerfile 路径3. 设置构建时变量4. 不使用缓存5. 删除中间容器6. 拉取最新基础镜像7. 静默输出完整示例 docker runDockerFile 入门syntax指定构造器FROM基础镜像RUN命令注释COPY复制ENV设置环境变量EXPOSE暴露端…...

开疆智能Ethernet/IP转Modbus网关连接鸣志步进电机驱动器配置案例

在工业自动化控制系统中&#xff0c;常常会遇到不同品牌和通信协议的设备需要协同工作的情况。本案例中&#xff0c;客户现场采用了 罗克韦尔PLC&#xff0c;但需要控制的变频器仅支持 ModbusRTU 协议。为了实现PLC 对变频器的有效控制与监控&#xff0c;引入了开疆智能Etherne…...