当前位置: 首页 > news >正文

RocketMq源码分析(八)--消息消费流程

文章目录

  • 一、消息消费实现
  • 二、消息消费过程
    • 1、消息拉取
    • 2、消息消费
      • 1)提交消费请求
      • 2)消费消息

一、消息消费实现

  消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。
在这里插入图片描述

二、消息消费过程

1、消息拉取

  1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成

//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转换成PUSH消息消费服务,然后消费消息DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}

  2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest)异步完成消息消费。

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {// ......//从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 然后把消息提交到消费线程池中执行,// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// ......}}}// ......};

2、消息消费

1)提交消费请求

  pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {//异步线程池中执行this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {//提交异常,延迟5S再提交this.submitConsumeRequestLater(consumeRequest);}} else {//超过最大数量,分批for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}

  submitConsumeRequest方法中,

  • 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
  • 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
  • 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交

2)消费消息

  ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run@Overridepublic void run() {//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;//恢复重试消息主题名// RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会//首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;//执行钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//内部消息监听器消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}//后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//同步消息消费状态和offsetif (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}

  ConsumeRequest线程中,执行步骤如下

  • 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
  • 恢复重试消息的topic和namespace
  • 如果存在钩子函数,则执行前置钩子函数 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext)
  • 调用消息监听器消费消息listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl)
  • 如果存在后置钩子,则执行后置钩子函数
  • 消息消费结果处理

相关文章:

RocketMq源码分析(八)--消息消费流程

文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1&#xff09;提交消费请求2&#xff09;消费消息 一、消息消费实现 消息消费有2种实现&#xff0c;分别为&#xff1a;并发消费实现&#xff08;ConsumeMessageConcurrentlyService&#xff09;和顺序消费实现…...

sql--索引使用

最左前缀法则&#xff08;联合索引&#xff09; 联合索引 位置不影响&#xff0c;但是所有索引必须连续使用&#xff0c;才会走索引 中间跳过则会造成后面索引则会失效 索引失效 规避方法---尽量使用> 或 < Explain需要重点关注的字段 Type key_leng possibl…...

alibaba.fastjson的使用(三)-- Map、List ==》JSON字符串

