【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。
分析的总体技术范围发送到存储,本文的主要目的是主要是为了认识一条消息并分析被发出且被存储的,代码中,关于 MQ 文件系统的优化,设计等。
来自官方源码example的一段发送代码:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
直接看看send方法,send 方法会设置一个默认的 timeout:3秒。默认使用 SYNC 模式,另外有Async和OneWay模式。需要处理方法签名中的 Client 端的异常,网络异常,Broker 端的异常,线程中断异常。
DefaultMQProducerImpl 的 sendDefaultImpl方法就是发送的主要逻辑。
代码里,有个地方可以提一下,关于更新故障时间的策略,RocketMQ有一个类 MQFaultStrategy,用来处理MQ错误,然后对 MQ Server 进行服务降级。
如果发送一条消息在550ms以内,那么就不用降级,如果550毫秒以外,就进行容错降级(熔断)30 秒,以此类推。
再看DefaultMQProducerImpl 的 sendKernelImpl发送到内核的方法实现。
先找到broker的地址。尝试压缩大于4M 的消息(批量消息不压缩),然后执行各种钩子。
- Request对象(存放数据)
- Context 上下文对象(存放调用上下文)。
这里会设置一个消息生成时间,即bornTimestamp,后面使用消息轨迹的时候,可以查看。
默认情况下:如果采用SYNC 模式,就调用 MQClientAPIImpl 来发送消息,这一层还是在 Client 模块里,在这一层,会设置更详细的消息细节,构造命令对象。最后调用 remotingClient的 invokeSync 发送消息。
MQClientAPIImpl的sendMessage这一层,会给命令对象设置一个CmdCode,叫SEND_MESSAGE,这个东西就是一个和Broker的契约,Broker会根据这个Code进行不同的策略。
Netty 会使用 Handler 处理出去的数据和返回的数据,我们看看 Client 端 Netty 有哪些 Handler.
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()).handler(new ChannelInitializer() {public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}pipeline.addLast(defaultEventExecutorGroup,new NettyEncoder(),new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),new NettyClientHandler());}});
使用了一个 Encoder,Decoder,空闲处理器,连接管理器,ClientHandler。
XXCoder就是对Cmd对象进行序列化和反序列化的,这里的空闲使用的读写最大空闲时间为120s,超过这个,就会触发空闲事件。
- RocketMQ就会关闭Channel 连接。而针对空闲事件进行处理的就是连接管理器了。
- 连接管理器处理空闲、Close、Connect、异常等事件,使用监听器模式,不同的监听器对不同的事件进行处理。另外,这里也许可以借鉴 EventBus,每个事件可以设置多个监听器。
看了RocketMQ中 Netty 的设计,再看看返回值处理就简单了,NettyClientHandler 会在 channelRead0 方法处理 Netty Server 的返回值。对应 RMQ,则是 processMessageReceived 方法。该方法很简洁:
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}
其实,这是一个模板方法,固定算法,由子类实现,分为 Request 实现和 Response 实现。我们看看 Response 实现。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque = cmd.getOpaque();final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {responseFuture.setResponseCommand(cmd);responseTable.remove(opaque);if (responseFuture.getInvokeCallback() != null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}}
通过 cmd 对象的 Request ID 找到 Feature,执行 responseFuture.putResponse,设置返回值,唤醒阻塞等待的发送线程。
这里还有一个 release 调用,这个和异步发送有关,默认最大同时 65535 个异步请求,具体就不展开了。
到这里,唤醒阻塞的发送线程,返回数据,客户端层面的发送就结束了。
看源码,看到有个 SEND_MESSAGE Code,是 Client 和 Broker Server 的一个约定代码,我们看看这个代码在哪里用的。
在 broker 模块的 BrokerController 类中,有个 registerProcessor 方法,会将 SEND_MESSAGE Code 和一个 SendMessageProcessor 对象绑定。
NettyRemotingServer是处理Request 的类,ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler处理器,这个处理器的channelRead0方法会调用 NettyRemotingServer的父类processMessageReceived 方法。
从processorTable 里,根据 Cmd Code,也就是 SEND_MESSAGE 获取对应的 Processor
一部分是处理数据的对象,一部分是这个对象所对应的线程池。用于异步处理逻辑,防止阻塞 Netty IO线程。
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
前后都是执行一些钩子,例如 ACL
RocketMQ会有一个 BrokerController 类,会注册 Code 和 Processor 的绑定关系,BrokerController 也会把这些绑定,注册到 Netty Server 中,当 Netty Server 从 Socket 收到 Cmd 对象,根据 Cmd 对象的 Code,就可以找到对应 Processor 类,对数据进行处理。
中间是处理 Request请求的。这个 processRequest 方法,有很多的实现,SendMessageProcessor的sendMessage 是处理消息的主要逻辑。
消息存储引擎,这里我们看DefaultMessageStore的putMessage 实现。
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
由于RocketMQ写数据是PageCache里面写的,因此,如果写的慢,就是 PageCache 忙,这里忙的标准是,如果锁文件的时间,超过了 1 秒,那就是忙。
if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
最后调用 PutMessageResult result = this.commitLog.putMessage(msg) 写数据。如果耗时超过 500 毫秒,就会打印日志。这样我们排查问题的时候,可以看看 storeStats 的日志。
result = mappedFile.appendMessage(msg, this.appendMessageCallback)
写完之后,释放锁,如果超过 500 毫秒,打印 cost time 日志。
处理刷盘和slave 同步,这里看刷盘策略和同步策略,是 SYNC 还是 ASYNC。经过我的测试,同步刷盘和异步刷盘的性能差距是 10 倍。
而 Slave 的数据同步,如果用 SYNC 模式,tps 最高也就 2000 多一丢度,为什么?内网,两台机器 ping 一下都要 0.2 毫秒,一秒最多 5000 次,再加上处理逻辑, 2000 已经到顶了,网络成了瓶颈。
我们看看 mappedFile.appendMessage 方法的实现。一路追踪,有个关键逻辑, 在 appendMessagesInner 里:
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result = null;if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;
}
代码中,使用了 mappedFile 从 Linux 映射的 MMap buffer,对数据进行写入。我们看看 doAppend 方法。
- 如果是 SYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,就会立刻刷盘并等待刷盘结果。
- 如果是 ASYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,会通知异步线程进行刷盘,但不等待结果。
如果没有新数据,则为 500ms 执行一次刷盘策略。
简单说下异步刷盘:
默认刷盘 4 页,Linux 一页是 4kb 数据,4页就是 16kb。
如果写的数据减去已经刷的数据,剩下的数据大于等于 4 页,就执行刷盘,执行 mappedByteBuffer.force() 或者 fileChannel.force(false);
分享资源
获取以上资源请访问开源项目 点击跳转
相关文章:

