【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。
分析的总体技术范围发送到存储,本文的主要目的是主要是为了认识一条消息并分析被发出且被存储的,代码中,关于 MQ 文件系统的优化,设计等。
来自官方源码example的一段发送代码:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
直接看看send方法,send 方法会设置一个默认的 timeout:3秒。默认使用 SYNC 模式,另外有Async和OneWay模式。需要处理方法签名中的 Client 端的异常,网络异常,Broker 端的异常,线程中断异常。
DefaultMQProducerImpl 的 sendDefaultImpl方法就是发送的主要逻辑。
代码里,有个地方可以提一下,关于更新故障时间的策略,RocketMQ有一个类 MQFaultStrategy,用来处理MQ错误,然后对 MQ Server 进行服务降级。
如果发送一条消息在550ms以内,那么就不用降级,如果550毫秒以外,就进行容错降级(熔断)30 秒,以此类推。
再看DefaultMQProducerImpl 的 sendKernelImpl发送到内核的方法实现。
先找到broker的地址。尝试压缩大于4M 的消息(批量消息不压缩),然后执行各种钩子。
- Request对象(存放数据)
- Context 上下文对象(存放调用上下文)。
这里会设置一个消息生成时间,即bornTimestamp,后面使用消息轨迹的时候,可以查看。
默认情况下:如果采用SYNC 模式,就调用 MQClientAPIImpl 来发送消息,这一层还是在 Client 模块里,在这一层,会设置更详细的消息细节,构造命令对象。最后调用 remotingClient的 invokeSync 发送消息。
MQClientAPIImpl的sendMessage这一层,会给命令对象设置一个CmdCode,叫SEND_MESSAGE,这个东西就是一个和Broker的契约,Broker会根据这个Code进行不同的策略。
Netty 会使用 Handler 处理出去的数据和返回的数据,我们看看 Client 端 Netty 有哪些 Handler.
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()).handler(new ChannelInitializer() {public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}pipeline.addLast(defaultEventExecutorGroup,new NettyEncoder(),new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),new NettyClientHandler());}});
使用了一个 Encoder,Decoder,空闲处理器,连接管理器,ClientHandler。
XXCoder就是对Cmd对象进行序列化和反序列化的,这里的空闲使用的读写最大空闲时间为120s,超过这个,就会触发空闲事件。
- RocketMQ就会关闭Channel 连接。而针对空闲事件进行处理的就是连接管理器了。
- 连接管理器处理空闲、Close、Connect、异常等事件,使用监听器模式,不同的监听器对不同的事件进行处理。另外,这里也许可以借鉴 EventBus,每个事件可以设置多个监听器。
看了RocketMQ中 Netty 的设计,再看看返回值处理就简单了,NettyClientHandler 会在 channelRead0 方法处理 Netty Server 的返回值。对应 RMQ,则是 processMessageReceived 方法。该方法很简洁:
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}
其实,这是一个模板方法,固定算法,由子类实现,分为 Request 实现和 Response 实现。我们看看 Response 实现。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque = cmd.getOpaque();final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {responseFuture.setResponseCommand(cmd);responseTable.remove(opaque);if (responseFuture.getInvokeCallback() != null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}}
通过 cmd 对象的 Request ID 找到 Feature,执行 responseFuture.putResponse,设置返回值,唤醒阻塞等待的发送线程。
这里还有一个 release 调用,这个和异步发送有关,默认最大同时 65535 个异步请求,具体就不展开了。
到这里,唤醒阻塞的发送线程,返回数据,客户端层面的发送就结束了。
看源码,看到有个 SEND_MESSAGE Code,是 Client 和 Broker Server 的一个约定代码,我们看看这个代码在哪里用的。
在 broker 模块的 BrokerController 类中,有个 registerProcessor 方法,会将 SEND_MESSAGE Code 和一个 SendMessageProcessor 对象绑定。
NettyRemotingServer是处理Request 的类,ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler处理器,这个处理器的channelRead0方法会调用 NettyRemotingServer的父类processMessageReceived 方法。
从processorTable 里,根据 Cmd Code,也就是 SEND_MESSAGE 获取对应的 Processor
一部分是处理数据的对象,一部分是这个对象所对应的线程池。用于异步处理逻辑,防止阻塞 Netty IO线程。
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
前后都是执行一些钩子,例如 ACL
RocketMQ会有一个 BrokerController 类,会注册 Code 和 Processor 的绑定关系,BrokerController 也会把这些绑定,注册到 Netty Server 中,当 Netty Server 从 Socket 收到 Cmd 对象,根据 Cmd 对象的 Code,就可以找到对应 Processor 类,对数据进行处理。
中间是处理 Request请求的。这个 processRequest 方法,有很多的实现,SendMessageProcessor的sendMessage 是处理消息的主要逻辑。
消息存储引擎,这里我们看DefaultMessageStore的putMessage 实现。
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
由于RocketMQ写数据是PageCache里面写的,因此,如果写的慢,就是 PageCache 忙,这里忙的标准是,如果锁文件的时间,超过了 1 秒,那就是忙。
if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
最后调用 PutMessageResult result = this.commitLog.putMessage(msg) 写数据。如果耗时超过 500 毫秒,就会打印日志。这样我们排查问题的时候,可以看看 storeStats 的日志。
result = mappedFile.appendMessage(msg, this.appendMessageCallback)
写完之后,释放锁,如果超过 500 毫秒,打印 cost time 日志。
处理刷盘和slave 同步,这里看刷盘策略和同步策略,是 SYNC 还是 ASYNC。经过我的测试,同步刷盘和异步刷盘的性能差距是 10 倍。
而 Slave 的数据同步,如果用 SYNC 模式,tps 最高也就 2000 多一丢度,为什么?内网,两台机器 ping 一下都要 0.2 毫秒,一秒最多 5000 次,再加上处理逻辑, 2000 已经到顶了,网络成了瓶颈。
我们看看 mappedFile.appendMessage 方法的实现。一路追踪,有个关键逻辑, 在 appendMessagesInner 里:
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result = null;if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;
}
代码中,使用了 mappedFile 从 Linux 映射的 MMap buffer,对数据进行写入。我们看看 doAppend 方法。
- 如果是 SYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,就会立刻刷盘并等待刷盘结果。
- 如果是 ASYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,会通知异步线程进行刷盘,但不等待结果。
如果没有新数据,则为 500ms 执行一次刷盘策略。
简单说下异步刷盘:
默认刷盘 4 页,Linux 一页是 4kb 数据,4页就是 16kb。
如果写的数据减去已经刷的数据,剩下的数据大于等于 4 页,就执行刷盘,执行 mappedByteBuffer.force() 或者 fileChannel.force(false);
分享资源

