RocketMQ源码阅读-Message消息存储
RocketMQ源码阅读-Message消息存储
- 1. CommitLog的作用
- 2. CommitLog 存储消息
- 3. 时序图
- 4. 小结
在Broker消息接收一篇中,分析到Broker接收到消息,最终会调用CommitLong#putMessage方法存储消息。
本篇来分析CommitLong#putMessage存储消息的流程。
1. CommitLog的作用
Store all metadata downtime for recovery, data protection reliability
翻译为:存储元数据,提供在停机时恢复和数据保护的能力
CommitLog是针对 MappedFileQueue 的封装使用。
其中有一个属性:
protected final MappedFileQueue mappedFileQueue;
MappedFileQueue是用来存储消息的文件队列,对上层提供可无限使用的文件容量,有一个属性:
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
是实际存储消息的链表,MappedFile是消息文件实体,每个 MappedFile 统一文件大小。
他们三个的关系是:
他们的比例为:CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N
CommitLog 存储在 MappedFile 有两种内容类型:
- MESSAGE :消息。
- BLANK :文件不足以存储消息时的空白占位。
2. CommitLog 存储消息
CommitLog存储消息的入口方法为CommitLog#putMessage(…):
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();// 事务相关final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = ScheduleMessageService.SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock = 0;// 获取写入映射文件MappedFile unlockMappedFile = null;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 获取写入锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 当不存在映射文件时,进行创建if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}// 存储消息result = mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 当文件尾时,获取新的映射文件,并进行插入unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result = mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {// 释放写入锁putMessageLock.unlock();}if (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// 刷新磁盘handleDiskFlush(result, putMessageResult, msg);// 数据同步,主节点同步到从节点可以看到handleHA(result, putMessageResult, msg);return putMessageResult;
}
可以看到,此方法获取到映射文件,然后将消息写入文件,并刷入磁盘,其中会使用到写入锁进行并发控制。
最后进行数据同步,将主节点的数据同步到从节点。
刷新磁盘的方法为CommitLog#handleDiskFlush:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 进行同步||异步 flush||commit// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else {commitLogService.wakeup();}}
}
主从数据同步的方法为CommitLog#handleHA:
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 如果是Master节点,同步到从节点if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service = this.defaultMessageStore.getHaService();if (messageExt.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);service.getWaitNotifyObject().wakeupAll();boolean flushOK =request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}}// Slave problemelse {// Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}}
对于获取文件和文件刷新落盘先不做深究,后面再深入分析。
3. 时序图

