当前位置: 首页 > news >正文

RocketMQ源码分析消息消费机制—-消费端消息负载均衡机制与重新分布

1、消息消费需要解决的问题

首先再次重复啰嗦一下 RocketMQ 消息消费的一些基本元素的关系

主题 —》 消息队列(MessageQueue) 1 对多。

主题 —》 消息生产者,一般主题会由多个生产者组成,生产者组。

主题 —》 消息消费者,一般一个主题也会被多个消费者消费。

那消息消费至少需要解决如下问题:

1、一个消费组中多个消费者是如何对消息队列(1个主题多个消息队列)进行负载消费的。

2、一个消费者中多个线程又是如何协作(并发)的消费分配给该消费者的消息队列中的消息呢?

3、消息消费进度如何保存,包括MQ是如何知道消息是否正常被消费了。

4、RocketMQ 推拉模式实现机制。

再提一个业界关于消费者与消息队列的消费规则。

1个消费者可以消费多个消息队列,但一个消息队列同一时间只能被一个消费者消费,这又是如何实现的呢?

本文紧接着上文:消息消费概述 。

继续探讨消息分发与消费端负载均衡。

我们从上文知道,PullMessageService 线程主要是负责 pullRequestQueue 中的 PullResult,那问题来了,pullRequestQueue 中的数据从哪来,在什么时候由谁来填充呢。

那我们就先沿着这条线索分析下去,看一下 PullMessageService 的 pullReqestQueue 添加元素的方法的调用链条如下:

也就是调用链:

RebalanceService. run()MQClientInstance.doRebalance()DefaultMQPulConsumerImpl.doRebalance()RebalanceImpl.doRebalance()RebalanceImpl.rebalanceByTopicRebalanceImpl.updateProcessQueueTableInRebalanceRebalanceImpl.dispatchPullRequestDefaultMQPushConsumerImpl.executePullRequestImmediately

从上面可以直观的看出,向 PullMesssageService 的 LinkedBlockingQueue pullRequestQueue 添加 PullRequest的是 RebalanceService.run 方法,就是向 PullMessageService 中放入 PullRequest,才会驱动 PullMessageSerivce run方法的运行,如果 pullRequestQueue 中没有元素,PullMessageService 线程将被阻塞。

那么RebalanceService是何许人也,让我们一起来揭开其神秘面纱。

2、消息消费负载机制分析

2.1 RebalanceService 线程

从上面可以看出,MQClientInstance 持有一个 RebalanceService 线程并启动它。RebalanceService 线程的 run 方法比较简单,就是直接调用 mqClientFactory.doRebalance。

下面重点分步骤来详细探究 MQClientInstance.doRebalance 方法的执行流程。

2.1.1 MQClientInstance.doRebalance

循环遍历每个消费组获取 MQConsumeInner 对象(其实就是 DefaultMQPushConsumerImpl 或 DefaultMQPullConsumerImpl 对象),并执行其 doRebalance 方法。

2.1.2 DefaultMQPushConsumerImpl.doRebalance

RebalanceImpl doRebalance

到这里,经过层层对象委托,终于进入到实现消息负载分发的核心地带了,RebalanceImpl 类,我们应该停下脚步,先重点认识一下RebalanceImpl类。

3、RebalanceImpl 类初探

我们先来看看其核心属性:

  • ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable
    消息处理队列。
  • ConcurrentMap<String, Set topicSubscribeInfoTable
    topic 的队列信息。
  • ConcurrentMap<String, SubscriptionData> subscriptionInner
    订阅信息。
  • String consumerGroup
    消费组名称。
  • MessageModel messageModel
    消费模式。
  • AllocateMessageQueueStrategy allocateMessageQueueStrategy
    队列分配算法。
  • MQClientInstance mqClientFactory
    MQ 客户端实例。

下面还是从doRebalance方法入手:

1、根据 topic 来进行负载。

2、移除 MessageQueue,如果 MesageQueue 的 topic 不在订阅的主题中,接下来重点关注 rebalanceByTopic 方法。

RebalanceImpl rebalanceByTopic详解:

part1:根据消息消费模式(集群还是广播)我们先重点看集群模式。

part2: 获取主题的消息消费队列、主题与该消费组的消费者id列表,任意一个为空,则退出方法的执行。

part3: 主要是对主题的消息队列排序、消费者ID进行排序,然后利用分配算法,计算当前消费者ID(mqClient.clientId) 分配出需要拉取的消息队列。

具体的消息消费队列分配算法参考:AllocateMessageQueueStrategy的实现类,具体算法实现就不细化研究了。