获取以上资源请访问开源项目 点击跳转
相关文章:
【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。 分析的总体…...
关于vue首屏加载loading问题
注意:网上搜索出来的都是教你在index.html里面<div id"app"><div class"loading"></div>或者在app.vue Mounte生命周期函数控制app和loading的显示和隐藏,这里会有一个问题,就是js渲染页面需要时间,一…...
数据库性能测试实践:慢查询统计分析
01、慢查询 查看是否开启慢查询 mysql> show variables like %slow%’; 如图所示: 系统变量log_slow_admin_statements 表示是否将慢管理语句例如ANALYZE TABLE和ALTER TABLE等记入慢查询日志启用log_slow_extra系统变量 (从MySQL 8.0.14开始提供&a…...
windows wsl ssh 配置流程 Permission denied (publickey)
wsl ssh连接失败配置流程 1、wsl2 ifconfig的网络ip是虚拟的ip,所以采用wsl1 2、wsl1的安装教程。 3、openssh-server重装 sudo apt-get update sudo apt-get remove openssh-server sudo apt-get install openssh-server4、修改ssh配置文件 sudo vim /etc/ss…...
OpenCV(五):图像颜色空间转换
目录 1.图像颜色空间介绍 RGB 颜色空间 2.HSV 颜色空间 3.RGBA 颜色空间 2.图像数据类型间的互相转换convertTo() 3.不同颜色空间互相转换cvtColor() 4.Android JNI demo 1.图像颜色空间介绍 RGB 颜色空间 RGB 颜色空间是最常见的颜色表示方式之一,其中 R、…...
一图胜千言!数据可视化多维讲解(Python)
数据聚合、汇总和可视化是支撑数据分析领域的三大支柱。长久以来,数据可视化都是一个强有力的工具,被业界广泛使用,却受限于 2 维。在本文中,作者将探索一些有效的多维数据可视化策略(范围从 1 维到 6 维)。…...
Hbase相关总结
Hbase 1、Hbase的数据写入流程 由客户端发起写入数据的请求, 首先会先连接zookeeper 从zookeeper中获取到当前HMaster的信息,并与HMaster建立连接从HMaster中获取RegionServer列表信息 连接meta表对应的RegionServer地址, 从meta表获取当前要写入的表对应region被那个RegionS…...
C++ Primer Plus第二章编程练习答案
答案仅供参考,实际运行效果取决于运行平台和运行软件 1.编写一个C程序,它显示您的姓名和地址。 #include <iostream> using namespace std;int main() {cout << "My name is sakuraaa0908 C Primer Plus." << endl;cout &…...
Web后端开发(请求响应)上
请求响应的概述 浏览器(请求)<--------------------------(HTTP协议)---------------------->(响应)Web服务器 请求:获取请求数据 响应:设置响应数据 BS架构:浏览器/服务器架构模式。…...
LeetCode 338. Counting Bits【动态规划,位运算】简单
本文属于「征服LeetCode」系列文章之一,这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁,本系列将至少持续到刷完所有无锁题之日为止;由于LeetCode还在不断地创建新题,本系列的终止日期可能是永远。在这一系列刷题文章…...
解释 Git 的基本概念和使用方式。
Git 是一种分布式版本控制系统,它可以跟踪文件的修改历史、协调多个人员的工作、将分支合并到一起等。下面是 Git 的一些基本概念和使用方式。 - 仓库(Repository):存储代码、版本控制历史记录等的地方。 - 分支(Bran…...
计算机网络初识
目录 1、计算机网络背景 网络发展 认识 "协议" 2、网络协议初识 OSI七层模型 TCP/IP五层(或四层)模型 3、网络传输基本流程 网络传输流程图 数据包封装和分用 4、网络中的地址管理 认识IP地址 认识MAC地址 1、计算机网络背景 网络发展 在之前呢&…...
python 笔记(2)——文件、异常、面向对象、装饰器、json
目录 1、文件操作 1-1)打开文件的两种方式: 1-2)文件操作的简单示例: write方法: read方法: readline方法: readlines方法: 2、异常处理 2-1)不会中断程序的异常捕获和处理…...
Meta AI的Nougat能够将数学表达式从PDF文件转换为机器可读文本
大多数科学知识通常以可移植文档格式(PDF)的形式存储,这也是互联网上第二突出的数据格式。然而,从这种格式中提取信息或将其转换为机器可读的文本具有挑战性,尤其是在涉及数学表达式时。 为了解决这个问题,…...
【Python爬虫笔记】爬虫代理IP与访问控制
一、前言 在进行网络爬虫的开发过程中,有许多限制因素阻碍着爬虫程序的正常运行,其中最主要的一点就是反爬虫机制。为了防止爬虫程序在短时间内大量地请求同一个网站,网站管理者会使用一些方式进行限制。这时候,代理IP就是解决方…...
50、Spring WebFlux 的 自动配置 的一些介绍,与 Spring MVC 的一些对比
Spring WebFlux Spring WebFlux 简称 WebFlux ,是 spring5.0 新引入的一个框架。 SpringBoot 同样为 WebFlux 提供了自动配置。 Spring WebFlux 和 Spring MVC 是属于竞争关系,都是框架。在一个项目中两个也可以同时存在。 SpringMVC 是基于 Servlet A…...
【算法专题突破】双指针 - 和为s的两个数字(6)
目录 1. 题目解析 2. 算法原理 3. 代码编写 写在最后: 1. 题目解析 题目链接:剑指 Offer 57. 和为s的两个数字 - 力扣(Leetcode) 这道题题目就一句话但是也是有信息可以提取的, 最重要的就是开始的那句话&#…...
Redis7入门概述
✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: Java从入门到精通 ✨特色专栏…...
SQL sever命名规范
目录 一、标识符 二、表名(Table): 三、字段名(fields): 四、约束(Constraint): 五、索引(Index): 六、存储过程(Stored Proced…...
BCSP-玄子Share-Java框基础_工厂模式/代理模式
三、设计模式 3.1 设计模式简介 软件设计中的三十六计是人们在长期的软件开发中的经验总结是对某些特定问题的经过实践检验的特定解决方法被广泛运用在 Java 框架技术中 3.1.1 设计模式的优点 设计模式是可复用的面向对象软件的基础可以更加简单方便地复用成功的设计和体系…...
基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
用docker来安装部署freeswitch记录
今天刚才测试一个callcenter的项目,所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...
MFC 抛体运动模拟:常见问题解决与界面美化
在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...
Chrome 浏览器前端与客户端双向通信实战
Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...
