当前位置: 首页 > news >正文

RocketMQ系统性学习-RocketMQ原理分析之消费者的接收消息流程

🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

消费者的接收消息流程

还是先把消费者接收消息的流程图贴出来,再细说代码流程:
在这里插入图片描述

首先先从消费者的业务调用出发

// 创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
// ...
// 注册监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
// 启动消费者
consumer.start();

那么我们就从 consumer.start() 进入,看一下消费者的启动逻辑,该方法的核心代码也就是:

this.defaultMQPushConsumerImpl.start();

那么进入到这个 start 方法,这里进行了一些配置以及客户端的启动:

  1. 通过 checkConfig() 检查消费组的一些配置:名称是否符合规范、消费者的线程数、消费者的监听等等
  2. 之后再设置一些属性
  3. 通过 mQClientFactory.start() 启动客户端

那么我们进入到启动客户端这个逻辑,我们猜测这里 start 之后,可能就可以进行消息的拉取了,那么在 start 这个方法中,看到了有下边这一行:

this.pullMessageService.start();

这不正是拉取消息的服务吗?点进去之后,发现就是启动了一个线程,这个线程呢就是 this,那么我们点进去这个 start 方法是定义在 ServiceThread 类中,这个类并没有定义 run 方法,因此呢,这个 run 方法应该是定义在了子类 PullMessageService 类中,点进去找到 run 方法,可以看到在 run 方法中就会不停地去 messageRequestQueue 中拉取数据:

MessageRequest messageRequest = this.messageRequestQueue.take();

既然在这里拉取数据了,那么数据是什么时候放到 messageRequestQueue 中的呢?

只需要搜一下哪里调用到了 this.messageRequestQueue.put 就可以知道了,找到之后呢,我们在这一行打个断点,再去启动生产者,就可以知道整个调用链了,

在这里插入图片描述

那么根据栈调用情况呢,可以发现这一行是通过 RebalanceService 的 run 方法进入的,那么这个 RebalanceService 一定是在哪里作为一个线程被启动了

在这里插入图片描述

那么呢,我们之前说了在启动客户端的时候,调用 this.pullMessageService.start() 启动了这个线程,那么在下一行就启动了 rebalanceService 这个线程:

在这里插入图片描述

因此呢,就通过 debug 的方式找到了向 messageRequestQueue 中存放消息就是在 RebalanceService 这个线程中做的

相关文章:

RocketMQ系统性学习-RocketMQ原理分析之消费者的接收消息流程

&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308; 【11来了】文章导读地址&#xff1a;点击查看文章导读&#xff01; &#x1f341;&#x1f341;&#x1f341;&#x1f341;&#x1f341;&#x1f341;&#x1f3…...

butterfly蝴蝶分类

一、分类原因 由于植物分类所使用的数据集存在一定问题&#xff0c;修改起来比较麻烦&#xff0c;本次采用kaggle的ButterflyMothsImageClassification数据集&#xff0c;对100这种蝴蝶进行分类。 二、100中蝴蝶类别 ‘ADONIS’,‘AFRICAN GIANT SWALLOWTAIL’,‘AMERICAN S…...

计算机基础:网络基础

目录 一.网线制作 1.制作所需要工具 网线制作标准 ​编辑 2.水晶头使用 3.网线钳使用 4.视频教学 二.集线器、交换机介绍 1.OSI七层模型 2.TCP/IP四层参考模型 3.集线器、交换机。路由器介绍 集线器 交换机 路由器 区别 三.路由器的配置 1.路由器设置 说明书 设…...

[原创][R语言]股票分析实战[3]:周级别涨幅趋势的相关性

[简介] 常用网名: 猪头三 出生日期: 1981.XX.XX QQ联系: 643439947 个人网站: 80x86汇编小站 https://www.x86asm.org 编程生涯: 2001年~至今[共22年] 职业生涯: 20年 开发语言: C/C、80x86ASM、PHP、Perl、Objective-C、Object Pascal、C#、Python 开发工具: Visual Studio、D…...

MSVC编译 openssl windows 库

开发需要在windows下集成 openssl 库&#xff0c;参考官方指导完成了编译&#xff1a;openssl/NOTES-WINDOWS.md at master openssl/openssl 不过&#xff0c;最后还是走了直接下载的捷径。 1. 安装 ActivePerl 需要在 ActiveState 注册账户&#xff0c;之后彼会提供具体的…...

electron兼容统信UOS系统过程中的坑

这里写目录标题 找统信支持人员咨询过&#xff0c;他们说不对electron提供支持&#xff0c;如果需要兼容统信UOS还是建议换个开发技术gbm_bo_map--no-sandboxNo protocol specified任务栏图标总结 找统信支持人员咨询过&#xff0c;他们说不对electron提供支持&#xff0c;如果…...

Flink系列之:Apache Kafka SQL 连接器

Flink系列之&#xff1a;Apache Kafka SQL 连接器 一、Apache Kafka SQL 连接器二、依赖三、创建Kafka 表四、可用的元数据五、连接器参数六、特性七、Topic 和 Partition 的探测八、起始消费位点九、有界结束位置十、CDC 变更日志&#xff08;Changelog&#xff09; Source十一…...

灰盒测试简要学习指南!

