当前位置: 首页 > news >正文

6、Broker消息处理流程(六)

前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器,接着分析Producer进行消息的发送,当Producer发送完消息后就得到Broker去接收Producer发送的消息了。
Producer发送给Broker消息时候,发送的请求code为SEND_MESSAGE(这里在上一章节有过分析),根据消息发送过来的Code,这时会调用NettyRemotingAbstract的processRequestCommand方法,该方法里面会根据消息传输的Code来取出对应的Processor,进入Processor系列类的SendMessageProcessor的asyncProcessRequest方法(前面这一部分之前都有过分析,接下来我们一起看看后面的操作,正好也将之前的知识串在一起更有利于理解和记忆)


public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消息重回队列case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析消息头SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 构建上下文,并调用处理前钩子函数mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// 判断批量消息还是单条消息if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}
}

首先解析消息头构建上下文,处理消息发送前钩子函数,最后异步处理消息请求,如果是批量消息调用asyncSendBatchMessage方法,如果是单条消息调用asyncSendMessage方法。

处理单条消息 private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 准备响应命令对象final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);// 时间msgInner.setBornTimestamp(requestHeader.getBornTimestamp());// 远程地址msgInner.setBornHost(ctx.channel().remoteAddress());// 主机msgInner.setStoreHost(this.getStoreHost());// 重试次数msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();// ...省略CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 事务消息if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 事务消息的状态(后面再分析)putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 消息存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}// 生成结果返回return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

构建MessageExtBrokerInner对象,设置相关属性执行asyncPutMessage方法存储消息并将结果返回客户端。

创建响应,验证以及自动创建topic


// 准备响应,验证以及自动创建topic
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {// 准备响应final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);// 设置唯一idresponse.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);// 获取broker处理请求服务的起始时间final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}response.setCode(-1);// 验证topic以及自动创建逻辑super.msgCheck(ctx, requestHeader, response);return response;
}

this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()用来判断是否支持自动创建topic,根据权限来判断如果是不支持自动创建就将权限设置为可读可写不可继承,后面我们去判断是否可以去继承,如果能继承就说明支持自动创建,这是就会new一个TopicConfig,这样就通过autoCreateTopicEnable自动来控制是否能够自动创建topic,同时也会调用registerBrokerAll方法注册到Broker路由信息里面,当然官方建议我们还是不要开启这个配置因为它没有做到压力的分摊。

