RocketMQ源码阅读-Message消息存储
RocketMQ源码阅读-Message消息存储
- 1. CommitLog的作用
- 2. CommitLog 存储消息
- 3. 时序图
- 4. 小结
在Broker消息接收一篇中,分析到Broker接收到消息,最终会调用CommitLong#putMessage方法存储消息。
本篇来分析CommitLong#putMessage存储消息的流程。
1. CommitLog的作用
Store all metadata downtime for recovery, data protection reliability
翻译为:存储元数据,提供在停机时恢复和数据保护的能力
CommitLog是针对 MappedFileQueue 的封装使用。
其中有一个属性:
protected final MappedFileQueue mappedFileQueue;
MappedFileQueue是用来存储消息的文件队列,对上层提供可无限使用的文件容量,有一个属性:
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
是实际存储消息的链表,MappedFile是消息文件实体,每个 MappedFile 统一文件大小。
他们三个的关系是:
他们的比例为:CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N
CommitLog 存储在 MappedFile 有两种内容类型:
- MESSAGE :消息。
- BLANK :文件不足以存储消息时的空白占位。
2. CommitLog 存储消息
CommitLog存储消息的入口方法为CommitLog#putMessage(…):
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();// 事务相关final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = ScheduleMessageService.SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock = 0;// 获取写入映射文件MappedFile unlockMappedFile = null;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 获取写入锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 当不存在映射文件时,进行创建if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}// 存储消息result = mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 当文件尾时,获取新的映射文件,并进行插入unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result = mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {// 释放写入锁putMessageLock.unlock();}if (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// 刷新磁盘handleDiskFlush(result, putMessageResult, msg);// 数据同步,主节点同步到从节点可以看到handleHA(result, putMessageResult, msg);return putMessageResult;
}
可以看到,此方法获取到映射文件,然后将消息写入文件,并刷入磁盘,其中会使用到写入锁进行并发控制。
最后进行数据同步,将主节点的数据同步到从节点。
刷新磁盘的方法为CommitLog#handleDiskFlush:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 进行同步||异步 flush||commit// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else {commitLogService.wakeup();}}
}
主从数据同步的方法为CommitLog#handleHA:
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 如果是Master节点,同步到从节点if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service = this.defaultMessageStore.getHaService();if (messageExt.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);service.getWaitNotifyObject().wakeupAll();boolean flushOK =request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}}// Slave problemelse {// Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}}
对于获取文件和文件刷新落盘先不做深究,后面再深入分析。
3. 时序图
4. 小结
本篇主要分析了CommitLog的作用以及其进行消息存储的流程:
CommitLog主要是对MappedFileQueue的封装使用,MappedFileQueue是对消息文件的封装。
存储流程主要是:
- 获取映射文件
- 获取写锁
- 写文件
- 释放写锁
- 刷新文件到磁盘(分为同步刷新和异步刷新)
- 如果是Master节点,需要同步数据到从节点
Master同步数据到从节点的过程放在下一篇《RocketMQ源码阅读-Master数据同步从节点》进行分析。
相关文章:

RocketMQ源码阅读-Message消息存储
RocketMQ源码阅读-Message消息存储 1. CommitLog的作用2. CommitLog 存储消息3. 时序图4. 小结 在Broker消息接收一篇中,分析到Broker接收到消息,最终会调用CommitLong#putMessage方法存储消息。 本篇来分析CommitLong#putMessage存储消息的流程。 1. C…...

《C语言学习》---郝斌版---笔记
简介 学习计算机,离不开C语言的学习,而C语言学习过程中的视频课教程,目前来说,如果郝斌老师的C语言排第二,没有人敢排第一 郝斌老师的C语言教程,通俗易懂,引人发思,特别适合新手入门…...

Python(32):字符串转换成列表或元组,列表转换成字典小例子
1、python 两个列表转换成字典 字符串转换成列表 列表转换成字典 column "ID,aes,sm4,sm4_a,email,phone,ssn,military,passport,intelssn,intelpassport,intelmilitary,intelganghui,inteltaitonei,credit_card_short,credit_card_long,job,sm4_cbc,sm4_a_cbc" …...

CentOS 7 安装私有平台OpenNebula
目录 一、配置yum源 二、配置数据库MySQL 2.1 安装MySQL 2.2 修改MySQL密码 2.3 创建项目用户和库 三、安装配置前端包 四、设置oneadmin账号密码 五、验证安装 5.1 命令行验证安装 5.2 数据存放位置 5.3 端口介绍 5.4 命令介绍 六、访问 6.1 设置语言 6.2 创建主…...