【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。 分析的总体…...

关于vue首屏加载loading问题
注意:网上搜索出来的都是教你在index.html里面<div id"app"><div class"loading"></div>或者在app.vue Mounte生命周期函数控制app和loading的显示和隐藏,这里会有一个问题,就是js渲染页面需要时间,一…...

数据库性能测试实践:慢查询统计分析
01、慢查询 查看是否开启慢查询 mysql> show variables like %slow%’; 如图所示: 系统变量log_slow_admin_statements 表示是否将慢管理语句例如ANALYZE TABLE和ALTER TABLE等记入慢查询日志启用log_slow_extra系统变量 (从MySQL 8.0.14开始提供&a…...

windows wsl ssh 配置流程 Permission denied (publickey)
wsl ssh连接失败配置流程 1、wsl2 ifconfig的网络ip是虚拟的ip,所以采用wsl1 2、wsl1的安装教程。 3、openssh-server重装 sudo apt-get update sudo apt-get remove openssh-server sudo apt-get install openssh-server4、修改ssh配置文件 sudo vim /etc/ss…...

OpenCV(五):图像颜色空间转换
目录 1.图像颜色空间介绍 RGB 颜色空间 2.HSV 颜色空间 3.RGBA 颜色空间 2.图像数据类型间的互相转换convertTo() 3.不同颜色空间互相转换cvtColor() 4.Android JNI demo 1.图像颜色空间介绍 RGB 颜色空间 RGB 颜色空间是最常见的颜色表示方式之一,其中 R、…...

一图胜千言!数据可视化多维讲解(Python)
数据聚合、汇总和可视化是支撑数据分析领域的三大支柱。长久以来,数据可视化都是一个强有力的工具,被业界广泛使用,却受限于 2 维。在本文中,作者将探索一些有效的多维数据可视化策略(范围从 1 维到 6 维)。…...

Hbase相关总结
Hbase 1、Hbase的数据写入流程 由客户端发起写入数据的请求, 首先会先连接zookeeper 从zookeeper中获取到当前HMaster的信息,并与HMaster建立连接从HMaster中获取RegionServer列表信息 连接meta表对应的RegionServer地址, 从meta表获取当前要写入的表对应region被那个RegionS…...

C++ Primer Plus第二章编程练习答案
答案仅供参考,实际运行效果取决于运行平台和运行软件 1.编写一个C程序,它显示您的姓名和地址。 #include <iostream> using namespace std;int main() {cout << "My name is sakuraaa0908 C Primer Plus." << endl;cout &…...

Web后端开发(请求响应)上
请求响应的概述 浏览器(请求)<--------------------------(HTTP协议)---------------------->(响应)Web服务器 请求:获取请求数据 响应:设置响应数据 BS架构:浏览器/服务器架构模式。…...

LeetCode 338. Counting Bits【动态规划,位运算】简单
本文属于「征服LeetCode」系列文章之一,这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁,本系列将至少持续到刷完所有无锁题之日为止;由于LeetCode还在不断地创建新题,本系列的终止日期可能是永远。在这一系列刷题文章…...

解释 Git 的基本概念和使用方式。
Git 是一种分布式版本控制系统,它可以跟踪文件的修改历史、协调多个人员的工作、将分支合并到一起等。下面是 Git 的一些基本概念和使用方式。 - 仓库(Repository):存储代码、版本控制历史记录等的地方。 - 分支(Bran…...

计算机网络初识
目录 1、计算机网络背景 网络发展 认识 "协议" 2、网络协议初识 OSI七层模型 TCP/IP五层(或四层)模型 3、网络传输基本流程 网络传输流程图 数据包封装和分用 4、网络中的地址管理 认识IP地址 认识MAC地址 1、计算机网络背景 网络发展 在之前呢&…...

python 笔记(2)——文件、异常、面向对象、装饰器、json
目录 1、文件操作 1-1)打开文件的两种方式: 1-2)文件操作的简单示例: write方法: read方法: readline方法: readlines方法: 2、异常处理 2-1)不会中断程序的异常捕获和处理…...

