当前位置: 首页 > news >正文

RocketMQ(1.NameServer源码)

NameServer功能简述

主要功能如下

  1. 服务注册与发现:Nameserver扮演了RocketMQ集群中服务注册中心的角色。当RocketMQ中的Broker、Producer和Consumer启动时,它们会向Nameserver注册自己的网络地址和角色信息。Nameserver维护着集群中所有活跃实例的信息,并提供查询接口供其他组件发现这些实例。这样,Producer和Consumer可以根据Nameserver提供的信息找到Broker,实现消息的发布和消费。
  2. Topic路由信息管理:RocketMQ中的消息按照Topic进行分类。Nameserver维护着Topic到Broker之间的路由关系,包括哪些Topic被哪些Broker负责。这样,当Producer发送消息时,它会根据Topic在Nameserver查询路由信息,得知应该将消息发送到哪些Broker上。
  3. 负载均衡:Nameserver通过周期性地检查Broker的状态信息,包括负载、存活状态等,来维护一个Broker的列表,供Producer和Consumer使用。这样做的目的是为了在消息生产和消费过程中实现负载均衡,避免某个Broker过载,影响整个系统性能。
  4. 权限控制:Nameserver支持对连接到集群的客户端进行权限控制。通过配置,管理员可以限制哪些客户端可以连接到Broker,增加系统的安全性。
  5. 集群管理:Nameserver允许管理员对Broker和Topic进行动态管理,包括增加、删除Broker节点,创建和删除Topic等操作。这使得RocketMQ集群能够根据实际业务需求进行扩展和调整。

综上所述,RockerMQ Nameserver是RocketMQ集群中起着至关重要、不可或缺的作用。其服务注册与发现、Topic路由信息管理、负载均衡、权限控制和集群管理等功能,Nameserver确保整个RocketMQ系统的稳定、高效和安全运行。

核心源码

main0

public static void main(String[] args) {// 本地启动需设置环境变量System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/Users/config/rocketmq");main0(args);controllerManagerMain();}

1.parseCommandlineAndConfigFile(解析命令行和配置文件,初始化namesrvConfig、nettyServerConfig、nettyClientConfig等配置对象)

public static void parseCommandlineAndConfigFile(String[] args) throws Exception {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));Options options = ServerUtil.buildCommandlineOptions(new Options());CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());if (null == commandLine) {System.exit(-1);return;}namesrvConfig = new NamesrvConfig();nettyServerConfig = new NettyServerConfig();nettyClientConfig = new NettyClientConfig();nettyServerConfig.setListenPort(9876);if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {controllerConfig = new ControllerConfig();MixAll.properties2Object(properties, controllerConfig);}namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (commandLine.hasOption('p')) {MixAll.printObjectProperties(logConsole, namesrvConfig);MixAll.printObjectProperties(logConsole, nettyServerConfig);MixAll.printObjectProperties(logConsole, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {MixAll.printObjectProperties(logConsole, controllerConfig);}System.exit(0);}if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);}

2.createAndStartNamesrvController(创建并运行NamesrvController)

public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();return null;}));controller.start();return controller;}
一、controller.initialize()初始化
  1. 加载配置
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =new HashMap<>();
public void load() {String content = null;try {content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());} catch (IOException e) {log.warn("Load KV config table exception", e);}if (content != null) {KVConfigSerializeWrapper kvConfigSerializeWrapper =KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);if (null != kvConfigSerializeWrapper) {this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());log.info("load KV config table OK");}}}
  1. 初始化网络组件