(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器
1. 背景介绍 在先前的博客文章中,我们已经搭建了一个基于SRS的流媒体服务器。现在,我们希望通过Web接口来控制这个服务器的行为,特别是对于正在进行的 RTSP 转码任务的管理。这将使我们能够在不停止整个服务器的情况下,动态地启动…...
基于STM32微控制器的四轮智能小车控制系统设计
标题:基于STM32微控制器的四轮智能小车控制系统设计与实现 摘要: 本文针对移动机器人领域的应用需求,详细介绍了基于STM32系列单片机(以STM32F103C8T6为例)为核心的四轮小车控制系统的设计和实现过程。该系统集成了电…...
JPA的复杂查询包括一对多多对一和多对多的查询
1. 多表关联查询和排序 假设我们有两个实体类:Customer和Order,它们之间是一对多的关系,即一个客户可以有多个订单。我们想要查询某个客户的所有订单,并按订单金额进行降序排序。 Entity Table(name "customers") pu…...

电脑文件mfc100u.dll丢失的解决方法分析,怎么修复mfc100u.dll靠谱
mfc100u.dll丢失了要怎么办?其实很多人都遇到过这样的电脑故障吧,说这个mfc100u.dll文件已经不见了,然后一些程序打不开了,那么这种情况我们要怎么解决呢?今天我们就来给大家详细的说说mfc100u.dll丢失的解决方法。 一…...

从DETR到Mask2former(2): 损失函数loss function
DETR的损失函数包括几个部分,如果只看论文或者代码,比较难理解,最好是可以打断点调试,对照着论文看。但是现在DETR模型都已经被集成进各种框架中,很难进入内部打断掉调试。与此同时,数据的label的前处理也比…...

Java21 + SpringBoot3集成WebSocket
文章目录 前言相关技术简介什么是WebSocketWebSocket的原理WebSocket与HTTP协议的关系WebSocket优点WebSocket应用场景 实现方式1. 添加maven依赖2. 添加WebSocket配置类,定义ServerEndpointExporter Bean3. 定义WebSocket Endpoint4. 前端创建WebSocket对象 总结 前…...
鲸鱼优化算法WOA改进预告
鲸鱼优化算法(Whale Optimization Algorithm,WOA)是一种基于自然界中鲸鱼群体行为的启发式优化算法。这个算法模拟了鲸鱼的觅食行为和社会行为,通过模拟这些行为来解决优化问题。 以下是鲸鱼优化算法的一些关键特点和步骤&#x…...

Nightingale 夜莺监控系统 - 告警篇(3)
Author:rab 官方文档:https://flashcat.cloud/docs/content/flashcat-monitor/nightingale-v6/usage/alert/alert-rule/ 目录 前言一、配置1.1 创建钉钉机器人1.2 n9e 创建通知用户1.3 n9e 创建团队(组)1.4 将通知用户添加团队1.…...
【LeetCode2696】删除子串后的字符串最小长度
1、题目描述 【题目链接】 标签:栈 、字符串、模拟 难度:简单 给你一个仅由 大写 英文字符组成的字符串 s 。 你可以对此字符串执行一些操作,在每一步操作中,你可以从 s 中删除 任一个 “AB” 或 “CD” 子字符串。 通过执行操作…...

VMware安装CentOS7虚拟机
VMware 安装 获取 VMware 安装包 下载地址:链接:https://pan.baidu.com/s/1ELR5NZa7rO6YVplZ1IUigw?pwdplz3 提取码:plz3 包括:当然,也可以自己去别的地方下载,WMware 版本都差不多,现在用的比…...

Linux第22步_安装CH340驱动和串口终端软件MobaXterm
开发板输出信息通常是采用串口,而计算机通常是USB接口,为了让他们之间能够交换数据,我们通常采用USB转串口的转换器来实现。目前市场上的串口转换器大多是采用CH340芯片来实现的,因此我们需要在计算中安装一个CH340驱动程序&#…...

Elasticsearch 地理空间搜索 - 远超 OpenSearch
作者:来自 Elastic Nathan_Reese 2021 年,OpenSearch 和 OpenSearch Dashboards 开始作为 Elasticsearch 和 Kibana 的分支。 尽管 OpenSearch 和 OpenSearch Dashboards 具有相似的血统,但它们不提供相同的功能。 在分叉时,只能克…...

USB micro输入口中三个问题详解——差分信号、自恢复保险丝SMD1210P050TF、电容滤波
前言:本文对USB micro输入口中遇见的三个问题进行详解:差分信号、自恢复保险丝SMD1210P050TF、电容滤波 目录: 差分信号 自恢复保险丝SMD1210P050TF 电容滤波 如下图,USB为U-F-M5DD-Y-1型号(9个引脚,除…...

mysql原理--undo日志1
1.事务回滚的需求 我们说过 事务 需要保证 原子性 ,也就是事务中的操作要么全部完成,要么什么也不做。但是偏偏有时候事务执行到一半会出现一些情况,比如: (1). 事务执行过程中可能遇到各种错误,比如服务器本身的错误&…...

Zookeeper系列(一)集群搭建(非容器)
系列文章 Zookeeper系列(一)集群搭建(非容器) 目录 前言 下载 搭建 Data目录 Conf目录 集群复制和修改 启动 配置示例 测试 总结 前言 Zookeeper是一个开源的分布式协调服务,其设计目标是将那些复杂的且容易出错的分…...

【高等数学之泰勒公式】
一、从零开始 1.1、泰勒中值定理1 什么是泰勒公式?我们先看看权威解读: 那么我们从古至今到底是如何创造出泰勒公式的呢? 由上图可知,任一无穷小数均可以表示成用一系列数字的求和而得出的结果,我们称之为“无穷算法”。 那么同理我们想对任一曲线来…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...

测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...

LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...

MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...

深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...