4. 小结
本篇主要分析了CommitLog的作用以及其进行消息存储的流程:
CommitLog主要是对MappedFileQueue的封装使用,MappedFileQueue是对消息文件的封装。
存储流程主要是:
- 获取映射文件
- 获取写锁
- 写文件
- 释放写锁
- 刷新文件到磁盘(分为同步刷新和异步刷新)
- 如果是Master节点,需要同步数据到从节点
Master同步数据到从节点的过程放在下一篇《RocketMQ源码阅读-Master数据同步从节点》进行分析。
相关文章:
RocketMQ源码阅读-Message消息存储
RocketMQ源码阅读-Message消息存储 1. CommitLog的作用2. CommitLog 存储消息3. 时序图4. 小结 在Broker消息接收一篇中,分析到Broker接收到消息,最终会调用CommitLong#putMessage方法存储消息。 本篇来分析CommitLong#putMessage存储消息的流程。 1. C…...
《C语言学习》---郝斌版---笔记
简介 学习计算机,离不开C语言的学习,而C语言学习过程中的视频课教程,目前来说,如果郝斌老师的C语言排第二,没有人敢排第一 郝斌老师的C语言教程,通俗易懂,引人发思,特别适合新手入门…...
Python(32):字符串转换成列表或元组,列表转换成字典小例子
1、python 两个列表转换成字典 字符串转换成列表 列表转换成字典 column "ID,aes,sm4,sm4_a,email,phone,ssn,military,passport,intelssn,intelpassport,intelmilitary,intelganghui,inteltaitonei,credit_card_short,credit_card_long,job,sm4_cbc,sm4_a_cbc" …...
CentOS 7 安装私有平台OpenNebula
目录 一、配置yum源 二、配置数据库MySQL 2.1 安装MySQL 2.2 修改MySQL密码 2.3 创建项目用户和库 三、安装配置前端包 四、设置oneadmin账号密码 五、验证安装 5.1 命令行验证安装 5.2 数据存放位置 5.3 端口介绍 5.4 命令介绍 六、访问 6.1 设置语言 6.2 创建主…...
(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器
1. 背景介绍 在先前的博客文章中,我们已经搭建了一个基于SRS的流媒体服务器。现在,我们希望通过Web接口来控制这个服务器的行为,特别是对于正在进行的 RTSP 转码任务的管理。这将使我们能够在不停止整个服务器的情况下,动态地启动…...
基于STM32微控制器的四轮智能小车控制系统设计
标题:基于STM32微控制器的四轮智能小车控制系统设计与实现 摘要: 本文针对移动机器人领域的应用需求,详细介绍了基于STM32系列单片机(以STM32F103C8T6为例)为核心的四轮小车控制系统的设计和实现过程。该系统集成了电…...
JPA的复杂查询包括一对多多对一和多对多的查询
1. 多表关联查询和排序 假设我们有两个实体类:Customer和Order,它们之间是一对多的关系,即一个客户可以有多个订单。我们想要查询某个客户的所有订单,并按订单金额进行降序排序。 Entity Table(name "customers") pu…...
电脑文件mfc100u.dll丢失的解决方法分析,怎么修复mfc100u.dll靠谱
mfc100u.dll丢失了要怎么办?其实很多人都遇到过这样的电脑故障吧,说这个mfc100u.dll文件已经不见了,然后一些程序打不开了,那么这种情况我们要怎么解决呢?今天我们就来给大家详细的说说mfc100u.dll丢失的解决方法。 一…...
从DETR到Mask2former(2): 损失函数loss function
DETR的损失函数包括几个部分,如果只看论文或者代码,比较难理解,最好是可以打断点调试,对照着论文看。但是现在DETR模型都已经被集成进各种框架中,很难进入内部打断掉调试。与此同时,数据的label的前处理也比…...
Java21 + SpringBoot3集成WebSocket
文章目录 前言相关技术简介什么是WebSocketWebSocket的原理WebSocket与HTTP协议的关系WebSocket优点WebSocket应用场景 实现方式1. 添加maven依赖2. 添加WebSocket配置类,定义ServerEndpointExporter Bean3. 定义WebSocket Endpoint4. 前端创建WebSocket对象 总结 前…...
鲸鱼优化算法WOA改进预告
鲸鱼优化算法(Whale Optimization Algorithm,WOA)是一种基于自然界中鲸鱼群体行为的启发式优化算法。这个算法模拟了鲸鱼的觅食行为和社会行为,通过模拟这些行为来解决优化问题。 以下是鲸鱼优化算法的一些关键特点和步骤&#x…...
Nightingale 夜莺监控系统 - 告警篇(3)
Author:rab 官方文档:https://flashcat.cloud/docs/content/flashcat-monitor/nightingale-v6/usage/alert/alert-rule/ 目录 前言一、配置1.1 创建钉钉机器人1.2 n9e 创建通知用户1.3 n9e 创建团队(组)1.4 将通知用户添加团队1.…...
【LeetCode2696】删除子串后的字符串最小长度
1、题目描述 【题目链接】 标签:栈 、字符串、模拟 难度:简单 给你一个仅由 大写 英文字符组成的字符串 s 。 你可以对此字符串执行一些操作,在每一步操作中,你可以从 s 中删除 任一个 “AB” 或 “CD” 子字符串。 通过执行操作…...
VMware安装CentOS7虚拟机
VMware 安装 获取 VMware 安装包 下载地址:链接:https://pan.baidu.com/s/1ELR5NZa7rO6YVplZ1IUigw?pwdplz3 提取码:plz3 包括:当然,也可以自己去别的地方下载,WMware 版本都差不多,现在用的比…...
Linux第22步_安装CH340驱动和串口终端软件MobaXterm
开发板输出信息通常是采用串口,而计算机通常是USB接口,为了让他们之间能够交换数据,我们通常采用USB转串口的转换器来实现。目前市场上的串口转换器大多是采用CH340芯片来实现的,因此我们需要在计算中安装一个CH340驱动程序&#…...
Elasticsearch 地理空间搜索 - 远超 OpenSearch
作者:来自 Elastic Nathan_Reese 2021 年,OpenSearch 和 OpenSearch Dashboards 开始作为 Elasticsearch 和 Kibana 的分支。 尽管 OpenSearch 和 OpenSearch Dashboards 具有相似的血统,但它们不提供相同的功能。 在分叉时,只能克…...
USB micro输入口中三个问题详解——差分信号、自恢复保险丝SMD1210P050TF、电容滤波
前言:本文对USB micro输入口中遇见的三个问题进行详解:差分信号、自恢复保险丝SMD1210P050TF、电容滤波 目录: 差分信号 自恢复保险丝SMD1210P050TF 电容滤波 如下图,USB为U-F-M5DD-Y-1型号(9个引脚,除…...
mysql原理--undo日志1
1.事务回滚的需求 我们说过 事务 需要保证 原子性 ,也就是事务中的操作要么全部完成,要么什么也不做。但是偏偏有时候事务执行到一半会出现一些情况,比如: (1). 事务执行过程中可能遇到各种错误,比如服务器本身的错误&…...
Zookeeper系列(一)集群搭建(非容器)
系列文章 Zookeeper系列(一)集群搭建(非容器) 目录 前言 下载 搭建 Data目录 Conf目录 集群复制和修改 启动 配置示例 测试 总结 前言 Zookeeper是一个开源的分布式协调服务,其设计目标是将那些复杂的且容易出错的分…...
【高等数学之泰勒公式】
一、从零开始 1.1、泰勒中值定理1 什么是泰勒公式?我们先看看权威解读: 那么我们从古至今到底是如何创造出泰勒公式的呢? 由上图可知,任一无穷小数均可以表示成用一系列数字的求和而得出的结果,我们称之为“无穷算法”。 那么同理我们想对任一曲线来…...
iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
深度剖析 DeepSeek 开源模型部署与应用:策略、权衡与未来走向
在人工智能技术呈指数级发展的当下,大模型已然成为推动各行业变革的核心驱动力。DeepSeek 开源模型以其卓越的性能和灵活的开源特性,吸引了众多企业与开发者的目光。如何高效且合理地部署与运用 DeepSeek 模型,成为释放其巨大潜力的关键所在&…...
goreplay
1.github地址 https://github.com/buger/goreplay 2.简单介绍 GoReplay 是一个开源的网络监控工具,可以记录用户的实时流量并将其用于镜像、负载测试、监控和详细分析。 3.出现背景 随着应用程序的增长,测试它所需的工作量也会呈指数级增长。GoRepl…...
qt+vs Generated File下的moc_和ui_文件丢失导致 error LNK2001
qt 5.9.7 vs2013 qt add-in 2.3.2 起因是添加一个新的控件类,直接把源文件拖进VS的项目里,然后VS卡住十秒,然后编译就报一堆 error LNK2001 一看项目的Generated Files下的moc_和ui_文件丢失了一部分,导致编译的时候找不到了。因…...
python打卡第47天
昨天代码中注意力热图的部分顺移至今天 知识点回顾: 热力图 作业:对比不同卷积层热图可视化的结果 def visualize_attention_map(model, test_loader, device, class_names, num_samples3):"""可视化模型的注意力热力图,展示模…...
__VUE_PROD_HYDRATION_MISMATCH_DETAILS__ is not explicitly defined.
这个警告表明您在使用Vue的esm-bundler构建版本时,未明确定义编译时特性标志。以下是详细解释和解决方案: 问题原因: 该标志是Vue 3.4引入的编译时特性标志,用于控制生产环境下SSR水合不匹配错误的详细报告1使用esm-bundler…...
构建Docker镜像的Dockerfile文件详解
文章目录 前言Dockerfile 案例docker build1. 基本构建2. 指定 Dockerfile 路径3. 设置构建时变量4. 不使用缓存5. 删除中间容器6. 拉取最新基础镜像7. 静默输出完整示例 docker runDockerFile 入门syntax指定构造器FROM基础镜像RUN命令注释COPY复制ENV设置环境变量EXPOSE暴露端…...
开疆智能Ethernet/IP转Modbus网关连接鸣志步进电机驱动器配置案例
在工业自动化控制系统中,常常会遇到不同品牌和通信协议的设备需要协同工作的情况。本案例中,客户现场采用了 罗克韦尔PLC,但需要控制的变频器仅支持 ModbusRTU 协议。为了实现PLC 对变频器的有效控制与监控,引入了开疆智能Etherne…...