在这里举一个最简单的队列分配机制,,比如一个topic 有8个消息队列(q1,q2,q3,q4,q5,q6,q7,q8) ,比如有三个消费者 c1,c2,c3

一种队列负载算法: q1,q4,q7 分给c1,,q2,q5,q8 c2,,q3,q5 给 c3。下文会专题研究一下负载算法。

part4: 更新主题的消息消费处理队列,并返回消息队列负载是否改变。

遍历消息队列-处理队列缓存,只处理 mq 的主题与该主题相关的 ProcessQueue, 如果 mq 不在当期主题的处理范围内(由于消息队列数量变化等原因,消费者的消费队列发生了变化,该消息队列已经分配给别的消费者去消费了),首先设置该消息队列为丢弃 (dropped 为 voliate 修饰),可以及时的阻止继续向 ProceeQueue 中拉取数据,然后执行removeUnecessaryMessageQueue(mq,pq) 来判断是否需要移除。

既然我们都是从Push进入的,本文以Push模式展开(同时我们也可以先思考思考push,pull差别),移步到RebalancePushImpl。

目前只看非顺序消息,逻辑就比较简单了,丢弃之前,先将 MessageQueue 消息消费进度 持久化,然后丢弃,重新被其他消费者加载。顺序消息将会本系列的后续文章中详细介绍。

接下来处理 MessageQueue 的 ProcessQueue,也就是在 ProcessQueueTable 中没有 mq 的处理队列(因为重新负载后,可能会分配一些新的队列)。

主要就是在内存中移除 MessageQueue 的 offerset, 然后计算下一个拉取偏移量,然后每一个MessageQueue创建一个拉取任务(PullRequest)。

RebalancePushImpl

PullMessageService

往PullServiceMessage中的 pullRequestQueue中放入PullRequest,则PullMessageService线程 的run方法就不会阻塞

part5:如果消息负载发生变化,需处理

主要是调整主题小各个队列的拉取阔值。

这里,主要看出来当消费者挂断后,或主题消息队列动态变化后,消息负载会发生变化的重新分布情况。

总结:


本文主要阐述了消息消费端负载机制,这里消息非顺序消息机制就梳理到这里了,大概再总结一下:

