Broker如何进行定时心跳发送和故障感知
1.前言
此文章是在儒猿课程中的学习笔记,感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》,我本人觉得这个作者还是不错,都是从场景来进行分析,感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔记。想看原版的,还是去儒猿课堂。
这篇文章紧挨着上一篇文章NameServer接收Broker的注册请求处理-CSDN博客
前面我们已经看到了NameServer处理Broker的注册请求的源码流程,NameServer核心就是基于Netty服务器来接收Broker注册请求,然后交给DefaultRequestProcessor这个请求处理组件,来注册Broker注册请求。Broker的注册逻辑是放在RouteInfoManager这个路由数据管理组件实现的。最终这些路由数据都会存放到RouteInfoManager内部的一些Map数据结构组成的路由数据表中。
2.Broker 如何进行定时发送心跳
那么Broker是如何定时发送心跳到NameServer,让NameServer感知到Broker一直存活,这个Broker中的发送注册请求给NameServer,这个代码的入口还是在BrokerController中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
这块代码,就是启动了一个定时调度的任务,默认是每30s就会执行Broker的注册请求。
我们紧接着看下RouteInfoManager的注册方法的代码逻辑:
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {//这里有一个加锁的操作this.lock.writeLock().lockInterruptibly();//从clusterAddrTable中获取clusterName对应的BrokeName的set集合//如果对应的clusterName中没有set集合那么就新建一个set集合Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());//直接将brokerName添加到set集合中//每次心跳过来了 都要进行添加 用set集合保证brokerName的唯一性brokerNames.add(brokerName);boolean registerFirst = false;//这里是根据BrokerName进行获取Broker的数据BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {//如果为空的话 设置registerFirst为true 这个代表是第一次注册的registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}//下面的这些代码应该是设置如果brokerAddrMap中没有这个brokerId对应的地址那么就添加这个地址//有的话 就要进行判断地址是否正确,不正确的就要修改Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {log.debug("remove entry {} from brokerData", item);it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);if (MixAll.MASTER_ID == brokerId) {log.info("cluster [{}] brokerName [{}] master address change from {} to {}",brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);}registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//这里就是放入Broker中心跳信息//每次心跳请求过来之后都要进行重新创建一个BrokerLiveInfo对象 对上一次心跳的信息进行覆盖BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//下面的代码感觉跟心跳的关联关系不大if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}
3.NameSever如何进行故障感知
经过前面的代码我们可以看出来,每隔30s进行发送注册请求的时候,都会针对RouteInfoManager中的brokerLiveTable对上一次已经发送的注册请求进行覆盖。
我们紧接着看下NameServer是如何进行故障感知的,我们直接看NameServer中的initialize方法,可以直接找到这个代码:
his.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
这行代码的主要作用就,就是启动一个定时调度线程,每隔10s进行扫描一次RouteInfoManager的scanNotActiveBroker方法,我们接下来看下这个方法,
public int scanNotActiveBroker() {// 这块的方法主要是brokerLiveTable的集合中的所有元素//拿到broker最新一次的心跳时间 //broker的最新一次心跳时间+120s 小于 当前时间戳 //就把这个broker进行移除掉int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}
4.总结
我们还是用一张图来进行总结这篇博客的内容:

相关文章:
Broker如何进行定时心跳发送和故障感知
1.前言 此文章是在儒猿课程中的学习笔记,感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》,我本人觉得这个作者还是不错,都是从场景来进行分析,感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔…...
网络安全设备主要有什么
网络安全设备指的肯定是硬件设备了,国内卖安全硬件的没几家,天融信,启明星辰,绿盟,深信服,就这四家卖的比较齐全吧,上它们官网看一下,就知道市面上主要的网络安全设备有哪些了。分类…...
Android Framework WMS全面概述和知识要点
一、概述 定义与作用 在 Android 系统中,WindowManagerService(WMS)就像是一个大管家,负责管理整个系统的窗口界面。它是 Android Framework 的核心组件之一,处于 system_server 进程内,在 Framework 层占…...
记一次某红蓝演练经历
在某天接到任务,对xxx进行一次红蓝演练,于是把自己渗透过程给记录下来,漏洞关键地方也会打码,希望各位大佬理解,菜鸡一枚,勿喷/(ㄒoㄒ)/~~ 概述 拿到目标域名第一件事就是信息收集,曾经一位大…...
一个运行在浏览器中的开源Web操作系统Puter本地部署与远程访问
文章目录 前言1.关于Puter2.本地部署Puter3.Puter简单使用4. 安装内网穿透5.配置puter公网地址6. 配置固定公网地址 💡 推荐 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【点击跳转到网站…...
【零基础入门Go语言】struct 和 interface:Go语言是如何实现继承的?
提到面向对象编程中的继承,许多人脑海中可能会浮现出 Java、C 等语言中那一套熟悉的类继承体系。然而,Go 语言作为一门别具一格的编程语言,并没有遵循传统的继承模式。那么,在 Go 语言的世界里,它是怎样实现类似于继承…...
麦田物语学习笔记:实现拖拽物品交换数据和在地图上生成物品
基本流程 1.代码思路 (1)InventoryUI的PlayerSlots与PlayerBag里一一对应,所以想要实现交换数据实际上是,先拿到被拖拽的物体所对的Slot的序号和目标的Slot序号,然后将这两个序号对调一下 (2)物品交换的数据逻辑应该在InventoryManager里去调用,因为InventoryManager里管理了p…...
一些计算机零碎知识随写(25年1月)-1
我原以为世界上有技术的那批人不会那么闲,我错了,被脚本真实了。 今天正隔着画画呢,手机突然弹出几条安全告警通知。 急忙打开服务器,发现问题不简单,直接关服务器重装系统..... 首先,不要认为小网站&…...
Qt学习笔记第81到90讲
第81讲 串口调试助手实现自动发送 为这个名叫“定时发送”的QCheckBox编写槽函数。 想要做出定时发送的效果,必须引入QT框架下的毫秒级定时器QTimer,查阅手册了解详情。 在widget.h内添加新的私有成员变量: QTimer *timer; 在widget类的构造…...
Centos9 + Docker 安装 MySQL8.4.0 + 定时备份数据库到本地
Centos9 Docker 安装 MySQL8.4.0 定时备份数据库到本地 创建目录,创建配置文件启动容器命令定时备份MySQL执行脚本Linux每日定时任务命令文件内参数其他时间参数 AT一次性定时任务 创建目录,创建配置文件 $ mkdir -p /opt/mysql/conf$ vim /opt/mysql/…...
网络原理一>UDP协议详解
UDP和TCP都是应用层中的重要协议,如果做基础架构开发,会用得多一些。 这一篇我们先简单聊一下的UDP TCP格式呈现: 我们知道UDP是一种无连接,面向数据报,全双工,不可靠传输特性的网络协议。 基本格式如图…...
MySQL的小问题
编码问题 不管官方使用什么编码:latin1、gbk、utf8、utfmb4。统一使用utfmb4 MySQL中的utf8并不是utf-8,它省略了一个字节,只是用三个字节存储所有的符号,utfmb4才是utf-8 远程登录问题: MySQL官方默认没有启动远程…...
Mac——Docker desktop安装与使用教程
摘要 本文是一篇关于Mac系统下Docker Desktop安装与使用教程的博文。首先介绍连接WiFi网络,然后详细阐述了如何在Mac上安装Docker,包括下载地址以及不同芯片版本的选择。接着讲解了如何下载基础镜像和指定版本镜像,旨在帮助用户在Mac上高效使…...
FastApi Swagger 序列化问题
问题 错误现象: fastapi的 swagger 界面无法正常打开控制台报错:raise PydanticInvalidForJsonSchema(fCannot generate a JsonSchema for {error_info}) 详细报错: File "d:\Envs\miniconda3\envs\xdagent\lib\site-packages\pydan…...
《机器学习》——sklearn库中CountVectorizer方法(词频矩阵)
CountVectorizer方法介绍 CountVectorizer 是 scikit-learn 库中的一个工具,它主要用于将文本数据转换为词频矩阵,而不是传统意义上的词向量转换,但可以作为词向量转换的一种基础形式。用于将文本数据转换为词频矩阵,它是文本特征…...
UML系列之Rational Rose笔记三:活动图(泳道图)
一、新建活动图(泳道图) 依旧在用例视图里面,新建一个activity diagram;新建好之后,就可以绘制活动图了: 正常每个活动需要一个开始,点击黑点,然后在图中某个位置安放,接…...
Java面向对象面经总结
目录 面向对象基础 面向对象与面向过程的区别 创建一个对象用什么运算符,对象实体与对象引用的区别 对象相等和引用相等的区别 构造方法的特点,是否可被重写? 面向对象三大特征 封装 继承 多态 接口和抽象类的共同点和区别 深拷贝…...
红队工具使用全解析:揭开网络安全神秘面纱一角
红队工具使用全解析:揭开网络安全神秘面纱一角 B站红队公益课:https://space.bilibili.com/350329294 学习网盘资源链接:https://pan.quark.cn/s/4079487939e8 嘿,各位网络安全爱好者们!在风云变幻的网络安全战场上&am…...
OpenLinkSaas 2025年第一季度开发计划
OpenLinkSaas在2025的发展方向是强化基础设施和研发协作,弱化管理相关的功能。 为了根据参与到软件研发的整个流程,OpenLinkSaas会增加一系列的基础设施项目,并和OpenLinksaas进行深度整合。 目前计划中的基础设施: 链路追踪系统(OpenDragonF…...
【python小工具】怎么获取视频的关键帧频率?
使用 FFmpeg 提取 MP4 视频的关键帧并计算关键帧频率可以按以下步骤进行: 提取关键帧: 使用 FFmpeg 提取视频中的关键帧可以通过以下命令实现: ffmpeg -i input.mp4 -vf "selecteq(pict_type,I)" -vsync vfr keyframes_%03d.jpg…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
C++.OpenGL (10/64)基础光照(Basic Lighting)
基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...
今日科技热点速览
🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
