RocketMQ源码分析消息消费机制—-消费端消息负载均衡机制与重新分布
1、消息消费需要解决的问题
首先再次重复啰嗦一下 RocketMQ 消息消费的一些基本元素的关系
主题 —》 消息队列(MessageQueue) 1 对多。
主题 —》 消息生产者,一般主题会由多个生产者组成,生产者组。
主题 —》 消息消费者,一般一个主题也会被多个消费者消费。
那消息消费至少需要解决如下问题:
1、一个消费组中多个消费者是如何对消息队列(1个主题多个消息队列)进行负载消费的。
2、一个消费者中多个线程又是如何协作(并发)的消费分配给该消费者的消息队列中的消息呢?
3、消息消费进度如何保存,包括MQ是如何知道消息是否正常被消费了。
4、RocketMQ 推拉模式实现机制。
再提一个业界关于消费者与消息队列的消费规则。
1个消费者可以消费多个消息队列,但一个消息队列同一时间只能被一个消费者消费,这又是如何实现的呢?
本文紧接着上文:消息消费概述 。
继续探讨消息分发与消费端负载均衡。
我们从上文知道,PullMessageService 线程主要是负责 pullRequestQueue 中的 PullResult,那问题来了,pullRequestQueue 中的数据从哪来,在什么时候由谁来填充呢。
那我们就先沿着这条线索分析下去,看一下 PullMessageService 的 pullReqestQueue 添加元素的方法的调用链条如下:
也就是调用链:
RebalanceService. run()MQClientInstance.doRebalance()DefaultMQPulConsumerImpl.doRebalance()RebalanceImpl.doRebalance()RebalanceImpl.rebalanceByTopicRebalanceImpl.updateProcessQueueTableInRebalanceRebalanceImpl.dispatchPullRequestDefaultMQPushConsumerImpl.executePullRequestImmediately
从上面可以直观的看出,向 PullMesssageService 的 LinkedBlockingQueue pullRequestQueue 添加 PullRequest的是 RebalanceService.run 方法,就是向 PullMessageService 中放入 PullRequest,才会驱动 PullMessageSerivce run方法的运行,如果 pullRequestQueue 中没有元素,PullMessageService 线程将被阻塞。
那么RebalanceService是何许人也,让我们一起来揭开其神秘面纱。
2、消息消费负载机制分析
2.1 RebalanceService 线程
从上面可以看出,MQClientInstance 持有一个 RebalanceService 线程并启动它。RebalanceService 线程的 run 方法比较简单,就是直接调用 mqClientFactory.doRebalance。
下面重点分步骤来详细探究 MQClientInstance.doRebalance 方法的执行流程。
2.1.1 MQClientInstance.doRebalance
循环遍历每个消费组获取 MQConsumeInner 对象(其实就是 DefaultMQPushConsumerImpl 或 DefaultMQPullConsumerImpl 对象),并执行其 doRebalance 方法。
2.1.2 DefaultMQPushConsumerImpl.doRebalance
RebalanceImpl doRebalance
到这里,经过层层对象委托,终于进入到实现消息负载分发的核心地带了,RebalanceImpl 类,我们应该停下脚步,先重点认识一下RebalanceImpl类。
3、RebalanceImpl 类初探
我们先来看看其核心属性:
- ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable
消息处理队列。 - ConcurrentMap<String, Set topicSubscribeInfoTable
topic 的队列信息。 - ConcurrentMap<String, SubscriptionData> subscriptionInner
订阅信息。 - String consumerGroup
消费组名称。 - MessageModel messageModel
消费模式。 - AllocateMessageQueueStrategy allocateMessageQueueStrategy
队列分配算法。 - MQClientInstance mqClientFactory
MQ 客户端实例。
下面还是从doRebalance方法入手:
1、根据 topic 来进行负载。
2、移除 MessageQueue,如果 MesageQueue 的 topic 不在订阅的主题中,接下来重点关注 rebalanceByTopic 方法。
RebalanceImpl rebalanceByTopic详解:
part1:根据消息消费模式(集群还是广播)我们先重点看集群模式。
part2: 获取主题的消息消费队列、主题与该消费组的消费者id列表,任意一个为空,则退出方法的执行。
part3: 主要是对主题的消息队列排序、消费者ID进行排序,然后利用分配算法,计算当前消费者ID(mqClient.clientId) 分配出需要拉取的消息队列。
具体的消息消费队列分配算法参考:AllocateMessageQueueStrategy的实现类,具体算法实现就不细化研究了。
在这里举一个最简单的队列分配机制,,比如一个topic 有8个消息队列(q1,q2,q3,q4,q5,q6,q7,q8) ,比如有三个消费者 c1,c2,c3
一种队列负载算法: q1,q4,q7 分给c1,,q2,q5,q8 c2,,q3,q5 给 c3。下文会专题研究一下负载算法。
part4: 更新主题的消息消费处理队列,并返回消息队列负载是否改变。
遍历消息队列-处理队列缓存,只处理 mq 的主题与该主题相关的 ProcessQueue, 如果 mq 不在当期主题的处理范围内(由于消息队列数量变化等原因,消费者的消费队列发生了变化,该消息队列已经分配给别的消费者去消费了),首先设置该消息队列为丢弃 (dropped 为 voliate 修饰),可以及时的阻止继续向 ProceeQueue 中拉取数据,然后执行removeUnecessaryMessageQueue(mq,pq) 来判断是否需要移除。
既然我们都是从Push进入的,本文以Push模式展开(同时我们也可以先思考思考push,pull差别),移步到RebalancePushImpl。
目前只看非顺序消息,逻辑就比较简单了,丢弃之前,先将 MessageQueue 消息消费进度 持久化,然后丢弃,重新被其他消费者加载。顺序消息将会本系列的后续文章中详细介绍。
接下来处理 MessageQueue 的 ProcessQueue,也就是在 ProcessQueueTable 中没有 mq 的处理队列(因为重新负载后,可能会分配一些新的队列)。
主要就是在内存中移除 MessageQueue 的 offerset, 然后计算下一个拉取偏移量,然后每一个MessageQueue创建一个拉取任务(PullRequest)。
RebalancePushImpl
PullMessageService
往PullServiceMessage中的 pullRequestQueue中放入PullRequest,则PullMessageService线程 的run方法就不会阻塞
part5:如果消息负载发生变化,需处理
主要是调整主题小各个队列的拉取阔值。
这里,主要看出来当消费者挂断后,或主题消息队列动态变化后,消息负载会发生变化的重新分布情况。
总结:
本文主要阐述了消息消费端负载机制,这里消息非顺序消息机制就梳理到这里了,大概再总结一下:
1、首先RebalanceService线程启动,为消费者分配消息队列,其实每一个MessageQueue 会构建一个 PullRequest 对象,然后通过 RebalanceImpl 将 PullRequest放入到 PullMessageService 线程的 LinkedBlockingQueue, 进而唤醒 queue.take()方法,然后执行 DefaultMQPushConsumerImpl 的 pullMessage,通过网络从broker端拉取消息,一次最多拉取的消息条数可配置,默认为32条,然后然后将拉取的消息,执行过滤等,然后封装成任务(ConsumeRequest),提交到消费者的线程池去执行,每次消费消息后,又将该 PullRequest 放入到 PullMessageService中(DefaultMQPushConsumerImpl 的机制就是pullInterval 为 0;
下文预告:
CommitLog写入与ConsumeQueue队列的持久化机制
消息消费进度存储机制,再谈RocketMQ消息存储
RocketMQ顺序消息
RocketMQ主从机制
相关文章:
RocketMQ源码分析消息消费机制—-消费端消息负载均衡机制与重新分布
1、消息消费需要解决的问题 首先再次重复啰嗦一下 RocketMQ 消息消费的一些基本元素的关系 主题 —》 消息队列(MessageQueue) 1 对多。 主题 —》 消息生产者,一般主题会由多个生产者组成,生产者组。 主题 —》 消息消费者,一般一个主题…...
华为OD机试真题Python实现【数据分类】真题+解题思路+代码(20222023)
数据分类 题目 对一个数据a进行分类, 分类方法是,此数据a(4 个字节大小)的 4 个字节相加对一个给定值b取模, 如果得到的结果小于一个给定的值c则数据a为有效类型,其类型为取模的值。 如果得到的结果大于或者等于c则数据a为无效类型。 比如一个数据a = 0x01010101,b = 3…...
vue项目中引入字体包
问题: 项目开发过程中,因UI的显示要求,需要引入一些字体,那如何引入外部字体呢?很简单,只需要以下3步 一 下载对应的字体包文件,放置到我们的项目中 比如我需要PingFangSC的系列字体&#…...
Linux 文件相关操作
文件相关操作 编辑文件 命令: vi 文件名 然后输入i进入编辑模式 编辑完成后输入esc退出编辑 输入:wq保存即便目录下没有这个文件,也可以想使用vi 文件名进行编辑,保存退出后会创建这个文件 查看文件内容 命令: cat 文件名复…...
【计算机网络】应用题方法总结
0.前言本篇博客主要记录自己在学习到的部分解决计算机网络应用题方法,主要参考视频如下:计算机网络期末复习 应用题_哔哩哔哩_bilibili【计算机网络】子网划分题型总结_哔哩哔哩_bilibili循环冗余码step 1:确定冗余码长度。多项式最高位即为冗…...
Linux 浅谈之性能分析工具 perf
Linux 浅谈之性能分析工具 perf HELLO,各位博友好,我是阿呆 🙈🙈🙈 这里是 Linux 浅谈系列,收录在操作系统专栏中 😜😜😜 本系列将记录一些阿呆个人整理的 OS 相关知识…...
代码随想录-Day7:四数相加、三数之和
454. 四数相加 II 给你四个整数数组 nums1、nums2、nums3 和 nums4 ,数组长度都是 n ,请你计算有多少个元组 (i, j, k, l) 能满足: 0 < i, j, k, l < nnums1[i] nums2[j] nums3[k] nums4[l] 0示例 1: 输入࿱…...
jsp在线考试系统Myeclipse开发mysql数据库web结构java编程计算机网页项目
一、源码特点 jsp 在线考试系统 是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5 开发,数据库为Mysql,使用j…...
【总结】2023数学建模美赛!收官!
今年的美赛时间是2.17-2.21,这学期疫情放开了之后管的没那么严了,我们小组就都提前一天到学校了,全力准备17号的比赛。 时间流程 刚拿到6个题的时候,我们三个人一人看两个题,每个人从两个题中再选出来一个自己觉得有…...
C# GDI+ winform绘图知识总结
一、Graphics GDI是GDI(Windows Graphics Device Interface)的后继者,它是.NET Framework为操作图形提供的应用程序编程接口,主要用在窗体上绘制各种图形图像,可以用于绘制各种数据图像、数学仿真等。 Graphics类是G…...
【研究空间复用及函数调用问题】
本篇总结函数调用过程会存在的一些奇怪现象,空间复用问题,其实本质上涉及函数调用的底层原理,理解函数栈帧的创建和销毁这样的问题直接迎刃而解。1.空间复用问题案例1案例22.函数调用过程不清晰问题案例33.总结1.空间复用问题 案例1 我们先…...
SQL常用查询语句
SELECT语句用于查询数据库中的内容 目录 1 查询指定表的所有内容 2 显示所有行的指定列 3 显示指定行的指定列 4 对查询结果进行排序 4.1 按照单一字段排序 4.2 多重排序 5 查询数据总数 5.1 查询一共有多少行 5.2 统计符合条件的有多少行 6 给查询出来的…...
【Python实战】一大波高颜值主播来袭:快看,某网站颜值排名,为了这个排名我可是大费周章啦,第一名不亏是你...(人脸检测+爬虫实战)
导语 民间一直有个传闻......「听说某站的小哥哥小姐姐颜值都很高哦!」 (不是颜值高才能加入,是优秀的人恰好颜值高) 所有文章完整的素材源码都在👇👇 粉丝白嫖源码福利,请移步至CSDN社区或文末…...
Linux进程学习【三】
✨个人主页: Yohifo 🎉所属专栏: Linux学习之旅 🎊每篇一句: 图片来源 🎃操作环境: CentOS 7.6 阿里云远程服务器 Perseverance is not a long race; it is many short races one after another…...
Spring自动装配的底层逻辑
Spring是如何自动装配Bean的?看源码一些自己的理解,如有错漏,请指正 使用Spring之前我们要先去web.xml中设置一下Spring的配置文件,在Spring的配置文件中,是通过component-scan扫描器去扫描base-package底下所有的类装…...
华为OD机试 - 数组合并(C++) | 附带编码思路 【2023】
刷算法题之前必看 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为 OD 清单查看地址:https://blog.csdn.net/hihell/category_12199283.html 华为OD详细说明:https://dream.blog.csdn.net/article/details/128980730 华为OD机试题…...
在vue3+ts的项目中,如何解决vant组件自带表单校验不生效?
问题描述: 点击发送验证码后,为了让逻辑更加严谨,使用了vant组件自带的表单校验,进行二次校验,防止验证码发送成功后,登录手机号被二次修改,但根据官网描述cv之后不生效,甚至连获取…...
华为OD机试真题Python实现【子序列长度】真题+解题思路+代码(20222023)
子序列长度 题目 有 N 个正整数组成的一个序列 给定一个整数sum 求长度最长的的连续子序列使他们的和等于sum 返回次子序列的长度 如果没有满足要求的序列 返回-1 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD机试(Python)真题目录汇总 ## 输入 两行输入 第一行…...
【答疑现场】我一个搞嵌入式的,有必要学习Python吗?
【答疑现场】我一个搞嵌入式的,有必要学习Python吗? 文章目录1 写在前面2 一个结论3 Python在嵌入式领域能干啥事4 Python是用来干大事的5 友情推荐6 福利活动大家好,我是架构师李肯,一个专注于嵌入式物联网系统架构设计的攻城狮。…...
MySQL存表报错问题 Incorrect string value
MySQL存表报错问题 Incorrect string value 问题 Incorrect string value: ‘\xF0\xA8\xA5\xA5\xE5\xAD…’ for column ‘xxxxxxx’ at row 1 意思是错误的字符,常出现在添加中文字符的时候。这个问题的产生原因主要是因为一些特色中文字符或者Emoji表情占4个字…...
wordpress后台更新后 前端没变化的解决方法
使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…...
AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)
在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马(服务器方面的)的原理,连接,以及各种木马及连接工具的分享 文件木马:https://w…...
【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...
Java编程之桥接模式
定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...
使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...
GO协程(Goroutine)问题总结
在使用Go语言来编写代码时,遇到的一些问题总结一下 [参考文档]:https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现: 今天在看到这个教程的时候,在自己的电…...
FFmpeg avformat_open_input函数分析
函数内部的总体流程如下: avformat_open_input 精简后的代码如下: int avformat_open_input(AVFormatContext **ps, const char *filename,ff_const59 AVInputFormat *fmt, AVDictionary **options) {AVFormatContext *s *ps;int i, ret 0;AVDictio…...
恶补电源:1.电桥
一、元器件的选择 搜索并选择电桥,再multisim中选择FWB,就有各种型号的电桥: 电桥是用来干嘛的呢? 它是一个由四个二极管搭成的“桥梁”形状的电路,用来把交流电(AC)变成直流电(DC)。…...
