当前位置: 首页 > news >正文

RocketMQ Broker消息处理流程及部分源码解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年2月10日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 消息处理流程
  • 消息存储目录结构
  • `SendMessage`源码
    • `processRequest`
    • `sendMessage`

消息处理流程

  1. SendMessageProcessor处理类接收到消息
  2. DefaultMessageStore实例将消息变成IndexFileConsumeQueueCommitLog对象
  3. 上述对象转成内存映射对象后进行落盘
    在这里插入图片描述

消息存储目录结构

RocketMQ的文件存储在store文件夹里,里面包含commitlogconfigconsumerqueueindex文件夹和abortcheckpoint两个文件。

文件夹:

  • commitlog存储写入到commitLog的消息内容
  • config存储配置信息
  • consumerqueue存储消费者队列信息
  • index存储消息队列的索引文件

文件:

  • abort标记RocketMQ是否正常退出
  • checkpoint存储commitlogconfigconsumerqueueindex文件的刷盘时间
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   ├── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerOffset.json
│   ├── delayOffset.json
│   ├── subscriptionGroup.json
│   ├── topics.json
├── consumequeue
│   ├── TopicA
│   ├── TopicB
│   ├── TopicC
├── index
│   ├── 00000000000000000000
│   ├── 00000000001073741824

RocketMQ内有专门对应磁盘上存储文件的封装类:

  1. CommitLog:对应commitlog文件
  2. ConsumeQueue:对应consumerqueue文件
  3. IndexFile:对应index文件
  4. MappedFile:直接内存映射业务的封装类,通过操作该类实例,可以把消息写入内存映射缓冲区,或将消息刷盘
  5. MappedFileQueue:连续物理存储的封装类,可以通过offset快速定位消息所在的MappedFile
  6. MappedFileBuff:堆外内存

SendMessage源码

SendMessageProcessor是接收消息的一个钩子函数,该类的对象将会处理发送到Broker的消息

processRequest

主要流程已在代码片注释中给出:

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {private List<ConsumeMessageHook> consumeMessageHookList;public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);}public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析请求体SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 建立消息上下文mqtraceContext = buildMsgContext(ctx, requestHeader);// 发送消息前的逻辑this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}/*** ......**/}

根据源码可以看出,首先解析发送消息的请求SendMessageRequestHeader,然后调用asyncSend(Batch)Message方法进行消息的发送。

该类提供了发送或接收消息的钩子函数:如果发送消息,则调用sendMessage方法,如果是接收消息则调用pullMessage拉取消息的方法。


sendMessage

消息发送给Broker服务器时,调用的是sendMessage方法接收并存储消息,主要流程已在代码片注释中给出:

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 初始化响应final RemotingCommand response = preSend(ctx, request, requestHeader);// 构建响应头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}// 设置消息体数据msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 获取Broker集群名称String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);// 同步等待消息存储成功if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {// 异步msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {// Broker拒绝接收消息if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

相关文章:

RocketMQ Broker消息处理流程及部分源码解析

&#x1f34a; Java学习&#xff1a;Java从入门到精通总结 &#x1f34a; 深入浅出RocketMQ设计思想&#xff1a;深入浅出RocketMQ设计思想 &#x1f34a; 绝对不一样的职场干货&#xff1a;大厂最佳实践经验指南 &#x1f4c6; 最近更新&#xff1a;2023年2月10日 &#x…...

Java面试题:Java集合框架

文章目录一、Java集合框架二、Java集合特性三、各集合类的使用ArrayListLinkedListHashSetHashSet源码解析对源码进行总结HashSet可同步HashSet的使用HashMap四、Iterator迭代器五、遍历集合元素的若干方式参考文章&#xff1a;Hash详解参考文章&#xff1a;深入浅出学Java——…...

时间之间的比较与计算相差年、月、日、小时、分钟、毫秒、纳秒以及判断闰年--LocalDateTime

