RocketMQ Broker消息处理流程及部分源码解析
🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2023年2月10日
🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
- 消息处理流程
- 消息存储目录结构
- `SendMessage`源码
- `processRequest`
- `sendMessage`
消息处理流程
SendMessageProcessor处理类接收到消息DefaultMessageStore实例将消息变成IndexFile、ConsumeQueue和CommitLog对象- 上述对象转成内存映射对象后进行落盘

消息存储目录结构
RocketMQ的文件存储在store文件夹里,里面包含commitlog,config,consumerqueue,index文件夹和abort,checkpoint两个文件。
文件夹:
commitlog存储写入到commitLog的消息内容config存储配置信息consumerqueue存储消费者队列信息index存储消息队列的索引文件
文件:
abort标记RocketMQ是否正常退出checkpoint存储commitlog,config,consumerqueue,index文件的刷盘时间
├── abort
├── checkpoint
├── commitlog
│ ├── 00000000000000000000
│ ├── 00000000001073741824
├── config
│ ├── consumerFilter.json
│ ├── consumerOffset.json
│ ├── delayOffset.json
│ ├── subscriptionGroup.json
│ ├── topics.json
├── consumequeue
│ ├── TopicA
│ ├── TopicB
│ ├── TopicC
├── index
│ ├── 00000000000000000000
│ ├── 00000000001073741824
RocketMQ内有专门对应磁盘上存储文件的封装类:
CommitLog:对应commitlog文件ConsumeQueue:对应consumerqueue文件IndexFile:对应index文件MappedFile:直接内存映射业务的封装类,通过操作该类实例,可以把消息写入内存映射缓冲区,或将消息刷盘MappedFileQueue:连续物理存储的封装类,可以通过offset快速定位消息所在的MappedFileMappedFileBuff:堆外内存
SendMessage源码
SendMessageProcessor是接收消息的一个钩子函数,该类的对象将会处理发送到Broker的消息
processRequest
主要流程已在代码片注释中给出:
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {private List<ConsumeMessageHook> consumeMessageHookList;public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);}public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析请求体SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 建立消息上下文mqtraceContext = buildMsgContext(ctx, requestHeader);// 发送消息前的逻辑this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}/*** ......**/}
根据源码可以看出,首先解析发送消息的请求SendMessageRequestHeader,然后调用asyncSend(Batch)Message方法进行消息的发送。
该类提供了发送或接收消息的钩子函数:如果发送消息,则调用sendMessage方法,如果是接收消息则调用pullMessage拉取消息的方法。
sendMessage
消息发送给Broker服务器时,调用的是sendMessage方法接收并存储消息,主要流程已在代码片注释中给出:
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 初始化响应final RemotingCommand response = preSend(ctx, request, requestHeader);// 构建响应头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}// 设置消息体数据msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 获取Broker集群名称String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);// 同步等待消息存储成功if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {// 异步msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {// Broker拒绝接收消息if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
相关文章:
RocketMQ Broker消息处理流程及部分源码解析
🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年2月10日 &#x…...
Java面试题:Java集合框架
文章目录一、Java集合框架二、Java集合特性三、各集合类的使用ArrayListLinkedListHashSetHashSet源码解析对源码进行总结HashSet可同步HashSet的使用HashMap四、Iterator迭代器五、遍历集合元素的若干方式参考文章:Hash详解参考文章:深入浅出学Java——…...
时间之间的比较与计算相差年、月、日、小时、分钟、毫秒、纳秒以及判断闰年--LocalDateTime
如何把String/Date转成LocalDateTime参考String、Date与LocalDate、LocalTime、LocalDateTime之间互转 String、Date、LocalDateTime、Calendar与时间戳之间互相转化参考String、Date、LocalDateTime、Calendar与时间戳之间互相转化 比较方法介绍 isBefore(ChronoLocalDateT…...
PyTorch学习笔记:nn.L1Loss——L1损失
PyTorch学习笔记:nn.L1Loss——L1损失 torch.nn.L1Loss(size_averageNone, reduceNone, reductionmean)功能:创建一个绝对值误差损失函数,即L1损失: l(x,y)L{l1,…,lN}T,ln∣xn−yn∣l(x,y)L\{l_1,\dots,l_N\}^T,l_n|x_n-y_n| l(…...
Java程序设计-ssm企业财务管理系统设计与实现
摘要系统设计系统实现开发环境:摘要 对于企业集来说,财务管理的地位很重要。随着计算机和网络在企业中的广泛应用,企业发展速度在不断加快,在这种市场竞争冲击下企业财务管理系统必须优先发展,这样才能保证在竞争中处于优势地位。…...
疑难杂症篇(二十一)--Ubuntu18.04安装usb-cam过程出现的问题
对Ubuntu18.04{\rm Ubuntu 18.04}Ubuntu18.04环境下的ROS{\rm ROS}ROS的melodic{\rm melodic}melodic版本安装usb−cam{\rm usb-cam}usb−cam过程出现的两个常见问题提出解决方案。 1.问题1:usb-cam功能包编译时出现"未定义的引用"的问题 问题描述&#…...
npm-npm i XX --save 和--save-dev
之前使用npm i XX --save 和--save-dev 没太在意,就想记录一下,查到一篇比较全的(链接:NPM install -save 和 -save-dev 傻傻分不清),直接看好了,哈哈~ # 安装模块到项目目录下 npm install moduleName # -g 的意思是…...
可重构或可调谐微波滤波器技术
电子可重构,或者说电调微波滤波器由于其在改善现在及未来微波系统容量中不断提高的重要性而正吸引着人们越来越多的关注来对其进行研究和开发。例如,崭露头脚的超宽带(UWB)技术要求使用很宽的无线电频谱。然而,作为资源…...
医院智能化解决方案-门(急)诊、医技、智能化项目解决方案
【版权声明】本资料来源网络,知识分享,仅供个人学习,请勿商用。【侵删致歉】如有侵权请联系小编,将在收到信息后第一时间删除!完整资料领取见文末,部分资料内容:篇幅有限,无法完全展…...
判断元素是否在可视区域
前言 在日常开发中,我们经常需要判断目标元素是否在视窗之内或者和视窗的距离小于一个值(例如 100 px),从而实现一些常用的功能,例如: 图片的懒加载列表的无限滚动计算广告元素的曝光情况可点击链接的预加…...
告别传统繁杂的采购合同管理 打造企业自动化采购管理模式
随着企业竞争日趋激烈,采购成本压力剧增,企业对于采购合同管理更加严格,从而把控物资成本。对于任何一家企业采购来说,规范化合同的全面管理,是采购活动中重要的一个环节。 但在如今,依旧有很多企业采购合…...
【prism】路由事件映射到Command命令
在之前的一篇文章中,我介绍了普通的自定义事件: 【wpf】自定义事件总结(Action, EventHandler)_code bean的博客-CSDN博客_wpf action可以说通过Action和EventHandle,自定义事件是相当的方便简单了。https…...
面向对象的基本概念和方法
面向对象的开发方法在近几十年见得以广泛应用,我们常见的Java语言就是一种典型的面向对象的开发语言。然而,面向对象的概念较为复杂,知识点也很细碎,本文整理了面向对象的基本概念和方法,供大家参考。为了便于读者理解…...
数据可视化大屏百度地图绘制行政区域标注实战案例解析(个性化地图、标注、视频、控件、定位、检索)
百度地图开发系列目录 数据可视化大屏应急管理综合指挥调度系统完整案例详解(PHP-API、Echarts、百度地图)数据可视化大屏百度地图API开发:停车场分布标注和检索静态版百度地图高级开发:map.getDistance计算多点之间的距离并输入…...
1.面向对象和类的关系?2.什么是Promise、3.Promise和async、await的关系
面向对象:面向对象是一种编程思想(oop)。(Js里面所有的东西都可以看做对象,Js它是基于原型的面向对象语言,采用原型的方式来构造对象)很多个具有相同属性和行为的对象就可以抽象为类,对象是类的一个实例。JavaScript在…...
【程序化天空盒】过程记录01:日月 天空渐变 大气散射
1 日月 SunAndMoon 昼夜的话肯定少不了太阳和月亮,太阳和月亮实现的道理是一样的,只不过是月亮比太阳多了一个需要控制月牙程度(or添加贴图)的细节~ 1.1 Sun 太阳的话很简单,直接在shader里实现一个太阳跟随平行光旋…...
无线通信中的轨道角动量
目录 一. 前言 二. 如何传输 三. 如何产生 3.1 螺旋结构器件 (1)螺旋相位板 (2)螺旋抛物面天线 3.2 超表面 3.3 天线阵列 3.3.1 相控阵 3.3.2 时控阵 四. 如何识别 一. 前言 轨道角动量:Orbital Angular M…...
以后更新功能,再也不用App发版了!智能小程序将为开发者最大化减负
在 IoT 时代,越来越多的企业意识到打造自有 App 对于品牌的重要性。作为智能设备不可或缺的控制终端,App 具备连接用户、完善服务、精细化运营用户的独特优势,可帮助企业大大提升品牌竞争力。 为了帮助品牌企业打造更具个性化、差异化的智能…...
C++之类模板全特化和偏特化
类模板类模板是通用类的描述,使用任意类型(泛型)来描述类的定义。使用类模板的时候,指定具体的数据类型,让编译器生成该类型的类定义。注意:函数模板中可以不指定具体数据类型,让编译器自动推到…...
Python 手写数字识别 MNIST数据集下载失败
目录 一、MNIST数据集下载失败 1 失败的解决办法(经验教训): 2 亲测有效的解决方法: 一、MNIST数据集下载失败 场景复现:想要pytorchMINIST数据集来实现手写数字识别,首先就是进行MNIST数据集的下载&am…...
测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...
【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
分布式增量爬虫实现方案
之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面,避免重复抓取,以节省资源和时间。 在分布式环境下,增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路:将增量判…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
在Ubuntu24上采用Wine打开SourceInsight
1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...
自然语言处理——文本分类
文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益(IG) 分类器设计贝叶斯理论:线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别, 有单标签多类别文本分类和多…...
DAY 26 函数专题1
函数定义与参数知识点回顾:1. 函数的定义2. 变量作用域:局部变量和全局变量3. 函数的参数类型:位置参数、默认参数、不定参数4. 传递参数的手段:关键词参数5 题目1:计算圆的面积 任务: 编写一…...
