当前位置: 首页 > news >正文

RocketMQ Broker消息处理流程及部分源码解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年2月10日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 消息处理流程
  • 消息存储目录结构
  • `SendMessage`源码
    • `processRequest`
    • `sendMessage`

消息处理流程

  1. SendMessageProcessor处理类接收到消息
  2. DefaultMessageStore实例将消息变成IndexFileConsumeQueueCommitLog对象
  3. 上述对象转成内存映射对象后进行落盘
    在这里插入图片描述

消息存储目录结构

RocketMQ的文件存储在store文件夹里,里面包含commitlogconfigconsumerqueueindex文件夹和abortcheckpoint两个文件。

文件夹:

  • commitlog存储写入到commitLog的消息内容
  • config存储配置信息
  • consumerqueue存储消费者队列信息
  • index存储消息队列的索引文件

文件:

  • abort标记RocketMQ是否正常退出
  • checkpoint存储commitlogconfigconsumerqueueindex文件的刷盘时间
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   ├── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerOffset.json
│   ├── delayOffset.json
│   ├── subscriptionGroup.json
│   ├── topics.json
├── consumequeue
│   ├── TopicA
│   ├── TopicB
│   ├── TopicC
├── index
│   ├── 00000000000000000000
│   ├── 00000000001073741824

RocketMQ内有专门对应磁盘上存储文件的封装类:

  1. CommitLog:对应commitlog文件
  2. ConsumeQueue:对应consumerqueue文件
  3. IndexFile:对应index文件
  4. MappedFile:直接内存映射业务的封装类,通过操作该类实例,可以把消息写入内存映射缓冲区,或将消息刷盘
  5. MappedFileQueue:连续物理存储的封装类,可以通过offset快速定位消息所在的MappedFile
  6. MappedFileBuff:堆外内存

SendMessage源码

SendMessageProcessor是接收消息的一个钩子函数,该类的对象将会处理发送到Broker的消息

processRequest

主要流程已在代码片注释中给出:

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {private List<ConsumeMessageHook> consumeMessageHookList;public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);}public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析请求体SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 建立消息上下文mqtraceContext = buildMsgContext(ctx, requestHeader);// 发送消息前的逻辑this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}/*** ......**/}

根据源码可以看出,首先解析发送消息的请求SendMessageRequestHeader,然后调用asyncSend(Batch)Message方法进行消息的发送。

该类提供了发送或接收消息的钩子函数:如果发送消息,则调用sendMessage方法,如果是接收消息则调用pullMessage拉取消息的方法。


sendMessage

消息发送给Broker服务器时,调用的是sendMessage方法接收并存储消息,主要流程已在代码片注释中给出:

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 初始化响应final RemotingCommand response = preSend(ctx, request, requestHeader);// 构建响应头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}// 设置消息体数据msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 获取Broker集群名称String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);// 同步等待消息存储成功if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {// 异步msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {// Broker拒绝接收消息if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

相关文章:

RocketMQ Broker消息处理流程及部分源码解析

&#x1f34a; Java学习&#xff1a;Java从入门到精通总结 &#x1f34a; 深入浅出RocketMQ设计思想&#xff1a;深入浅出RocketMQ设计思想 &#x1f34a; 绝对不一样的职场干货&#xff1a;大厂最佳实践经验指南 &#x1f4c6; 最近更新&#xff1a;2023年2月10日 &#x…...

Java面试题:Java集合框架

文章目录一、Java集合框架二、Java集合特性三、各集合类的使用ArrayListLinkedListHashSetHashSet源码解析对源码进行总结HashSet可同步HashSet的使用HashMap四、Iterator迭代器五、遍历集合元素的若干方式参考文章&#xff1a;Hash详解参考文章&#xff1a;深入浅出学Java——…...

时间之间的比较与计算相差年、月、日、小时、分钟、毫秒、纳秒以及判断闰年--LocalDateTime

