当前位置: 首页 > news >正文

RocketMq源码分析(八)--消息消费流程

文章目录

  • 一、消息消费实现
  • 二、消息消费过程
    • 1、消息拉取
    • 2、消息消费
      • 1)提交消费请求
      • 2)消费消息

一、消息消费实现

  消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。
在这里插入图片描述

二、消息消费过程

1、消息拉取

  1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成

//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转换成PUSH消息消费服务,然后消费消息DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}

  2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest)异步完成消息消费。

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {// ......//从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 然后把消息提交到消费线程池中执行,// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// ......}}}// ......};

2、消息消费

1)提交消费请求

  pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {//异步线程池中执行this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {//提交异常,延迟5S再提交this.submitConsumeRequestLater(consumeRequest);}} else {//超过最大数量,分批for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}

  submitConsumeRequest方法中,

  • 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
  • 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
  • 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交

2)消费消息

  ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run@Overridepublic void run() {//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;//恢复重试消息主题名// RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会//首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;//执行钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//内部消息监听器消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}//后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//同步消息消费状态和offsetif (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}

  ConsumeRequest线程中,执行步骤如下

  • 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
  • 恢复重试消息的topic和namespace
  • 如果存在钩子函数,则执行前置钩子函数 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext)
  • 调用消息监听器消费消息listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl)
  • 如果存在后置钩子,则执行后置钩子函数
  • 消息消费结果处理

相关文章:

RocketMq源码分析(八)--消息消费流程

文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1&#xff09;提交消费请求2&#xff09;消费消息 一、消息消费实现 消息消费有2种实现&#xff0c;分别为&#xff1a;并发消费实现&#xff08;ConsumeMessageConcurrentlyService&#xff09;和顺序消费实现…...

sql--索引使用

最左前缀法则&#xff08;联合索引&#xff09; 联合索引 位置不影响&#xff0c;但是所有索引必须连续使用&#xff0c;才会走索引 中间跳过则会造成后面索引则会失效 索引失效 规避方法---尽量使用> 或 < Explain需要重点关注的字段 Type key_leng possibl…...

alibaba.fastjson的使用(三)-- Map、List ==》JSON字符串

