RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析
前言
SubscriptionGroupManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。
源码版本:4.9.3
源码架构图

核心数据结构
主要的数据结构比较简单,维护了Map<消费组名称, 订阅组配置>的映射关系。
// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// Map<消费组名称,订阅组配置>private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);// 内存数据版本号private final DataVersion dataVersion = new DataVersion();
}
深入看下SubscriptionGroupConfig 的数据结构。
public class SubscriptionGroupConfig {// 消费组名称private String groupName;// 是否开启消费private boolean consumeEnable = true;// 是否允许消费最早消息private boolean consumeFromMinEnable = true;// 是否允许广播消费private boolean consumeBroadcastEnable = true;// 重试队列数private int retryQueueNums = 1;// 重试最大次数private int retryMaxTimes = 16;// brokerIdprivate long brokerId = MixAll.MASTER_ID;// 当产生慢消费时,选择第几个brokerprivate long whichBrokerWhenConsumeSlowly = 1;// 是否通知消费者ids变化private boolean notifyConsumerIdsChangedEnable = true;
}
核心数据行为
数据行为主要都是对上面提到的数据结构的维护,代码 + 注释如下:
// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {public SubscriptionGroupManager() {this.init();}public SubscriptionGroupManager(BrokerController brokerController) {this.brokerController = brokerController;this.init();}private void init() {{// 初始化系统消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化过滤服务消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化自测消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化http代理消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PULL消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true); // 激活广播模式this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PERMISSION消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_OWNER消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);}}// 更新订阅配置,且更新内存数据版本号public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);if (old != null) {log.info("update subscription group config, old: {} new: {}", old, config);} else {log.info("create new subscription group, {}", config);}this.dataVersion.nextVersion();this.persist();}// 失效消费组public void disableConsume(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);if (old != null) {old.setConsumeEnable(false);this.dataVersion.nextVersion();}}// 查找指定消费组的订阅配置public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}this.dataVersion.nextVersion();this.persist();}}return subscriptionGroupConfig;}// 将内存数据结构编码成字符串@Overridepublic String encode() {return this.encode(false);}// 获取配置文件路径@Overridepublic String configFilePath() {return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}// 从字符串中恢复数据,写回内存数据结构@Overridepublic void decode(String jsonString) {if (jsonString != null) {SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);this.dataVersion.assignNewOne(obj.dataVersion);this.printLoadDataWhenFirstBoot(obj);}}}// 将内存数据结构编码成字符串public String encode(final boolean prettyFormat) {return RemotingSerializable.toJson(this, prettyFormat);}// 当第一次启动时,打印加载数据时的日志private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();while (it.hasNext()) {Entry<String, SubscriptionGroupConfig> next = it.next();log.info("load exist subscription group, {}", next.getValue().toString());}}public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {return subscriptionGroupTable;}public DataVersion getDataVersion() {return dataVersion;}// 删除指定消费组的订阅配置public void deleteSubscriptionGroupConfig(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);if (old != null) {log.info("delete subscription group OK, subscription group:{}", old);this.dataVersion.nextVersion();this.persist();} else {log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);}}
}
相关文章:
RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析
前言 SubscriptionGroupManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。 源码版本:4.9.3 源码架构图 核心数据结构 主要的数据结构比较简单&am…...
go-zero开发入门-API网关鉴权开发示例
本文是go-zero开发入门-API网关开发示例一文的延伸,继续之前请先阅读此文。 在项目根目录下创建子目录 middleware,在此目录下创建文件 auth.go,内容如下: // 鉴权中间件 package middlewareimport ("context""e…...
[LLM]nanoGPT---训练一个写唐诗的GPT
karpathy/nanoGPT: The simplest, fastest repository for training/finetuning medium-sized GPTs. (github.com) 原有模型使用的莎士比亚的戏剧数据集, 如果需要一个写唐诗机器人,需要使用唐诗的文本数据, 一个不错的唐诗,宋词数据的下载…...
docker compose部署wordpress
准备机器: 192.168.58.151 (关闭防火墙和selinux) 安装好docker服务 (详细参照:http://t.csdnimg.cn/usG0s 中的国内源安装docker) 部署wordpress: 创建目录: [rootdocker ~]# mkdir…...
【docker四】使用Docker-compose一键部署Wordpress平台
目录 一、YAML 文件格式及编写注意事项(重要) 1、yaml文件使用时注意事项: 2、yaml文件的基本数据结构: 2.1、声明变量(标量。是单个的不可再分的值,类型:字符串,整数,…...
HTML程序大全(1):简易计算器
HTML代码,主要创建了几个按钮。 <div class"container"><div class"output" id"output">0</div><button class"button" onclick"clearOutput()" id"clear">C</button>…...
esp32服务器与android客户端的tcp通讯
esp32 //esp32作为服务端 #include <WiFi.h>#define LED_BUILTIN 2 // 创建热点 const char *ssid "ESP32"; const char *password "12345678"; const int port 1122; //端口 WiFiServer server(port); void setup() {delay(5000);pinMode(LED_…...
自定义Mybatis LanguageDriver性能优化
场景:高并发情况下mybatis 动态sql 解析 锁问题优化 优化前 并发测试 XMLLanguageDriver 类 的 createSqlSource 方法有锁 而且 每次执行时都会走该方法 优化前 : 线程有Block 优化后的 LanguageDriver public class CustomXMLLanguageDriver im…...
DevEco Studio 鸿蒙(HarmonyOS)项目结构
DevEco Studio 鸿蒙(HarmonyOS)项目结构 一、操作环境 操作系统: Windows 10 专业版 IDE:DevEco Studio 3.1 SDK:HarmonyOS 3.1 二、项目结构 创建简单的Hello World移动应用项目结构如下图 由上到下说明各个文件夹的作用 .hvigor:存…...
Springboot整合篇Druid
一、概述 1.1简介 Druid 是阿里巴巴开源平台上一个数据库连接池实现,结合了 C3P0、DBCP 等 DB 池的优点,同时加入了日志监控。 它本身还自带一个监控平台,可以查看时时产生的sql、uri等监控数据,可以排查慢sql、慢请求࿰…...
uniapp 微信小程序 封装axios 包含请求拦截、响应拦截、无感刷新令牌功能
前言: 1、为什么不适用uniapp自带的请求功能? 答:uniapp自带的请求功能,再刷新了令牌后,重新请求返回的数据无法返回给发起请求的方法。也就是说,刷新令牌后重新发起的请求和第一次发起请求的方法是割裂的。…...
C语言精选——选择题Day41
第一题 1. 有以下程序段: char *p, *q; p (char *)malloc(sizeof(char) * 20); q p; scanf("%s %s", p, q); printf("%s %s\n", p, q); 若从键盘输入:abc def↙,则输出结果是( ) A:d…...
Tomcat头上有个叉叉
问题原因: 这是因为它就是个空的tomcat,并没有导入项目运行 解决方案: war模式:发布模式,正式发布时用,将WEB工程以war包的形式上传到服务器 war exploded模式:开发时用,将WEB工程的文件夹直接…...
Linux shell编程学习笔记35:seq
0 前言 在使用 for 循环语句时,我们经常使用到序列。比如: for i in 1 2 3 4 5 6 7 8 9 10; do echo "$i * 2 $(expr $i \* 2)"; done 其中的 1 2 3 4 5 6 7 8 9 10;就是一个整数序列 。 为了方便我们使用数字序列,Linux提供了…...
Nougat:结合光学神经网络,引领学术PDF文档的智能解析、挖掘学术论文PDF的价值
Nougat:结合光学神经网络,引领学术PDF文档的智能解析、挖掘学术论文PDF的价值 这是Nougat的官方存储库,Nougat是一种学术文档PDF解析器,可以理解LaTeX数学和表格。 Project page: https://facebookresearch.github.io/nougat/ …...
涉密网络的IP查询防护策略
涉密网络的安全性对于维护国家、企业及个人的核心利益至关重要。在当今数字化时代,网络攻击日益猖獗,其中IP查询是攻击者获取目标信息的一种常见手段。本文将探讨涉密网络中防护IP查询的关键策略,以确保网络的机密性和安全性。 1. 专用VPN和…...
基础算法(1):排序(1):选择排序
今天对算法产生了兴趣,开始学习基础算法,比如排序,模拟,贪心,递推等内容,算法是很重要的,它是解决某个问题的特定方法,程序数据结构算法,所以对算法的学习是至关重要的&a…...
GeoTrust OV证书
当谈到网站安全性和可信度时,GeoTrust OV证书是一个备受推崇的选择。作为一家备受尊敬的数字证书颁发机构,GeoTrust以其卓越的品牌声誉和高质量的产品而闻名于世。GeoTrust OV证书提供了一系列的安全功能,同时还具有出色的性价比,…...
第一个“hello Android”程序
1、首先安装Android studio(跳过) Android Studio是由Google推出的官方集成开发环境(IDE),专门用于Android应用程序的开发。它是基于JetBrains的IntelliJ IDEA IDE构建的,提供了丰富的功能和工具࿰…...
docker-compose安装nacos和msql
docker-compose安装nacos和msql 前言前提已经安装docker-compose,如果没有安装,则可以查看上面系列文章中的安装教程。并且文章中使用的是mobaxterm连接虚拟机。 1、下载2、创建并运行 前言 前提已经安装docker-compose,如果没有安装&#x…...
从零实现富文本编辑器#5-编辑器选区模型的状态结构表达
先前我们总结了浏览器选区模型的交互策略,并且实现了基本的选区操作,还调研了自绘选区的实现。那么相对的,我们还需要设计编辑器的选区表达,也可以称为模型选区。编辑器中应用变更时的操作范围,就是以模型选区为基准来…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
【Linux】Linux 系统默认的目录及作用说明
博主介绍:✌全网粉丝23W,CSDN博客专家、Java领域优质创作者,掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围:SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...