private void initiateNetworkComponents() {// remotingServer是一个NettyRemotingServer对象,用于接收和处理来自其他服务的请求或响应// BrokerHouseKeepingService对象用于处理Broker的连接和断开事件this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// remotingClient是一个NettyRemotingClient对象,它用于向其他服务发送请求或响应this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);}
  1. 初始化线程执行器
private void initiateThreadExecutors() {this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());// 用于处理默认的远程请求this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}};this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());// 处理客户端的路由信息请求this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}};}
  1. 注册remoting server处理器
private void registerProcessor() {if (namesrvConfig.isClusterTest()) {// ClusterTestRequestProcessor是一个用于集群测试的处理器,它会在请求前后添加一些环境信息,比如产品环境名称、请求时间等this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);} else {// Support get route info only temporarily// 在 namesrvConfig.isClusterTest() = false 时如果收到请求的 requestCode 等于 RequestCode.GET_ROUTEINFO_BY_TOPIC 则会使用ClientRequestProcessor来处理;当收到其他请求时,会使用DefaultRequestProcessor来处理。ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);// DefaultRequestProcessor是一个用于正常运行的处理器,它会根据请求的类型,调用不同的方法来处理,比如注册Broker、获取路由信息、更新配置等。this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);}}
  1. 启动定时任务
private void startScheduleService() {// 首次延迟5毫秒执行,后续执行间隔5秒执行(分两种情况)// 后续执行:1.当执行任务时间小于间隔时间时,延迟(间隔时间-任务执行时间)执行//         2.当任务执行时间大于间隔时间,则任务结束立即执行下一次任务// 间隔扫描不活跃的Brokerthis.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);//每隔10分钟打印所有KV配置this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,1, 10, TimeUnit.MINUTES);// 每隔1分钟打印WaterMarkthis.scheduledExecutorService.scheduleAtFixedRate(() -> {try {NamesrvController.this.printWaterMark();} catch (Throwable e) {LOGGER.error("printWaterMark error.", e);}}, 10, 1, TimeUnit.SECONDS);}
  1. 初始化ssl上下文(配置remotingServer使用TLS协议进行安全通信)
  2. 注册RPC钩子
private void initiateRpcHooks() {// 注册RPC钩子 在remotingServer处理请求之前或之后执行一些自定义的逻辑this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());}
二、注册JVM钩子
  1. controllerManager.shutdown()
// 注册一个ShutdownHookThread对象,JVM钩子,在程序终止时调用controllerManager.shutdown(),释放资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controllerManager.shutdown();return null;}));
public void shutdown() {this.heartbeatManager.shutdown();this.controllerRequestExecutor.shutdown();this.notifyService.shutdown();this.controller.shutdown();this.remotingClient.shutdown();}
三、controller.start()运行
  1. 运行NettyRemotingServer,启动一个NettyRemotingServer,用于接收和处理客户端的请求

  2. 运行remotingClient,启动一个NettyRemotingClient,用于向其他服务发送请求

  3. 运行FileWatch,调用它的start方法,启动一个文件监视服务,用于动态加载证书文件

用来跟踪SSL

  1. 运行RouteInfoManager,启动一个路由信息管理器,用于维护Broker和Topic的路由关系

unRegisterService.start() 提供了一种以批处理方式注销Broker的机制。扫描离线的Broker BlockingQueue,take()获取数据,while循环获取不活跃的broker。take()是阻塞的,drainTo()方法获取队列中的全部数据。

registerBroker() 接收Broker每隔30秒上报Broker信息

相关文章:

RocketMQ(1.NameServer源码)

NameServer功能简述 主要功能如下 服务注册与发现&#xff1a;Nameserver扮演了RocketMQ集群中服务注册中心的角色。当RocketMQ中的Broker、Producer和Consumer启动时&#xff0c;它们会向Nameserver注册自己的网络地址和角色信息。Nameserver维护着集群中所有活跃实例的信息…...

责任链vs金融登录

金融app相对普通app而言&#xff0c;出于安全考虑&#xff0c;其安全校验方式比较多&#xff0c;以某些银行app为例&#xff0c;手机号登录成功后&#xff0c;会增加指纹、手势、OCR人脸等验证&#xff01;这些安全项的校验&#xff0c;会根据用户的风险等级有不同的校验优先级…...

通过VIOOVI,了解联合作业分析的意义和目标!

现如今企业的主流生产模式就是流水线生产&#xff0c;一道工序结束后&#xff0c;紧接着开展下一项工序&#xff0c;这种作业模式可以以一种比较高效的方式缩减生产时间。尽管流水作业的效率已经够高的了&#xff0c;但是各个工序之间如果衔接不到位的话&#xff0c;会造成生产…...

清洁机器人规划控制方案

清洁机器人规划控制方案 作者联系方式Forrest709335543qq.com 文章目录 清洁机器人规划控制方案方案简介方案设计模块链路坐标变换算法框架 功能设计定点自主导航固定路线清洁区域覆盖清洁贴边沿墙清洁自主返航回充 仿真测试仿真测试准备定点自主导航测试固定路线清洁测试区域…...

设计模式 - 工厂模式

一、 简单工厂&#xff08;Simple Factory Pattern&#xff09; 1、概念 一个工厂对象决定创建出哪一种产品类的实力&#xff0c;但不属于GOF23种设计模式。 简单工厂适用于工厂类负责创建的对象较少的场景&#xff0c;且客户端只需要传入工厂类的参数&#xff0c;对于如何创…...

elementUI this.$confirm 文字大小样式

dangerouslyUseHTMLString:true // message部分 以html片段处理 customClass //MessageBox 的自定义类名 整个comfirm框自定义类名 cancelButtonClass // 取消按钮的自定义类名 confirmButtonClass // 确定按钮的自定义类名<style> .addcomfirm{width: 500px; } .a…...

Kafka的TimingWheel

Kafka的TimingWheel是Kafka中的一个时间轮实现,用于管理和处理延迟消息。时间轮是一种定时器的数据结构,可以高效地管理和触发定时事件。 在Kafka中,TimingWheel用于处理延迟消息的重试。当Kafka生产者发送消息到Kafka集群,但由于某些原因导致消息发送失败,生产者会将这些…...

第2集丨webpack 江湖 —— 创建一个简单的webpack工程demo

目录 一、创建webpack工程1.1 新建 webpack工程目录1.2 项目初始化1.3 新建src目录和文件1.4 安装jQuery1.5 安装webpack1.6 配置webpack1.6.1 创建配置文件&#xff1a;webpack.config.js1.6.2 配置dev脚本1.7 运行dev脚本 1.8 查看效果1.9 附件1.9.1 package.json1.9.2 webpa…...

Python(Web时代)——初识flask

flask简介 介绍 Flask是一个用Python编写的Web 微框架&#xff0c;让我们可以使用Python语言快速实现一个网站或Web服务。它是BSD授权的&#xff0c;一个有少量限制的免费软件许可。它使用了 Werkzeug 工具箱和 Jinja2 模板引擎。 Flask 的设计理念是简单、灵活、易于扩展&a…...

二、SQL-5.DQL-8).案例练习

1、查询年龄为20,21,22,23岁的员工信息 select * from emp where age in(20, 21, 22, 23) and gender 女; 2、查询性别为男&#xff0c;并且年龄在20-40岁&#xff08;含&#xff09;以内的姓名为三个字的员工 select * from emp where gender 男 && age between 2…...

浙大数据结构第五周之05-树7 堆中的路径

题目详情&#xff1a; 将一系列给定数字依次插入一个初始为空的小顶堆H[]。随后对任意给定的下标i&#xff0c;打印从H[i]到根结点的路径。 输入格式: 每组测试第1行包含2个正整数N和M(≤1000)&#xff0c;分别是插入元素的个数、以及需要打印的路径条数。下一行给出区间[-1…...

C# Modbus TCP上位机测试

前面说了三菱和西门子PLC的上位机通信&#xff0c;实际在生产应用中&#xff0c;设备会有很多不同的厂家生产的PLC&#xff0c;那么&#xff0c;我们就需要一种通用的语言&#xff0c;进行设备之间的通信&#xff0c;工业上较为广泛使用的语言之一就是Modbus。 Modbus有多种连…...

instr字符查找函数(oracle用instr来代替like)

instr函数&#xff1a;字符查找函数。其功能是查找一个字符串在另一个字符串中首次出现的位置。 instr函数在Oracle/PLSQL中是返回要截取的字符串在源字符串中的位置。 语法 instr( string1, string2, start_position,nth_appearance ) 参数 string1&#xff1a;源字符串&am…...

trie树的一点理解

这个是最简单的数据结构&#xff1a;因为只需要记住两句话就能完美的写出简洁优雅的代码 1. 每次都是从根节点开始看(或者说从第零次插入的东西开始遍历&#xff0c;son[][]里面存的是第几次插入) 2每次遍历都是插入和查询的字符串 #include<iostream> using namespace …...

Linux CentOS监控系统的运行情况工具 - top/htop/glances/sar/nmon

在CentOS系统中&#xff0c;您可以使用以下工具来监控系统的运行情况&#xff1a; 1. top&#xff1a; top 是一个命令行工具&#xff0c;用于实时监控系统的进程、CPU、内存和负载情况。您可以使用以下命令来启动 top&#xff1a; top 输出 2. htop&#xff1a; htop 是一…...

Qt开发(2)——windows下调用外部程序

一、QProcess::start &#xff11;&#xff0e;阻塞性 start是非阻塞函数&#xff0c;但是这里的waitForFinished是阻塞的 2. 调用外部压缩程序7z // 目标压缩路径 QString zipFilePath destinationFolder "/" zipFileName; QStringList arguments{"a&qu…...

PostgreSQL查看数据库对象大小

PostgreSQL查看数据库对象大小 PostgreSQL查看数据库对象大小1、查看某个数据库大小2、查看多个数据库大小3、按顺序查看索引大小4、查看所有对象的大小 PostgreSQL查看数据库对象大小 1、查看某个数据库大小 select pg_size_pretty(pg_database_size(tzqdb));2、查看多个数据…...

给el-table实现列显隐

用过若依的都知道&#xff0c;在使用el-table 时候&#xff0c;实现列显隐效果是要给每个列加v-if 判断的&#xff0c;这种代码过于繁琐&#xff0c;于是翻看el-table包的代码&#xff0c;调试后发现内部的【插入】和【删除】两个方法可以达到我们要的效果。 项目不提供源码&a…...

为Android构建现代应用——应用架构

选择风格(Choosing a style) 我们将依照Google在《应用架构指南》中推荐的最佳实践和架构指南来构建OrderNow的架构。 这些定义包括通过各层定义组件的一些Clean Architecture原则。 层次的定义(Definition of the layers) 在应用程序中&#xff0c;我们将定义以下主要层次…...

49:字符串的新增方法

字符串的新增方法 String.fromCodePoint()String.raw()实例方法&#xff1a;codePointAt()实例方法&#xff1a;normalize()[实例方法&#xff1a;includes(), startsWith(), endsWith()](https://es6.ruanyifeng.com/#docs/string-methods#实例方法&#xff1a;includes(), s…...

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站&#xff0c;会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后&#xff0c;网站没有变化的情况。 不熟悉siteground主机的新手&#xff0c;遇到这个问题&#xff0c;就很抓狂&#xff0c;明明是哪都没操作错误&#x…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

C++ 基础特性深度解析

目录 引言 一、命名空间&#xff08;namespace&#xff09; C 中的命名空间​ 与 C 语言的对比​ 二、缺省参数​ C 中的缺省参数​ 与 C 语言的对比​ 三、引用&#xff08;reference&#xff09;​ C 中的引用​ 与 C 语言的对比​ 四、inline&#xff08;内联函数…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

网络编程(UDP编程)

思维导图 UDP基础编程&#xff08;单播&#xff09; 1.流程图 服务器&#xff1a;短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

Selenium常用函数介绍

目录 一&#xff0c;元素定位 1.1 cssSeector 1.2 xpath 二&#xff0c;操作测试对象 三&#xff0c;窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四&#xff0c;弹窗 五&#xff0c;等待 六&#xff0c;导航 七&#xff0c;文件上传 …...

深入理解Optional:处理空指针异常

1. 使用Optional处理可能为空的集合 在Java开发中&#xff0c;集合判空是一个常见但容易出错的场景。传统方式虽然可行&#xff0c;但存在一些潜在问题&#xff1a; // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...

LangFlow技术架构分析

&#x1f527; LangFlow 的可视化技术栈 前端节点编辑器 底层框架&#xff1a;基于 &#xff08;一个现代化的 React 节点绘图库&#xff09; 功能&#xff1a; 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...

WPF八大法则:告别模态窗口卡顿

⚙️ 核心问题&#xff1a;阻塞式模态窗口的缺陷 原始代码中ShowDialog()会阻塞UI线程&#xff0c;导致后续逻辑无法执行&#xff1a; var result modalWindow.ShowDialog(); // 线程阻塞 ProcessResult(result); // 必须等待窗口关闭根本问题&#xff1a…...