RocketMq源码分析(八)--消息消费流程
文章目录
- 一、消息消费实现
- 二、消息消费过程
- 1、消息拉取
- 2、消息消费
- 1)提交消费请求
- 2)消费消息
一、消息消费实现
消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。
二、消息消费过程
1、消息拉取
1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成
//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转换成PUSH消息消费服务,然后消费消息DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
)异步完成消息消费。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {// ......//从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 然后把消息提交到消费线程池中执行,// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// ......}}}// ......};
2、消息消费
1)提交消费请求
pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {//异步线程池中执行this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {//提交异常,延迟5S再提交this.submitConsumeRequestLater(consumeRequest);}} else {//超过最大数量,分批for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}
submitConsumeRequest方法中,
- 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
- 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
- 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交
2)消费消息
ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run@Overridepublic void run() {//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;//恢复重试消息主题名// RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会//首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;//执行钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//内部消息监听器消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}//后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//同步消息消费状态和offsetif (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}
ConsumeRequest线程中,执行步骤如下
- 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
- 恢复重试消息的topic和namespace
- 如果存在钩子函数,则执行前置钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext)
- 调用消息监听器消费消息
listener.consumeMessage
(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl) - 如果存在后置钩子,则执行后置钩子函数
- 消息消费结果处理
相关文章:

RocketMq源码分析(八)--消息消费流程
文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1)提交消费请求2)消费消息 一、消息消费实现 消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现…...

sql--索引使用
最左前缀法则(联合索引) 联合索引 位置不影响,但是所有索引必须连续使用,才会走索引 中间跳过则会造成后面索引则会失效 索引失效 规避方法---尽量使用> 或 < Explain需要重点关注的字段 Type key_leng possibl…...

