当前位置: 首页 > news >正文

Broker如何进行定时心跳发送和故障感知

1.前言

此文章是在儒猿课程中的学习笔记,感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》,我本人觉得这个作者还是不错,都是从场景来进行分析,感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔记。想看原版的,还是去儒猿课堂。

这篇文章紧挨着上一篇文章NameServer接收Broker的注册请求处理-CSDN博客

前面我们已经看到了NameServer处理Broker的注册请求的源码流程,NameServer核心就是基于Netty服务器来接收Broker注册请求,然后交给DefaultRequestProcessor这个请求处理组件,来注册Broker注册请求。Broker的注册逻辑是放在RouteInfoManager这个路由数据管理组件实现的。最终这些路由数据都会存放到RouteInfoManager内部的一些Map数据结构组成的路由数据表中。

2.Broker 如何进行定时发送心跳

那么Broker是如何定时发送心跳到NameServer,让NameServer感知到Broker一直存活,这个Broker中的发送注册请求给NameServer,这个代码的入口还是在BrokerController中

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

这块代码,就是启动了一个定时调度的任务,默认是每30s就会执行Broker的注册请求。

我们紧接着看下RouteInfoManager的注册方法的代码逻辑:

   public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {//这里有一个加锁的操作this.lock.writeLock().lockInterruptibly();//从clusterAddrTable中获取clusterName对应的BrokeName的set集合//如果对应的clusterName中没有set集合那么就新建一个set集合Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());//直接将brokerName添加到set集合中//每次心跳过来了 都要进行添加 用set集合保证brokerName的唯一性brokerNames.add(brokerName);boolean registerFirst = false;//这里是根据BrokerName进行获取Broker的数据BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {//如果为空的话 设置registerFirst为true 这个代表是第一次注册的registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}//下面的这些代码应该是设置如果brokerAddrMap中没有这个brokerId对应的地址那么就添加这个地址//有的话 就要进行判断地址是否正确,不正确的就要修改Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {log.debug("remove entry {} from brokerData", item);it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);if (MixAll.MASTER_ID == brokerId) {log.info("cluster [{}] brokerName [{}] master address change from {} to {}",brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);}registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//这里就是放入Broker中心跳信息//每次心跳请求过来之后都要进行重新创建一个BrokerLiveInfo对象 对上一次心跳的信息进行覆盖BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//下面的代码感觉跟心跳的关联关系不大if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}

3.NameSever如何进行故障感知

经过前面的代码我们可以看出来,每隔30s进行发送注册请求的时候,都会针对RouteInfoManager中的brokerLiveTable对上一次已经发送的注册请求进行覆盖。

我们紧接着看下NameServer是如何进行故障感知的,我们直接看NameServer中的initialize方法,可以直接找到这个代码:

his.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);

