RocketMQ(1.NameServer源码)
NameServer功能简述
主要功能如下
- 服务注册与发现:Nameserver扮演了RocketMQ集群中服务注册中心的角色。当RocketMQ中的Broker、Producer和Consumer启动时,它们会向Nameserver注册自己的网络地址和角色信息。Nameserver维护着集群中所有活跃实例的信息,并提供查询接口供其他组件发现这些实例。这样,Producer和Consumer可以根据Nameserver提供的信息找到Broker,实现消息的发布和消费。
- Topic路由信息管理:RocketMQ中的消息按照Topic进行分类。Nameserver维护着Topic到Broker之间的路由关系,包括哪些Topic被哪些Broker负责。这样,当Producer发送消息时,它会根据Topic在Nameserver查询路由信息,得知应该将消息发送到哪些Broker上。
- 负载均衡:Nameserver通过周期性地检查Broker的状态信息,包括负载、存活状态等,来维护一个Broker的列表,供Producer和Consumer使用。这样做的目的是为了在消息生产和消费过程中实现负载均衡,避免某个Broker过载,影响整个系统性能。
- 权限控制:Nameserver支持对连接到集群的客户端进行权限控制。通过配置,管理员可以限制哪些客户端可以连接到Broker,增加系统的安全性。
- 集群管理:Nameserver允许管理员对Broker和Topic进行动态管理,包括增加、删除Broker节点,创建和删除Topic等操作。这使得RocketMQ集群能够根据实际业务需求进行扩展和调整。
综上所述,RockerMQ Nameserver是RocketMQ集群中起着至关重要、不可或缺的作用。其服务注册与发现、Topic路由信息管理、负载均衡、权限控制和集群管理等功能,Nameserver确保整个RocketMQ系统的稳定、高效和安全运行。
核心源码
main0
public static void main(String[] args) {// 本地启动需设置环境变量System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/Users/config/rocketmq");main0(args);controllerManagerMain();}
1.parseCommandlineAndConfigFile(解析命令行和配置文件,初始化namesrvConfig、nettyServerConfig、nettyClientConfig等配置对象)
public static void parseCommandlineAndConfigFile(String[] args) throws Exception {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));Options options = ServerUtil.buildCommandlineOptions(new Options());CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());if (null == commandLine) {System.exit(-1);return;}namesrvConfig = new NamesrvConfig();nettyServerConfig = new NettyServerConfig();nettyClientConfig = new NettyClientConfig();nettyServerConfig.setListenPort(9876);if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {controllerConfig = new ControllerConfig();MixAll.properties2Object(properties, controllerConfig);}namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (commandLine.hasOption('p')) {MixAll.printObjectProperties(logConsole, namesrvConfig);MixAll.printObjectProperties(logConsole, nettyServerConfig);MixAll.printObjectProperties(logConsole, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {MixAll.printObjectProperties(logConsole, controllerConfig);}System.exit(0);}if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);}
2.createAndStartNamesrvController(创建并运行NamesrvController)
public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();return null;}));controller.start();return controller;}
一、controller.initialize()初始化
- 加载配置
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =new HashMap<>();
public void load() {String content = null;try {content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());} catch (IOException e) {log.warn("Load KV config table exception", e);}if (content != null) {KVConfigSerializeWrapper kvConfigSerializeWrapper =KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);if (null != kvConfigSerializeWrapper) {this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());log.info("load KV config table OK");}}}
- 初始化网络组件
private void initiateNetworkComponents() {// remotingServer是一个NettyRemotingServer对象,用于接收和处理来自其他服务的请求或响应// BrokerHouseKeepingService对象用于处理Broker的连接和断开事件this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// remotingClient是一个NettyRemotingClient对象,它用于向其他服务发送请求或响应this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);}
- 初始化线程执行器
private void initiateThreadExecutors() {this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());// 用于处理默认的远程请求this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}};this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());// 处理客户端的路由信息请求this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}};}
- 注册remoting server处理器
private void registerProcessor() {if (namesrvConfig.isClusterTest()) {// ClusterTestRequestProcessor是一个用于集群测试的处理器,它会在请求前后添加一些环境信息,比如产品环境名称、请求时间等this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);} else {// Support get route info only temporarily// 在 namesrvConfig.isClusterTest() = false 时如果收到请求的 requestCode 等于 RequestCode.GET_ROUTEINFO_BY_TOPIC 则会使用ClientRequestProcessor来处理;当收到其他请求时,会使用DefaultRequestProcessor来处理。ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);// DefaultRequestProcessor是一个用于正常运行的处理器,它会根据请求的类型,调用不同的方法来处理,比如注册Broker、获取路由信息、更新配置等。this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);}}
- 启动定时任务
private void startScheduleService() {// 首次延迟5毫秒执行,后续执行间隔5秒执行(分两种情况)// 后续执行:1.当执行任务时间小于间隔时间时,延迟(间隔时间-任务执行时间)执行// 2.当任务执行时间大于间隔时间,则任务结束立即执行下一次任务// 间隔扫描不活跃的Brokerthis.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);//每隔10分钟打印所有KV配置this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,1, 10, TimeUnit.MINUTES);// 每隔1分钟打印WaterMarkthis.scheduledExecutorService.scheduleAtFixedRate(() -> {try {NamesrvController.this.printWaterMark();} catch (Throwable e) {LOGGER.error("printWaterMark error.", e);}}, 10, 1, TimeUnit.SECONDS);}
- 初始化ssl上下文(配置remotingServer使用TLS协议进行安全通信)
- 注册RPC钩子
private void initiateRpcHooks() {// 注册RPC钩子 在remotingServer处理请求之前或之后执行一些自定义的逻辑this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());}
二、注册JVM钩子
- controllerManager.shutdown()
// 注册一个ShutdownHookThread对象,JVM钩子,在程序终止时调用controllerManager.shutdown(),释放资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controllerManager.shutdown();return null;}));
public void shutdown() {this.heartbeatManager.shutdown();this.controllerRequestExecutor.shutdown();this.notifyService.shutdown();this.controller.shutdown();this.remotingClient.shutdown();}
三、controller.start()运行
-
运行NettyRemotingServer,启动一个NettyRemotingServer,用于接收和处理客户端的请求
-
运行remotingClient,启动一个NettyRemotingClient,用于向其他服务发送请求
-
运行FileWatch,调用它的start方法,启动一个文件监视服务,用于动态加载证书文件
用来跟踪SSL
- 运行RouteInfoManager,启动一个路由信息管理器,用于维护Broker和Topic的路由关系
unRegisterService.start() 提供了一种以批处理方式注销Broker的机制。扫描离线的Broker BlockingQueue,take()获取数据,while循环获取不活跃的broker。take()是阻塞的,drainTo()方法获取队列中的全部数据。
registerBroker() 接收Broker每隔30秒上报Broker信息
相关文章:
RocketMQ(1.NameServer源码)
NameServer功能简述 主要功能如下 服务注册与发现:Nameserver扮演了RocketMQ集群中服务注册中心的角色。当RocketMQ中的Broker、Producer和Consumer启动时,它们会向Nameserver注册自己的网络地址和角色信息。Nameserver维护着集群中所有活跃实例的信息…...
责任链vs金融登录
金融app相对普通app而言,出于安全考虑,其安全校验方式比较多,以某些银行app为例,手机号登录成功后,会增加指纹、手势、OCR人脸等验证!这些安全项的校验,会根据用户的风险等级有不同的校验优先级…...
通过VIOOVI,了解联合作业分析的意义和目标!
现如今企业的主流生产模式就是流水线生产,一道工序结束后,紧接着开展下一项工序,这种作业模式可以以一种比较高效的方式缩减生产时间。尽管流水作业的效率已经够高的了,但是各个工序之间如果衔接不到位的话,会造成生产…...
清洁机器人规划控制方案
清洁机器人规划控制方案 作者联系方式Forrest709335543qq.com 文章目录 清洁机器人规划控制方案方案简介方案设计模块链路坐标变换算法框架 功能设计定点自主导航固定路线清洁区域覆盖清洁贴边沿墙清洁自主返航回充 仿真测试仿真测试准备定点自主导航测试固定路线清洁测试区域…...
设计模式 - 工厂模式
一、 简单工厂(Simple Factory Pattern) 1、概念 一个工厂对象决定创建出哪一种产品类的实力,但不属于GOF23种设计模式。 简单工厂适用于工厂类负责创建的对象较少的场景,且客户端只需要传入工厂类的参数,对于如何创…...
elementUI this.$confirm 文字大小样式
dangerouslyUseHTMLString:true // message部分 以html片段处理 customClass //MessageBox 的自定义类名 整个comfirm框自定义类名 cancelButtonClass // 取消按钮的自定义类名 confirmButtonClass // 确定按钮的自定义类名<style> .addcomfirm{width: 500px; } .a…...
Kafka的TimingWheel
Kafka的TimingWheel是Kafka中的一个时间轮实现,用于管理和处理延迟消息。时间轮是一种定时器的数据结构,可以高效地管理和触发定时事件。 在Kafka中,TimingWheel用于处理延迟消息的重试。当Kafka生产者发送消息到Kafka集群,但由于某些原因导致消息发送失败,生产者会将这些…...
第2集丨webpack 江湖 —— 创建一个简单的webpack工程demo
目录 一、创建webpack工程1.1 新建 webpack工程目录1.2 项目初始化1.3 新建src目录和文件1.4 安装jQuery1.5 安装webpack1.6 配置webpack1.6.1 创建配置文件:webpack.config.js1.6.2 配置dev脚本1.7 运行dev脚本 1.8 查看效果1.9 附件1.9.1 package.json1.9.2 webpa…...
Python(Web时代)——初识flask
flask简介 介绍 Flask是一个用Python编写的Web 微框架,让我们可以使用Python语言快速实现一个网站或Web服务。它是BSD授权的,一个有少量限制的免费软件许可。它使用了 Werkzeug 工具箱和 Jinja2 模板引擎。 Flask 的设计理念是简单、灵活、易于扩展&a…...
二、SQL-5.DQL-8).案例练习
1、查询年龄为20,21,22,23岁的员工信息 select * from emp where age in(20, 21, 22, 23) and gender 女; 2、查询性别为男,并且年龄在20-40岁(含)以内的姓名为三个字的员工 select * from emp where gender 男 && age between 2…...
浙大数据结构第五周之05-树7 堆中的路径
题目详情: 将一系列给定数字依次插入一个初始为空的小顶堆H[]。随后对任意给定的下标i,打印从H[i]到根结点的路径。 输入格式: 每组测试第1行包含2个正整数N和M(≤1000),分别是插入元素的个数、以及需要打印的路径条数。下一行给出区间[-1…...
C# Modbus TCP上位机测试
前面说了三菱和西门子PLC的上位机通信,实际在生产应用中,设备会有很多不同的厂家生产的PLC,那么,我们就需要一种通用的语言,进行设备之间的通信,工业上较为广泛使用的语言之一就是Modbus。 Modbus有多种连…...
instr字符查找函数(oracle用instr来代替like)
instr函数:字符查找函数。其功能是查找一个字符串在另一个字符串中首次出现的位置。 instr函数在Oracle/PLSQL中是返回要截取的字符串在源字符串中的位置。 语法 instr( string1, string2, start_position,nth_appearance ) 参数 string1:源字符串&am…...
trie树的一点理解
这个是最简单的数据结构:因为只需要记住两句话就能完美的写出简洁优雅的代码 1. 每次都是从根节点开始看(或者说从第零次插入的东西开始遍历,son[][]里面存的是第几次插入) 2每次遍历都是插入和查询的字符串 #include<iostream> using namespace …...
Linux CentOS监控系统的运行情况工具 - top/htop/glances/sar/nmon
在CentOS系统中,您可以使用以下工具来监控系统的运行情况: 1. top: top 是一个命令行工具,用于实时监控系统的进程、CPU、内存和负载情况。您可以使用以下命令来启动 top: top 输出 2. htop: htop 是一…...
Qt开发(2)——windows下调用外部程序
一、QProcess::start 1.阻塞性 start是非阻塞函数,但是这里的waitForFinished是阻塞的 2. 调用外部压缩程序7z // 目标压缩路径 QString zipFilePath destinationFolder "/" zipFileName; QStringList arguments{"a&qu…...
PostgreSQL查看数据库对象大小
PostgreSQL查看数据库对象大小 PostgreSQL查看数据库对象大小1、查看某个数据库大小2、查看多个数据库大小3、按顺序查看索引大小4、查看所有对象的大小 PostgreSQL查看数据库对象大小 1、查看某个数据库大小 select pg_size_pretty(pg_database_size(tzqdb));2、查看多个数据…...
给el-table实现列显隐
用过若依的都知道,在使用el-table 时候,实现列显隐效果是要给每个列加v-if 判断的,这种代码过于繁琐,于是翻看el-table包的代码,调试后发现内部的【插入】和【删除】两个方法可以达到我们要的效果。 项目不提供源码&a…...
为Android构建现代应用——应用架构
选择风格(Choosing a style) 我们将依照Google在《应用架构指南》中推荐的最佳实践和架构指南来构建OrderNow的架构。 这些定义包括通过各层定义组件的一些Clean Architecture原则。 层次的定义(Definition of the layers) 在应用程序中,我们将定义以下主要层次…...
49:字符串的新增方法
字符串的新增方法 String.fromCodePoint()String.raw()实例方法:codePointAt()实例方法:normalize()[实例方法:includes(), startsWith(), endsWith()](https://es6.ruanyifeng.com/#docs/string-methods#实例方法:includes(), s…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
AI病理诊断七剑下天山,医疗未来触手可及
一、病理诊断困局:刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断",医生需通过显微镜观察组织切片,在细胞迷宫中捕捉癌变信号。某省病理质控报告显示,基层医院误诊率达12%-15%,专家会诊…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