如何把String/Date转成LocalDateTime参考String、Date与LocalDate、LocalTime、LocalDateTime之间互转 String、Date、LocalDateTime、Calendar与时间戳之间互相转化参考String、Date、LocalDateTime、Calendar与时间戳之间互相转化 比较方法介绍 isBefore(ChronoLocalDateT…...

PyTorch学习笔记:nn.L1Loss——L1损失

PyTorch学习笔记&#xff1a;nn.L1Loss——L1损失 torch.nn.L1Loss(size_averageNone, reduceNone, reductionmean)功能&#xff1a;创建一个绝对值误差损失函数&#xff0c;即L1损失&#xff1a; l(x,y)L{l1,…,lN}T,ln∣xn−yn∣l(x,y)L\{l_1,\dots,l_N\}^T,l_n|x_n-y_n| l(…...

Java程序设计-ssm企业财务管理系统设计与实现

摘要系统设计系统实现开发环境&#xff1a;摘要 对于企业集来说,财务管理的地位很重要。随着计算机和网络在企业中的广泛应用&#xff0c;企业发展速度在不断加快&#xff0c;在这种市场竞争冲击下企业财务管理系统必须优先发展&#xff0c;这样才能保证在竞争中处于优势地位。…...

疑难杂症篇(二十一)--Ubuntu18.04安装usb-cam过程出现的问题

对Ubuntu18.04{\rm Ubuntu 18.04}Ubuntu18.04环境下的ROS{\rm ROS}ROS的melodic{\rm melodic}melodic版本安装usb−cam{\rm usb-cam}usb−cam过程出现的两个常见问题提出解决方案。 1.问题1&#xff1a;usb-cam功能包编译时出现"未定义的引用"的问题 问题描述&#…...

npm-npm i XX --save 和--save-dev

之前使用npm i XX --save 和--save-dev 没太在意&#xff0c;就想记录一下&#xff0c;查到一篇比较全的(链接&#xff1a;NPM install -save 和 -save-dev 傻傻分不清)&#xff0c;直接看好了&#xff0c;哈哈~ # 安装模块到项目目录下 npm install moduleName # -g 的意思是…...

可重构或可调谐微波滤波器技术

电子可重构&#xff0c;或者说电调微波滤波器由于其在改善现在及未来微波系统容量中不断提高的重要性而正吸引着人们越来越多的关注来对其进行研究和开发。例如&#xff0c;崭露头脚的超宽带&#xff08;UWB&#xff09;技术要求使用很宽的无线电频谱。然而&#xff0c;作为资源…...

医院智能化解决方案-门(急)诊、医技、智能化项目解决方案

【版权声明】本资料来源网络&#xff0c;知识分享&#xff0c;仅供个人学习&#xff0c;请勿商用。【侵删致歉】如有侵权请联系小编&#xff0c;将在收到信息后第一时间删除&#xff01;完整资料领取见文末&#xff0c;部分资料内容&#xff1a;篇幅有限&#xff0c;无法完全展…...

判断元素是否在可视区域

前言 在日常开发中&#xff0c;我们经常需要判断目标元素是否在视窗之内或者和视窗的距离小于一个值&#xff08;例如 100 px&#xff09;&#xff0c;从而实现一些常用的功能&#xff0c;例如&#xff1a; 图片的懒加载列表的无限滚动计算广告元素的曝光情况可点击链接的预加…...

告别传统繁杂的采购合同管理 打造企业自动化采购管理模式

随着企业竞争日趋激烈&#xff0c;采购成本压力剧增&#xff0c;企业对于采购合同管理更加严格&#xff0c;从而把控物资成本。对于任何一家企业采购来说&#xff0c;规范化合同的全面管理&#xff0c;是采购活动中重要的一个环节。 但在如今&#xff0c;依旧有很多企业采购合…...

【prism】路由事件映射到Command命令

在之前的一篇文章中&#xff0c;我介绍了普通的自定义事件&#xff1a; 【wpf】自定义事件总结&#xff08;Action&#xff0c; EventHandler&#xff09;_code bean的博客-CSDN博客_wpf action可以说通过Action和EventHandle&#xff0c;自定义事件是相当的方便简单了。https…...

面向对象的基本概念和方法

面向对象的开发方法在近几十年见得以广泛应用&#xff0c;我们常见的Java语言就是一种典型的面向对象的开发语言。然而&#xff0c;面向对象的概念较为复杂&#xff0c;知识点也很细碎&#xff0c;本文整理了面向对象的基本概念和方法&#xff0c;供大家参考。为了便于读者理解…...

数据可视化大屏百度地图绘制行政区域标注实战案例解析(个性化地图、标注、视频、控件、定位、检索)

百度地图开发系列目录 数据可视化大屏应急管理综合指挥调度系统完整案例详解&#xff08;PHP-API、Echarts、百度地图&#xff09;数据可视化大屏百度地图API开发&#xff1a;停车场分布标注和检索静态版百度地图高级开发&#xff1a;map.getDistance计算多点之间的距离并输入…...

1.面向对象和类的关系?2.什么是Promise、3.Promise和async、await的关系

面向对象:面向对象是一种编程思想&#xff08;oop&#xff09;。(Js里面所有的东西都可以看做对象&#xff0c;Js它是基于原型的面向对象语言&#xff0c;采用原型的方式来构造对象)很多个具有相同属性和行为的对象就可以抽象为类&#xff0c;对象是类的一个实例。JavaScript在…...

【程序化天空盒】过程记录01:日月 天空渐变 大气散射

1 日月 SunAndMoon 昼夜的话肯定少不了太阳和月亮&#xff0c;太阳和月亮实现的道理是一样的&#xff0c;只不过是月亮比太阳多了一个需要控制月牙程度&#xff08;or添加贴图&#xff09;的细节~ 1.1 Sun 太阳的话很简单&#xff0c;直接在shader里实现一个太阳跟随平行光旋…...

无线通信中的轨道角动量

目录 一. 前言 二. 如何传输 三. 如何产生 3.1 螺旋结构器件 &#xff08;1&#xff09;螺旋相位板 &#xff08;2&#xff09;螺旋抛物面天线 3.2 超表面 3.3 天线阵列 3.3.1 相控阵 3.3.2 时控阵 四. 如何识别 一. 前言 轨道角动量&#xff1a;Orbital Angular M…...

以后更新功能,再也不用App发版了!智能小程序将为开发者最大化减负

在 IoT 时代&#xff0c;越来越多的企业意识到打造自有 App 对于品牌的重要性。作为智能设备不可或缺的控制终端&#xff0c;App 具备连接用户、完善服务、精细化运营用户的独特优势&#xff0c;可帮助企业大大提升品牌竞争力。 为了帮助品牌企业打造更具个性化、差异化的智能…...

C++之类模板全特化和偏特化

类模板类模板是通用类的描述&#xff0c;使用任意类型&#xff08;泛型&#xff09;来描述类的定义。使用类模板的时候&#xff0c;指定具体的数据类型&#xff0c;让编译器生成该类型的类定义。注意&#xff1a;函数模板中可以不指定具体数据类型&#xff0c;让编译器自动推到…...

Python 手写数字识别 MNIST数据集下载失败

目录 一、MNIST数据集下载失败 1 失败的解决办法&#xff08;经验教训&#xff09;&#xff1a; 2 亲测有效的解决方法&#xff1a; 一、MNIST数据集下载失败 场景复现&#xff1a;想要pytorchMINIST数据集来实现手写数字识别&#xff0c;首先就是进行MNIST数据集的下载&am…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

【算法训练营Day07】字符串part1

文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接&#xff1a;344. 反转字符串 双指针法&#xff0c;两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)

宇树机器人多姿态起立控制强化学习框架论文解析 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架&#xff08;一&#xff09; 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

Map相关知识

数据结构 二叉树 二叉树&#xff0c;顾名思义&#xff0c;每个节点最多有两个“叉”&#xff0c;也就是两个子节点&#xff0c;分别是左子 节点和右子节点。不过&#xff0c;二叉树并不要求每个节点都有两个子节点&#xff0c;有的节点只 有左子节点&#xff0c;有的节点只有…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

智能AI电话机器人系统的识别能力现状与发展水平

一、引言 随着人工智能技术的飞速发展&#xff0c;AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术&#xff0c;在客户服务、营销推广、信息查询等领域发挥着越来越重要…...