Meta AI的Nougat能够将数学表达式从PDF文件转换为机器可读文本
大多数科学知识通常以可移植文档格式(PDF)的形式存储,这也是互联网上第二突出的数据格式。然而,从这种格式中提取信息或将其转换为机器可读的文本具有挑战性,尤其是在涉及数学表达式时。 为了解决这个问题,…...

【Python爬虫笔记】爬虫代理IP与访问控制
一、前言 在进行网络爬虫的开发过程中,有许多限制因素阻碍着爬虫程序的正常运行,其中最主要的一点就是反爬虫机制。为了防止爬虫程序在短时间内大量地请求同一个网站,网站管理者会使用一些方式进行限制。这时候,代理IP就是解决方…...

50、Spring WebFlux 的 自动配置 的一些介绍,与 Spring MVC 的一些对比
Spring WebFlux Spring WebFlux 简称 WebFlux ,是 spring5.0 新引入的一个框架。 SpringBoot 同样为 WebFlux 提供了自动配置。 Spring WebFlux 和 Spring MVC 是属于竞争关系,都是框架。在一个项目中两个也可以同时存在。 SpringMVC 是基于 Servlet A…...

【算法专题突破】双指针 - 和为s的两个数字(6)
目录 1. 题目解析 2. 算法原理 3. 代码编写 写在最后: 1. 题目解析 题目链接:剑指 Offer 57. 和为s的两个数字 - 力扣(Leetcode) 这道题题目就一句话但是也是有信息可以提取的, 最重要的就是开始的那句话&#…...