目录 1.使用到的方法为: 2. Map转JSON字符串 3. List转JSON字符串 1.使用到的方法为: static String toJSONString(Object object) 2. Map转JSON字符串 /**...

pycharm 2023.2.3设置conda虚拟环境

分两步&#xff1a; &#xff08;1&#xff09;设置Virtualenv Environment &#xff08;2&#xff09;设值Conda Executable 加载conda环境&#xff0c;然后选择conda环境...

安卓Frida 脱壳

总结下现在脱壳的方法,比如寒冰大佬的Fart,买Nexus 手机,然后刷入肉丝老师的镜像就可以。是比较快速的方式。我今天推荐的方式是,使用Frida 来脱壳,基本上满足日常需求。也不用特别准备手机,脱壳镜像等。 Frida 环境电脑端安装比较简单,主要注意和手机版本相同即可。手机…...

【C】为什么7.0会被存储为6.99999

在《C Primer Plus》第 6 版 3.3.3 节 浮点数的介绍中&#xff0c;作者说浮点数通常只是实际值的近似值&#xff0c;例如&#xff0c;7.0可能被储存为浮点值6.99999。 如果采用32位的IEEE 754浮点表示形式来存储7.0&#xff0c;那么它的二进制表示将如下&#xff1a; 符号位&…...

Framework -- 系统架构

一、前言 framework的学习&#xff0c;需要掌握到什么程度&#xff1f; App 的启动流程&#xff1a;整体的过程&#xff0c;具体到某些类在整个流程中所起的作用&#xff1b;组件的设计模式&#xff0c;核心设计思想&#xff1b;需要知晓目前已知的问题&#xff0c;以及解决方…...

1.1 计算机安全概念

思维导图&#xff1a; 前言&#xff1a; 第1章: 计算机与网络安全概念笔记 1. 学习目标 了解保密性、完整性和可用性的关键安全需求。了解OSI的X.800安全架构。识别和举例说明不同的安全威胁和攻击。掌握安全设计的基本准则。熟悉攻击面和攻击树的使用。了解与密码标准相关的…...

react中的函数柯里化

函数柯里化是一种将接受多个参数的函数转化为一系列接受单一参数的函数的技术。在React开发中&#xff0c;函数柯里化可以帮助我们更好地组织组件的代码&#xff0c;使其具有更好的可读性和可复用性。 一个简单的函数柯里化示例&#xff1a; function add(a) {return functio…...

Unity点乘的实战案例1

向量的点乘,也叫向量的内积、数量积&#xff0c;对两个向量执行点乘运算&#xff0c;就是对这两个向量对应位一一相乘之后求和的操作&#xff0c;点乘的结果是一个标量。点乘&#xff0c;也叫数量积。结果是一个向量在另一个向量方向上投影的长度&#xff0c;是一个标量。 • …...

Hive数据查询详解

本专栏案例数据集链接: https://download.csdn.net/download/shangjg03/88478038 1.数据准备 为了演示查询操作,这里需要预先创建三张表,并加载测试数据。 1.1 员工表 -- 建表语句CREATE TABLE emp(empno INT, -- 员工表编号ename STRING, -- 员工姓名...

人工智能基础_机器学习008_使用正规方程_损失函数进行计算_一元一次和二元一次方程演示_sklearn线性回归演示---人工智能工作笔记0048

自然界很多都是正态分布的,身高,年龄,体重...但是财富不是. 然后我们来看一下这个y = wx+b 线性回归方程. 然后我们用上面的代码演示. 可以看到首先import numpy as np 导入numby 数据计算库 import matplotlib.pyplot as plt 然后导入图形画的库 然后: X = np.linspace(0,…...

【详细】Java网络通信 TCP、UDP、InetAddress

一、网络程序设计基础 1.局域网与因特网 为了实现两台计算机的通信&#xff0c;必须用一个网络线路连接两台计算机&#xff08;服务器<-->网络<-->客户机&#xff09;。 服务器是指提供信息的计算机或程序&#xff0c;客户机是指请求信息的计算机或程序。网络用…...

Linux(Centos7)操作记录

1、nginx -t #Nginx配置文件检查 上述截图代表检查没问题 上述截图检查配置文件配置错误&#xff0c;并提示错误文件位置 2、systemctl restart nginx #重启Nginx 重启Nginx失败 3、systemctl status nginx.service #查看Nginx服务状态 80端口被占导致服务启动失败 4、n…...

Vue全局事件总线实现任意组件间通信

一、安装全局事件总线 全局事件总线就像是一个工具&#xff0c;专门用于挂载自定义事件和。 想要所有的组件都能使用这个全局事件总线&#xff0c;就只有在Vue的原型身上添加一个能够绑定自定义事件的属性。 所以我们在创建Vue实例对象的时候就可以添加如下代码&#xff1a;…...

linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装:

linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装: ebpfebpf-virtual-machine:~$ sudo apt-get install linux-tools-$(uname -r) [sudo] ebpf 的密码&#xff1a; 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 linux…...

hive sql,年月日 时分秒格式的数据,以15分钟为时间段,找出每一条数据所在时间段的上下界限时间值(15分钟分区)

获取当前的年月日 时分秒 select date_format(current_timestamp(), yyyy-MM-dd HH:mm:ss)date_format(时间字段, ‘yyyy-MM-dd HH:mm:ss’) 将时间字段转为 2023-10-18 18:14:16 这种格式 在指定时间上增加15分钟 select from_unixtime(unix_timestamp(current_timestamp(…...

C#学习系列之继承

C#学习系列之继承 啰嗦继承使用特殊基类隐藏方法实际使用总结 啰嗦 基础学习。 继承 一个类派生于另一个基类型&#xff0c;它拥有该基础类型的所有成员字段和函数。A派生于B&#xff0c;继承A的所有东西&#xff0c;同时可以增加自己的东西。 使用 public class parent {p…...

PyTorch入门学习(六):神经网络的基本骨架使用

目录 一、引言 二、创建神经网络骨架 三、执行前向传播 一、引言 神经网络是深度学习的基础。在PyTorch中&#xff0c;可以使用nn.Module类创建自定义神经网络模型。本文将演示如何创建一个简单的神经网络骨架并执行前向传播操作。 二、创建神经网络骨架 首先&#xff0c…...

“体检报告健康解读技术传承人”授牌仪式圆满结束

2023年10月&#xff0c;全国卫生健康技术推广传承项目办公室将体检报告健康解读技术传承人证书授予中山大学麻醉学硕士、副主任医师、医说友道创始人许才燕医生。 10月13日&#xff0c;许才燕医生团队在广东佛山举行“解读体检报告 重构健康生态”体检报告健康解读技术传承人授…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

Linux 内存管理实战精讲:核心原理与面试常考点全解析

Linux 内存管理实战精讲&#xff1a;核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用&#xff0c;还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...

STM32HAL库USART源代码解析及应用

STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

怎么让Comfyui导出的图像不包含工作流信息,

为了数据安全&#xff0c;让Comfyui导出的图像不包含工作流信息&#xff0c;导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo&#xff08;推荐&#xff09;​​ 在 save_images 方法中&#xff0c;​​删除或注释掉所有与 metadata …...

Docker拉取MySQL后数据库连接失败的解决方案

在使用Docker部署MySQL时&#xff0c;拉取并启动容器后&#xff0c;有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致&#xff0c;包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因&#xff0c;并提供解决方案。 一、确认MySQL容器的运行状态 …...

绕过 Xcode?使用 Appuploader和主流工具实现 iOS 上架自动化

iOS 应用的发布流程一直是开发链路中最“苹果味”的环节&#xff1a;强依赖 Xcode、必须使用 macOS、各种证书和描述文件配置……对很多跨平台开发者来说&#xff0c;这一套流程并不友好。 特别是当你的项目主要在 Windows 或 Linux 下开发&#xff08;例如 Flutter、React Na…...

在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7

在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤&#xff1a; 第一步&#xff1a; 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为&#xff1a; // 改为 v…...

Redis——Cluster配置

目录 分片 一、分片的本质与核心价值 二、分片实现方案对比 三、分片算法详解 1. ‌范围分片&#xff08;顺序分片&#xff09;‌ 2. ‌哈希分片‌ 3. ‌虚拟槽分片&#xff08;Redis Cluster 方案&#xff09;‌ 四、Redis Cluster 分片实践要点 五、经典问题解析 C…...

compose 组件 ---无ui组件

在 Jetpack Compose 中&#xff0c;确实存在不直接参与 UI 渲染的组件&#xff0c;它们主要用于逻辑处理、状态管理或副作用控制。这些组件虽然没有视觉界面&#xff0c;但在架构中扮演重要角色。以下是常见的非 UI 组件及其用途&#xff1a; 1. 无 UI 的 Compose 组件分类 (…...