当前位置: 首页 > news >正文

RocketMQ Broker消息处理流程及部分源码解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年2月10日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 消息处理流程
  • 消息存储目录结构
  • `SendMessage`源码
    • `processRequest`
    • `sendMessage`

消息处理流程

  1. SendMessageProcessor处理类接收到消息
  2. DefaultMessageStore实例将消息变成IndexFileConsumeQueueCommitLog对象
  3. 上述对象转成内存映射对象后进行落盘
    在这里插入图片描述

消息存储目录结构

RocketMQ的文件存储在store文件夹里,里面包含commitlogconfigconsumerqueueindex文件夹和abortcheckpoint两个文件。

文件夹:

  • commitlog存储写入到commitLog的消息内容
  • config存储配置信息
  • consumerqueue存储消费者队列信息
  • index存储消息队列的索引文件

文件:

  • abort标记RocketMQ是否正常退出
  • checkpoint存储commitlogconfigconsumerqueueindex文件的刷盘时间
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   ├── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerOffset.json
│   ├── delayOffset.json
│   ├── subscriptionGroup.json
│   ├── topics.json
├── consumequeue
│   ├── TopicA
│   ├── TopicB
│   ├── TopicC
├── index
│   ├── 00000000000000000000
│   ├── 00000000001073741824

RocketMQ内有专门对应磁盘上存储文件的封装类:

  1. CommitLog:对应commitlog文件
  2. ConsumeQueue:对应consumerqueue文件
  3. IndexFile:对应index文件
  4. MappedFile:直接内存映射业务的封装类,通过操作该类实例,可以把消息写入内存映射缓冲区,或将消息刷盘
  5. MappedFileQueue:连续物理存储的封装类,可以通过offset快速定位消息所在的MappedFile
  6. MappedFileBuff:堆外内存

SendMessage源码

SendMessageProcessor是接收消息的一个钩子函数,该类的对象将会处理发送到Broker的消息

processRequest

主要流程已在代码片注释中给出:

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {private List<ConsumeMessageHook> consumeMessageHookList;public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);}public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析请求体SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 建立消息上下文mqtraceContext = buildMsgContext(ctx, requestHeader);// 发送消息前的逻辑this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}/*** ......**/}

根据源码可以看出,首先解析发送消息的请求SendMessageRequestHeader,然后调用asyncSend(Batch)Message方法进行消息的发送。

该类提供了发送或接收消息的钩子函数:如果发送消息,则调用sendMessage方法,如果是接收消息则调用pullMessage拉取消息的方法。


sendMessage

消息发送给Broker服务器时,调用的是sendMessage方法接收并存储消息,主要流程已在代码片注释中给出:

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 初始化响应final RemotingCommand response = preSend(ctx, request, requestHeader);// 构建响应头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}// 设置消息体数据msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 获取Broker集群名称String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);// 同步等待消息存储成功if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {// 异步msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {// Broker拒绝接收消息if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

相关文章:

RocketMQ Broker消息处理流程及部分源码解析

&#x1f34a; Java学习&#xff1a;Java从入门到精通总结 &#x1f34a; 深入浅出RocketMQ设计思想&#xff1a;深入浅出RocketMQ设计思想 &#x1f34a; 绝对不一样的职场干货&#xff1a;大厂最佳实践经验指南 &#x1f4c6; 最近更新&#xff1a;2023年2月10日 &#x…...

Java面试题:Java集合框架

文章目录一、Java集合框架二、Java集合特性三、各集合类的使用ArrayListLinkedListHashSetHashSet源码解析对源码进行总结HashSet可同步HashSet的使用HashMap四、Iterator迭代器五、遍历集合元素的若干方式参考文章&#xff1a;Hash详解参考文章&#xff1a;深入浅出学Java——…...

时间之间的比较与计算相差年、月、日、小时、分钟、毫秒、纳秒以及判断闰年--LocalDateTime