Redis7入门概述
✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: Java从入门到精通 ✨特色专栏…...

SQL sever命名规范
目录 一、标识符 二、表名(Table): 三、字段名(fields): 四、约束(Constraint): 五、索引(Index): 六、存储过程(Stored Proced…...

BCSP-玄子Share-Java框基础_工厂模式/代理模式
三、设计模式 3.1 设计模式简介 软件设计中的三十六计是人们在长期的软件开发中的经验总结是对某些特定问题的经过实践检验的特定解决方法被广泛运用在 Java 框架技术中 3.1.1 设计模式的优点 设计模式是可复用的面向对象软件的基础可以更加简单方便地复用成功的设计和体系…...

【数据结构】2015统考真题 6
题目描述 【2015统考真题】求下面的带权图的最小(代价)生成树时,可能是Kruskal算法第2次选中但不是Prim算法(从v4开始)第2次选中的边是(C) A. (V1, V3) B. (V1, V4) C. (V2, V3) D. (V3, V4) …...

HTML <track> 标签
实例 播放带有字幕的视频: <video width="320" height="240" controls="controls"><source src="forrest_gump.mp4" type="video/mp4" /><source src="forrest_gump.ogg" type="video/ogg…...

php中识别url被篡改并阻止访问的实现方式是什么
在 PHP 中,可以通过多种方式来识别并阻止 URL 被篡改的访问。以下是一些常见的方法: 基本身份验证:使用 PHP 的 $_SERVER[PHP_AUTH_USER] 和 $_SERVER[PHP_AUTH_PW] 变量可以实施基本的 HTTP 身份验证。在访问受保护的页面之前,可…...

c++ 学习 之 const,constexpr,volatile
前言 const、constexpr 和 volatile 是 C 中用于修饰变量和类型的关键字 正文 它们分别用于不同的用途: const(常量): const 用于声明常量,表示变量的值不能被修改。 它可以应用于变量、指针、引用、成员函数以及类…...

【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决
问题一:7张表是同一个mysql中的,我们进行增量同步时分别用不同的flink任务读取,造成mysql server-id冲突问题,如下: Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this…...

C#常用多线程(线程同步,事件触发,信号量,互斥锁,共享内存,消息队列)
using System; using System.Threading; using System.Windows.Forms; using UtilForm.Util;namespace UtilForm {// 线程同步,事件触发,信号量,互斥锁,共享内存,消息队列public partial class frmUIThread : Form{ Sy…...

OpenWrt系统开发笔记
openWrt英文官网: https://openwrt.org/ 中文官网: http://www.openwrt.org.cn/ 一、开发环境及编译 在github上有两个源码使用的比较多 一个是lede,地址为:https://github.com/coolsnowwolf/lede 另一个为OpenWrt的官方源码&#…...

实战 - Restful APi 格式规范
文章目录 1. 特征2. 优点3. 动作1. GET 获取资源2. POST 创建资源3. PUT 整体替换4. PATCH 部分替换5. DELETE 删除资源 4. 示例 RESTful是一种API的设计风格,他和GraphQL ,JSON-RPC,WebService类似,用于定义在CS、BS架构下暴露服…...

《Linux从练气到飞升》No.21 Linux简单实现一个shell
🕺作者: 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux菜鸟刷题集 😘欢迎关注:👍点赞🙌收藏✍️留言 🏇码字不易,你的👍点赞🙌收藏❤️关注对我真的…...

【iVX】iVX的低代码未来发展趋势:加速应用开发的创新之路
简介: 随着数字化转型的飞速发展,企业和组织对快速开发和交付高质量应用的需求越来越迫切。低代码开发平台作为一种创新的解决方案,极大地简化了应用程序的开发过程。在这一领域,iVX低代码平台作为领先的创业公司,正在…...