6、Broker消息处理流程(六)
前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器,接着分析Producer进行消息的发送,当Producer发送完消息后就得到Broker去接收Producer发送的消息了。
Producer发送给Broker消息时候,发送的请求code为SEND_MESSAGE(这里在上一章节有过分析),根据消息发送过来的Code,这时会调用NettyRemotingAbstract的processRequestCommand方法,该方法里面会根据消息传输的Code来取出对应的Processor,进入Processor系列类的SendMessageProcessor的asyncProcessRequest方法(前面这一部分之前都有过分析,接下来我们一起看看后面的操作,正好也将之前的知识串在一起更有利于理解和记忆)
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消息重回队列case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析消息头SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 构建上下文,并调用处理前钩子函数mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// 判断批量消息还是单条消息if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}
}
首先解析消息头构建上下文,处理消息发送前钩子函数,最后异步处理消息请求,如果是批量消息调用asyncSendBatchMessage方法,如果是单条消息调用asyncSendMessage方法。
处理单条消息 private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 准备响应命令对象final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);// 时间msgInner.setBornTimestamp(requestHeader.getBornTimestamp());// 远程地址msgInner.setBornHost(ctx.channel().remoteAddress());// 主机msgInner.setStoreHost(this.getStoreHost());// 重试次数msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();// ...省略CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 事务消息if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 事务消息的状态(后面再分析)putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 消息存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}// 生成结果返回return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
构建MessageExtBrokerInner对象,设置相关属性执行asyncPutMessage方法存储消息并将结果返回客户端。
创建响应,验证以及自动创建topic
// 准备响应,验证以及自动创建topic
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {// 准备响应final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);// 设置唯一idresponse.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);// 获取broker处理请求服务的起始时间final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}response.setCode(-1);// 验证topic以及自动创建逻辑super.msgCheck(ctx, requestHeader, response);return response;
}
this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()用来判断是否支持自动创建topic,根据权限来判断如果是不支持自动创建就将权限设置为可读可写不可继承,后面我们去判断是否可以去继承,如果能继承就说明支持自动创建,这是就会new一个TopicConfig,这样就通过autoCreateTopicEnable自动来控制是否能够自动创建topic,同时也会调用registerBrokerAll方法注册到Broker路由信息里面,当然官方建议我们还是不要开启这个配置因为它没有做到压力的分摊。
存盘 asyncPutMessage方法
根据topic查询对应的路由信息即broker。
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { msg.setStoreTimestamp(System.currentTimeMillis());msg.setBodyCRC(UtilAll.crc32(msg.getBody()));AppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延迟消息转到系统Topic(后面在分析)if (msg.getDelayTimeLevel() > 0) {// ...省略}}// 发送消息地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}// 存储消息地址InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}// 更新消息大小PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);if (!multiDispatch.isMultiDispatchMsg(msg)) {PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());}PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 写入CommitLog文件前加锁,保证文件操作并发安全putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {// 获取最后一个mapperFileMappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;msg.setStoreTimestamp(beginLockTimestamp);// 如果不存在或者满了就创建一个if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 实际写入CommitLog,在后面追加result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {// 添加成功直接breakcase PUT_OK:break;// 表示当前文件存放不下,只保存了一部分case END_OF_FILE:unlockMappedFile = mappedFile;// 创建一个新的文件mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 继续追加进去result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}// 锁的时间elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {beginTimeInLock = 0;putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 提交刷盘申请CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// 提交主从复制申请CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}
首先它会去处理延时消息这里我不做过细的分析,后面针对各种消息在来具体分析,接着就将消息进行编码然后加锁并写入消息以获取最后文件进行追加的方式来将消息内存文件里面,最后进行刷盘以及通知主从同步的操作。
相关文章:
6、Broker消息处理流程(六)
前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器,接着分析Producer进行消息的发送,当Producer发送完消息后就得到Broker去接收Producer发送的消息了。 Producer发送给Broker消息时候,发送的请求code为SEND_MESSAGE(这…...
Clean 架构下的现代 Android 架构指南
Clean 架构下的现代 Android 架构指南 Clean 架构是 Uncle Bob 提出的一种软件架构,Bob 大叔同时也是 SOLID 原则的命名者。 Clean 架构图如下: 这张图描述的是整个软件系统的架构,而不是单体软件,其中至少包括服务端以及客户端…...
代码随想录算法训练营第四十六天| 139 单词拆分
目录 139 单词拆分 139 单词拆分 class Solution { public:bool wordBreak(string s, vector<string>& wordDict) {vector<bool>dp(s.size() 1);//长度为i的字符串时能否成功拆分unordered_set<string>set(wordDict.begin(),wordDict.end());dp[0] t…...
IEEE期刊论文模板
一、模板下载 1、登陆IEEE作者中心Author Center 地址:Publish with IEEE Journals - IEEE Author Center Journals 2、点击“Download a template” 3、在弹出的模板下载页面点击IEEE模板选择器“IEEE Template Selector” 4、在弹出的模板选择器页面点击“Tran…...
上位机与PLC:ModbusTCP通讯之数据类型转换
前请提要: 从PLC读取的数值,不管是读正负整数还是正负浮点数,读取过来后都会变成UInt16,也就是Ushort类型 一、ushort(UInt16)转成 Int32 源代码方法: //ushort类型转Int32类型的方法private int ushortToInt32(ushort[] date, int start){//先进行判断,长度是否正确…...
界面控件DevExpress WPF导航组件,助力升级应用程序用户体验!(上)
DevExpress WPF的Side Navigation(侧边导航)、TreeView、导航面板组件能帮助开发者在WPF项目中添加Windows样式的资源管理器栏或Outlook NavBar(导航栏),DevExpress WPF NavBar和Accordion控件包含了许多开发人员友好的…...
联合基于信息论的安全和隐蔽通信的框架
这个标题很帅 abstractintroductionsystem modelPROPOSED JOINT OPTIMIZATION OF ITS AND COVERT TRANSMISSION RATE信息论安全 (ITS)隐蔽通信需要(CC)Joint Information-Theoretic Secrecy and Covert Communication in the Presence of an Untrusted User and Warden 202…...
行业地位失守,业绩持续失速,科沃斯的故事不好讲
特劳特曾在《定位》一书中提到,为了在容量有限的消费者心智中占据品类,品牌最好的差异化就是成为第一,做品类领导者或开创者,销量遥遥领先;其次分化品类,做到细分品类的唯一,即细分品类的第一。…...
蓝桥杯:货物摆放--因数存到数组里的技巧--减少运算量的方法
小蓝有一个超大的仓库,可以摆放很多货物。 现在,小蓝有 n 箱货物要摆放在仓库,每箱货物都是规则的正方体。小蓝规定了长、宽、高三个互相垂直的方向,每箱货物的边都必须严格平行于长、宽、高。 小蓝希望所有的货物最终摆成一个大…...
我的创作纪念日——一年
机缘 初心始于对技术的热爱和分享知识的渴望。最初,我在一次练习中遇到了一些问题,通过解决这些问题并将解决方案记录下来,我意识到分享经验对自己和他人都非常有价值。于是,我开始在博客和社交平台上记录日常学习过程、撰写技术…...
Windows server 部署iSCSI共享磁盘搭建故障转移群集
在域环境下,在域控制器中配置iSCSI服务,配置共享网络磁盘,在节点服务器使用共享磁盘,并在节点服务器中搭建故障转移群集,实现故障转移 环境准备 准备3台服务器,配置都是8g2核,50g硬盘…...
2023年山东省职业院校技能大赛信息安全管理与评估二三阶段样题
2023年山东省职业院校技能大赛信息安全管理与评估二三阶段 样题 第二阶段 模块二 网络安全事件响应、数字取证调查、应用程序安全 一、竞赛内容 Geek极安云科专注技能竞赛技术提升,基于各大赛项提供全面的系统性培训,拥有完整的培训体系。团队拥有曾…...
数据结构——栈与栈排序
栈的特性 栈是一种遵循后进先出(LIFO)原则的数据结构。其基本操作包括: push:将元素添加到栈顶。pop:移除栈顶元素。peek:查看栈顶元素,但不移除。 栈排序的原理 栈排序的核心是使用两个栈&…...
Java Web应用小案例 - 实现用户登录功能
文章目录 一、使用纯JSP方式实现用户登录功能(一)项目概述(二)实现步骤1、创建Web项目2、创建登录页面 二、使用JSPServlet方式实现用户登录功能三、使用JSPServletDB方式实现用户登录功能 一、使用纯JSP方式实现用户登录功能 &a…...
Excel——多列合并成一列的4种方法
Excel怎么将多列内容合并成一列? 怎么将多个单元格的内容连接起来放在一个单元格里? 比如下图,要将B、C、D列的内容,合并成E列那样,该怎么做呢? △图1 本文中,高潜老师将给大家介绍 4种 将多…...
Vue笔记(四)路由
路由(Vue Router) 用Vue Vue Router创建单页面应用非常简单。当加入Vue Router时,需要将组件映射到路由上,让Vue知道在哪里渲染它们。 路由基本例子 <!-- 引入Vue 和 router --><script src"https://unpkg.com/vu…...
Redis部署-哨兵模式
目录 redis sentinel相关名词 redis sentinel架构 故障转移流程 基于docker搭建redis哨兵 准备工作 搭建过程 模拟主节点宕机,观察哨兵节点的工作流程 哨兵重新选取主节点的流程 1.主观下线 2.客观下线 3.哨兵节点推举出一个leader节点 4.leader选举完毕,leader挑选…...
滑动窗口练习(三)— 加油站问题
题目 测试链接 在一条环路上有 n 个加油站,其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车,从第 i 个加油站开往第 i1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发,开始时油箱为空。 给定两个整数数组…...
udemy angular decoration 自存
番外 为什么一个ts文件变成了component,因为它使用了components装饰器 components is just a class,you export it so angular know how to use it 举例:组件装饰器 decoration前总是有一个符号 decoration的作用(之一?) NgModu…...
msvcr90.dll丢失的解决方法分享,5个快速修复dll文件丢失教程
在今天的电脑使用过程中,我们可能会遇到各种各样的问题。其中之一就是msvcr90.dll丢失的问题。那么,msvcr90.dll是什么?msvcr90.dll丢失对电脑有什么影响?又该如何解决这个问题呢?接下来,我将为大家详细介绍…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?
在建筑行业,项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升,传统的管理模式已经难以满足现代工程的需求。过去,许多企业依赖手工记录、口头沟通和分散的信息管理,导致效率低下、成本失控、风险频发。例如&#…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