1、首先RebalanceService线程启动,为消费者分配消息队列,其实每一个MessageQueue 会构建一个 PullRequest 对象,然后通过 RebalanceImpl 将 PullRequest放入到 PullMessageService 线程的 LinkedBlockingQueue, 进而唤醒 queue.take()方法,然后执行 DefaultMQPushConsumerImpl 的 pullMessage,通过网络从broker端拉取消息,一次最多拉取的消息条数可配置,默认为32条,然后然后将拉取的消息,执行过滤等,然后封装成任务(ConsumeRequest),提交到消费者的线程池去执行,每次消费消息后,又将该 PullRequest 放入到 PullMessageService中(DefaultMQPushConsumerImpl 的机制就是pullInterval 为 0;

下文预告:

CommitLog写入与ConsumeQueue队列的持久化机制

消息消费进度存储机制,再谈RocketMQ消息存储

RocketMQ顺序消息

RocketMQ主从机制

相关文章:

RocketMQ源码分析消息消费机制—-消费端消息负载均衡机制与重新分布

1、消息消费需要解决的问题 首先再次重复啰嗦一下 RocketMQ 消息消费的一些基本元素的关系 主题 —》 消息队列(MessageQueue) 1 对多。 主题 —》 消息生产者&#xff0c;一般主题会由多个生产者组成&#xff0c;生产者组。 主题 —》 消息消费者&#xff0c;一般一个主题…...

华为OD机试真题Python实现【数据分类】真题+解题思路+代码(20222023)

数据分类 题目 对一个数据a进行分类, 分类方法是,此数据a(4 个字节大小)的 4 个字节相加对一个给定值b取模, 如果得到的结果小于一个给定的值c则数据a为有效类型,其类型为取模的值。 如果得到的结果大于或者等于c则数据a为无效类型。 比如一个数据a = 0x01010101,b = 3…...

vue项目中引入字体包

问题&#xff1a; 项目开发过程中&#xff0c;因UI的显示要求&#xff0c;需要引入一些字体&#xff0c;那如何引入外部字体呢&#xff1f;很简单&#xff0c;只需要以下3步 一 下载对应的字体包文件&#xff0c;放置到我们的项目中 ​ 比如我需要PingFangSC的系列字体&#…...

Linux 文件相关操作

文件相关操作 编辑文件 命令&#xff1a; vi 文件名 然后输入i进入编辑模式 编辑完成后输入esc退出编辑 输入:wq保存即便目录下没有这个文件&#xff0c;也可以想使用vi 文件名进行编辑&#xff0c;保存退出后会创建这个文件 查看文件内容 命令&#xff1a; cat 文件名复…...

【计算机网络】应用题方法总结

0.前言本篇博客主要记录自己在学习到的部分解决计算机网络应用题方法&#xff0c;主要参考视频如下&#xff1a;计算机网络期末复习 应用题_哔哩哔哩_bilibili【计算机网络】子网划分题型总结_哔哩哔哩_bilibili循环冗余码step 1&#xff1a;确定冗余码长度。多项式最高位即为冗…...

Linux 浅谈之性能分析工具 perf

Linux 浅谈之性能分析工具 perf HELLO&#xff0c;各位博友好&#xff0c;我是阿呆 &#x1f648;&#x1f648;&#x1f648; 这里是 Linux 浅谈系列&#xff0c;收录在操作系统专栏中 &#x1f61c;&#x1f61c;&#x1f61c; 本系列将记录一些阿呆个人整理的 OS 相关知识…...

代码随想录-Day7:四数相加、三数之和

454. 四数相加 II 给你四个整数数组 nums1、nums2、nums3 和 nums4 &#xff0c;数组长度都是 n &#xff0c;请你计算有多少个元组 (i, j, k, l) 能满足&#xff1a; 0 < i, j, k, l < nnums1[i] nums2[j] nums3[k] nums4[l] 0示例 1&#xff1a; 输入&#xff1…...

jsp在线考试系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 jsp 在线考试系统 是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5 开发&#xff0c;数据库为Mysql&#xff0c;使用j…...

【总结】2023数学建模美赛!收官!

今年的美赛时间是2.17-2.21&#xff0c;这学期疫情放开了之后管的没那么严了&#xff0c;我们小组就都提前一天到学校了&#xff0c;全力准备17号的比赛。 时间流程 刚拿到6个题的时候&#xff0c;我们三个人一人看两个题&#xff0c;每个人从两个题中再选出来一个自己觉得有…...

C# GDI+ winform绘图知识总结

一、Graphics GDI是GDI&#xff08;Windows Graphics Device Interface&#xff09;的后继者&#xff0c;它是.NET Framework为操作图形提供的应用程序编程接口&#xff0c;主要用在窗体上绘制各种图形图像&#xff0c;可以用于绘制各种数据图像、数学仿真等。 Graphics类是G…...

【研究空间复用及函数调用问题】

本篇总结函数调用过程会存在的一些奇怪现象&#xff0c;空间复用问题&#xff0c;其实本质上涉及函数调用的底层原理&#xff0c;理解函数栈帧的创建和销毁这样的问题直接迎刃而解。1.空间复用问题案例1案例22.函数调用过程不清晰问题案例33.总结1.空间复用问题 案例1 我们先…...

SQL常用查询语句

SELECT语句用于查询数据库中的内容 目录 1 查询指定表的所有内容 2 显示所有行的指定列 3 显示指定行的指定列 4 对查询结果进行排序 4.1 按照单一字段排序 4.2 多重排序 5 查询数据总数 5.1 查询一共有多少行 5.2 统计符合条件的有多少行 6 给查询出来的…...

【Python实战】一大波高颜值主播来袭:快看,某网站颜值排名,为了这个排名我可是大费周章啦,第一名不亏是你...(人脸检测+爬虫实战)

导语 民间一直有个传闻......「听说某站的小哥哥小姐姐颜值都很高哦&#xff01;」 &#xff08;不是颜值高才能加入&#xff0c;是优秀的人恰好颜值高&#xff09; 所有文章完整的素材源码都在&#x1f447;&#x1f447; 粉丝白嫖源码福利&#xff0c;请移步至CSDN社区或文末…...

Linux进程学习【三】

✨个人主页&#xff1a; Yohifo &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f38a;每篇一句&#xff1a; 图片来源 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 Perseverance is not a long race; it is many short races one after another…...

Spring自动装配的底层逻辑

Spring是如何自动装配Bean的&#xff1f;看源码一些自己的理解&#xff0c;如有错漏&#xff0c;请指正 使用Spring之前我们要先去web.xml中设置一下Spring的配置文件&#xff0c;在Spring的配置文件中&#xff0c;是通过component-scan扫描器去扫描base-package底下所有的类装…...

华为OD机试 - 数组合并(C++) | 附带编码思路 【2023】

刷算法题之前必看 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为 OD 清单查看地址:https://blog.csdn.net/hihell/category_12199283.html 华为OD详细说明:https://dream.blog.csdn.net/article/details/128980730 华为OD机试题…...

在vue3+ts的项目中,如何解决vant组件自带表单校验不生效?

问题描述&#xff1a; 点击发送验证码后&#xff0c;为了让逻辑更加严谨&#xff0c;使用了vant组件自带的表单校验&#xff0c;进行二次校验&#xff0c;防止验证码发送成功后&#xff0c;登录手机号被二次修改&#xff0c;但根据官网描述cv之后不生效&#xff0c;甚至连获取…...

华为OD机试真题Python实现【子序列长度】真题+解题思路+代码(20222023)

子序列长度 题目 有 N 个正整数组成的一个序列 给定一个整数sum 求长度最长的的连续子序列使他们的和等于sum 返回次子序列的长度 如果没有满足要求的序列 返回-1 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD机试(Python)真题目录汇总 ## 输入 两行输入 第一行…...

【答疑现场】我一个搞嵌入式的,有必要学习Python吗?

【答疑现场】我一个搞嵌入式的&#xff0c;有必要学习Python吗&#xff1f; 文章目录1 写在前面2 一个结论3 Python在嵌入式领域能干啥事4 Python是用来干大事的5 友情推荐6 福利活动大家好&#xff0c;我是架构师李肯&#xff0c;一个专注于嵌入式物联网系统架构设计的攻城狮。…...

MySQL存表报错问题 Incorrect string value

MySQL存表报错问题 Incorrect string value 问题 Incorrect string value: ‘\xF0\xA8\xA5\xA5\xE5\xAD…’ for column ‘xxxxxxx’ at row 1 意思是错误的字符&#xff0c;常出现在添加中文字符的时候。这个问题的产生原因主要是因为一些特色中文字符或者Emoji表情占4个字…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装&#xff08;Encapsulation&#xff09; 定义&#xff1a;将数据&#xff08;属性&#xff09;和操作数据的方法绑定在一起&#xff0c;通过访问控制符&#xff08;private、protected、public&#xff09;隐藏内部实现细节。示例&#xff1a; public …...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

ElasticSearch搜索引擎之倒排索引及其底层算法

文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

如何在最短时间内提升打ctf(web)的水平?

刚刚刷完2遍 bugku 的 web 题&#xff0c;前来答题。 每个人对刷题理解是不同&#xff0c;有的人是看了writeup就等于刷了&#xff0c;有的人是收藏了writeup就等于刷了&#xff0c;有的人是跟着writeup做了一遍就等于刷了&#xff0c;还有的人是独立思考做了一遍就等于刷了。…...

车载诊断架构 --- ZEVonUDS(J1979-3)简介第一篇

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 做到欲望极简,了解自己的真实欲望,不受外在潮流的影响,不盲从,不跟风。把自己的精力全部用在自己。一是去掉多余,凡事找规律,基础是诚信;二是…...

goreplay

1.github地址 https://github.com/buger/goreplay 2.简单介绍 GoReplay 是一个开源的网络监控工具&#xff0c;可以记录用户的实时流量并将其用于镜像、负载测试、监控和详细分析。 3.出现背景 随着应用程序的增长&#xff0c;测试它所需的工作量也会呈指数级增长。GoRepl…...

PLC入门【4】基本指令2(SET RST)

04 基本指令2 PLC编程第四课基本指令(2) 1、运用上接课所学的基本指令完成个简单的实例编程。 2、学习SET--置位指令 3、RST--复位指令 打开软件(FX-TRN-BEG-C)&#xff0c;从 文件 - 主画面&#xff0c;“B: 让我们学习基本的”- “B-3.控制优先程序”。 点击“梯形图编辑”…...

标注工具核心架构分析——主窗口的图像显示

&#x1f3d7;️ 标注工具核心架构分析 &#x1f4cb; 系统概述 主要有两个核心类&#xff0c;采用经典的 Scene-View 架构模式&#xff1a; &#x1f3af; 核心类结构 1. AnnotationScene (QGraphicsScene子类) 主要负责标注场景的管理和交互 &#x1f527; 关键函数&…...

性能优化中,多面体模型基本原理

1&#xff09;多面体编译技术是一种基于多面体模型的程序分析和优化技术&#xff0c;它将程序 中的语句实例、访问关系、依赖关系和调度等信息映射到多维空间中的几何对 象&#xff0c;通过对这些几何对象进行几何操作和线性代数计算来进行程序的分析和优 化。 其中&#xff0…...