RocketMq源码分析(八)--消息消费流程
文章目录
- 一、消息消费实现
- 二、消息消费过程
- 1、消息拉取
- 2、消息消费
- 1)提交消费请求
- 2)消费消息
一、消息消费实现
消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。

二、消息消费过程
1、消息拉取
1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成
//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转换成PUSH消息消费服务,然后消费消息DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest)异步完成消息消费。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {// ......//从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 然后把消息提交到消费线程池中执行,// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// ......}}}// ......};
2、消息消费
1)提交消费请求
pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {//异步线程池中执行this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {//提交异常,延迟5S再提交this.submitConsumeRequestLater(consumeRequest);}} else {//超过最大数量,分批for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}
submitConsumeRequest方法中,
- 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
- 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
- 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交
2)消费消息
ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run@Overridepublic void run() {//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;//恢复重试消息主题名// RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会//首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;//执行钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//内部消息监听器消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}//后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//同步消息消费状态和offsetif (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}
ConsumeRequest线程中,执行步骤如下
- 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
- 恢复重试消息的topic和namespace
- 如果存在钩子函数,则执行前置钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext) - 调用消息监听器消费消息
listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl) - 如果存在后置钩子,则执行后置钩子函数
- 消息消费结果处理
相关文章:
RocketMq源码分析(八)--消息消费流程
文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1)提交消费请求2)消费消息 一、消息消费实现 消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现…...
sql--索引使用
最左前缀法则(联合索引) 联合索引 位置不影响,但是所有索引必须连续使用,才会走索引 中间跳过则会造成后面索引则会失效 索引失效 规避方法---尽量使用> 或 < Explain需要重点关注的字段 Type key_leng possibl…...
alibaba.fastjson的使用(三)-- Map、List ==》JSON字符串
目录 1.使用到的方法为: 2. Map转JSON字符串 3. List转JSON字符串 1.使用到的方法为: static String toJSONString(Object object) 2. Map转JSON字符串 /**...
pycharm 2023.2.3设置conda虚拟环境
分两步: (1)设置Virtualenv Environment (2)设值Conda Executable 加载conda环境,然后选择conda环境...
安卓Frida 脱壳
总结下现在脱壳的方法,比如寒冰大佬的Fart,买Nexus 手机,然后刷入肉丝老师的镜像就可以。是比较快速的方式。我今天推荐的方式是,使用Frida 来脱壳,基本上满足日常需求。也不用特别准备手机,脱壳镜像等。 Frida 环境电脑端安装比较简单,主要注意和手机版本相同即可。手机…...
【C】为什么7.0会被存储为6.99999
在《C Primer Plus》第 6 版 3.3.3 节 浮点数的介绍中,作者说浮点数通常只是实际值的近似值,例如,7.0可能被储存为浮点值6.99999。 如果采用32位的IEEE 754浮点表示形式来存储7.0,那么它的二进制表示将如下: 符号位&…...
Framework -- 系统架构
一、前言 framework的学习,需要掌握到什么程度? App 的启动流程:整体的过程,具体到某些类在整个流程中所起的作用;组件的设计模式,核心设计思想;需要知晓目前已知的问题,以及解决方…...
1.1 计算机安全概念
思维导图: 前言: 第1章: 计算机与网络安全概念笔记 1. 学习目标 了解保密性、完整性和可用性的关键安全需求。了解OSI的X.800安全架构。识别和举例说明不同的安全威胁和攻击。掌握安全设计的基本准则。熟悉攻击面和攻击树的使用。了解与密码标准相关的…...
react中的函数柯里化
函数柯里化是一种将接受多个参数的函数转化为一系列接受单一参数的函数的技术。在React开发中,函数柯里化可以帮助我们更好地组织组件的代码,使其具有更好的可读性和可复用性。 一个简单的函数柯里化示例: function add(a) {return functio…...
Unity点乘的实战案例1
向量的点乘,也叫向量的内积、数量积,对两个向量执行点乘运算,就是对这两个向量对应位一一相乘之后求和的操作,点乘的结果是一个标量。点乘,也叫数量积。结果是一个向量在另一个向量方向上投影的长度,是一个标量。 • …...
Hive数据查询详解
本专栏案例数据集链接: https://download.csdn.net/download/shangjg03/88478038 1.数据准备 为了演示查询操作,这里需要预先创建三张表,并加载测试数据。 1.1 员工表 -- 建表语句CREATE TABLE emp(empno INT, -- 员工表编号ename STRING, -- 员工姓名...
人工智能基础_机器学习008_使用正规方程_损失函数进行计算_一元一次和二元一次方程演示_sklearn线性回归演示---人工智能工作笔记0048
自然界很多都是正态分布的,身高,年龄,体重...但是财富不是. 然后我们来看一下这个y = wx+b 线性回归方程. 然后我们用上面的代码演示. 可以看到首先import numpy as np 导入numby 数据计算库 import matplotlib.pyplot as plt 然后导入图形画的库 然后: X = np.linspace(0,…...
【详细】Java网络通信 TCP、UDP、InetAddress
一、网络程序设计基础 1.局域网与因特网 为了实现两台计算机的通信,必须用一个网络线路连接两台计算机(服务器<-->网络<-->客户机)。 服务器是指提供信息的计算机或程序,客户机是指请求信息的计算机或程序。网络用…...
Linux(Centos7)操作记录
1、nginx -t #Nginx配置文件检查 上述截图代表检查没问题 上述截图检查配置文件配置错误,并提示错误文件位置 2、systemctl restart nginx #重启Nginx 重启Nginx失败 3、systemctl status nginx.service #查看Nginx服务状态 80端口被占导致服务启动失败 4、n…...
Vue全局事件总线实现任意组件间通信
一、安装全局事件总线 全局事件总线就像是一个工具,专门用于挂载自定义事件和。 想要所有的组件都能使用这个全局事件总线,就只有在Vue的原型身上添加一个能够绑定自定义事件的属性。 所以我们在创建Vue实例对象的时候就可以添加如下代码:…...
linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装:
linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装: ebpfebpf-virtual-machine:~$ sudo apt-get install linux-tools-$(uname -r) [sudo] ebpf 的密码: 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 linux…...
hive sql,年月日 时分秒格式的数据,以15分钟为时间段,找出每一条数据所在时间段的上下界限时间值(15分钟分区)
获取当前的年月日 时分秒 select date_format(current_timestamp(), yyyy-MM-dd HH:mm:ss)date_format(时间字段, ‘yyyy-MM-dd HH:mm:ss’) 将时间字段转为 2023-10-18 18:14:16 这种格式 在指定时间上增加15分钟 select from_unixtime(unix_timestamp(current_timestamp(…...
C#学习系列之继承
C#学习系列之继承 啰嗦继承使用特殊基类隐藏方法实际使用总结 啰嗦 基础学习。 继承 一个类派生于另一个基类型,它拥有该基础类型的所有成员字段和函数。A派生于B,继承A的所有东西,同时可以增加自己的东西。 使用 public class parent {p…...
PyTorch入门学习(六):神经网络的基本骨架使用
目录 一、引言 二、创建神经网络骨架 三、执行前向传播 一、引言 神经网络是深度学习的基础。在PyTorch中,可以使用nn.Module类创建自定义神经网络模型。本文将演示如何创建一个简单的神经网络骨架并执行前向传播操作。 二、创建神经网络骨架 首先,…...
“体检报告健康解读技术传承人”授牌仪式圆满结束
2023年10月,全国卫生健康技术推广传承项目办公室将体检报告健康解读技术传承人证书授予中山大学麻醉学硕士、副主任医师、医说友道创始人许才燕医生。 10月13日,许才燕医生团队在广东佛山举行“解读体检报告 重构健康生态”体检报告健康解读技术传承人授…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...
Angular微前端架构:Module Federation + ngx-build-plus (Webpack)
以下是一个完整的 Angular 微前端示例,其中使用的是 Module Federation 和 npx-build-plus 实现了主应用(Shell)与子应用(Remote)的集成。 🛠️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...
