RocketMQ源码分析消息消费机制—-消费端消息负载均衡机制与重新分布
1、消息消费需要解决的问题
首先再次重复啰嗦一下 RocketMQ 消息消费的一些基本元素的关系
主题 —》 消息队列(MessageQueue) 1 对多。
主题 —》 消息生产者,一般主题会由多个生产者组成,生产者组。
主题 —》 消息消费者,一般一个主题也会被多个消费者消费。
那消息消费至少需要解决如下问题:
1、一个消费组中多个消费者是如何对消息队列(1个主题多个消息队列)进行负载消费的。
2、一个消费者中多个线程又是如何协作(并发)的消费分配给该消费者的消息队列中的消息呢?
3、消息消费进度如何保存,包括MQ是如何知道消息是否正常被消费了。
4、RocketMQ 推拉模式实现机制。
再提一个业界关于消费者与消息队列的消费规则。
1个消费者可以消费多个消息队列,但一个消息队列同一时间只能被一个消费者消费,这又是如何实现的呢?
本文紧接着上文:消息消费概述 。
继续探讨消息分发与消费端负载均衡。
我们从上文知道,PullMessageService 线程主要是负责 pullRequestQueue 中的 PullResult,那问题来了,pullRequestQueue 中的数据从哪来,在什么时候由谁来填充呢。
那我们就先沿着这条线索分析下去,看一下 PullMessageService 的 pullReqestQueue 添加元素的方法的调用链条如下:
也就是调用链:
RebalanceService. run()MQClientInstance.doRebalance()DefaultMQPulConsumerImpl.doRebalance()RebalanceImpl.doRebalance()RebalanceImpl.rebalanceByTopicRebalanceImpl.updateProcessQueueTableInRebalanceRebalanceImpl.dispatchPullRequestDefaultMQPushConsumerImpl.executePullRequestImmediately
从上面可以直观的看出,向 PullMesssageService 的 LinkedBlockingQueue pullRequestQueue 添加 PullRequest的是 RebalanceService.run 方法,就是向 PullMessageService 中放入 PullRequest,才会驱动 PullMessageSerivce run方法的运行,如果 pullRequestQueue 中没有元素,PullMessageService 线程将被阻塞。
那么RebalanceService是何许人也,让我们一起来揭开其神秘面纱。
2、消息消费负载机制分析
2.1 RebalanceService 线程
从上面可以看出,MQClientInstance 持有一个 RebalanceService 线程并启动它。RebalanceService 线程的 run 方法比较简单,就是直接调用 mqClientFactory.doRebalance。
下面重点分步骤来详细探究 MQClientInstance.doRebalance 方法的执行流程。
2.1.1 MQClientInstance.doRebalance
循环遍历每个消费组获取 MQConsumeInner 对象(其实就是 DefaultMQPushConsumerImpl 或 DefaultMQPullConsumerImpl 对象),并执行其 doRebalance 方法。
2.1.2 DefaultMQPushConsumerImpl.doRebalance
RebalanceImpl doRebalance
到这里,经过层层对象委托,终于进入到实现消息负载分发的核心地带了,RebalanceImpl 类,我们应该停下脚步,先重点认识一下RebalanceImpl类。
3、RebalanceImpl 类初探
我们先来看看其核心属性:
- ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable
消息处理队列。 - ConcurrentMap<String, Set topicSubscribeInfoTable
topic 的队列信息。 - ConcurrentMap<String, SubscriptionData> subscriptionInner
订阅信息。 - String consumerGroup
消费组名称。 - MessageModel messageModel
消费模式。 - AllocateMessageQueueStrategy allocateMessageQueueStrategy
队列分配算法。 - MQClientInstance mqClientFactory
MQ 客户端实例。
下面还是从doRebalance方法入手:
1、根据 topic 来进行负载。
2、移除 MessageQueue,如果 MesageQueue 的 topic 不在订阅的主题中,接下来重点关注 rebalanceByTopic 方法。
RebalanceImpl rebalanceByTopic详解:
part1:根据消息消费模式(集群还是广播)我们先重点看集群模式。
part2: 获取主题的消息消费队列、主题与该消费组的消费者id列表,任意一个为空,则退出方法的执行。
part3: 主要是对主题的消息队列排序、消费者ID进行排序,然后利用分配算法,计算当前消费者ID(mqClient.clientId) 分配出需要拉取的消息队列。
具体的消息消费队列分配算法参考:AllocateMessageQueueStrategy的实现类,具体算法实现就不细化研究了。
在这里举一个最简单的队列分配机制,,比如一个topic 有8个消息队列(q1,q2,q3,q4,q5,q6,q7,q8) ,比如有三个消费者 c1,c2,c3
一种队列负载算法: q1,q4,q7 分给c1,,q2,q5,q8 c2,,q3,q5 给 c3。下文会专题研究一下负载算法。
part4: 更新主题的消息消费处理队列,并返回消息队列负载是否改变。
遍历消息队列-处理队列缓存,只处理 mq 的主题与该主题相关的 ProcessQueue, 如果 mq 不在当期主题的处理范围内(由于消息队列数量变化等原因,消费者的消费队列发生了变化,该消息队列已经分配给别的消费者去消费了),首先设置该消息队列为丢弃 (dropped 为 voliate 修饰),可以及时的阻止继续向 ProceeQueue 中拉取数据,然后执行removeUnecessaryMessageQueue(mq,pq) 来判断是否需要移除。
既然我们都是从Push进入的,本文以Push模式展开(同时我们也可以先思考思考push,pull差别),移步到RebalancePushImpl。
目前只看非顺序消息,逻辑就比较简单了,丢弃之前,先将 MessageQueue 消息消费进度 持久化,然后丢弃,重新被其他消费者加载。顺序消息将会本系列的后续文章中详细介绍。
接下来处理 MessageQueue 的 ProcessQueue,也就是在 ProcessQueueTable 中没有 mq 的处理队列(因为重新负载后,可能会分配一些新的队列)。
主要就是在内存中移除 MessageQueue 的 offerset, 然后计算下一个拉取偏移量,然后每一个MessageQueue创建一个拉取任务(PullRequest)。
RebalancePushImpl
PullMessageService
往PullServiceMessage中的 pullRequestQueue中放入PullRequest,则PullMessageService线程 的run方法就不会阻塞
part5:如果消息负载发生变化,需处理
主要是调整主题小各个队列的拉取阔值。
这里,主要看出来当消费者挂断后,或主题消息队列动态变化后,消息负载会发生变化的重新分布情况。
总结:
本文主要阐述了消息消费端负载机制,这里消息非顺序消息机制就梳理到这里了,大概再总结一下:
1、首先RebalanceService线程启动,为消费者分配消息队列,其实每一个MessageQueue 会构建一个 PullRequest 对象,然后通过 RebalanceImpl 将 PullRequest放入到 PullMessageService 线程的 LinkedBlockingQueue, 进而唤醒 queue.take()方法,然后执行 DefaultMQPushConsumerImpl 的 pullMessage,通过网络从broker端拉取消息,一次最多拉取的消息条数可配置,默认为32条,然后然后将拉取的消息,执行过滤等,然后封装成任务(ConsumeRequest),提交到消费者的线程池去执行,每次消费消息后,又将该 PullRequest 放入到 PullMessageService中(DefaultMQPushConsumerImpl 的机制就是pullInterval 为 0;
下文预告:
CommitLog写入与ConsumeQueue队列的持久化机制
消息消费进度存储机制,再谈RocketMQ消息存储
RocketMQ顺序消息
RocketMQ主从机制
相关文章:
RocketMQ源码分析消息消费机制—-消费端消息负载均衡机制与重新分布
1、消息消费需要解决的问题 首先再次重复啰嗦一下 RocketMQ 消息消费的一些基本元素的关系 主题 —》 消息队列(MessageQueue) 1 对多。 主题 —》 消息生产者,一般主题会由多个生产者组成,生产者组。 主题 —》 消息消费者,一般一个主题…...
华为OD机试真题Python实现【数据分类】真题+解题思路+代码(20222023)
数据分类 题目 对一个数据a进行分类, 分类方法是,此数据a(4 个字节大小)的 4 个字节相加对一个给定值b取模, 如果得到的结果小于一个给定的值c则数据a为有效类型,其类型为取模的值。 如果得到的结果大于或者等于c则数据a为无效类型。 比如一个数据a = 0x01010101,b = 3…...
vue项目中引入字体包
问题: 项目开发过程中,因UI的显示要求,需要引入一些字体,那如何引入外部字体呢?很简单,只需要以下3步 一 下载对应的字体包文件,放置到我们的项目中 比如我需要PingFangSC的系列字体&#…...
Linux 文件相关操作
文件相关操作 编辑文件 命令: vi 文件名 然后输入i进入编辑模式 编辑完成后输入esc退出编辑 输入:wq保存即便目录下没有这个文件,也可以想使用vi 文件名进行编辑,保存退出后会创建这个文件 查看文件内容 命令: cat 文件名复…...
【计算机网络】应用题方法总结
0.前言本篇博客主要记录自己在学习到的部分解决计算机网络应用题方法,主要参考视频如下:计算机网络期末复习 应用题_哔哩哔哩_bilibili【计算机网络】子网划分题型总结_哔哩哔哩_bilibili循环冗余码step 1:确定冗余码长度。多项式最高位即为冗…...
Linux 浅谈之性能分析工具 perf
Linux 浅谈之性能分析工具 perf HELLO,各位博友好,我是阿呆 🙈🙈🙈 这里是 Linux 浅谈系列,收录在操作系统专栏中 😜😜😜 本系列将记录一些阿呆个人整理的 OS 相关知识…...
代码随想录-Day7:四数相加、三数之和
454. 四数相加 II 给你四个整数数组 nums1、nums2、nums3 和 nums4 ,数组长度都是 n ,请你计算有多少个元组 (i, j, k, l) 能满足: 0 < i, j, k, l < nnums1[i] nums2[j] nums3[k] nums4[l] 0示例 1: 输入࿱…...
jsp在线考试系统Myeclipse开发mysql数据库web结构java编程计算机网页项目
一、源码特点 jsp 在线考试系统 是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5 开发,数据库为Mysql,使用j…...
【总结】2023数学建模美赛!收官!
今年的美赛时间是2.17-2.21,这学期疫情放开了之后管的没那么严了,我们小组就都提前一天到学校了,全力准备17号的比赛。 时间流程 刚拿到6个题的时候,我们三个人一人看两个题,每个人从两个题中再选出来一个自己觉得有…...
C# GDI+ winform绘图知识总结
一、Graphics GDI是GDI(Windows Graphics Device Interface)的后继者,它是.NET Framework为操作图形提供的应用程序编程接口,主要用在窗体上绘制各种图形图像,可以用于绘制各种数据图像、数学仿真等。 Graphics类是G…...
【研究空间复用及函数调用问题】
本篇总结函数调用过程会存在的一些奇怪现象,空间复用问题,其实本质上涉及函数调用的底层原理,理解函数栈帧的创建和销毁这样的问题直接迎刃而解。1.空间复用问题案例1案例22.函数调用过程不清晰问题案例33.总结1.空间复用问题 案例1 我们先…...
SQL常用查询语句
SELECT语句用于查询数据库中的内容 目录 1 查询指定表的所有内容 2 显示所有行的指定列 3 显示指定行的指定列 4 对查询结果进行排序 4.1 按照单一字段排序 4.2 多重排序 5 查询数据总数 5.1 查询一共有多少行 5.2 统计符合条件的有多少行 6 给查询出来的…...
【Python实战】一大波高颜值主播来袭:快看,某网站颜值排名,为了这个排名我可是大费周章啦,第一名不亏是你...(人脸检测+爬虫实战)
导语 民间一直有个传闻......「听说某站的小哥哥小姐姐颜值都很高哦!」 (不是颜值高才能加入,是优秀的人恰好颜值高) 所有文章完整的素材源码都在👇👇 粉丝白嫖源码福利,请移步至CSDN社区或文末…...
Linux进程学习【三】
✨个人主页: Yohifo 🎉所属专栏: Linux学习之旅 🎊每篇一句: 图片来源 🎃操作环境: CentOS 7.6 阿里云远程服务器 Perseverance is not a long race; it is many short races one after another…...
Spring自动装配的底层逻辑
Spring是如何自动装配Bean的?看源码一些自己的理解,如有错漏,请指正 使用Spring之前我们要先去web.xml中设置一下Spring的配置文件,在Spring的配置文件中,是通过component-scan扫描器去扫描base-package底下所有的类装…...
华为OD机试 - 数组合并(C++) | 附带编码思路 【2023】
刷算法题之前必看 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为 OD 清单查看地址:https://blog.csdn.net/hihell/category_12199283.html 华为OD详细说明:https://dream.blog.csdn.net/article/details/128980730 华为OD机试题…...
在vue3+ts的项目中,如何解决vant组件自带表单校验不生效?
问题描述: 点击发送验证码后,为了让逻辑更加严谨,使用了vant组件自带的表单校验,进行二次校验,防止验证码发送成功后,登录手机号被二次修改,但根据官网描述cv之后不生效,甚至连获取…...
华为OD机试真题Python实现【子序列长度】真题+解题思路+代码(20222023)
子序列长度 题目 有 N 个正整数组成的一个序列 给定一个整数sum 求长度最长的的连续子序列使他们的和等于sum 返回次子序列的长度 如果没有满足要求的序列 返回-1 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD机试(Python)真题目录汇总 ## 输入 两行输入 第一行…...
【答疑现场】我一个搞嵌入式的,有必要学习Python吗?
【答疑现场】我一个搞嵌入式的,有必要学习Python吗? 文章目录1 写在前面2 一个结论3 Python在嵌入式领域能干啥事4 Python是用来干大事的5 友情推荐6 福利活动大家好,我是架构师李肯,一个专注于嵌入式物联网系统架构设计的攻城狮。…...
MySQL存表报错问题 Incorrect string value
MySQL存表报错问题 Incorrect string value 问题 Incorrect string value: ‘\xF0\xA8\xA5\xA5\xE5\xAD…’ for column ‘xxxxxxx’ at row 1 意思是错误的字符,常出现在添加中文字符的时候。这个问题的产生原因主要是因为一些特色中文字符或者Emoji表情占4个字…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
leetcodeSQL解题:3564. 季节性销售分析
leetcodeSQL解题:3564. 季节性销售分析 题目: 表:sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...
MySQL 8.0 OCP 英文题库解析(十三)
Oracle 为庆祝 MySQL 30 周年,截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始,将英文题库免费公布出来,并进行解析,帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
【笔记】WSL 中 Rust 安装与测试完整记录
#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统:Ubuntu 24.04 LTS (WSL2)架构:x86_64 (GNU/Linux)Rust 版本:rustc 1.87.0 (2025-05-09)Cargo 版本:cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...
【若依】框架项目部署笔记
参考【SpringBoot】【Vue】项目部署_no main manifest attribute, in springboot-0.0.1-sn-CSDN博客 多一个redis安装 准备工作: 压缩包下载:http://download.redis.io/releases 1. 上传压缩包,并进入压缩包所在目录,解压到目标…...
电脑桌面太单调,用Python写一个桌面小宠物应用。
下面是一个使用Python创建的简单桌面小宠物应用。这个小宠物会在桌面上游荡,可以响应鼠标点击,并且有简单的动画效果。 import tkinter as tk import random import time from PIL import Image, ImageTk import os import sysclass DesktopPet:def __i…...