如何把String/Date转成LocalDateTime参考String、Date与LocalDate、LocalTime、LocalDateTime之间互转 String、Date、LocalDateTime、Calendar与时间戳之间互相转化参考String、Date、LocalDateTime、Calendar与时间戳之间互相转化 比较方法介绍 isBefore(ChronoLocalDateT…...

PyTorch学习笔记:nn.L1Loss——L1损失

PyTorch学习笔记&#xff1a;nn.L1Loss——L1损失 torch.nn.L1Loss(size_averageNone, reduceNone, reductionmean)功能&#xff1a;创建一个绝对值误差损失函数&#xff0c;即L1损失&#xff1a; l(x,y)L{l1,…,lN}T,ln∣xn−yn∣l(x,y)L\{l_1,\dots,l_N\}^T,l_n|x_n-y_n| l(…...

Java程序设计-ssm企业财务管理系统设计与实现

摘要系统设计系统实现开发环境&#xff1a;摘要 对于企业集来说,财务管理的地位很重要。随着计算机和网络在企业中的广泛应用&#xff0c;企业发展速度在不断加快&#xff0c;在这种市场竞争冲击下企业财务管理系统必须优先发展&#xff0c;这样才能保证在竞争中处于优势地位。…...

疑难杂症篇(二十一)--Ubuntu18.04安装usb-cam过程出现的问题

对Ubuntu18.04{\rm Ubuntu 18.04}Ubuntu18.04环境下的ROS{\rm ROS}ROS的melodic{\rm melodic}melodic版本安装usb−cam{\rm usb-cam}usb−cam过程出现的两个常见问题提出解决方案。 1.问题1&#xff1a;usb-cam功能包编译时出现"未定义的引用"的问题 问题描述&#…...

npm-npm i XX --save 和--save-dev

之前使用npm i XX --save 和--save-dev 没太在意&#xff0c;就想记录一下&#xff0c;查到一篇比较全的(链接&#xff1a;NPM install -save 和 -save-dev 傻傻分不清)&#xff0c;直接看好了&#xff0c;哈哈~ # 安装模块到项目目录下 npm install moduleName # -g 的意思是…...

可重构或可调谐微波滤波器技术

电子可重构&#xff0c;或者说电调微波滤波器由于其在改善现在及未来微波系统容量中不断提高的重要性而正吸引着人们越来越多的关注来对其进行研究和开发。例如&#xff0c;崭露头脚的超宽带&#xff08;UWB&#xff09;技术要求使用很宽的无线电频谱。然而&#xff0c;作为资源…...

医院智能化解决方案-门(急)诊、医技、智能化项目解决方案

【版权声明】本资料来源网络&#xff0c;知识分享&#xff0c;仅供个人学习&#xff0c;请勿商用。【侵删致歉】如有侵权请联系小编&#xff0c;将在收到信息后第一时间删除&#xff01;完整资料领取见文末&#xff0c;部分资料内容&#xff1a;篇幅有限&#xff0c;无法完全展…...

判断元素是否在可视区域

前言 在日常开发中&#xff0c;我们经常需要判断目标元素是否在视窗之内或者和视窗的距离小于一个值&#xff08;例如 100 px&#xff09;&#xff0c;从而实现一些常用的功能&#xff0c;例如&#xff1a; 图片的懒加载列表的无限滚动计算广告元素的曝光情况可点击链接的预加…...

告别传统繁杂的采购合同管理 打造企业自动化采购管理模式

随着企业竞争日趋激烈&#xff0c;采购成本压力剧增&#xff0c;企业对于采购合同管理更加严格&#xff0c;从而把控物资成本。对于任何一家企业采购来说&#xff0c;规范化合同的全面管理&#xff0c;是采购活动中重要的一个环节。 但在如今&#xff0c;依旧有很多企业采购合…...

【prism】路由事件映射到Command命令

在之前的一篇文章中&#xff0c;我介绍了普通的自定义事件&#xff1a; 【wpf】自定义事件总结&#xff08;Action&#xff0c; EventHandler&#xff09;_code bean的博客-CSDN博客_wpf action可以说通过Action和EventHandle&#xff0c;自定义事件是相当的方便简单了。https…...

面向对象的基本概念和方法

面向对象的开发方法在近几十年见得以广泛应用&#xff0c;我们常见的Java语言就是一种典型的面向对象的开发语言。然而&#xff0c;面向对象的概念较为复杂&#xff0c;知识点也很细碎&#xff0c;本文整理了面向对象的基本概念和方法&#xff0c;供大家参考。为了便于读者理解…...

数据可视化大屏百度地图绘制行政区域标注实战案例解析(个性化地图、标注、视频、控件、定位、检索)

百度地图开发系列目录 数据可视化大屏应急管理综合指挥调度系统完整案例详解&#xff08;PHP-API、Echarts、百度地图&#xff09;数据可视化大屏百度地图API开发&#xff1a;停车场分布标注和检索静态版百度地图高级开发&#xff1a;map.getDistance计算多点之间的距离并输入…...

1.面向对象和类的关系?2.什么是Promise、3.Promise和async、await的关系

面向对象:面向对象是一种编程思想&#xff08;oop&#xff09;。(Js里面所有的东西都可以看做对象&#xff0c;Js它是基于原型的面向对象语言&#xff0c;采用原型的方式来构造对象)很多个具有相同属性和行为的对象就可以抽象为类&#xff0c;对象是类的一个实例。JavaScript在…...

【程序化天空盒】过程记录01:日月 天空渐变 大气散射

1 日月 SunAndMoon 昼夜的话肯定少不了太阳和月亮&#xff0c;太阳和月亮实现的道理是一样的&#xff0c;只不过是月亮比太阳多了一个需要控制月牙程度&#xff08;or添加贴图&#xff09;的细节~ 1.1 Sun 太阳的话很简单&#xff0c;直接在shader里实现一个太阳跟随平行光旋…...

无线通信中的轨道角动量

目录 一. 前言 二. 如何传输 三. 如何产生 3.1 螺旋结构器件 &#xff08;1&#xff09;螺旋相位板 &#xff08;2&#xff09;螺旋抛物面天线 3.2 超表面 3.3 天线阵列 3.3.1 相控阵 3.3.2 时控阵 四. 如何识别 一. 前言 轨道角动量&#xff1a;Orbital Angular M…...

以后更新功能,再也不用App发版了!智能小程序将为开发者最大化减负

在 IoT 时代&#xff0c;越来越多的企业意识到打造自有 App 对于品牌的重要性。作为智能设备不可或缺的控制终端&#xff0c;App 具备连接用户、完善服务、精细化运营用户的独特优势&#xff0c;可帮助企业大大提升品牌竞争力。 为了帮助品牌企业打造更具个性化、差异化的智能…...

C++之类模板全特化和偏特化

类模板类模板是通用类的描述&#xff0c;使用任意类型&#xff08;泛型&#xff09;来描述类的定义。使用类模板的时候&#xff0c;指定具体的数据类型&#xff0c;让编译器生成该类型的类定义。注意&#xff1a;函数模板中可以不指定具体数据类型&#xff0c;让编译器自动推到…...

Python 手写数字识别 MNIST数据集下载失败

目录 一、MNIST数据集下载失败 1 失败的解决办法&#xff08;经验教训&#xff09;&#xff1a; 2 亲测有效的解决方法&#xff1a; 一、MNIST数据集下载失败 场景复现&#xff1a;想要pytorchMINIST数据集来实现手写数字识别&#xff0c;首先就是进行MNIST数据集的下载&am…...

如何用Sunshine搭建家庭游戏串流服务器:跨设备游戏共享终极指南

如何用Sunshine搭建家庭游戏串流服务器&#xff1a;跨设备游戏共享终极指南 【免费下载链接】Sunshine Self-hosted game stream host for Moonlight. 项目地址: https://gitcode.com/GitHub_Trending/su/Sunshine Sunshine是一款开源的自托管游戏串流服务器&#xff0c…...

3步搞定微信聊天记录导出:Mac用户必备的数据备份指南

3步搞定微信聊天记录导出&#xff1a;Mac用户必备的数据备份指南 【免费下载链接】WeChatExporter 一个可以快速导出、查看你的微信聊天记录的工具 项目地址: https://gitcode.com/gh_mirrors/wec/WeChatExporter 你是否担心珍贵的微信聊天记录因为手机丢失或系统升级而…...

CodeGPT:基于AI的Git提交信息自动生成工具实战指南

1. 项目概述&#xff1a;CodeGPT&#xff0c;一个用Go写的AI驱动Git工具 如果你和我一样&#xff0c;每天都要在终端里敲无数次 git commit -m "..." &#xff0c;并且为写一个清晰、规范的提交信息而绞尽脑汁&#xff0c;那今天分享的这个工具绝对能让你眼前一亮…...

从遥感图像到文字识别:手把手教你用旋转目标检测搞定那些‘歪着’的目标

旋转目标检测实战&#xff1a;从遥感图像到倾斜文本的高效解决方案 在计算机视觉领域&#xff0c;目标检测技术已经取得了长足进步&#xff0c;但传统水平边界框检测方法在面对旋转目标时往往表现不佳。无论是遥感图像中的飞机、船舶&#xff0c;还是自然场景中倾斜的文本&…...

MIKE IO 终极指南:Python高效处理MIKE水文数据的完整教程

MIKE IO 终极指南&#xff1a;Python高效处理MIKE水文数据的完整教程 【免费下载链接】mikeio Read, write and manipulate dfs0, dfs1, dfs2, dfs3, dfsu and mesh files. 项目地址: https://gitcode.com/gh_mirrors/mi/mikeio MIKE IO 是DHI集团推出的专业Python开源库…...

Linux内核开发避坑:你的kmalloc申请到底浪费了多少内存?(附slab/slub实战分析)

Linux内核内存优化实战&#xff1a;kmalloc申请背后的隐藏成本与调优策略 在性能敏感的内核模块开发中&#xff0c;每个字节的内存使用都可能成为系统瓶颈的导火索。我曾亲眼见证过一个网络驱动模块因为不当的kmalloc调用模式&#xff0c;导致系统在高压下额外消耗了12%的内存—…...

2026 年行业真相:履职规范背后的管理秘密

现场冲突&#xff1a;安全与进度的激烈碰撞在工程建设领域&#xff0c;安全与进度的冲突一直是个老大难问题。就拿上海中心的建设来说&#xff0c;如此庞大复杂的项目&#xff0c;施工过程中安全管理难度极大。在某些施工阶段&#xff0c;为了赶进度&#xff0c;部分施工人员可…...

NotebookLM笔记生产力跃迁(仅限前500名早鸟用户的动态模板库已开放)

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;NotebookLM笔记生产力跃迁&#xff08;仅限前500名早鸟用户的动态模板库已开放&#xff09; NotebookLM 正式引入基于语义理解的「上下文感知模板引擎」&#xff0c;早鸟用户可通过专属入口启用动态模板…...

从DICOM到NIfTI:3D Slicer中医学图像坐标转换的完整避坑指南(附Python代码片段)

从DICOM到NIfTI&#xff1a;3D Slicer中医学图像坐标转换的完整避坑指南&#xff08;附Python代码片段&#xff09; 医学影像处理中&#xff0c;数据格式和坐标系的差异常常成为工程师和研究员们的"隐形杀手"。想象一下&#xff0c;你花了三天三夜训练的深度学习模型…...

Gemini3.1Pro:商业分析框架搭建神器

做咨询的人都知道&#xff0c;真正难的从来不是“找到信息”&#xff0c;而是“把信息组织成框架”。 很多商业分析问题看起来复杂&#xff0c;本质上都是在有限时间内&#xff0c;把零散材料整理成一个能支撑判断的逻辑结构。如果你经常做行业分析、竞品分析、市场调研、经营诊…...