RocketMq源码分析(八)--消息消费流程
文章目录
- 一、消息消费实现
- 二、消息消费过程
- 1、消息拉取
- 2、消息消费
- 1)提交消费请求
- 2)消费消息
一、消息消费实现
消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。

二、消息消费过程
1、消息拉取
1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成
//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转换成PUSH消息消费服务,然后消费消息DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest)异步完成消息消费。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {// ......//从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 然后把消息提交到消费线程池中执行,// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// ......}}}// ......};
2、消息消费
1)提交消费请求
pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {//异步线程池中执行this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {//提交异常,延迟5S再提交this.submitConsumeRequestLater(consumeRequest);}} else {//超过最大数量,分批for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}
submitConsumeRequest方法中,
- 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
- 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
- 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交
2)消费消息
ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run@Overridepublic void run() {//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;//恢复重试消息主题名// RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会//首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;//执行钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//内部消息监听器消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}//后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//同步消息消费状态和offsetif (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}
ConsumeRequest线程中,执行步骤如下
- 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
- 恢复重试消息的topic和namespace
- 如果存在钩子函数,则执行前置钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext) - 调用消息监听器消费消息
listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl) - 如果存在后置钩子,则执行后置钩子函数
- 消息消费结果处理
相关文章:
RocketMq源码分析(八)--消息消费流程
文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1)提交消费请求2)消费消息 一、消息消费实现 消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现…...
sql--索引使用
最左前缀法则(联合索引) 联合索引 位置不影响,但是所有索引必须连续使用,才会走索引 中间跳过则会造成后面索引则会失效 索引失效 规避方法---尽量使用> 或 < Explain需要重点关注的字段 Type key_leng possibl…...
alibaba.fastjson的使用(三)-- Map、List ==》JSON字符串
目录 1.使用到的方法为: 2. Map转JSON字符串 3. List转JSON字符串 1.使用到的方法为: static String toJSONString(Object object) 2. Map转JSON字符串 /**...
pycharm 2023.2.3设置conda虚拟环境
分两步: (1)设置Virtualenv Environment (2)设值Conda Executable 加载conda环境,然后选择conda环境...
安卓Frida 脱壳
总结下现在脱壳的方法,比如寒冰大佬的Fart,买Nexus 手机,然后刷入肉丝老师的镜像就可以。是比较快速的方式。我今天推荐的方式是,使用Frida 来脱壳,基本上满足日常需求。也不用特别准备手机,脱壳镜像等。 Frida 环境电脑端安装比较简单,主要注意和手机版本相同即可。手机…...
【C】为什么7.0会被存储为6.99999
在《C Primer Plus》第 6 版 3.3.3 节 浮点数的介绍中,作者说浮点数通常只是实际值的近似值,例如,7.0可能被储存为浮点值6.99999。 如果采用32位的IEEE 754浮点表示形式来存储7.0,那么它的二进制表示将如下: 符号位&…...
Framework -- 系统架构
一、前言 framework的学习,需要掌握到什么程度? App 的启动流程:整体的过程,具体到某些类在整个流程中所起的作用;组件的设计模式,核心设计思想;需要知晓目前已知的问题,以及解决方…...
1.1 计算机安全概念
思维导图: 前言: 第1章: 计算机与网络安全概念笔记 1. 学习目标 了解保密性、完整性和可用性的关键安全需求。了解OSI的X.800安全架构。识别和举例说明不同的安全威胁和攻击。掌握安全设计的基本准则。熟悉攻击面和攻击树的使用。了解与密码标准相关的…...
react中的函数柯里化
函数柯里化是一种将接受多个参数的函数转化为一系列接受单一参数的函数的技术。在React开发中,函数柯里化可以帮助我们更好地组织组件的代码,使其具有更好的可读性和可复用性。 一个简单的函数柯里化示例: function add(a) {return functio…...
Unity点乘的实战案例1
向量的点乘,也叫向量的内积、数量积,对两个向量执行点乘运算,就是对这两个向量对应位一一相乘之后求和的操作,点乘的结果是一个标量。点乘,也叫数量积。结果是一个向量在另一个向量方向上投影的长度,是一个标量。 • …...
Hive数据查询详解
本专栏案例数据集链接: https://download.csdn.net/download/shangjg03/88478038 1.数据准备 为了演示查询操作,这里需要预先创建三张表,并加载测试数据。 1.1 员工表 -- 建表语句CREATE TABLE emp(empno INT, -- 员工表编号ename STRING, -- 员工姓名...
人工智能基础_机器学习008_使用正规方程_损失函数进行计算_一元一次和二元一次方程演示_sklearn线性回归演示---人工智能工作笔记0048
自然界很多都是正态分布的,身高,年龄,体重...但是财富不是. 然后我们来看一下这个y = wx+b 线性回归方程. 然后我们用上面的代码演示. 可以看到首先import numpy as np 导入numby 数据计算库 import matplotlib.pyplot as plt 然后导入图形画的库 然后: X = np.linspace(0,…...
【详细】Java网络通信 TCP、UDP、InetAddress
一、网络程序设计基础 1.局域网与因特网 为了实现两台计算机的通信,必须用一个网络线路连接两台计算机(服务器<-->网络<-->客户机)。 服务器是指提供信息的计算机或程序,客户机是指请求信息的计算机或程序。网络用…...
Linux(Centos7)操作记录
1、nginx -t #Nginx配置文件检查 上述截图代表检查没问题 上述截图检查配置文件配置错误,并提示错误文件位置 2、systemctl restart nginx #重启Nginx 重启Nginx失败 3、systemctl status nginx.service #查看Nginx服务状态 80端口被占导致服务启动失败 4、n…...
Vue全局事件总线实现任意组件间通信
一、安装全局事件总线 全局事件总线就像是一个工具,专门用于挂载自定义事件和。 想要所有的组件都能使用这个全局事件总线,就只有在Vue的原型身上添加一个能够绑定自定义事件的属性。 所以我们在创建Vue实例对象的时候就可以添加如下代码:…...
linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装:
linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装: ebpfebpf-virtual-machine:~$ sudo apt-get install linux-tools-$(uname -r) [sudo] ebpf 的密码: 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 linux…...
hive sql,年月日 时分秒格式的数据,以15分钟为时间段,找出每一条数据所在时间段的上下界限时间值(15分钟分区)
获取当前的年月日 时分秒 select date_format(current_timestamp(), yyyy-MM-dd HH:mm:ss)date_format(时间字段, ‘yyyy-MM-dd HH:mm:ss’) 将时间字段转为 2023-10-18 18:14:16 这种格式 在指定时间上增加15分钟 select from_unixtime(unix_timestamp(current_timestamp(…...
C#学习系列之继承
C#学习系列之继承 啰嗦继承使用特殊基类隐藏方法实际使用总结 啰嗦 基础学习。 继承 一个类派生于另一个基类型,它拥有该基础类型的所有成员字段和函数。A派生于B,继承A的所有东西,同时可以增加自己的东西。 使用 public class parent {p…...
PyTorch入门学习(六):神经网络的基本骨架使用
目录 一、引言 二、创建神经网络骨架 三、执行前向传播 一、引言 神经网络是深度学习的基础。在PyTorch中,可以使用nn.Module类创建自定义神经网络模型。本文将演示如何创建一个简单的神经网络骨架并执行前向传播操作。 二、创建神经网络骨架 首先,…...
“体检报告健康解读技术传承人”授牌仪式圆满结束
2023年10月,全国卫生健康技术推广传承项目办公室将体检报告健康解读技术传承人证书授予中山大学麻醉学硕士、副主任医师、医说友道创始人许才燕医生。 10月13日,许才燕医生团队在广东佛山举行“解读体检报告 重构健康生态”体检报告健康解读技术传承人授…...
G-Helper终极指南:告别Armoury Crate臃肿体验的3步高效方案
G-Helper终极指南:告别Armoury Crate臃肿体验的3步高效方案 【免费下载链接】g-helper Lightweight Armoury Crate alternative for Asus laptops with nearly the same functionality. Works with ROG Zephyrus, Flow, TUF, Strix, Scar, ProArt, Vivobook, Zenboo…...
别再乱关防火墙了!ESXi 7.0/8.0 安全开放自定义端口的保姆级教程(附配置文件详解)
ESXi防火墙精细化管控:安全开放自定义端口的工程实践 在虚拟化环境中,ESXi主机作为承载业务系统的核心基础设施,其网络安全防护的重要性不言而喻。许多管理员在面对需要开放非标准端口的场景时,往往陷入两难:要么粗暴关…...
AI绘画如何听懂草图?文字+手绘混合生成原理与实战
1. 项目概述:当文字描述遇上手绘草图,AI绘画如何真正“听懂”你的想法? 你有没有过这样的经历:脑子里已经浮现出一幅画面——比如“一只戴圆框眼镜的柴犬坐在咖啡馆窗边,阳光斜射在它毛茸茸的耳朵上,背景是…...
Linux服务器网络断了别慌!手把手教你用nmcli命令快速诊断与恢复连接(实战排错指南)
Linux服务器网络故障急救指南:nmcli命令实战排错全解析 凌晨三点,服务器监控突然告警,SSH连接中断,业务系统全面瘫痪——这是每位运维工程师都经历过的噩梦时刻。当远程连接彻底断开,仅剩控制台可用时,掌握…...
从‘理想采样’到‘现实妥协’:聊聊三电阻电流采样方案里那些不得不做的优化(以FOC矢量控制为例)
从‘理想采样’到‘现实妥协’:三电阻电流采样方案的设计哲学与工程智慧 在电机控制领域,电流采样如同一位沉默的指挥家,用精确的数据引导着PWM交响乐章的每个音符。当我们从教科书走向真实工程现场时,会发现那些看似完美的理论方…...
SUMO低秩优化器:LLM训练内存效率提升技术解析
1. 低秩优化技术背景与SUMO核心价值在大型语言模型(LLM)训练领域,内存消耗一直是制约模型规模扩展的关键瓶颈。传统全参数训练需要存储完整的梯度矩阵,对于数十亿参数的模型,仅单次迭代就可能消耗数十GB显存。低秩优化技术通过矩阵分解原理&a…...
模块型OLT跟光模块有什么区别?
模块型OLT跟光模块有什么区别?明明是同一个 SFP 接口,插上去长得也差不多,为什么有的叫“光模块”,有的叫“模块型 OLT”? 它们到底有什么区别?能不能互换?选错了会怎样?同样是 SFP …...
别再乱调了!用Audition参数均衡器拯救你的干音(附实战预设)
别再乱调了!用Audition参数均衡器拯救你的干音(附实战预设) 录制完一段音频后,你是否经常遇到这样的困扰:人声听起来闷闷的像隔了层棉被,或是尖锐刺耳到让人皱眉,又或者整体浑浊不清缺乏层次感&…...
【MLOps】模型部署与监控实战:从训练到生产的完整链路
一、MLOps概述与重要性 在机器学习项目中,模型训练仅仅是第一步。将训练好的模型部署到生产环境并持续监控其性能,是确保业务价值实现的关键环节。MLOps(Machine Learning Operations)正是解决这一问题的方法论和实践体系。 1.1 什…...
XUnity Auto Translator:打破语言壁垒的Unity游戏翻译解决方案
XUnity Auto Translator:打破语言壁垒的Unity游戏翻译解决方案 【免费下载链接】XUnity.AutoTranslator 项目地址: https://gitcode.com/gh_mirrors/xu/XUnity.AutoTranslator 你是否曾经因为语言障碍而错过精彩的Unity游戏?面对日文、韩文或其他…...
