当前位置: 首页 > news >正文

6、Broker消息处理流程(六)

前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器,接着分析Producer进行消息的发送,当Producer发送完消息后就得到Broker去接收Producer发送的消息了。
Producer发送给Broker消息时候,发送的请求code为SEND_MESSAGE(这里在上一章节有过分析),根据消息发送过来的Code,这时会调用NettyRemotingAbstract的processRequestCommand方法,该方法里面会根据消息传输的Code来取出对应的Processor,进入Processor系列类的SendMessageProcessor的asyncProcessRequest方法(前面这一部分之前都有过分析,接下来我们一起看看后面的操作,正好也将之前的知识串在一起更有利于理解和记忆)


public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消息重回队列case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析消息头SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 构建上下文,并调用处理前钩子函数mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// 判断批量消息还是单条消息if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}
}

首先解析消息头构建上下文,处理消息发送前钩子函数,最后异步处理消息请求,如果是批量消息调用asyncSendBatchMessage方法,如果是单条消息调用asyncSendMessage方法。

处理单条消息 private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 准备响应命令对象final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);// 时间msgInner.setBornTimestamp(requestHeader.getBornTimestamp());// 远程地址msgInner.setBornHost(ctx.channel().remoteAddress());// 主机msgInner.setStoreHost(this.getStoreHost());// 重试次数msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();// ...省略CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 事务消息if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 事务消息的状态(后面再分析)putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 消息存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}// 生成结果返回return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

构建MessageExtBrokerInner对象,设置相关属性执行asyncPutMessage方法存储消息并将结果返回客户端。

创建响应,验证以及自动创建topic


// 准备响应,验证以及自动创建topic
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {// 准备响应final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);// 设置唯一idresponse.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);// 获取broker处理请求服务的起始时间final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}response.setCode(-1);// 验证topic以及自动创建逻辑super.msgCheck(ctx, requestHeader, response);return response;
}

this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()用来判断是否支持自动创建topic,根据权限来判断如果是不支持自动创建就将权限设置为可读可写不可继承,后面我们去判断是否可以去继承,如果能继承就说明支持自动创建,这是就会new一个TopicConfig,这样就通过autoCreateTopicEnable自动来控制是否能够自动创建topic,同时也会调用registerBrokerAll方法注册到Broker路由信息里面,当然官方建议我们还是不要开启这个配置因为它没有做到压力的分摊。

