当前位置: 首页 > news >正文

6、Broker消息处理流程(六)

前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器,接着分析Producer进行消息的发送,当Producer发送完消息后就得到Broker去接收Producer发送的消息了。
Producer发送给Broker消息时候,发送的请求code为SEND_MESSAGE(这里在上一章节有过分析),根据消息发送过来的Code,这时会调用NettyRemotingAbstract的processRequestCommand方法,该方法里面会根据消息传输的Code来取出对应的Processor,进入Processor系列类的SendMessageProcessor的asyncProcessRequest方法(前面这一部分之前都有过分析,接下来我们一起看看后面的操作,正好也将之前的知识串在一起更有利于理解和记忆)


public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消息重回队列case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析消息头SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 构建上下文,并调用处理前钩子函数mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// 判断批量消息还是单条消息if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}
}

首先解析消息头构建上下文,处理消息发送前钩子函数,最后异步处理消息请求,如果是批量消息调用asyncSendBatchMessage方法,如果是单条消息调用asyncSendMessage方法。

处理单条消息 private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 准备响应命令对象final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);// 时间msgInner.setBornTimestamp(requestHeader.getBornTimestamp());// 远程地址msgInner.setBornHost(ctx.channel().remoteAddress());// 主机msgInner.setStoreHost(this.getStoreHost());// 重试次数msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();// ...省略CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 事务消息if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 事务消息的状态(后面再分析)putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 消息存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}// 生成结果返回return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

构建MessageExtBrokerInner对象,设置相关属性执行asyncPutMessage方法存储消息并将结果返回客户端。

创建响应,验证以及自动创建topic


// 准备响应,验证以及自动创建topic
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {// 准备响应final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);// 设置唯一idresponse.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);// 获取broker处理请求服务的起始时间final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}response.setCode(-1);// 验证topic以及自动创建逻辑super.msgCheck(ctx, requestHeader, response);return response;
}

this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()用来判断是否支持自动创建topic,根据权限来判断如果是不支持自动创建就将权限设置为可读可写不可继承,后面我们去判断是否可以去继承,如果能继承就说明支持自动创建,这是就会new一个TopicConfig,这样就通过autoCreateTopicEnable自动来控制是否能够自动创建topic,同时也会调用registerBrokerAll方法注册到Broker路由信息里面,当然官方建议我们还是不要开启这个配置因为它没有做到压力的分摊。

