RocketMq源码分析(八)--消息消费流程
文章目录
- 一、消息消费实现
- 二、消息消费过程
- 1、消息拉取
- 2、消息消费
- 1)提交消费请求
- 2)消费消息
一、消息消费实现
消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。

二、消息消费过程
1、消息拉取
1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成
//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转换成PUSH消息消费服务,然后消费消息DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest)异步完成消息消费。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {// ......//从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 然后把消息提交到消费线程池中执行,// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// ......}}}// ......};
2、消息消费
1)提交消费请求
pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {//异步线程池中执行this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {//提交异常,延迟5S再提交this.submitConsumeRequestLater(consumeRequest);}} else {//超过最大数量,分批for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}
submitConsumeRequest方法中,
- 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
- 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
- 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交
2)消费消息
ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run@Overridepublic void run() {//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;//恢复重试消息主题名// RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会//首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;//执行钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//内部消息监听器消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}//后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//同步消息消费状态和offsetif (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}
ConsumeRequest线程中,执行步骤如下
- 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
- 恢复重试消息的topic和namespace
- 如果存在钩子函数,则执行前置钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext) - 调用消息监听器消费消息
listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl) - 如果存在后置钩子,则执行后置钩子函数
- 消息消费结果处理
相关文章:
RocketMq源码分析(八)--消息消费流程
文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1)提交消费请求2)消费消息 一、消息消费实现 消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现…...
sql--索引使用
最左前缀法则(联合索引) 联合索引 位置不影响,但是所有索引必须连续使用,才会走索引 中间跳过则会造成后面索引则会失效 索引失效 规避方法---尽量使用> 或 < Explain需要重点关注的字段 Type key_leng possibl…...
alibaba.fastjson的使用(三)-- Map、List ==》JSON字符串
目录 1.使用到的方法为: 2. Map转JSON字符串 3. List转JSON字符串 1.使用到的方法为: static String toJSONString(Object object) 2. Map转JSON字符串 /**...
pycharm 2023.2.3设置conda虚拟环境
分两步: (1)设置Virtualenv Environment (2)设值Conda Executable 加载conda环境,然后选择conda环境...
安卓Frida 脱壳
总结下现在脱壳的方法,比如寒冰大佬的Fart,买Nexus 手机,然后刷入肉丝老师的镜像就可以。是比较快速的方式。我今天推荐的方式是,使用Frida 来脱壳,基本上满足日常需求。也不用特别准备手机,脱壳镜像等。 Frida 环境电脑端安装比较简单,主要注意和手机版本相同即可。手机…...
【C】为什么7.0会被存储为6.99999
在《C Primer Plus》第 6 版 3.3.3 节 浮点数的介绍中,作者说浮点数通常只是实际值的近似值,例如,7.0可能被储存为浮点值6.99999。 如果采用32位的IEEE 754浮点表示形式来存储7.0,那么它的二进制表示将如下: 符号位&…...
Framework -- 系统架构
一、前言 framework的学习,需要掌握到什么程度? App 的启动流程:整体的过程,具体到某些类在整个流程中所起的作用;组件的设计模式,核心设计思想;需要知晓目前已知的问题,以及解决方…...
1.1 计算机安全概念
思维导图: 前言: 第1章: 计算机与网络安全概念笔记 1. 学习目标 了解保密性、完整性和可用性的关键安全需求。了解OSI的X.800安全架构。识别和举例说明不同的安全威胁和攻击。掌握安全设计的基本准则。熟悉攻击面和攻击树的使用。了解与密码标准相关的…...
react中的函数柯里化
函数柯里化是一种将接受多个参数的函数转化为一系列接受单一参数的函数的技术。在React开发中,函数柯里化可以帮助我们更好地组织组件的代码,使其具有更好的可读性和可复用性。 一个简单的函数柯里化示例: function add(a) {return functio…...
Unity点乘的实战案例1
向量的点乘,也叫向量的内积、数量积,对两个向量执行点乘运算,就是对这两个向量对应位一一相乘之后求和的操作,点乘的结果是一个标量。点乘,也叫数量积。结果是一个向量在另一个向量方向上投影的长度,是一个标量。 • …...
Hive数据查询详解
本专栏案例数据集链接: https://download.csdn.net/download/shangjg03/88478038 1.数据准备 为了演示查询操作,这里需要预先创建三张表,并加载测试数据。 1.1 员工表 -- 建表语句CREATE TABLE emp(empno INT, -- 员工表编号ename STRING, -- 员工姓名...
人工智能基础_机器学习008_使用正规方程_损失函数进行计算_一元一次和二元一次方程演示_sklearn线性回归演示---人工智能工作笔记0048
自然界很多都是正态分布的,身高,年龄,体重...但是财富不是. 然后我们来看一下这个y = wx+b 线性回归方程. 然后我们用上面的代码演示. 可以看到首先import numpy as np 导入numby 数据计算库 import matplotlib.pyplot as plt 然后导入图形画的库 然后: X = np.linspace(0,…...
【详细】Java网络通信 TCP、UDP、InetAddress
一、网络程序设计基础 1.局域网与因特网 为了实现两台计算机的通信,必须用一个网络线路连接两台计算机(服务器<-->网络<-->客户机)。 服务器是指提供信息的计算机或程序,客户机是指请求信息的计算机或程序。网络用…...
Linux(Centos7)操作记录
1、nginx -t #Nginx配置文件检查 上述截图代表检查没问题 上述截图检查配置文件配置错误,并提示错误文件位置 2、systemctl restart nginx #重启Nginx 重启Nginx失败 3、systemctl status nginx.service #查看Nginx服务状态 80端口被占导致服务启动失败 4、n…...
Vue全局事件总线实现任意组件间通信
一、安装全局事件总线 全局事件总线就像是一个工具,专门用于挂载自定义事件和。 想要所有的组件都能使用这个全局事件总线,就只有在Vue的原型身上添加一个能够绑定自定义事件的属性。 所以我们在创建Vue实例对象的时候就可以添加如下代码:…...
linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装:
linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装: ebpfebpf-virtual-machine:~$ sudo apt-get install linux-tools-$(uname -r) [sudo] ebpf 的密码: 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 linux…...
hive sql,年月日 时分秒格式的数据,以15分钟为时间段,找出每一条数据所在时间段的上下界限时间值(15分钟分区)
获取当前的年月日 时分秒 select date_format(current_timestamp(), yyyy-MM-dd HH:mm:ss)date_format(时间字段, ‘yyyy-MM-dd HH:mm:ss’) 将时间字段转为 2023-10-18 18:14:16 这种格式 在指定时间上增加15分钟 select from_unixtime(unix_timestamp(current_timestamp(…...
C#学习系列之继承
C#学习系列之继承 啰嗦继承使用特殊基类隐藏方法实际使用总结 啰嗦 基础学习。 继承 一个类派生于另一个基类型,它拥有该基础类型的所有成员字段和函数。A派生于B,继承A的所有东西,同时可以增加自己的东西。 使用 public class parent {p…...
PyTorch入门学习(六):神经网络的基本骨架使用
目录 一、引言 二、创建神经网络骨架 三、执行前向传播 一、引言 神经网络是深度学习的基础。在PyTorch中,可以使用nn.Module类创建自定义神经网络模型。本文将演示如何创建一个简单的神经网络骨架并执行前向传播操作。 二、创建神经网络骨架 首先,…...
“体检报告健康解读技术传承人”授牌仪式圆满结束
2023年10月,全国卫生健康技术推广传承项目办公室将体检报告健康解读技术传承人证书授予中山大学麻醉学硕士、副主任医师、医说友道创始人许才燕医生。 10月13日,许才燕医生团队在广东佛山举行“解读体检报告 重构健康生态”体检报告健康解读技术传承人授…...
接口测试中缓存处理策略
在接口测试中,缓存处理策略是一个关键环节,直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性,避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明: 一、缓存处理的核…...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...
linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
JavaScript基础-API 和 Web API
在学习JavaScript的过程中,理解API(应用程序接口)和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能,使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
基于PHP的连锁酒店管理系统
有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发,数据库mysql,前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...
认识CMake并使用CMake构建自己的第一个项目
1.CMake的作用和优势 跨平台支持:CMake支持多种操作系统和编译器,使用同一份构建配置可以在不同的环境中使用 简化配置:通过CMakeLists.txt文件,用户可以定义项目结构、依赖项、编译选项等,无需手动编写复杂的构建脚本…...
redis和redission的区别
Redis 和 Redisson 是两个密切相关但又本质不同的技术,它们扮演着完全不同的角色: Redis: 内存数据库/数据结构存储 本质: 它是一个开源的、高性能的、基于内存的 键值存储数据库。它也可以将数据持久化到磁盘。 核心功能: 提供丰…...