alibaba.fastjson的使用(三)-- Map、List ==》JSON字符串
目录 1.使用到的方法为: 2. Map转JSON字符串 3. List转JSON字符串 1.使用到的方法为: static String toJSONString(Object object) 2. Map转JSON字符串 /**...

pycharm 2023.2.3设置conda虚拟环境
分两步: (1)设置Virtualenv Environment (2)设值Conda Executable 加载conda环境,然后选择conda环境...

安卓Frida 脱壳
总结下现在脱壳的方法,比如寒冰大佬的Fart,买Nexus 手机,然后刷入肉丝老师的镜像就可以。是比较快速的方式。我今天推荐的方式是,使用Frida 来脱壳,基本上满足日常需求。也不用特别准备手机,脱壳镜像等。 Frida 环境电脑端安装比较简单,主要注意和手机版本相同即可。手机…...

【C】为什么7.0会被存储为6.99999
在《C Primer Plus》第 6 版 3.3.3 节 浮点数的介绍中,作者说浮点数通常只是实际值的近似值,例如,7.0可能被储存为浮点值6.99999。 如果采用32位的IEEE 754浮点表示形式来存储7.0,那么它的二进制表示将如下: 符号位&…...

Framework -- 系统架构
一、前言 framework的学习,需要掌握到什么程度? App 的启动流程:整体的过程,具体到某些类在整个流程中所起的作用;组件的设计模式,核心设计思想;需要知晓目前已知的问题,以及解决方…...

1.1 计算机安全概念
思维导图: 前言: 第1章: 计算机与网络安全概念笔记 1. 学习目标 了解保密性、完整性和可用性的关键安全需求。了解OSI的X.800安全架构。识别和举例说明不同的安全威胁和攻击。掌握安全设计的基本准则。熟悉攻击面和攻击树的使用。了解与密码标准相关的…...

react中的函数柯里化
函数柯里化是一种将接受多个参数的函数转化为一系列接受单一参数的函数的技术。在React开发中,函数柯里化可以帮助我们更好地组织组件的代码,使其具有更好的可读性和可复用性。 一个简单的函数柯里化示例: function add(a) {return functio…...

Unity点乘的实战案例1
向量的点乘,也叫向量的内积、数量积,对两个向量执行点乘运算,就是对这两个向量对应位一一相乘之后求和的操作,点乘的结果是一个标量。点乘,也叫数量积。结果是一个向量在另一个向量方向上投影的长度,是一个标量。 • …...

Hive数据查询详解
本专栏案例数据集链接: https://download.csdn.net/download/shangjg03/88478038 1.数据准备 为了演示查询操作,这里需要预先创建三张表,并加载测试数据。 1.1 员工表 -- 建表语句CREATE TABLE emp(empno INT, -- 员工表编号ename STRING, -- 员工姓名...

人工智能基础_机器学习008_使用正规方程_损失函数进行计算_一元一次和二元一次方程演示_sklearn线性回归演示---人工智能工作笔记0048
自然界很多都是正态分布的,身高,年龄,体重...但是财富不是. 然后我们来看一下这个y = wx+b 线性回归方程. 然后我们用上面的代码演示. 可以看到首先import numpy as np 导入numby 数据计算库 import matplotlib.pyplot as plt 然后导入图形画的库 然后: X = np.linspace(0,…...

【详细】Java网络通信 TCP、UDP、InetAddress
一、网络程序设计基础 1.局域网与因特网 为了实现两台计算机的通信,必须用一个网络线路连接两台计算机(服务器<-->网络<-->客户机)。 服务器是指提供信息的计算机或程序,客户机是指请求信息的计算机或程序。网络用…...

Linux(Centos7)操作记录
1、nginx -t #Nginx配置文件检查 上述截图代表检查没问题 上述截图检查配置文件配置错误,并提示错误文件位置 2、systemctl restart nginx #重启Nginx 重启Nginx失败 3、systemctl status nginx.service #查看Nginx服务状态 80端口被占导致服务启动失败 4、n…...

Vue全局事件总线实现任意组件间通信
一、安装全局事件总线 全局事件总线就像是一个工具,专门用于挂载自定义事件和。 想要所有的组件都能使用这个全局事件总线,就只有在Vue的原型身上添加一个能够绑定自定义事件的属性。 所以我们在创建Vue实例对象的时候就可以添加如下代码:…...

linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装:
linux-tools-$(uname -r) linux-headers-$(uname -r)工具安装: ebpfebpf-virtual-machine:~$ sudo apt-get install linux-tools-$(uname -r) [sudo] ebpf 的密码: 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 linux…...

hive sql,年月日 时分秒格式的数据,以15分钟为时间段,找出每一条数据所在时间段的上下界限时间值(15分钟分区)
获取当前的年月日 时分秒 select date_format(current_timestamp(), yyyy-MM-dd HH:mm:ss)date_format(时间字段, ‘yyyy-MM-dd HH:mm:ss’) 将时间字段转为 2023-10-18 18:14:16 这种格式 在指定时间上增加15分钟 select from_unixtime(unix_timestamp(current_timestamp(…...

C#学习系列之继承
C#学习系列之继承 啰嗦继承使用特殊基类隐藏方法实际使用总结 啰嗦 基础学习。 继承 一个类派生于另一个基类型,它拥有该基础类型的所有成员字段和函数。A派生于B,继承A的所有东西,同时可以增加自己的东西。 使用 public class parent {p…...

PyTorch入门学习(六):神经网络的基本骨架使用
目录 一、引言 二、创建神经网络骨架 三、执行前向传播 一、引言 神经网络是深度学习的基础。在PyTorch中,可以使用nn.Module类创建自定义神经网络模型。本文将演示如何创建一个简单的神经网络骨架并执行前向传播操作。 二、创建神经网络骨架 首先,…...

“体检报告健康解读技术传承人”授牌仪式圆满结束
2023年10月,全国卫生健康技术推广传承项目办公室将体检报告健康解读技术传承人证书授予中山大学麻醉学硕士、副主任医师、医说友道创始人许才燕医生。 10月13日,许才燕医生团队在广东佛山举行“解读体检报告 重构健康生态”体检报告健康解读技术传承人授…...

查询计算机GUID码
如何查询计算机GUID码(全局唯一标识符) 1.快键键WINR进入注册表 2.找到\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Cryptography路径 3.双击MachineGuid项即可显示计算机GUID码...

MediaPlayer+TextureView实现视频播放功能
前面写一些基础知识的学习,这篇写个小demo,实现视频循环播放功能。 1、xml代码: <TextureViewandroid:id"id/textureView"android:layout_width"600px"android:layout_height"400px"android:focusable&quo…...

webpack 优化
打包优化 webpack 优化1、依赖转化,兼容低版本浏览器2、生产环境关闭sourceMap3、打包输出目录名称修改和静态资源的存放4、修改图标5、修改webpack配置5-1、写在此处的配置可以覆盖掉脚手架本来就预设上有的配置5-2、写在此处的都是预设没有配置的,脚手…...

保障 Golang 项目安全的最佳实践
对任何项目来说,安全都是一个永恒的话题,本文详细讲解一下保障 Golang 项目的安全性需要遵循一些最佳实践。 对源代码和构建出的二进制文件做全面的安全扫描 定期对源代码和二进制文件进行全面的安全扫描,查找漏洞,以便及早识别…...

PG物理备份与恢复之pg_basebackup
PG物理备份与恢复之pg_basebackup 开启WAL日志归档pg_basebackup备份工具全库恢复:recovery.conf 🐘 数据库版本:PostgreSQL 10.4 开启WAL日志归档 通过数据库的全量备份和WAL日志,可以将数据库恢复到任意时间点。每个WAL日志文件…...

npm : 无法将“npm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写,如果包括路径,请确保路径正确,然后再试一次。
1 bug描述 使用vscode执行npm run dev指令时出现 “npm : 无法将“npm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写,如果包括路径,请确保路径正确,然后再试一次 “ 的错误提示,原因是系统里没有安装n…...

Android 13.0 通过驱动实现禁用usb鼠标和usb键盘功能
1.概述 在13.0的系统产品定制化开发中,在进行定制中有关于usb键盘和usb鼠标的需求中,产品要求禁止usb口挂载usb鼠标和usb键盘,所以需要要求在usb挂载类型的时候 判断如果是usb鼠标和usb键盘就不让挂载,这就需要从驱动方面入手来解决这个问题,接下来看下驱动的某些挂载usb…...

Ubuntu 22.04配置/etc/rc.local开机自启文件
1.查看系统版本 rootbogon-virtual-machine:~# lsb_release -a No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 22.04 LTS Release: 22.04 Codename: jammy rootbogon-virtual-machine:~ 2. 解决 /etc/rc.local 开机启动问题 看rc-loc…...

python爬虫之正则表达式解析实战
文章目录 1. 图片爬取流程分析2. 实现代码—爬取家常菜图片 1. 图片爬取流程分析 先获取网址,URL:https://www.xiachufang.com/category/40076/ 定位想要爬取的内容使用正则表达式爬取导入模块指定URLUA伪装(模拟浏览器)发起请求…...

什么是虚拟dom?
虚拟DOM是利用js描述元素与元素的关系,用js对象来表示真实的dom树结构,创建一个虚拟的dom对象。 虚拟DOM的原理是根据真实DOM生成一个js对象,里面有元素、属性和文本,这些与真实DOM中的元素、属性、文本一一对应。虚拟DOM可以更好…...