如何把String/Date转成LocalDateTime参考String、Date与LocalDate、LocalTime、LocalDateTime之间互转 String、Date、LocalDateTime、Calendar与时间戳之间互相转化参考String、Date、LocalDateTime、Calendar与时间戳之间互相转化 比较方法介绍 isBefore(ChronoLocalDateT…...

PyTorch学习笔记:nn.L1Loss——L1损失

PyTorch学习笔记&#xff1a;nn.L1Loss——L1损失 torch.nn.L1Loss(size_averageNone, reduceNone, reductionmean)功能&#xff1a;创建一个绝对值误差损失函数&#xff0c;即L1损失&#xff1a; l(x,y)L{l1,…,lN}T,ln∣xn−yn∣l(x,y)L\{l_1,\dots,l_N\}^T,l_n|x_n-y_n| l(…...

Java程序设计-ssm企业财务管理系统设计与实现

摘要系统设计系统实现开发环境&#xff1a;摘要 对于企业集来说,财务管理的地位很重要。随着计算机和网络在企业中的广泛应用&#xff0c;企业发展速度在不断加快&#xff0c;在这种市场竞争冲击下企业财务管理系统必须优先发展&#xff0c;这样才能保证在竞争中处于优势地位。…...

疑难杂症篇(二十一)--Ubuntu18.04安装usb-cam过程出现的问题

对Ubuntu18.04{\rm Ubuntu 18.04}Ubuntu18.04环境下的ROS{\rm ROS}ROS的melodic{\rm melodic}melodic版本安装usb−cam{\rm usb-cam}usb−cam过程出现的两个常见问题提出解决方案。 1.问题1&#xff1a;usb-cam功能包编译时出现"未定义的引用"的问题 问题描述&#…...

npm-npm i XX --save 和--save-dev

之前使用npm i XX --save 和--save-dev 没太在意&#xff0c;就想记录一下&#xff0c;查到一篇比较全的(链接&#xff1a;NPM install -save 和 -save-dev 傻傻分不清)&#xff0c;直接看好了&#xff0c;哈哈~ # 安装模块到项目目录下 npm install moduleName # -g 的意思是…...

可重构或可调谐微波滤波器技术

电子可重构&#xff0c;或者说电调微波滤波器由于其在改善现在及未来微波系统容量中不断提高的重要性而正吸引着人们越来越多的关注来对其进行研究和开发。例如&#xff0c;崭露头脚的超宽带&#xff08;UWB&#xff09;技术要求使用很宽的无线电频谱。然而&#xff0c;作为资源…...

医院智能化解决方案-门(急)诊、医技、智能化项目解决方案

【版权声明】本资料来源网络&#xff0c;知识分享&#xff0c;仅供个人学习&#xff0c;请勿商用。【侵删致歉】如有侵权请联系小编&#xff0c;将在收到信息后第一时间删除&#xff01;完整资料领取见文末&#xff0c;部分资料内容&#xff1a;篇幅有限&#xff0c;无法完全展…...

判断元素是否在可视区域

前言 在日常开发中&#xff0c;我们经常需要判断目标元素是否在视窗之内或者和视窗的距离小于一个值&#xff08;例如 100 px&#xff09;&#xff0c;从而实现一些常用的功能&#xff0c;例如&#xff1a; 图片的懒加载列表的无限滚动计算广告元素的曝光情况可点击链接的预加…...

告别传统繁杂的采购合同管理 打造企业自动化采购管理模式

随着企业竞争日趋激烈&#xff0c;采购成本压力剧增&#xff0c;企业对于采购合同管理更加严格&#xff0c;从而把控物资成本。对于任何一家企业采购来说&#xff0c;规范化合同的全面管理&#xff0c;是采购活动中重要的一个环节。 但在如今&#xff0c;依旧有很多企业采购合…...

【prism】路由事件映射到Command命令

在之前的一篇文章中&#xff0c;我介绍了普通的自定义事件&#xff1a; 【wpf】自定义事件总结&#xff08;Action&#xff0c; EventHandler&#xff09;_code bean的博客-CSDN博客_wpf action可以说通过Action和EventHandle&#xff0c;自定义事件是相当的方便简单了。https…...

面向对象的基本概念和方法

面向对象的开发方法在近几十年见得以广泛应用&#xff0c;我们常见的Java语言就是一种典型的面向对象的开发语言。然而&#xff0c;面向对象的概念较为复杂&#xff0c;知识点也很细碎&#xff0c;本文整理了面向对象的基本概念和方法&#xff0c;供大家参考。为了便于读者理解…...

数据可视化大屏百度地图绘制行政区域标注实战案例解析(个性化地图、标注、视频、控件、定位、检索)

百度地图开发系列目录 数据可视化大屏应急管理综合指挥调度系统完整案例详解&#xff08;PHP-API、Echarts、百度地图&#xff09;数据可视化大屏百度地图API开发&#xff1a;停车场分布标注和检索静态版百度地图高级开发&#xff1a;map.getDistance计算多点之间的距离并输入…...

1.面向对象和类的关系?2.什么是Promise、3.Promise和async、await的关系

面向对象:面向对象是一种编程思想&#xff08;oop&#xff09;。(Js里面所有的东西都可以看做对象&#xff0c;Js它是基于原型的面向对象语言&#xff0c;采用原型的方式来构造对象)很多个具有相同属性和行为的对象就可以抽象为类&#xff0c;对象是类的一个实例。JavaScript在…...

【程序化天空盒】过程记录01:日月 天空渐变 大气散射

1 日月 SunAndMoon 昼夜的话肯定少不了太阳和月亮&#xff0c;太阳和月亮实现的道理是一样的&#xff0c;只不过是月亮比太阳多了一个需要控制月牙程度&#xff08;or添加贴图&#xff09;的细节~ 1.1 Sun 太阳的话很简单&#xff0c;直接在shader里实现一个太阳跟随平行光旋…...

无线通信中的轨道角动量

目录 一. 前言 二. 如何传输 三. 如何产生 3.1 螺旋结构器件 &#xff08;1&#xff09;螺旋相位板 &#xff08;2&#xff09;螺旋抛物面天线 3.2 超表面 3.3 天线阵列 3.3.1 相控阵 3.3.2 时控阵 四. 如何识别 一. 前言 轨道角动量&#xff1a;Orbital Angular M…...

以后更新功能,再也不用App发版了!智能小程序将为开发者最大化减负

在 IoT 时代&#xff0c;越来越多的企业意识到打造自有 App 对于品牌的重要性。作为智能设备不可或缺的控制终端&#xff0c;App 具备连接用户、完善服务、精细化运营用户的独特优势&#xff0c;可帮助企业大大提升品牌竞争力。 为了帮助品牌企业打造更具个性化、差异化的智能…...

C++之类模板全特化和偏特化

类模板类模板是通用类的描述&#xff0c;使用任意类型&#xff08;泛型&#xff09;来描述类的定义。使用类模板的时候&#xff0c;指定具体的数据类型&#xff0c;让编译器生成该类型的类定义。注意&#xff1a;函数模板中可以不指定具体数据类型&#xff0c;让编译器自动推到…...

Python 手写数字识别 MNIST数据集下载失败

目录 一、MNIST数据集下载失败 1 失败的解决办法&#xff08;经验教训&#xff09;&#xff1a; 2 亲测有效的解决方法&#xff1a; 一、MNIST数据集下载失败 场景复现&#xff1a;想要pytorchMINIST数据集来实现手写数字识别&#xff0c;首先就是进行MNIST数据集的下载&am…...

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

【决胜公务员考试】求职OMG——见面课测验1

2025最新版&#xff01;&#xff01;&#xff01;6.8截至答题&#xff0c;大家注意呀&#xff01; 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:&#xff08; B &#xff09; A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

Webpack性能优化:构建速度与体积优化策略

一、构建速度优化 1、​​升级Webpack和Node.js​​ ​​优化效果​​&#xff1a;Webpack 4比Webpack 3构建时间降低60%-98%。​​原因​​&#xff1a; V8引擎优化&#xff08;for of替代forEach、Map/Set替代Object&#xff09;。默认使用更快的md4哈希算法。AST直接从Loa…...

Python Einops库:深度学习中的张量操作革命

Einops&#xff08;爱因斯坦操作库&#xff09;就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库&#xff0c;用类似自然语言的表达式替代了晦涩的API调用&#xff0c;彻底改变了深度学习工程…...