Broker如何进行定时心跳发送和故障感知
1.前言
此文章是在儒猿课程中的学习笔记,感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》,我本人觉得这个作者还是不错,都是从场景来进行分析,感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔记。想看原版的,还是去儒猿课堂。
这篇文章紧挨着上一篇文章NameServer接收Broker的注册请求处理-CSDN博客
前面我们已经看到了NameServer处理Broker的注册请求的源码流程,NameServer核心就是基于Netty服务器来接收Broker注册请求,然后交给DefaultRequestProcessor这个请求处理组件,来注册Broker注册请求。Broker的注册逻辑是放在RouteInfoManager这个路由数据管理组件实现的。最终这些路由数据都会存放到RouteInfoManager内部的一些Map数据结构组成的路由数据表中。
2.Broker 如何进行定时发送心跳
那么Broker是如何定时发送心跳到NameServer,让NameServer感知到Broker一直存活,这个Broker中的发送注册请求给NameServer,这个代码的入口还是在BrokerController中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
这块代码,就是启动了一个定时调度的任务,默认是每30s就会执行Broker的注册请求。
我们紧接着看下RouteInfoManager的注册方法的代码逻辑:
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {//这里有一个加锁的操作this.lock.writeLock().lockInterruptibly();//从clusterAddrTable中获取clusterName对应的BrokeName的set集合//如果对应的clusterName中没有set集合那么就新建一个set集合Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());//直接将brokerName添加到set集合中//每次心跳过来了 都要进行添加 用set集合保证brokerName的唯一性brokerNames.add(brokerName);boolean registerFirst = false;//这里是根据BrokerName进行获取Broker的数据BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {//如果为空的话 设置registerFirst为true 这个代表是第一次注册的registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}//下面的这些代码应该是设置如果brokerAddrMap中没有这个brokerId对应的地址那么就添加这个地址//有的话 就要进行判断地址是否正确,不正确的就要修改Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {log.debug("remove entry {} from brokerData", item);it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);if (MixAll.MASTER_ID == brokerId) {log.info("cluster [{}] brokerName [{}] master address change from {} to {}",brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);}registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//这里就是放入Broker中心跳信息//每次心跳请求过来之后都要进行重新创建一个BrokerLiveInfo对象 对上一次心跳的信息进行覆盖BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//下面的代码感觉跟心跳的关联关系不大if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}
3.NameSever如何进行故障感知
经过前面的代码我们可以看出来,每隔30s进行发送注册请求的时候,都会针对RouteInfoManager中的brokerLiveTable对上一次已经发送的注册请求进行覆盖。
我们紧接着看下NameServer是如何进行故障感知的,我们直接看NameServer中的initialize方法,可以直接找到这个代码:
his.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
这行代码的主要作用就,就是启动一个定时调度线程,每隔10s进行扫描一次RouteInfoManager的scanNotActiveBroker方法,我们接下来看下这个方法,
public int scanNotActiveBroker() {// 这块的方法主要是brokerLiveTable的集合中的所有元素//拿到broker最新一次的心跳时间 //broker的最新一次心跳时间+120s 小于 当前时间戳 //就把这个broker进行移除掉int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}
4.总结
我们还是用一张图来进行总结这篇博客的内容:
相关文章:

Broker如何进行定时心跳发送和故障感知
1.前言 此文章是在儒猿课程中的学习笔记,感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》,我本人觉得这个作者还是不错,都是从场景来进行分析,感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔…...

网络安全设备主要有什么
网络安全设备指的肯定是硬件设备了,国内卖安全硬件的没几家,天融信,启明星辰,绿盟,深信服,就这四家卖的比较齐全吧,上它们官网看一下,就知道市面上主要的网络安全设备有哪些了。分类…...
Android Framework WMS全面概述和知识要点
一、概述 定义与作用 在 Android 系统中,WindowManagerService(WMS)就像是一个大管家,负责管理整个系统的窗口界面。它是 Android Framework 的核心组件之一,处于 system_server 进程内,在 Framework 层占…...

记一次某红蓝演练经历
在某天接到任务,对xxx进行一次红蓝演练,于是把自己渗透过程给记录下来,漏洞关键地方也会打码,希望各位大佬理解,菜鸡一枚,勿喷/(ㄒoㄒ)/~~ 概述 拿到目标域名第一件事就是信息收集,曾经一位大…...

一个运行在浏览器中的开源Web操作系统Puter本地部署与远程访问
文章目录 前言1.关于Puter2.本地部署Puter3.Puter简单使用4. 安装内网穿透5.配置puter公网地址6. 配置固定公网地址 💡 推荐 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【点击跳转到网站…...
【零基础入门Go语言】struct 和 interface:Go语言是如何实现继承的?
提到面向对象编程中的继承,许多人脑海中可能会浮现出 Java、C 等语言中那一套熟悉的类继承体系。然而,Go 语言作为一门别具一格的编程语言,并没有遵循传统的继承模式。那么,在 Go 语言的世界里,它是怎样实现类似于继承…...

麦田物语学习笔记:实现拖拽物品交换数据和在地图上生成物品
基本流程 1.代码思路 (1)InventoryUI的PlayerSlots与PlayerBag里一一对应,所以想要实现交换数据实际上是,先拿到被拖拽的物体所对的Slot的序号和目标的Slot序号,然后将这两个序号对调一下 (2)物品交换的数据逻辑应该在InventoryManager里去调用,因为InventoryManager里管理了p…...

一些计算机零碎知识随写(25年1月)-1
我原以为世界上有技术的那批人不会那么闲,我错了,被脚本真实了。 今天正隔着画画呢,手机突然弹出几条安全告警通知。 急忙打开服务器,发现问题不简单,直接关服务器重装系统..... 首先,不要认为小网站&…...

Qt学习笔记第81到90讲
第81讲 串口调试助手实现自动发送 为这个名叫“定时发送”的QCheckBox编写槽函数。 想要做出定时发送的效果,必须引入QT框架下的毫秒级定时器QTimer,查阅手册了解详情。 在widget.h内添加新的私有成员变量: QTimer *timer; 在widget类的构造…...
Centos9 + Docker 安装 MySQL8.4.0 + 定时备份数据库到本地
Centos9 Docker 安装 MySQL8.4.0 定时备份数据库到本地 创建目录,创建配置文件启动容器命令定时备份MySQL执行脚本Linux每日定时任务命令文件内参数其他时间参数 AT一次性定时任务 创建目录,创建配置文件 $ mkdir -p /opt/mysql/conf$ vim /opt/mysql/…...

网络原理一>UDP协议详解
UDP和TCP都是应用层中的重要协议,如果做基础架构开发,会用得多一些。 这一篇我们先简单聊一下的UDP TCP格式呈现: 我们知道UDP是一种无连接,面向数据报,全双工,不可靠传输特性的网络协议。 基本格式如图…...

MySQL的小问题
编码问题 不管官方使用什么编码:latin1、gbk、utf8、utfmb4。统一使用utfmb4 MySQL中的utf8并不是utf-8,它省略了一个字节,只是用三个字节存储所有的符号,utfmb4才是utf-8 远程登录问题: MySQL官方默认没有启动远程…...

Mac——Docker desktop安装与使用教程
摘要 本文是一篇关于Mac系统下Docker Desktop安装与使用教程的博文。首先介绍连接WiFi网络,然后详细阐述了如何在Mac上安装Docker,包括下载地址以及不同芯片版本的选择。接着讲解了如何下载基础镜像和指定版本镜像,旨在帮助用户在Mac上高效使…...

FastApi Swagger 序列化问题
问题 错误现象: fastapi的 swagger 界面无法正常打开控制台报错:raise PydanticInvalidForJsonSchema(fCannot generate a JsonSchema for {error_info}) 详细报错: File "d:\Envs\miniconda3\envs\xdagent\lib\site-packages\pydan…...

《机器学习》——sklearn库中CountVectorizer方法(词频矩阵)
CountVectorizer方法介绍 CountVectorizer 是 scikit-learn 库中的一个工具,它主要用于将文本数据转换为词频矩阵,而不是传统意义上的词向量转换,但可以作为词向量转换的一种基础形式。用于将文本数据转换为词频矩阵,它是文本特征…...

UML系列之Rational Rose笔记三:活动图(泳道图)
一、新建活动图(泳道图) 依旧在用例视图里面,新建一个activity diagram;新建好之后,就可以绘制活动图了: 正常每个活动需要一个开始,点击黑点,然后在图中某个位置安放,接…...

Java面向对象面经总结
目录 面向对象基础 面向对象与面向过程的区别 创建一个对象用什么运算符,对象实体与对象引用的区别 对象相等和引用相等的区别 构造方法的特点,是否可被重写? 面向对象三大特征 封装 继承 多态 接口和抽象类的共同点和区别 深拷贝…...
红队工具使用全解析:揭开网络安全神秘面纱一角
红队工具使用全解析:揭开网络安全神秘面纱一角 B站红队公益课:https://space.bilibili.com/350329294 学习网盘资源链接:https://pan.quark.cn/s/4079487939e8 嘿,各位网络安全爱好者们!在风云变幻的网络安全战场上&am…...

OpenLinkSaas 2025年第一季度开发计划
OpenLinkSaas在2025的发展方向是强化基础设施和研发协作,弱化管理相关的功能。 为了根据参与到软件研发的整个流程,OpenLinkSaas会增加一系列的基础设施项目,并和OpenLinksaas进行深度整合。 目前计划中的基础设施: 链路追踪系统(OpenDragonF…...
【python小工具】怎么获取视频的关键帧频率?
使用 FFmpeg 提取 MP4 视频的关键帧并计算关键帧频率可以按以下步骤进行: 提取关键帧: 使用 FFmpeg 提取视频中的关键帧可以通过以下命令实现: ffmpeg -i input.mp4 -vf "selecteq(pict_type,I)" -vsync vfr keyframes_%03d.jpg…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...

【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...

【WiFi帧结构】
文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成:MAC头部frame bodyFCS,其中MAC是固定格式的,frame body是可变长度。 MAC头部有frame control,duration,address1,address2,addre…...
渲染学进阶内容——模型
最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...

Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...
作为测试我们应该关注redis哪些方面
1、功能测试 数据结构操作:验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化:测试aof和aof持久化机制,确保数据在开启后正确恢复。 事务:检查事务的原子性和回滚机制。 发布订阅:确保消息正确传递。 2、性…...