存盘 asyncPutMessage方法
根据topic查询对应的路由信息即broker。

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {   msg.setStoreTimestamp(System.currentTimeMillis());msg.setBodyCRC(UtilAll.crc32(msg.getBody()));AppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延迟消息转到系统Topic(后面在分析)if (msg.getDelayTimeLevel() > 0) {// ...省略}}// 发送消息地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}// 存储消息地址InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}// 更新消息大小PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);if (!multiDispatch.isMultiDispatchMsg(msg)) {PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());}PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 写入CommitLog文件前加锁,保证文件操作并发安全putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {// 获取最后一个mapperFileMappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;msg.setStoreTimestamp(beginLockTimestamp);// 如果不存在或者满了就创建一个if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 实际写入CommitLog,在后面追加result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {// 添加成功直接breakcase PUT_OK:break;// 表示当前文件存放不下,只保存了一部分case END_OF_FILE:unlockMappedFile = mappedFile;// 创建一个新的文件mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 继续追加进去result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}// 锁的时间elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {beginTimeInLock = 0;putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 提交刷盘申请CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// 提交主从复制申请CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}

首先它会去处理延时消息这里我不做过细的分析,后面针对各种消息在来具体分析,接着就将消息进行编码然后加锁并写入消息以获取最后文件进行追加的方式来将消息内存文件里面,最后进行刷盘以及通知主从同步的操作。

相关文章:

6、Broker消息处理流程(六)

前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器&#xff0c;接着分析Producer进行消息的发送&#xff0c;当Producer发送完消息后就得到Broker去接收Producer发送的消息了。 Producer发送给Broker消息时候&#xff0c;发送的请求code为SEND_MESSAGE(这…...

Clean 架构下的现代 Android 架构指南

Clean 架构下的现代 Android 架构指南 Clean 架构是 Uncle Bob 提出的一种软件架构&#xff0c;Bob 大叔同时也是 SOLID 原则的命名者。 Clean 架构图如下&#xff1a; 这张图描述的是整个软件系统的架构&#xff0c;而不是单体软件&#xff0c;其中至少包括服务端以及客户端…...

代码随想录算法训练营第四十六天| 139 单词拆分

目录 139 单词拆分 139 单词拆分 class Solution { public:bool wordBreak(string s, vector<string>& wordDict) {vector<bool>dp(s.size() 1);//长度为i的字符串时能否成功拆分unordered_set<string>set(wordDict.begin(),wordDict.end());dp[0] t…...

IEEE期刊论文模板

一、模板下载 1、登陆IEEE作者中心Author Center 地址&#xff1a;Publish with IEEE Journals - IEEE Author Center Journals 2、点击“Download a template” 3、在弹出的模板下载页面点击IEEE模板选择器“IEEE Template Selector” 4、在弹出的模板选择器页面点击“Tran…...

上位机与PLC:ModbusTCP通讯之数据类型转换

前请提要: 从PLC读取的数值,不管是读正负整数还是正负浮点数,读取过来后都会变成UInt16,也就是Ushort类型 一、ushort(UInt16)转成 Int32 源代码方法: //ushort类型转Int32类型的方法private int ushortToInt32(ushort[] date, int start){//先进行判断,长度是否正确…...

界面控件DevExpress WPF导航组件,助力升级应用程序用户体验!(上)

DevExpress WPF的Side Navigation&#xff08;侧边导航&#xff09;、TreeView、导航面板组件能帮助开发者在WPF项目中添加Windows样式的资源管理器栏或Outlook NavBar&#xff08;导航栏&#xff09;&#xff0c;DevExpress WPF NavBar和Accordion控件包含了许多开发人员友好的…...

联合基于信息论的安全和隐蔽通信的框架

这个标题很帅 abstractintroductionsystem modelPROPOSED JOINT OPTIMIZATION OF ITS AND COVERT TRANSMISSION RATE信息论安全 (ITS)隐蔽通信需要(CC)Joint Information-Theoretic Secrecy and Covert Communication in the Presence of an Untrusted User and Warden 202…...

行业地位失守,业绩持续失速,科沃斯的故事不好讲

特劳特曾在《定位》一书中提到&#xff0c;为了在容量有限的消费者心智中占据品类&#xff0c;品牌最好的差异化就是成为第一&#xff0c;做品类领导者或开创者&#xff0c;销量遥遥领先&#xff1b;其次分化品类&#xff0c;做到细分品类的唯一&#xff0c;即细分品类的第一。…...

蓝桥杯:货物摆放--因数存到数组里的技巧--减少运算量的方法

小蓝有一个超大的仓库&#xff0c;可以摆放很多货物。 现在&#xff0c;小蓝有 n 箱货物要摆放在仓库&#xff0c;每箱货物都是规则的正方体。小蓝规定了长、宽、高三个互相垂直的方向&#xff0c;每箱货物的边都必须严格平行于长、宽、高。 小蓝希望所有的货物最终摆成一个大…...

我的创作纪念日——一年

机缘 初心始于对技术的热爱和分享知识的渴望。最初&#xff0c;我在一次练习中遇到了一些问题&#xff0c;通过解决这些问题并将解决方案记录下来&#xff0c;我意识到分享经验对自己和他人都非常有价值。于是&#xff0c;我开始在博客和社交平台上记录日常学习过程、撰写技术…...

Windows server 部署iSCSI共享磁盘搭建故障转移群集

在域环境下&#xff0c;在域控制器中配置iSCSI服务&#xff0c;配置共享网络磁盘&#xff0c;在节点服务器使用共享磁盘&#xff0c;并在节点服务器中搭建故障转移群集&#xff0c;实现故障转移 环境准备 准备3台服务器&#xff0c;配置都是8g2核&#xff0c;50g硬盘&#xf…...

2023年山东省职业院校技能大赛信息安全管理与评估二三阶段样题

2023年山东省职业院校技能大赛信息安全管理与评估二三阶段 样题 第二阶段 模块二 网络安全事件响应、数字取证调查、应用程序安全 一、竞赛内容 Geek极安云科专注技能竞赛技术提升&#xff0c;基于各大赛项提供全面的系统性培训&#xff0c;拥有完整的培训体系。团队拥有曾…...

数据结构——栈与栈排序

栈的特性 栈是一种遵循后进先出&#xff08;LIFO&#xff09;原则的数据结构。其基本操作包括&#xff1a; push&#xff1a;将元素添加到栈顶。pop&#xff1a;移除栈顶元素。peek&#xff1a;查看栈顶元素&#xff0c;但不移除。 栈排序的原理 栈排序的核心是使用两个栈&…...

Java Web应用小案例 - 实现用户登录功能

文章目录 一、使用纯JSP方式实现用户登录功能&#xff08;一&#xff09;项目概述&#xff08;二&#xff09;实现步骤1、创建Web项目2、创建登录页面 二、使用JSPServlet方式实现用户登录功能三、使用JSPServletDB方式实现用户登录功能 一、使用纯JSP方式实现用户登录功能 &a…...

Excel——多列合并成一列的4种方法

Excel怎么将多列内容合并成一列&#xff1f; 怎么将多个单元格的内容连接起来放在一个单元格里&#xff1f; 比如下图&#xff0c;要将B、C、D列的内容&#xff0c;合并成E列那样&#xff0c;该怎么做呢&#xff1f; △图1 本文中&#xff0c;高潜老师将给大家介绍 4种 将多…...

Vue笔记(四)路由

路由&#xff08;Vue Router&#xff09; 用Vue Vue Router创建单页面应用非常简单。当加入Vue Router时&#xff0c;需要将组件映射到路由上&#xff0c;让Vue知道在哪里渲染它们。 路由基本例子 <!-- 引入Vue 和 router --><script src"https://unpkg.com/vu…...

Redis部署-哨兵模式

目录 redis sentinel相关名词 redis sentinel架构 故障转移流程 基于docker搭建redis哨兵 准备工作 搭建过程 模拟主节点宕机,观察哨兵节点的工作流程 哨兵重新选取主节点的流程 1.主观下线 2.客观下线 3.哨兵节点推举出一个leader节点 4.leader选举完毕,leader挑选…...

滑动窗口练习(三)— 加油站问题

题目 测试链接 在一条环路上有 n 个加油站&#xff0c;其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车&#xff0c;从第 i 个加油站开往第 i1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发&#xff0c;开始时油箱为空。 给定两个整数数组…...

udemy angular decoration 自存

番外 为什么一个ts文件变成了component,因为它使用了components装饰器 components is just a class,you export it so angular know how to use it 举例&#xff1a;组件装饰器 decoration前总是有一个符号 decoration的作用&#xff08;之一&#xff1f;&#xff09; NgModu…...

msvcr90.dll丢失的解决方法分享,5个快速修复dll文件丢失教程

在今天的电脑使用过程中&#xff0c;我们可能会遇到各种各样的问题。其中之一就是msvcr90.dll丢失的问题。那么&#xff0c;msvcr90.dll是什么&#xff1f;msvcr90.dll丢失对电脑有什么影响&#xff1f;又该如何解决这个问题呢&#xff1f;接下来&#xff0c;我将为大家详细介绍…...

长期项目使用Taotoken感受到的API服务稳定性与可靠性

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 长期项目使用Taotoken感受到的API服务稳定性与可靠性 在持续数月的项目开发与线上服务运行中&#xff0c;我们团队将核心的AI能力构…...

为什么93%的Gemini集成应用在48小时内必须升级?权威发布:3个高危CVE编号+官方回滚方案

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Gemini Bug修复公告 近日&#xff0c;我们在 Gemini 模型推理服务的 v2.4.1 版本中发现一个影响高并发场景下响应一致性的关键缺陷&#xff1a;当连续提交含嵌套 JSON Schema 的结构化请求时&#xff0…...

Android系统级证书安装:解决HTTPS抓包的SSLHandshakeException

1. 为什么系统级证书安装成了Android抓包的“最后一道墙”做Android逆向分析的朋友&#xff0c;大概率都经历过这个场景&#xff1a;Charles或Fiddler配好代理、手机连上Wi-Fi、HTTP流量哗哗地跑&#xff0c;可一打开目标App——空白页、网络错误、甚至直接闪退。点开App的设置…...

机器学习模型自洽性:方差、公平性与弃权机制

1. 项目概述&#xff1a;当机器学习模型“拿不准”时&#xff0c;我们该让它闭嘴吗&#xff1f;在机器学习&#xff0c;尤其是涉及公平性决策的场景里&#xff0c;我们常常面临一个两难困境&#xff1a;模型必须给出一个明确的“是”或“否”的答案&#xff0c;但有时它自己内部…...

终极解决方案:3步恢复Calibre-Web豆瓣元数据获取功能

终极解决方案&#xff1a;3步恢复Calibre-Web豆瓣元数据获取功能 【免费下载链接】calibre-web-douban-api 新版calibre-web已经移除douban-api了&#xff0c;添加一个豆瓣api实现 项目地址: https://gitcode.com/gh_mirrors/ca/calibre-web-douban-api 还在为Calibre-W…...

基于RCT数据评估新模型因果效应:边界估计方法与实践

1. 项目概述与核心挑战 在医疗、金融、教育等高风险领域部署机器学习或人工智能模型时&#xff0c;我们面临一个根本性的难题&#xff1a;如何可靠地评估一个 从未在真实世界随机对照试验中测试过的新模型 的因果效应&#xff1f;随机对照试验是评估干预措施因果效应的黄金标…...

机器学习如何提升GNSS定位精度:从信号分类到多传感器融合

1. 项目概述&#xff1a;当GNSS遇见机器学习全球导航卫星系统&#xff08;GNSS&#xff09;早已融入现代社会的毛细血管&#xff0c;从我们手机上的地图导航&#xff0c;到港口集装箱的自动化调度&#xff0c;再到无人机的精准喷洒&#xff0c;其身影无处不在。其核心原理并不复…...

免费快速搞定CTF MISC难题:5个PuzzleSolver实战技巧让你秒变大神

免费快速搞定CTF MISC难题&#xff1a;5个PuzzleSolver实战技巧让你秒变大神 【免费下载链接】PuzzleSolver 一款针对CTF竞赛MISC的工具~ 项目地址: https://gitcode.com/gh_mirrors/pu/PuzzleSolver 你是不是每次参加CTF比赛&#xff0c;看到MISC题目就头疼&#xff1f…...

机器学习加速电子-声子耦合计算:对称性描述符与蒙特卡洛采样实践

1. 项目概述&#xff1a;当机器学习遇见电子-声子耦合在计算材料科学领域&#xff0c;有一个长期存在的“效率瓶颈”&#xff1a;如何精确且高效地计算材料性质随温度的变化。比如&#xff0c;为什么半导体的带隙会随着温度升高而变窄&#xff1f;这背后是电子与晶格振动&#…...

MAA明日方舟助手:3步实现每日游戏时间从45分钟到5分钟的智能革命

MAA明日方舟助手&#xff1a;3步实现每日游戏时间从45分钟到5分钟的智能革命 【免费下载链接】MaaAssistantArknights 《明日方舟》小助手&#xff0c;全日常一键长草&#xff01;| A one-click tool for the daily tasks of Arknights, supporting all clients. 项目地址: h…...