【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。
分析的总体技术范围发送到存储,本文的主要目的是主要是为了认识一条消息并分析被发出且被存储的,代码中,关于 MQ 文件系统的优化,设计等。
来自官方源码example的一段发送代码:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
直接看看send方法,send 方法会设置一个默认的 timeout:3秒。默认使用 SYNC 模式,另外有Async和OneWay模式。需要处理方法签名中的 Client 端的异常,网络异常,Broker 端的异常,线程中断异常。
DefaultMQProducerImpl 的 sendDefaultImpl方法就是发送的主要逻辑。
代码里,有个地方可以提一下,关于更新故障时间的策略,RocketMQ有一个类 MQFaultStrategy,用来处理MQ错误,然后对 MQ Server 进行服务降级。
如果发送一条消息在550ms以内,那么就不用降级,如果550毫秒以外,就进行容错降级(熔断)30 秒,以此类推。
再看DefaultMQProducerImpl 的 sendKernelImpl发送到内核的方法实现。
先找到broker的地址。尝试压缩大于4M 的消息(批量消息不压缩),然后执行各种钩子。
- Request对象(存放数据)
- Context 上下文对象(存放调用上下文)。
这里会设置一个消息生成时间,即bornTimestamp,后面使用消息轨迹的时候,可以查看。
默认情况下:如果采用SYNC 模式,就调用 MQClientAPIImpl 来发送消息,这一层还是在 Client 模块里,在这一层,会设置更详细的消息细节,构造命令对象。最后调用 remotingClient的 invokeSync 发送消息。
MQClientAPIImpl的sendMessage这一层,会给命令对象设置一个CmdCode,叫SEND_MESSAGE,这个东西就是一个和Broker的契约,Broker会根据这个Code进行不同的策略。
Netty 会使用 Handler 处理出去的数据和返回的数据,我们看看 Client 端 Netty 有哪些 Handler.
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()).handler(new ChannelInitializer() {public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}pipeline.addLast(defaultEventExecutorGroup,new NettyEncoder(),new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),new NettyClientHandler());}});
使用了一个 Encoder,Decoder,空闲处理器,连接管理器,ClientHandler。
XXCoder就是对Cmd对象进行序列化和反序列化的,这里的空闲使用的读写最大空闲时间为120s,超过这个,就会触发空闲事件。
- RocketMQ就会关闭Channel 连接。而针对空闲事件进行处理的就是连接管理器了。
- 连接管理器处理空闲、Close、Connect、异常等事件,使用监听器模式,不同的监听器对不同的事件进行处理。另外,这里也许可以借鉴 EventBus,每个事件可以设置多个监听器。
看了RocketMQ中 Netty 的设计,再看看返回值处理就简单了,NettyClientHandler 会在 channelRead0 方法处理 Netty Server 的返回值。对应 RMQ,则是 processMessageReceived 方法。该方法很简洁:
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}
其实,这是一个模板方法,固定算法,由子类实现,分为 Request 实现和 Response 实现。我们看看 Response 实现。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque = cmd.getOpaque();final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {responseFuture.setResponseCommand(cmd);responseTable.remove(opaque);if (responseFuture.getInvokeCallback() != null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}}
通过 cmd 对象的 Request ID 找到 Feature,执行 responseFuture.putResponse,设置返回值,唤醒阻塞等待的发送线程。
这里还有一个 release 调用,这个和异步发送有关,默认最大同时 65535 个异步请求,具体就不展开了。
到这里,唤醒阻塞的发送线程,返回数据,客户端层面的发送就结束了。
看源码,看到有个 SEND_MESSAGE Code,是 Client 和 Broker Server 的一个约定代码,我们看看这个代码在哪里用的。
在 broker 模块的 BrokerController 类中,有个 registerProcessor 方法,会将 SEND_MESSAGE Code 和一个 SendMessageProcessor 对象绑定。
NettyRemotingServer是处理Request 的类,ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler处理器,这个处理器的channelRead0方法会调用 NettyRemotingServer的父类processMessageReceived 方法。
从processorTable 里,根据 Cmd Code,也就是 SEND_MESSAGE 获取对应的 Processor
一部分是处理数据的对象,一部分是这个对象所对应的线程池。用于异步处理逻辑,防止阻塞 Netty IO线程。
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
前后都是执行一些钩子,例如 ACL
RocketMQ会有一个 BrokerController 类,会注册 Code 和 Processor 的绑定关系,BrokerController 也会把这些绑定,注册到 Netty Server 中,当 Netty Server 从 Socket 收到 Cmd 对象,根据 Cmd 对象的 Code,就可以找到对应 Processor 类,对数据进行处理。
中间是处理 Request请求的。这个 processRequest 方法,有很多的实现,SendMessageProcessor的sendMessage 是处理消息的主要逻辑。
消息存储引擎,这里我们看DefaultMessageStore的putMessage 实现。
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
由于RocketMQ写数据是PageCache里面写的,因此,如果写的慢,就是 PageCache 忙,这里忙的标准是,如果锁文件的时间,超过了 1 秒,那就是忙。
if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
最后调用 PutMessageResult result = this.commitLog.putMessage(msg) 写数据。如果耗时超过 500 毫秒,就会打印日志。这样我们排查问题的时候,可以看看 storeStats 的日志。
result = mappedFile.appendMessage(msg, this.appendMessageCallback)
写完之后,释放锁,如果超过 500 毫秒,打印 cost time 日志。
处理刷盘和slave 同步,这里看刷盘策略和同步策略,是 SYNC 还是 ASYNC。经过我的测试,同步刷盘和异步刷盘的性能差距是 10 倍。
而 Slave 的数据同步,如果用 SYNC 模式,tps 最高也就 2000 多一丢度,为什么?内网,两台机器 ping 一下都要 0.2 毫秒,一秒最多 5000 次,再加上处理逻辑, 2000 已经到顶了,网络成了瓶颈。
我们看看 mappedFile.appendMessage 方法的实现。一路追踪,有个关键逻辑, 在 appendMessagesInner 里:
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result = null;if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;
}
代码中,使用了 mappedFile 从 Linux 映射的 MMap buffer,对数据进行写入。我们看看 doAppend 方法。
- 如果是 SYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,就会立刻刷盘并等待刷盘结果。
- 如果是 ASYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,会通知异步线程进行刷盘,但不等待结果。
如果没有新数据,则为 500ms 执行一次刷盘策略。
简单说下异步刷盘:
默认刷盘 4 页,Linux 一页是 4kb 数据,4页就是 16kb。
如果写的数据减去已经刷的数据,剩下的数据大于等于 4 页,就执行刷盘,执行 mappedByteBuffer.force() 或者 fileChannel.force(false);
分享资源

