当前位置: 首页 > news >正文

RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析

前言

SubscriptionGroupManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。


源码版本:4.9.3

源码架构图

核心数据结构

主要的数据结构比较简单,维护了Map<消费组名称, 订阅组配置>的映射关系。

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// Map<消费组名称,订阅组配置>private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);// 内存数据版本号private final DataVersion dataVersion = new DataVersion();
}

深入看下SubscriptionGroupConfig 的数据结构。

public class SubscriptionGroupConfig {// 消费组名称private String groupName;// 是否开启消费private boolean consumeEnable = true;// 是否允许消费最早消息private boolean consumeFromMinEnable = true;// 是否允许广播消费private boolean consumeBroadcastEnable = true;// 重试队列数private int retryQueueNums = 1;// 重试最大次数private int retryMaxTimes = 16;// brokerIdprivate long brokerId = MixAll.MASTER_ID;// 当产生慢消费时,选择第几个brokerprivate long whichBrokerWhenConsumeSlowly = 1;// 是否通知消费者ids变化private boolean notifyConsumerIdsChangedEnable = true;
}

核心数据行为

数据行为主要都是对上面提到的数据结构的维护,代码 + 注释如下:

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {public SubscriptionGroupManager() {this.init();}public SubscriptionGroupManager(BrokerController brokerController) {this.brokerController = brokerController;this.init();}private void init() {{// 初始化系统消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化过滤服务消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化自测消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化http代理消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PULL消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true); // 激活广播模式this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PERMISSION消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_OWNER消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);}}// 更新订阅配置,且更新内存数据版本号public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);if (old != null) {log.info("update subscription group config, old: {} new: {}", old, config);} else {log.info("create new subscription group, {}", config);}this.dataVersion.nextVersion();this.persist();}// 失效消费组public void disableConsume(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);if (old != null) {old.setConsumeEnable(false);this.dataVersion.nextVersion();}}// 查找指定消费组的订阅配置public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}this.dataVersion.nextVersion();this.persist();}}return subscriptionGroupConfig;}// 将内存数据结构编码成字符串@Overridepublic String encode() {return this.encode(false);}// 获取配置文件路径@Overridepublic String configFilePath() {return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}// 从字符串中恢复数据,写回内存数据结构@Overridepublic void decode(String jsonString) {if (jsonString != null) {SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);this.dataVersion.assignNewOne(obj.dataVersion);this.printLoadDataWhenFirstBoot(obj);}}}// 将内存数据结构编码成字符串public String encode(final boolean prettyFormat) {return RemotingSerializable.toJson(this, prettyFormat);}// 当第一次启动时,打印加载数据时的日志private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();while (it.hasNext()) {Entry<String, SubscriptionGroupConfig> next = it.next();log.info("load exist subscription group, {}", next.getValue().toString());}}public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {return subscriptionGroupTable;}public DataVersion getDataVersion() {return dataVersion;}// 删除指定消费组的订阅配置public void deleteSubscriptionGroupConfig(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);if (old != null) {log.info("delete subscription group OK, subscription group:{}", old);this.dataVersion.nextVersion();this.persist();} else {log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);}}
}

相关文章:

RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析

前言 SubscriptionGroupManager 继承了ConfigManager配置管理组件&#xff0c;拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。 源码版本&#xff1a;4.9.3 源码架构图 核心数据结构 主要的数据结构比较简单&am…...

go-zero开发入门-API网关鉴权开发示例

本文是go-zero开发入门-API网关开发示例一文的延伸&#xff0c;继续之前请先阅读此文。 在项目根目录下创建子目录 middleware&#xff0c;在此目录下创建文件 auth.go&#xff0c;内容如下&#xff1a; // 鉴权中间件 package middlewareimport ("context""e…...

[LLM]nanoGPT---训练一个写唐诗的GPT

karpathy/nanoGPT: The simplest, fastest repository for training/finetuning medium-sized GPTs. (github.com) 原有模型使用的莎士比亚的戏剧数据集, 如果需要一个写唐诗机器人&#xff0c;需要使用唐诗的文本数据&#xff0c; 一个不错的唐诗&#xff0c;宋词数据的下载…...

docker compose部署wordpress

准备机器&#xff1a; 192.168.58.151 &#xff08;关闭防火墙和selinux&#xff09; 安装好docker服务 &#xff08;详细参照&#xff1a;http://t.csdnimg.cn/usG0s 中的国内源安装docker&#xff09; 部署wordpress: 创建目录&#xff1a; [rootdocker ~]# mkdir…...

【docker四】使用Docker-compose一键部署Wordpress平台

目录 一、YAML 文件格式及编写注意事项&#xff08;重要&#xff09; 1、yaml文件使用时注意事项&#xff1a; 2、yaml文件的基本数据结构&#xff1a; 2.1、声明变量&#xff08;标量。是单个的不可再分的值&#xff0c;类型&#xff1a;字符串&#xff0c;整数&#xff0c…...

HTML程序大全(1):简易计算器

HTML代码&#xff0c;主要创建了几个按钮。 <div class"container"><div class"output" id"output">0</div><button class"button" onclick"clearOutput()" id"clear">C</button>…...

esp32服务器与android客户端的tcp通讯

esp32 //esp32作为服务端 #include <WiFi.h>#define LED_BUILTIN 2 // 创建热点 const char *ssid "ESP32"; const char *password "12345678"; const int port 1122; //端口 WiFiServer server(port); void setup() {delay(5000);pinMode(LED_…...

自定义Mybatis LanguageDriver性能优化

场景&#xff1a;高并发情况下mybatis 动态sql 解析 锁问题优化 优化前 并发测试 XMLLanguageDriver 类 的 createSqlSource 方法有锁 而且 每次执行时都会走该方法 优化前 &#xff1a; 线程有Block 优化后的 LanguageDriver public class CustomXMLLanguageDriver im…...

DevEco Studio 鸿蒙(HarmonyOS)项目结构

DevEco Studio 鸿蒙&#xff08;HarmonyOS&#xff09;项目结构 一、操作环境 操作系统: Windows 10 专业版 IDE:DevEco Studio 3.1 SDK:HarmonyOS 3.1 二、项目结构 创建简单的Hello World移动应用项目结构如下图 由上到下说明各个文件夹的作用 .hvigor&#xff1a;存…...

Springboot整合篇Druid

一、概述 1.1简介 Druid 是阿里巴巴开源平台上一个数据库连接池实现&#xff0c;结合了 C3P0、DBCP 等 DB 池的优点&#xff0c;同时加入了日志监控。 它本身还自带一个监控平台&#xff0c;可以查看时时产生的sql、uri等监控数据&#xff0c;可以排查慢sql、慢请求&#xff0…...

uniapp 微信小程序 封装axios 包含请求拦截、响应拦截、无感刷新令牌功能

前言&#xff1a; 1、为什么不适用uniapp自带的请求功能&#xff1f; 答&#xff1a;uniapp自带的请求功能&#xff0c;再刷新了令牌后&#xff0c;重新请求返回的数据无法返回给发起请求的方法。也就是说&#xff0c;刷新令牌后重新发起的请求和第一次发起请求的方法是割裂的。…...

C语言精选——选择题Day41

第一题 1. 有以下程序段&#xff1a; char *p, *q; p (char *)malloc(sizeof(char) * 20); q p; scanf("%s %s", p, q); printf("%s %s\n", p, q); 若从键盘输入&#xff1a;abc def↙&#xff0c;则输出结果是&#xff08; &#xff09; A&#xff1a;d…...

Tomcat头上有个叉叉

问题原因&#xff1a; 这是因为它就是个空的tomcat,并没有导入项目运行 解决方案&#xff1a; war模式&#xff1a;发布模式&#xff0c;正式发布时用&#xff0c;将WEB工程以war包的形式上传到服务器 war exploded模式&#xff1a;开发时用&#xff0c;将WEB工程的文件夹直接…...

Linux shell编程学习笔记35:seq

0 前言 在使用 for 循环语句时&#xff0c;我们经常使用到序列。比如&#xff1a; for i in 1 2 3 4 5 6 7 8 9 10; do echo "$i * 2 $(expr $i \* 2)"; done 其中的 1 2 3 4 5 6 7 8 9 10;就是一个整数序列 。 为了方便我们使用数字序列&#xff0c;Linux提供了…...

Nougat:结合光学神经网络,引领学术PDF文档的智能解析、挖掘学术论文PDF的价值

Nougat&#xff1a;结合光学神经网络&#xff0c;引领学术PDF文档的智能解析、挖掘学术论文PDF的价值 这是Nougat的官方存储库&#xff0c;Nougat是一种学术文档PDF解析器&#xff0c;可以理解LaTeX数学和表格。 Project page: https://facebookresearch.github.io/nougat/ …...

涉密网络的IP查询防护策略

涉密网络的安全性对于维护国家、企业及个人的核心利益至关重要。在当今数字化时代&#xff0c;网络攻击日益猖獗&#xff0c;其中IP查询是攻击者获取目标信息的一种常见手段。本文将探讨涉密网络中防护IP查询的关键策略&#xff0c;以确保网络的机密性和安全性。 1. 专用VPN和…...

基础算法(1):排序(1):选择排序

今天对算法产生了兴趣&#xff0c;开始学习基础算法&#xff0c;比如排序&#xff0c;模拟&#xff0c;贪心&#xff0c;递推等内容&#xff0c;算法是很重要的&#xff0c;它是解决某个问题的特定方法&#xff0c;程序数据结构算法&#xff0c;所以对算法的学习是至关重要的&a…...

GeoTrust OV证书

当谈到网站安全性和可信度时&#xff0c;GeoTrust OV证书是一个备受推崇的选择。作为一家备受尊敬的数字证书颁发机构&#xff0c;GeoTrust以其卓越的品牌声誉和高质量的产品而闻名于世。GeoTrust OV证书提供了一系列的安全功能&#xff0c;同时还具有出色的性价比&#xff0c;…...

第一个“hello Android”程序

1、首先安装Android studio&#xff08;跳过&#xff09; Android Studio是由Google推出的官方集成开发环境&#xff08;IDE&#xff09;&#xff0c;专门用于Android应用程序的开发。它是基于JetBrains的IntelliJ IDEA IDE构建的&#xff0c;提供了丰富的功能和工具&#xff0…...

docker-compose安装nacos和msql

docker-compose安装nacos和msql 前言前提已经安装docker-compose&#xff0c;如果没有安装&#xff0c;则可以查看上面系列文章中的安装教程。并且文章中使用的是mobaxterm连接虚拟机。 1、下载2、创建并运行 前言 前提已经安装docker-compose&#xff0c;如果没有安装&#x…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

vue3 字体颜色设置的多种方式

在Vue 3中设置字体颜色可以通过多种方式实现&#xff0c;这取决于你是想在组件内部直接设置&#xff0c;还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法&#xff1a; 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

如何在网页里填写 PDF 表格?

有时候&#xff0c;你可能希望用户能在你的网站上填写 PDF 表单。然而&#xff0c;这件事并不简单&#xff0c;因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件&#xff0c;但原生并不支持编辑或填写它们。更糟的是&#xff0c;如果你想收集表单数据&#xff…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)

上一章用到了V2 的概念&#xff0c;其实 Fiori当中还有 V4&#xff0c;咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务)&#xff0c;代理中间件&#xff08;ui5-middleware-simpleproxy&#xff09;-CSDN博客…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...