存盘 asyncPutMessage方法
根据topic查询对应的路由信息即broker。

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {   msg.setStoreTimestamp(System.currentTimeMillis());msg.setBodyCRC(UtilAll.crc32(msg.getBody()));AppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延迟消息转到系统Topic(后面在分析)if (msg.getDelayTimeLevel() > 0) {// ...省略}}// 发送消息地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}// 存储消息地址InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}// 更新消息大小PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);if (!multiDispatch.isMultiDispatchMsg(msg)) {PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());}PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 写入CommitLog文件前加锁,保证文件操作并发安全putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {// 获取最后一个mapperFileMappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;msg.setStoreTimestamp(beginLockTimestamp);// 如果不存在或者满了就创建一个if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 实际写入CommitLog,在后面追加result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {// 添加成功直接breakcase PUT_OK:break;// 表示当前文件存放不下,只保存了一部分case END_OF_FILE:unlockMappedFile = mappedFile;// 创建一个新的文件mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 继续追加进去result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}// 锁的时间elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {beginTimeInLock = 0;putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 提交刷盘申请CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// 提交主从复制申请CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}

首先它会去处理延时消息这里我不做过细的分析,后面针对各种消息在来具体分析,接着就将消息进行编码然后加锁并写入消息以获取最后文件进行追加的方式来将消息内存文件里面,最后进行刷盘以及通知主从同步的操作。

相关文章:

6、Broker消息处理流程(六)

前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器&#xff0c;接着分析Producer进行消息的发送&#xff0c;当Producer发送完消息后就得到Broker去接收Producer发送的消息了。 Producer发送给Broker消息时候&#xff0c;发送的请求code为SEND_MESSAGE(这…...

Clean 架构下的现代 Android 架构指南

Clean 架构下的现代 Android 架构指南 Clean 架构是 Uncle Bob 提出的一种软件架构&#xff0c;Bob 大叔同时也是 SOLID 原则的命名者。 Clean 架构图如下&#xff1a; 这张图描述的是整个软件系统的架构&#xff0c;而不是单体软件&#xff0c;其中至少包括服务端以及客户端…...

代码随想录算法训练营第四十六天| 139 单词拆分

目录 139 单词拆分 139 单词拆分 class Solution { public:bool wordBreak(string s, vector<string>& wordDict) {vector<bool>dp(s.size() 1);//长度为i的字符串时能否成功拆分unordered_set<string>set(wordDict.begin(),wordDict.end());dp[0] t…...

IEEE期刊论文模板

一、模板下载 1、登陆IEEE作者中心Author Center 地址&#xff1a;Publish with IEEE Journals - IEEE Author Center Journals 2、点击“Download a template” 3、在弹出的模板下载页面点击IEEE模板选择器“IEEE Template Selector” 4、在弹出的模板选择器页面点击“Tran…...

上位机与PLC:ModbusTCP通讯之数据类型转换

前请提要: 从PLC读取的数值,不管是读正负整数还是正负浮点数,读取过来后都会变成UInt16,也就是Ushort类型 一、ushort(UInt16)转成 Int32 源代码方法: //ushort类型转Int32类型的方法private int ushortToInt32(ushort[] date, int start){//先进行判断,长度是否正确…...

界面控件DevExpress WPF导航组件,助力升级应用程序用户体验!(上)

DevExpress WPF的Side Navigation&#xff08;侧边导航&#xff09;、TreeView、导航面板组件能帮助开发者在WPF项目中添加Windows样式的资源管理器栏或Outlook NavBar&#xff08;导航栏&#xff09;&#xff0c;DevExpress WPF NavBar和Accordion控件包含了许多开发人员友好的…...

联合基于信息论的安全和隐蔽通信的框架

这个标题很帅 abstractintroductionsystem modelPROPOSED JOINT OPTIMIZATION OF ITS AND COVERT TRANSMISSION RATE信息论安全 (ITS)隐蔽通信需要(CC)Joint Information-Theoretic Secrecy and Covert Communication in the Presence of an Untrusted User and Warden 202…...

行业地位失守,业绩持续失速,科沃斯的故事不好讲

特劳特曾在《定位》一书中提到&#xff0c;为了在容量有限的消费者心智中占据品类&#xff0c;品牌最好的差异化就是成为第一&#xff0c;做品类领导者或开创者&#xff0c;销量遥遥领先&#xff1b;其次分化品类&#xff0c;做到细分品类的唯一&#xff0c;即细分品类的第一。…...

蓝桥杯:货物摆放--因数存到数组里的技巧--减少运算量的方法

小蓝有一个超大的仓库&#xff0c;可以摆放很多货物。 现在&#xff0c;小蓝有 n 箱货物要摆放在仓库&#xff0c;每箱货物都是规则的正方体。小蓝规定了长、宽、高三个互相垂直的方向&#xff0c;每箱货物的边都必须严格平行于长、宽、高。 小蓝希望所有的货物最终摆成一个大…...

我的创作纪念日——一年

机缘 初心始于对技术的热爱和分享知识的渴望。最初&#xff0c;我在一次练习中遇到了一些问题&#xff0c;通过解决这些问题并将解决方案记录下来&#xff0c;我意识到分享经验对自己和他人都非常有价值。于是&#xff0c;我开始在博客和社交平台上记录日常学习过程、撰写技术…...

Windows server 部署iSCSI共享磁盘搭建故障转移群集

在域环境下&#xff0c;在域控制器中配置iSCSI服务&#xff0c;配置共享网络磁盘&#xff0c;在节点服务器使用共享磁盘&#xff0c;并在节点服务器中搭建故障转移群集&#xff0c;实现故障转移 环境准备 准备3台服务器&#xff0c;配置都是8g2核&#xff0c;50g硬盘&#xf…...

2023年山东省职业院校技能大赛信息安全管理与评估二三阶段样题

2023年山东省职业院校技能大赛信息安全管理与评估二三阶段 样题 第二阶段 模块二 网络安全事件响应、数字取证调查、应用程序安全 一、竞赛内容 Geek极安云科专注技能竞赛技术提升&#xff0c;基于各大赛项提供全面的系统性培训&#xff0c;拥有完整的培训体系。团队拥有曾…...

数据结构——栈与栈排序

栈的特性 栈是一种遵循后进先出&#xff08;LIFO&#xff09;原则的数据结构。其基本操作包括&#xff1a; push&#xff1a;将元素添加到栈顶。pop&#xff1a;移除栈顶元素。peek&#xff1a;查看栈顶元素&#xff0c;但不移除。 栈排序的原理 栈排序的核心是使用两个栈&…...

Java Web应用小案例 - 实现用户登录功能

文章目录 一、使用纯JSP方式实现用户登录功能&#xff08;一&#xff09;项目概述&#xff08;二&#xff09;实现步骤1、创建Web项目2、创建登录页面 二、使用JSPServlet方式实现用户登录功能三、使用JSPServletDB方式实现用户登录功能 一、使用纯JSP方式实现用户登录功能 &a…...

Excel——多列合并成一列的4种方法

Excel怎么将多列内容合并成一列&#xff1f; 怎么将多个单元格的内容连接起来放在一个单元格里&#xff1f; 比如下图&#xff0c;要将B、C、D列的内容&#xff0c;合并成E列那样&#xff0c;该怎么做呢&#xff1f; △图1 本文中&#xff0c;高潜老师将给大家介绍 4种 将多…...

Vue笔记(四)路由

路由&#xff08;Vue Router&#xff09; 用Vue Vue Router创建单页面应用非常简单。当加入Vue Router时&#xff0c;需要将组件映射到路由上&#xff0c;让Vue知道在哪里渲染它们。 路由基本例子 <!-- 引入Vue 和 router --><script src"https://unpkg.com/vu…...

Redis部署-哨兵模式

目录 redis sentinel相关名词 redis sentinel架构 故障转移流程 基于docker搭建redis哨兵 准备工作 搭建过程 模拟主节点宕机,观察哨兵节点的工作流程 哨兵重新选取主节点的流程 1.主观下线 2.客观下线 3.哨兵节点推举出一个leader节点 4.leader选举完毕,leader挑选…...

滑动窗口练习(三)— 加油站问题

题目 测试链接 在一条环路上有 n 个加油站&#xff0c;其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车&#xff0c;从第 i 个加油站开往第 i1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发&#xff0c;开始时油箱为空。 给定两个整数数组…...

udemy angular decoration 自存

番外 为什么一个ts文件变成了component,因为它使用了components装饰器 components is just a class,you export it so angular know how to use it 举例&#xff1a;组件装饰器 decoration前总是有一个符号 decoration的作用&#xff08;之一&#xff1f;&#xff09; NgModu…...

msvcr90.dll丢失的解决方法分享,5个快速修复dll文件丢失教程

在今天的电脑使用过程中&#xff0c;我们可能会遇到各种各样的问题。其中之一就是msvcr90.dll丢失的问题。那么&#xff0c;msvcr90.dll是什么&#xff1f;msvcr90.dll丢失对电脑有什么影响&#xff1f;又该如何解决这个问题呢&#xff1f;接下来&#xff0c;我将为大家详细介绍…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…...

国防科技大学计算机基础课程笔记02信息编码

1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制&#xff0c;因此这个了16进制的数据既可以翻译成为这个机器码&#xff0c;也可以翻译成为这个国标码&#xff0c;所以这个时候很容易会出现这个歧义的情况&#xff1b; 因此&#xff0c;我们的这个国…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)

骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术&#xff0c;它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton)&#xff1a;由层级结构的骨头组成&#xff0c;类似于人体骨骼蒙皮 (Mesh Skinning)&#xff1a;将模型网格顶点绑定到骨骼上&#xff0c;使骨骼移动…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...