这行代码的主要作用就,就是启动一个定时调度线程,每隔10s进行扫描一次RouteInfoManager的scanNotActiveBroker方法,我们接下来看下这个方法,

    public int scanNotActiveBroker() {// 这块的方法主要是brokerLiveTable的集合中的所有元素//拿到broker最新一次的心跳时间 //broker的最新一次心跳时间+120s 小于 当前时间戳 //就把这个broker进行移除掉int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}

4.总结

我们还是用一张图来进行总结这篇博客的内容:

相关文章:

Broker如何进行定时心跳发送和故障感知

1.前言 此文章是在儒猿课程中的学习笔记&#xff0c;感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》&#xff0c;我本人觉得这个作者还是不错&#xff0c;都是从场景来进行分析&#xff0c;感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔…...

网络安全设备主要有什么

网络安全设备指的肯定是硬件设备了&#xff0c;国内卖安全硬件的没几家&#xff0c;天融信&#xff0c;启明星辰&#xff0c;绿盟&#xff0c;深信服&#xff0c;就这四家卖的比较齐全吧&#xff0c;上它们官网看一下&#xff0c;就知道市面上主要的网络安全设备有哪些了。分类…...

Android Framework WMS全面概述和知识要点

一、概述 定义与作用 在 Android 系统中&#xff0c;WindowManagerService&#xff08;WMS&#xff09;就像是一个大管家&#xff0c;负责管理整个系统的窗口界面。它是 Android Framework 的核心组件之一&#xff0c;处于 system_server 进程内&#xff0c;在 Framework 层占…...

记一次某红蓝演练经历

在某天接到任务&#xff0c;对xxx进行一次红蓝演练&#xff0c;于是把自己渗透过程给记录下来&#xff0c;漏洞关键地方也会打码&#xff0c;希望各位大佬理解&#xff0c;菜鸡一枚&#xff0c;勿喷/(ㄒoㄒ)/~~ 概述 拿到目标域名第一件事就是信息收集&#xff0c;曾经一位大…...

一个运行在浏览器中的开源Web操作系统Puter本地部署与远程访问

文章目录 前言1.关于Puter2.本地部署Puter3.Puter简单使用4. 安装内网穿透5.配置puter公网地址6. 配置固定公网地址 &#x1f4a1; 推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击跳转到网站…...

【零基础入门Go语言】struct 和 interface:Go语言是如何实现继承的?

提到面向对象编程中的继承&#xff0c;许多人脑海中可能会浮现出 Java、C 等语言中那一套熟悉的类继承体系。然而&#xff0c;Go 语言作为一门别具一格的编程语言&#xff0c;并没有遵循传统的继承模式。那么&#xff0c;在 Go 语言的世界里&#xff0c;它是怎样实现类似于继承…...

麦田物语学习笔记:实现拖拽物品交换数据和在地图上生成物品

基本流程 1.代码思路 (1)InventoryUI的PlayerSlots与PlayerBag里一一对应,所以想要实现交换数据实际上是,先拿到被拖拽的物体所对的Slot的序号和目标的Slot序号,然后将这两个序号对调一下 (2)物品交换的数据逻辑应该在InventoryManager里去调用,因为InventoryManager里管理了p…...

一些计算机零碎知识随写(25年1月)-1

我原以为世界上有技术的那批人不会那么闲&#xff0c;我错了&#xff0c;被脚本真实了。 今天正隔着画画呢&#xff0c;手机突然弹出几条安全告警通知。 急忙打开服务器&#xff0c;发现问题不简单&#xff0c;直接关服务器重装系统..... 首先&#xff0c;不要认为小网站&…...

Qt学习笔记第81到90讲

第81讲 串口调试助手实现自动发送 为这个名叫“定时发送”的QCheckBox编写槽函数。 想要做出定时发送的效果&#xff0c;必须引入QT框架下的毫秒级定时器QTimer&#xff0c;查阅手册了解详情。 在widget.h内添加新的私有成员变量&#xff1a; QTimer *timer; 在widget类的构造…...

Centos9 + Docker 安装 MySQL8.4.0 + 定时备份数据库到本地

Centos9 Docker 安装 MySQL8.4.0 定时备份数据库到本地 创建目录&#xff0c;创建配置文件启动容器命令定时备份MySQL执行脚本Linux每日定时任务命令文件内参数其他时间参数 AT一次性定时任务 创建目录&#xff0c;创建配置文件 $ mkdir -p /opt/mysql/conf$ vim /opt/mysql/…...

网络原理一>UDP协议详解

UDP和TCP都是应用层中的重要协议&#xff0c;如果做基础架构开发&#xff0c;会用得多一些。 这一篇我们先简单聊一下的UDP TCP格式呈现&#xff1a; 我们知道UDP是一种无连接&#xff0c;面向数据报&#xff0c;全双工&#xff0c;不可靠传输特性的网络协议。 基本格式如图…...

MySQL的小问题

编码问题 不管官方使用什么编码&#xff1a;latin1、gbk、utf8、utfmb4。统一使用utfmb4 MySQL中的utf8并不是utf-8&#xff0c;它省略了一个字节&#xff0c;只是用三个字节存储所有的符号&#xff0c;utfmb4才是utf-8 远程登录问题&#xff1a; MySQL官方默认没有启动远程…...

Mac——Docker desktop安装与使用教程

摘要 本文是一篇关于Mac系统下Docker Desktop安装与使用教程的博文。首先介绍连接WiFi网络&#xff0c;然后详细阐述了如何在Mac上安装Docker&#xff0c;包括下载地址以及不同芯片版本的选择。接着讲解了如何下载基础镜像和指定版本镜像&#xff0c;旨在帮助用户在Mac上高效使…...

FastApi Swagger 序列化问题

问题 错误现象&#xff1a; fastapi的 swagger 界面无法正常打开控制台报错&#xff1a;raise PydanticInvalidForJsonSchema(fCannot generate a JsonSchema for {error_info}) 详细报错&#xff1a; File "d:\Envs\miniconda3\envs\xdagent\lib\site-packages\pydan…...

《机器学习》——sklearn库中CountVectorizer方法(词频矩阵)

CountVectorizer方法介绍 CountVectorizer 是 scikit-learn 库中的一个工具&#xff0c;它主要用于将文本数据转换为词频矩阵&#xff0c;而不是传统意义上的词向量转换&#xff0c;但可以作为词向量转换的一种基础形式。用于将文本数据转换为词频矩阵&#xff0c;它是文本特征…...

UML系列之Rational Rose笔记三:活动图(泳道图)

一、新建活动图&#xff08;泳道图&#xff09; 依旧在用例视图里面&#xff0c;新建一个activity diagram&#xff1b;新建好之后&#xff0c;就可以绘制活动图了&#xff1a; 正常每个活动需要一个开始&#xff0c;点击黑点&#xff0c;然后在图中某个位置安放&#xff0c;接…...

Java面向对象面经总结

目录 面向对象基础 面向对象与面向过程的区别 创建一个对象用什么运算符&#xff0c;对象实体与对象引用的区别 对象相等和引用相等的区别 构造方法的特点&#xff0c;是否可被重写&#xff1f; 面向对象三大特征 封装 继承 多态 接口和抽象类的共同点和区别 深拷贝…...

红队工具使用全解析:揭开网络安全神秘面纱一角

红队工具使用全解析&#xff1a;揭开网络安全神秘面纱一角 B站红队公益课&#xff1a;https://space.bilibili.com/350329294 学习网盘资源链接&#xff1a;https://pan.quark.cn/s/4079487939e8 嘿&#xff0c;各位网络安全爱好者们&#xff01;在风云变幻的网络安全战场上&am…...

OpenLinkSaas 2025年第一季度开发计划

OpenLinkSaas在2025的发展方向是强化基础设施和研发协作&#xff0c;弱化管理相关的功能。 为了根据参与到软件研发的整个流程&#xff0c;OpenLinkSaas会增加一系列的基础设施项目&#xff0c;并和OpenLinksaas进行深度整合。 目前计划中的基础设施: 链路追踪系统(OpenDragonF…...

【python小工具】怎么获取视频的关键帧频率?

使用 FFmpeg 提取 MP4 视频的关键帧并计算关键帧频率可以按以下步骤进行&#xff1a; 提取关键帧&#xff1a; 使用 FFmpeg 提取视频中的关键帧可以通过以下命令实现&#xff1a; ffmpeg -i input.mp4 -vf "selecteq(pict_type,I)" -vsync vfr keyframes_%03d.jpg…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一&#xff09; 1. CSI-2层定义&#xff08;CSI-2 Layer Definitions&#xff09; 分层结构 &#xff1a;CSI-2协议分为6层&#xff1a; 物理层&#xff08;PHY Layer&#xff09; &#xff1a; 定义电气特性、时钟机制和传输介质&#xff08;导线&#…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

第25节 Node.js 断言测试

Node.js的assert模块主要用于编写程序的单元测试时使用&#xff0c;通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试&#xff0c;通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

网络编程(UDP编程)

思维导图 UDP基础编程&#xff08;单播&#xff09; 1.流程图 服务器&#xff1a;短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问&#xff08;基础概念问题&#xff09; 1. 请解释Spring框架的核心容器是什么&#xff1f;它在Spring中起到什么作用&#xff1f; Spring框架的核心容器是IoC容器&#…...

PHP 8.5 即将发布:管道操作符、强力调试

前不久&#xff0c;PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5&#xff01;作为 PHP 语言的又一次重要迭代&#xff0c;PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是&#xff0c;借助强大的本地开发环境 ServBay&am…...

django blank 与 null的区别

1.blank blank控制表单验证时是否允许字段为空 2.null null控制数据库层面是否为空 但是&#xff0c;要注意以下几点&#xff1a; Django的表单验证与null无关&#xff1a;null参数控制的是数据库层面字段是否可以为NULL&#xff0c;而blank参数控制的是Django表单验证时字…...