目录 1.使用到的方法为: 2. Map转JSON字符串 3. List转JSON字符串 1.使用到的方法为: static String toJSONString(Object object) 2. Map转JSON字符串 /**...

pycharm 2023.2.3设置conda虚拟环境

分两步&#xff1a; &#xff08;1&#xff09;设置Virtualenv Environment &#xff08;2&#xff09;设值Conda Executable 加载conda环境&#xff0c;然后选择conda环境...

安卓Frida 脱壳

总结下现在脱壳的方法,比如寒冰大佬的Fart,买Nexus 手机,然后刷入肉丝老师的镜像就可以。是比较快速的方式。我今天推荐的方式是,使用Frida 来脱壳,基本上满足日常需求。也不用特别准备手机,脱壳镜像等。 Frida 环境电脑端安装比较简单,主要注意和手机版本相同即可。手机…...

【C】为什么7.0会被存储为6.99999

在《C Primer Plus》第 6 版 3.3.3 节 浮点数的介绍中&#xff0c;作者说浮点数通常只是实际值的近似值&#xff0c;例如&#xff0c;7.0可能被储存为浮点值6.99999。 如果采用32位的IEEE 754浮点表示形式来存储7.0&#xff0c;那么它的二进制表示将如下&#xff1a; 符号位&…...

Framework -- 系统架构

一、前言 framework的学习&#xff0c;需要掌握到什么程度&#xff1f; App 的启动流程&#xff1a;整体的过程&#xff0c;具体到某些类在整个流程中所起的作用&#xff1b;组件的设计模式&#xff0c;核心设计思想&#xff1b;需要知晓目前已知的问题&#xff0c;以及解决方…...

1.1 计算机安全概念

思维导图&#xff1a; 前言&#xff1a; 第1章: 计算机与网络安全概念笔记 1. 学习目标 了解保密性、完整性和可用性的关键安全需求。了解OSI的X.800安全架构。识别和举例说明不同的安全威胁和攻击。掌握安全设计的基本准则。熟悉攻击面和攻击树的使用。了解与密码标准相关的…...

react中的函数柯里化

函数柯里化是一种将接受多个参数的函数转化为一系列接受单一参数的函数的技术。在React开发中&#xff0c;函数柯里化可以帮助我们更好地组织组件的代码&#xff0c;使其具有更好的可读性和可复用性。 一个简单的函数柯里化示例&#xff1a; function add(a) {return functio…...

Unity点乘的实战案例1

向量的点乘,也叫向量的内积、数量积&#xff0c;对两个向量执行点乘运算&#xff0c;就是对这两个向量对应位一一相乘之后求和的操作&#xff0c;点乘的结果是一个标量。点乘&#xff0c;也叫数量积。结果是一个向量在另一个向量方向上投影的长度&#xff0c;是一个标量。 • …...

Hive数据查询详解

本专栏案例数据集链接: https://download.csdn.net/download/shangjg03/88478038 1.数据准备 为了演示查询操作,这里需要预先创建三张表,并加载测试数据。 1.1 员工表 -- 建表语句CREATE TABLE emp(empno INT, -- 员工表编号ename STRING, -- 员工姓名...

人工智能基础_机器学习008_使用正规方程_损失函数进行计算_一元一次和二元一次方程演示_sklearn线性回归演示---人工智能工作笔记0048

自然界很多都是正态分布的,身高,年龄,体重...但是财富不是. 然后我们来看一下这个y = wx+b 线性回归方程. 然后我们用上面的代码演示. 可以看到首先import numpy as np 导入numby 数据计算库 import matplotlib.pyplot as plt 然后导入图形画的库 然后: X = np.linspace(0,…...

【详细】Java网络通信 TCP、UDP、InetAddress

一、网络程序设计基础 1.局域网与因特网 为了实现两台计算机的通信&#xff0c;必须用一个网络线路连接两台计算机&#xff08;服务器<-->网络<-->客户机&#xff09;。 服务器是指提供信息的计算机或程序&#xff0c;客户机是指请求信息的计算机或程序。网络用…...

Linux(Centos7)操作记录

1、nginx -t #Nginx配置文件检查 上述截图代表检查没问题 上述截图检查配置文件配置错误&#xff0c;并提示错误文件位置 2、systemctl restart nginx #重启Nginx 重启Nginx失败 3、systemctl status nginx.service #查看Nginx服务状态 80端口被占导致服务启动失败 4、n…...

Vue全局事件总线实现任意组件间通信

一、安装全局事件总线 全局事件总线就像是一个工具&#xff0c;专门用于挂载自定义事件和。 想要所有的组件都能使用这个全局事件总线&#xff0c;就只有在Vue的原型身上添加一个能够绑定自定义事件的属性。 所以我们在创建Vue实例对象的时候就可以添加如下代码&#xff1a;…...

linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装:

linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装: ebpfebpf-virtual-machine:~$ sudo apt-get install linux-tools-$(uname -r) [sudo] ebpf 的密码&#xff1a; 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 linux…...

hive sql,年月日 时分秒格式的数据,以15分钟为时间段,找出每一条数据所在时间段的上下界限时间值(15分钟分区)

获取当前的年月日 时分秒 select date_format(current_timestamp(), yyyy-MM-dd HH:mm:ss)date_format(时间字段, ‘yyyy-MM-dd HH:mm:ss’) 将时间字段转为 2023-10-18 18:14:16 这种格式 在指定时间上增加15分钟 select from_unixtime(unix_timestamp(current_timestamp(…...

C#学习系列之继承

C#学习系列之继承 啰嗦继承使用特殊基类隐藏方法实际使用总结 啰嗦 基础学习。 继承 一个类派生于另一个基类型&#xff0c;它拥有该基础类型的所有成员字段和函数。A派生于B&#xff0c;继承A的所有东西&#xff0c;同时可以增加自己的东西。 使用 public class parent {p…...

PyTorch入门学习(六):神经网络的基本骨架使用

目录 一、引言 二、创建神经网络骨架 三、执行前向传播 一、引言 神经网络是深度学习的基础。在PyTorch中&#xff0c;可以使用nn.Module类创建自定义神经网络模型。本文将演示如何创建一个简单的神经网络骨架并执行前向传播操作。 二、创建神经网络骨架 首先&#xff0c…...

“体检报告健康解读技术传承人”授牌仪式圆满结束

2023年10月&#xff0c;全国卫生健康技术推广传承项目办公室将体检报告健康解读技术传承人证书授予中山大学麻醉学硕士、副主任医师、医说友道创始人许才燕医生。 10月13日&#xff0c;许才燕医生团队在广东佛山举行“解读体检报告 重构健康生态”体检报告健康解读技术传承人授…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...

Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?

Redis 的发布订阅&#xff08;Pub/Sub&#xff09;模式与专业的 MQ&#xff08;Message Queue&#xff09;如 Kafka、RabbitMQ 进行比较&#xff0c;核心的权衡点在于&#xff1a;简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...

Docker 本地安装 mysql 数据库

Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker &#xff1b;并安装。 基础操作不再赘述。 打开 macOS 终端&#xff0c;开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...

C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)

名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...

【 java 虚拟机知识 第一篇 】

目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...