在本文中&#xff0c;我们将了解什么是灰盒测试、以及为什么要使用它&#xff0c;以及它的优缺点。 在软件测试中&#xff0c;灰盒测试是一种有用的技术&#xff0c;可以确保发布的软件是高性能的、安全的并满足预期用户的需求。这是一种从外部测试应用程序同时跟踪其内部操作…...

【经典LeetCode算法题目专栏分类】【第7期】快慢指针与链表

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能AI、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 快慢指针 移动零 class…...

springboot解决XSS存储型漏洞

springboot解决XSS存储型漏洞 XSS攻击 XSS 攻击&#xff1a;跨站脚本攻击(Cross Site Scripting)&#xff0c;为不和 前端层叠样式表(Cascading Style Sheets)CSS 混淆&#xff0c;故将跨站脚本攻击缩写为 XSS。 XSS(跨站脚本攻击)&#xff1a;是指恶意攻击者往 Web 页面里插…...

I.MX6ULL_Linux_驱动篇(47)linux RTC驱动

RTC 也就是实时时钟&#xff0c;用于记录当前系统时间&#xff0c;对于 Linux 系统而言时间是非常重要的&#xff0c;就和我们使用 Windows 电脑或手机查看时间一样&#xff0c;我们在使用 Linux 设备的时候也需要查看时间。本章我们就来学习一下如何编写 Linux 下的 RTC 驱动程…...

详解IBM企业架构框架模型CBM

&#xff08;一&#xff09;&#xff1a;什么是CBM IBM的CBM是组件化业务模型&#xff08;Component Business Model&#xff09;&#xff0c;是IBM在2003年提出的一种业务架构方法论。 目的是通过将企业的业务活动划分为一些独立、模块化、可重用的业务组件&#xff0c;来识…...

宝塔面板安装MySQL数据库并通过内网穿透工具实现公网远程访问

文章目录 前言1.Mysql 服务安装2.创建数据库3.安装 cpolar3.2 创建 HTTP 隧道 4.远程连接5.固定 TCP 地址5.1 保留一个固定的公网 TCP 端口地址5.2 配置固定公网 TCP 端口地址 前言 宝塔面板的简易操作性,使得运维难度降低,简化了 Linux 命令行进行繁琐的配置,下面简单几步,通…...

Elasticsearch 性能调优基础知识

Elastic Stack 已成为监控任何环境或应用程序的实际解决方案。 从日志、指标和正常运行时间到性能监控甚至安全&#xff0c;Elastic Stack 已成为满足几乎所有监控需求的一体化解决方案。 Elasticsearch 通过提供强大的分析引擎来处理任何类型的数据&#xff0c;成为这方面的基…...

速盾网络:网络安全守护者

速盾网络作为一家专业的网络安全服务提供商&#xff0c;致力于为企业和个人提供全面、高效、可靠的网络安全解决方案。以下是速盾网络的主要业务介绍&#xff1a; 一、CDN加速 速盾网络拥有全球化的CDN加速网络&#xff0c;通过分布在全球各地的节点&#xff0c;为客户提供快速…...

jmeter如何参数化?Jmeter参数化设置的5种方法

jmeter如何参数化&#xff1f;我们使用jmeter在进行测试的时候&#xff0c;测试数据是一项重要的准备工作&#xff0c;每次迭代的数据当不一样的时候&#xff0c;需要进行参数化&#xff0c;从参数化的文件中来读取测试数据。那么&#xff0c;你知道jmeter如何进行参数化吗&…...

01AVue入门(持续学习中)

1.使用AVue开发简单的前端页面直接简单到起飞,他是Element PlusVueVite开发的,不需要向元素的前端代码一样一个组件要传很多参数,他可以使用Json文本来控制我们要传入的数据结构来决定显示什么 //我使用的比较新,我们也可以使用cdn直接使用script标签直接引入 2.开发中遇到的坑…...

js 深浅拷贝的区别和实现方法

一&#xff1a;什么浅拷贝&#xff1a; 浅拷贝创建一个新对象&#xff0c;然后将原始对象的所有属性值复制到新对象中。这意味着&#xff0c;如果原始对象的属性值是基本类型&#xff08;例如数字、字符串&#xff09;&#xff0c;那么这些值会被直接复制到新对象中。但如果属…...

【jvm从入门到实战】(九) 垃圾回收(2)-垃圾回收器

垃圾回收器是垃圾回收算法的具体实现。 由于垃圾回收器分为年轻代和老年代&#xff0c;除了G1之外其他垃圾回收器必须成对组合进行使用 垃圾回收器的组合使用关系图如下。 常用的组合如下: Serial&#xff08;新生代&#xff09; Serial Old&#xff08;老年代&#xff09; Pa…...

C#基础——匿名函数和参数不固定的函数

匿名函数、参数不固定的函数 匿名函数&#xff1a;没有名字&#xff0c;又叫做lambda表达式&#xff0c;具有简洁&#xff0c;灵活&#xff0c;可读的特性。 具名函数&#xff1a;有名字的函数。 1、简洁性&#xff1a;使用更少的代码实现相同的功能 MyDelegate myDelegate…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具

文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...

自然语言处理——Transformer

自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效&#xff0c;它能挖掘数据中的时序信息以及语义信息&#xff0c;但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN&#xff0c;但是…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

【SpringBoot自动化部署】

SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一&#xff0c;能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时&#xff0c;需要添加Git仓库地址和凭证&#xff0c;设置构建触发器&#xff08;如GitHub…...