获取以上资源请访问开源项目 点击跳转
相关文章:
【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。 分析的总体…...
关于vue首屏加载loading问题
注意:网上搜索出来的都是教你在index.html里面<div id"app"><div class"loading"></div>或者在app.vue Mounte生命周期函数控制app和loading的显示和隐藏,这里会有一个问题,就是js渲染页面需要时间,一…...
数据库性能测试实践:慢查询统计分析
01、慢查询 查看是否开启慢查询 mysql> show variables like %slow%’; 如图所示: 系统变量log_slow_admin_statements 表示是否将慢管理语句例如ANALYZE TABLE和ALTER TABLE等记入慢查询日志启用log_slow_extra系统变量 (从MySQL 8.0.14开始提供&a…...
windows wsl ssh 配置流程 Permission denied (publickey)
wsl ssh连接失败配置流程 1、wsl2 ifconfig的网络ip是虚拟的ip,所以采用wsl1 2、wsl1的安装教程。 3、openssh-server重装 sudo apt-get update sudo apt-get remove openssh-server sudo apt-get install openssh-server4、修改ssh配置文件 sudo vim /etc/ss…...
OpenCV(五):图像颜色空间转换
目录 1.图像颜色空间介绍 RGB 颜色空间 2.HSV 颜色空间 3.RGBA 颜色空间 2.图像数据类型间的互相转换convertTo() 3.不同颜色空间互相转换cvtColor() 4.Android JNI demo 1.图像颜色空间介绍 RGB 颜色空间 RGB 颜色空间是最常见的颜色表示方式之一,其中 R、…...
一图胜千言!数据可视化多维讲解(Python)
数据聚合、汇总和可视化是支撑数据分析领域的三大支柱。长久以来,数据可视化都是一个强有力的工具,被业界广泛使用,却受限于 2 维。在本文中,作者将探索一些有效的多维数据可视化策略(范围从 1 维到 6 维)。…...
Hbase相关总结
Hbase 1、Hbase的数据写入流程 由客户端发起写入数据的请求, 首先会先连接zookeeper 从zookeeper中获取到当前HMaster的信息,并与HMaster建立连接从HMaster中获取RegionServer列表信息 连接meta表对应的RegionServer地址, 从meta表获取当前要写入的表对应region被那个RegionS…...
C++ Primer Plus第二章编程练习答案
答案仅供参考,实际运行效果取决于运行平台和运行软件 1.编写一个C程序,它显示您的姓名和地址。 #include <iostream> using namespace std;int main() {cout << "My name is sakuraaa0908 C Primer Plus." << endl;cout &…...
Web后端开发(请求响应)上
请求响应的概述 浏览器(请求)<--------------------------(HTTP协议)---------------------->(响应)Web服务器 请求:获取请求数据 响应:设置响应数据 BS架构:浏览器/服务器架构模式。…...
LeetCode 338. Counting Bits【动态规划,位运算】简单
本文属于「征服LeetCode」系列文章之一,这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁,本系列将至少持续到刷完所有无锁题之日为止;由于LeetCode还在不断地创建新题,本系列的终止日期可能是永远。在这一系列刷题文章…...
解释 Git 的基本概念和使用方式。
Git 是一种分布式版本控制系统,它可以跟踪文件的修改历史、协调多个人员的工作、将分支合并到一起等。下面是 Git 的一些基本概念和使用方式。 - 仓库(Repository):存储代码、版本控制历史记录等的地方。 - 分支(Bran…...
计算机网络初识
目录 1、计算机网络背景 网络发展 认识 "协议" 2、网络协议初识 OSI七层模型 TCP/IP五层(或四层)模型 3、网络传输基本流程 网络传输流程图 数据包封装和分用 4、网络中的地址管理 认识IP地址 认识MAC地址 1、计算机网络背景 网络发展 在之前呢&…...
python 笔记(2)——文件、异常、面向对象、装饰器、json
目录 1、文件操作 1-1)打开文件的两种方式: 1-2)文件操作的简单示例: write方法: read方法: readline方法: readlines方法: 2、异常处理 2-1)不会中断程序的异常捕获和处理…...
Meta AI的Nougat能够将数学表达式从PDF文件转换为机器可读文本
大多数科学知识通常以可移植文档格式(PDF)的形式存储,这也是互联网上第二突出的数据格式。然而,从这种格式中提取信息或将其转换为机器可读的文本具有挑战性,尤其是在涉及数学表达式时。 为了解决这个问题,…...
【Python爬虫笔记】爬虫代理IP与访问控制
一、前言 在进行网络爬虫的开发过程中,有许多限制因素阻碍着爬虫程序的正常运行,其中最主要的一点就是反爬虫机制。为了防止爬虫程序在短时间内大量地请求同一个网站,网站管理者会使用一些方式进行限制。这时候,代理IP就是解决方…...
50、Spring WebFlux 的 自动配置 的一些介绍,与 Spring MVC 的一些对比
Spring WebFlux Spring WebFlux 简称 WebFlux ,是 spring5.0 新引入的一个框架。 SpringBoot 同样为 WebFlux 提供了自动配置。 Spring WebFlux 和 Spring MVC 是属于竞争关系,都是框架。在一个项目中两个也可以同时存在。 SpringMVC 是基于 Servlet A…...
【算法专题突破】双指针 - 和为s的两个数字(6)
目录 1. 题目解析 2. 算法原理 3. 代码编写 写在最后: 1. 题目解析 题目链接:剑指 Offer 57. 和为s的两个数字 - 力扣(Leetcode) 这道题题目就一句话但是也是有信息可以提取的, 最重要的就是开始的那句话&#…...
Redis7入门概述
✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: Java从入门到精通 ✨特色专栏…...
SQL sever命名规范
目录 一、标识符 二、表名(Table): 三、字段名(fields): 四、约束(Constraint): 五、索引(Index): 六、存储过程(Stored Proced…...
BCSP-玄子Share-Java框基础_工厂模式/代理模式
三、设计模式 3.1 设计模式简介 软件设计中的三十六计是人们在长期的软件开发中的经验总结是对某些特定问题的经过实践检验的特定解决方法被广泛运用在 Java 框架技术中 3.1.1 设计模式的优点 设计模式是可复用的面向对象软件的基础可以更加简单方便地复用成功的设计和体系…...
UE5 学习系列(二)用户操作界面及介绍
这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...
C++:std::is_convertible
C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...
Qemu arm操作系统开发环境
使用qemu虚拟arm硬件比较合适。 步骤如下: 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载,下载地址:https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...