存盘 asyncPutMessage方法
根据topic查询对应的路由信息即broker。

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {   msg.setStoreTimestamp(System.currentTimeMillis());msg.setBodyCRC(UtilAll.crc32(msg.getBody()));AppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延迟消息转到系统Topic(后面在分析)if (msg.getDelayTimeLevel() > 0) {// ...省略}}// 发送消息地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}// 存储消息地址InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}// 更新消息大小PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);if (!multiDispatch.isMultiDispatchMsg(msg)) {PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());}PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 写入CommitLog文件前加锁,保证文件操作并发安全putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {// 获取最后一个mapperFileMappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;msg.setStoreTimestamp(beginLockTimestamp);// 如果不存在或者满了就创建一个if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 实际写入CommitLog,在后面追加result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {// 添加成功直接breakcase PUT_OK:break;// 表示当前文件存放不下,只保存了一部分case END_OF_FILE:unlockMappedFile = mappedFile;// 创建一个新的文件mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 继续追加进去result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}// 锁的时间elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {beginTimeInLock = 0;putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 提交刷盘申请CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// 提交主从复制申请CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}

首先它会去处理延时消息这里我不做过细的分析,后面针对各种消息在来具体分析,接着就将消息进行编码然后加锁并写入消息以获取最后文件进行追加的方式来将消息内存文件里面,最后进行刷盘以及通知主从同步的操作。

相关文章:

6、Broker消息处理流程(六)

前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器&#xff0c;接着分析Producer进行消息的发送&#xff0c;当Producer发送完消息后就得到Broker去接收Producer发送的消息了。 Producer发送给Broker消息时候&#xff0c;发送的请求code为SEND_MESSAGE(这…...

Clean 架构下的现代 Android 架构指南

Clean 架构下的现代 Android 架构指南 Clean 架构是 Uncle Bob 提出的一种软件架构&#xff0c;Bob 大叔同时也是 SOLID 原则的命名者。 Clean 架构图如下&#xff1a; 这张图描述的是整个软件系统的架构&#xff0c;而不是单体软件&#xff0c;其中至少包括服务端以及客户端…...

代码随想录算法训练营第四十六天| 139 单词拆分

目录 139 单词拆分 139 单词拆分 class Solution { public:bool wordBreak(string s, vector<string>& wordDict) {vector<bool>dp(s.size() 1);//长度为i的字符串时能否成功拆分unordered_set<string>set(wordDict.begin(),wordDict.end());dp[0] t…...

IEEE期刊论文模板

一、模板下载 1、登陆IEEE作者中心Author Center 地址&#xff1a;Publish with IEEE Journals - IEEE Author Center Journals 2、点击“Download a template” 3、在弹出的模板下载页面点击IEEE模板选择器“IEEE Template Selector” 4、在弹出的模板选择器页面点击“Tran…...

上位机与PLC:ModbusTCP通讯之数据类型转换

前请提要: 从PLC读取的数值,不管是读正负整数还是正负浮点数,读取过来后都会变成UInt16,也就是Ushort类型 一、ushort(UInt16)转成 Int32 源代码方法: //ushort类型转Int32类型的方法private int ushortToInt32(ushort[] date, int start){//先进行判断,长度是否正确…...

界面控件DevExpress WPF导航组件,助力升级应用程序用户体验!(上)

DevExpress WPF的Side Navigation&#xff08;侧边导航&#xff09;、TreeView、导航面板组件能帮助开发者在WPF项目中添加Windows样式的资源管理器栏或Outlook NavBar&#xff08;导航栏&#xff09;&#xff0c;DevExpress WPF NavBar和Accordion控件包含了许多开发人员友好的…...

联合基于信息论的安全和隐蔽通信的框架

这个标题很帅 abstractintroductionsystem modelPROPOSED JOINT OPTIMIZATION OF ITS AND COVERT TRANSMISSION RATE信息论安全 (ITS)隐蔽通信需要(CC)Joint Information-Theoretic Secrecy and Covert Communication in the Presence of an Untrusted User and Warden 202…...

行业地位失守,业绩持续失速,科沃斯的故事不好讲

特劳特曾在《定位》一书中提到&#xff0c;为了在容量有限的消费者心智中占据品类&#xff0c;品牌最好的差异化就是成为第一&#xff0c;做品类领导者或开创者&#xff0c;销量遥遥领先&#xff1b;其次分化品类&#xff0c;做到细分品类的唯一&#xff0c;即细分品类的第一。…...

蓝桥杯:货物摆放--因数存到数组里的技巧--减少运算量的方法

小蓝有一个超大的仓库&#xff0c;可以摆放很多货物。 现在&#xff0c;小蓝有 n 箱货物要摆放在仓库&#xff0c;每箱货物都是规则的正方体。小蓝规定了长、宽、高三个互相垂直的方向&#xff0c;每箱货物的边都必须严格平行于长、宽、高。 小蓝希望所有的货物最终摆成一个大…...

我的创作纪念日——一年

机缘 初心始于对技术的热爱和分享知识的渴望。最初&#xff0c;我在一次练习中遇到了一些问题&#xff0c;通过解决这些问题并将解决方案记录下来&#xff0c;我意识到分享经验对自己和他人都非常有价值。于是&#xff0c;我开始在博客和社交平台上记录日常学习过程、撰写技术…...

Windows server 部署iSCSI共享磁盘搭建故障转移群集

在域环境下&#xff0c;在域控制器中配置iSCSI服务&#xff0c;配置共享网络磁盘&#xff0c;在节点服务器使用共享磁盘&#xff0c;并在节点服务器中搭建故障转移群集&#xff0c;实现故障转移 环境准备 准备3台服务器&#xff0c;配置都是8g2核&#xff0c;50g硬盘&#xf…...

2023年山东省职业院校技能大赛信息安全管理与评估二三阶段样题

2023年山东省职业院校技能大赛信息安全管理与评估二三阶段 样题 第二阶段 模块二 网络安全事件响应、数字取证调查、应用程序安全 一、竞赛内容 Geek极安云科专注技能竞赛技术提升&#xff0c;基于各大赛项提供全面的系统性培训&#xff0c;拥有完整的培训体系。团队拥有曾…...

数据结构——栈与栈排序

栈的特性 栈是一种遵循后进先出&#xff08;LIFO&#xff09;原则的数据结构。其基本操作包括&#xff1a; push&#xff1a;将元素添加到栈顶。pop&#xff1a;移除栈顶元素。peek&#xff1a;查看栈顶元素&#xff0c;但不移除。 栈排序的原理 栈排序的核心是使用两个栈&…...

Java Web应用小案例 - 实现用户登录功能

文章目录 一、使用纯JSP方式实现用户登录功能&#xff08;一&#xff09;项目概述&#xff08;二&#xff09;实现步骤1、创建Web项目2、创建登录页面 二、使用JSPServlet方式实现用户登录功能三、使用JSPServletDB方式实现用户登录功能 一、使用纯JSP方式实现用户登录功能 &a…...

Excel——多列合并成一列的4种方法

Excel怎么将多列内容合并成一列&#xff1f; 怎么将多个单元格的内容连接起来放在一个单元格里&#xff1f; 比如下图&#xff0c;要将B、C、D列的内容&#xff0c;合并成E列那样&#xff0c;该怎么做呢&#xff1f; △图1 本文中&#xff0c;高潜老师将给大家介绍 4种 将多…...

Vue笔记(四)路由

路由&#xff08;Vue Router&#xff09; 用Vue Vue Router创建单页面应用非常简单。当加入Vue Router时&#xff0c;需要将组件映射到路由上&#xff0c;让Vue知道在哪里渲染它们。 路由基本例子 <!-- 引入Vue 和 router --><script src"https://unpkg.com/vu…...

Redis部署-哨兵模式

目录 redis sentinel相关名词 redis sentinel架构 故障转移流程 基于docker搭建redis哨兵 准备工作 搭建过程 模拟主节点宕机,观察哨兵节点的工作流程 哨兵重新选取主节点的流程 1.主观下线 2.客观下线 3.哨兵节点推举出一个leader节点 4.leader选举完毕,leader挑选…...

滑动窗口练习(三)— 加油站问题

题目 测试链接 在一条环路上有 n 个加油站&#xff0c;其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车&#xff0c;从第 i 个加油站开往第 i1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发&#xff0c;开始时油箱为空。 给定两个整数数组…...

udemy angular decoration 自存

番外 为什么一个ts文件变成了component,因为它使用了components装饰器 components is just a class,you export it so angular know how to use it 举例&#xff1a;组件装饰器 decoration前总是有一个符号 decoration的作用&#xff08;之一&#xff1f;&#xff09; NgModu…...

msvcr90.dll丢失的解决方法分享,5个快速修复dll文件丢失教程

在今天的电脑使用过程中&#xff0c;我们可能会遇到各种各样的问题。其中之一就是msvcr90.dll丢失的问题。那么&#xff0c;msvcr90.dll是什么&#xff1f;msvcr90.dll丢失对电脑有什么影响&#xff1f;又该如何解决这个问题呢&#xff1f;接下来&#xff0c;我将为大家详细介绍…...

海外媒体发稿:软文发稿推广技巧解析超级实用-华媒舍

随着互联网时代的发展&#xff0c;软文发稿成为推广产品与服务的重要手段之一。本文将向大家介绍软文发稿推广的技巧&#xff0c;帮助您更好地利用软文推广商业活动。无论是拥有自己的品牌还是个人创业者&#xff0c;都可以从中受益。 1. 什么是软文&#xff1f; 软文是指以文…...

vm net 方式 静态ip配置访问主机IP和外网

1、win 11 安装vm&#xff0c;镜像文件 F:\software\VMwork\CentOS-7-x86_64-Everything-1804.iso 2、配置网络 net 方式 3、右击网络--》属性---》更改适配器设置--》vmnet8 属性、这里不做配置会出现主机ping通访问不通的情况&#xff0c;&#xff08;访问不通&#xff0c;…...

Vue笔记(二)基本语法

基本语法 <style> table {border-collapse: collapse;margin:0 auto; } strong {color: rgb(235, 51, 100); }td, th {padding-left: 6px; } table tr td:first-child {width:150px } table tr td:nth-child(2) {width:300px } </style> <template><tabl…...

前端面试提问(4)

1、手撕防抖与节流、树与对象的转换、递归调用&#xff0c;链表头插法 1.1、防抖 防抖函数用于延迟执行某个函数&#xff0c;直到过了一定的间隔时间&#xff08;例如等待用户停止输入&#xff09;后再执行。 即后一次点击事件发生时间距离一次点击事件至少间隔一定时间。 …...

基于BEV+Transformer的地面要素感知+建模技术在高德的应用

导读 本文将主要介绍BEVTransformer端到端感知与建模技术在高德各项业务中的应用&#xff0c;如高精地图中地面要素&#xff08;包含线要素和地面标识&#xff09;自动化上的具体方案及其演化过程。该方案使用BEVTransformer技术来实现采集车上不同传感器&#xff08;包含激光和…...

了解c++11中的新增

一&#xff0c;统一的初始化列表 在引入c11后&#xff0c;我们得出计划都可以用初始化列表进行初始化。 C11 扩大了用大括号括起的列表 ( 初始化列表 ) 的使用范围&#xff0c;使其可用于所有的内置类型和用户自 定义的类型&#xff0c; 使用初始化列表时&#xff0c;可添加等…...

104. 二叉树的最大深度(Java)

目录 解法&#xff1a; 官方解答&#xff1a; 方法一&#xff1a;深度优先搜索 方法二&#xff1a;广度优先搜索 思路与算法 复杂度分析 时间复杂度&#xff1a; 空间复杂度&#xff1a; 给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根…...

SpringSecurity6 | 自定义认证规则

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; Java从入门到精通 ✨特色专栏&#xf…...

浅析安科瑞电动机保护器在广州某地铁项目的设计与应用-安科瑞 蒋静

1 摘要 随着城市的发展&#xff0c;较多城市的轨道交通选择修建地下式车辆段&#xff08;或停车场&#xff09;&#xff0c;即车辆段&#xff08;或停车场&#xff09;位于地下或设置有上盖&#xff08;上盖上再做物业开发&#xff09;。为了给工作人员提供良好的工作环境、给…...

LeetCode 2048. 下一个更大的数值平衡数

【LetMeFly】2048.下一个更大的数值平衡数 力扣题目链接&#xff1a;https://leetcode.cn/problems/next-greater-numerically-balanced-number/ 如果整数 x 满足&#xff1a;对于每个数位 d &#xff0c;这个数位 恰好 在 x 中出现 d 次。那么整数 x 就是一个 数值平衡数 。…...