Broker如何进行定时心跳发送和故障感知
1.前言
此文章是在儒猿课程中的学习笔记,感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》,我本人觉得这个作者还是不错,都是从场景来进行分析,感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔记。想看原版的,还是去儒猿课堂。
这篇文章紧挨着上一篇文章NameServer接收Broker的注册请求处理-CSDN博客
前面我们已经看到了NameServer处理Broker的注册请求的源码流程,NameServer核心就是基于Netty服务器来接收Broker注册请求,然后交给DefaultRequestProcessor这个请求处理组件,来注册Broker注册请求。Broker的注册逻辑是放在RouteInfoManager这个路由数据管理组件实现的。最终这些路由数据都会存放到RouteInfoManager内部的一些Map数据结构组成的路由数据表中。
2.Broker 如何进行定时发送心跳
那么Broker是如何定时发送心跳到NameServer,让NameServer感知到Broker一直存活,这个Broker中的发送注册请求给NameServer,这个代码的入口还是在BrokerController中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
这块代码,就是启动了一个定时调度的任务,默认是每30s就会执行Broker的注册请求。
我们紧接着看下RouteInfoManager的注册方法的代码逻辑:
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {//这里有一个加锁的操作this.lock.writeLock().lockInterruptibly();//从clusterAddrTable中获取clusterName对应的BrokeName的set集合//如果对应的clusterName中没有set集合那么就新建一个set集合Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());//直接将brokerName添加到set集合中//每次心跳过来了 都要进行添加 用set集合保证brokerName的唯一性brokerNames.add(brokerName);boolean registerFirst = false;//这里是根据BrokerName进行获取Broker的数据BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {//如果为空的话 设置registerFirst为true 这个代表是第一次注册的registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}//下面的这些代码应该是设置如果brokerAddrMap中没有这个brokerId对应的地址那么就添加这个地址//有的话 就要进行判断地址是否正确,不正确的就要修改Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {log.debug("remove entry {} from brokerData", item);it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);if (MixAll.MASTER_ID == brokerId) {log.info("cluster [{}] brokerName [{}] master address change from {} to {}",brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);}registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//这里就是放入Broker中心跳信息//每次心跳请求过来之后都要进行重新创建一个BrokerLiveInfo对象 对上一次心跳的信息进行覆盖BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//下面的代码感觉跟心跳的关联关系不大if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}
3.NameSever如何进行故障感知
经过前面的代码我们可以看出来,每隔30s进行发送注册请求的时候,都会针对RouteInfoManager中的brokerLiveTable对上一次已经发送的注册请求进行覆盖。
我们紧接着看下NameServer是如何进行故障感知的,我们直接看NameServer中的initialize方法,可以直接找到这个代码:
his.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
这行代码的主要作用就,就是启动一个定时调度线程,每隔10s进行扫描一次RouteInfoManager的scanNotActiveBroker方法,我们接下来看下这个方法,
public int scanNotActiveBroker() {// 这块的方法主要是brokerLiveTable的集合中的所有元素//拿到broker最新一次的心跳时间 //broker的最新一次心跳时间+120s 小于 当前时间戳 //就把这个broker进行移除掉int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}
4.总结
我们还是用一张图来进行总结这篇博客的内容:

相关文章:
Broker如何进行定时心跳发送和故障感知
1.前言 此文章是在儒猿课程中的学习笔记,感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》,我本人觉得这个作者还是不错,都是从场景来进行分析,感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔…...
网络安全设备主要有什么
网络安全设备指的肯定是硬件设备了,国内卖安全硬件的没几家,天融信,启明星辰,绿盟,深信服,就这四家卖的比较齐全吧,上它们官网看一下,就知道市面上主要的网络安全设备有哪些了。分类…...
Android Framework WMS全面概述和知识要点
一、概述 定义与作用 在 Android 系统中,WindowManagerService(WMS)就像是一个大管家,负责管理整个系统的窗口界面。它是 Android Framework 的核心组件之一,处于 system_server 进程内,在 Framework 层占…...
记一次某红蓝演练经历
在某天接到任务,对xxx进行一次红蓝演练,于是把自己渗透过程给记录下来,漏洞关键地方也会打码,希望各位大佬理解,菜鸡一枚,勿喷/(ㄒoㄒ)/~~ 概述 拿到目标域名第一件事就是信息收集,曾经一位大…...
一个运行在浏览器中的开源Web操作系统Puter本地部署与远程访问
文章目录 前言1.关于Puter2.本地部署Puter3.Puter简单使用4. 安装内网穿透5.配置puter公网地址6. 配置固定公网地址 💡 推荐 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【点击跳转到网站…...
【零基础入门Go语言】struct 和 interface:Go语言是如何实现继承的?
提到面向对象编程中的继承,许多人脑海中可能会浮现出 Java、C 等语言中那一套熟悉的类继承体系。然而,Go 语言作为一门别具一格的编程语言,并没有遵循传统的继承模式。那么,在 Go 语言的世界里,它是怎样实现类似于继承…...
麦田物语学习笔记:实现拖拽物品交换数据和在地图上生成物品
基本流程 1.代码思路 (1)InventoryUI的PlayerSlots与PlayerBag里一一对应,所以想要实现交换数据实际上是,先拿到被拖拽的物体所对的Slot的序号和目标的Slot序号,然后将这两个序号对调一下 (2)物品交换的数据逻辑应该在InventoryManager里去调用,因为InventoryManager里管理了p…...
一些计算机零碎知识随写(25年1月)-1
我原以为世界上有技术的那批人不会那么闲,我错了,被脚本真实了。 今天正隔着画画呢,手机突然弹出几条安全告警通知。 急忙打开服务器,发现问题不简单,直接关服务器重装系统..... 首先,不要认为小网站&…...
Qt学习笔记第81到90讲
第81讲 串口调试助手实现自动发送 为这个名叫“定时发送”的QCheckBox编写槽函数。 想要做出定时发送的效果,必须引入QT框架下的毫秒级定时器QTimer,查阅手册了解详情。 在widget.h内添加新的私有成员变量: QTimer *timer; 在widget类的构造…...
Centos9 + Docker 安装 MySQL8.4.0 + 定时备份数据库到本地
Centos9 Docker 安装 MySQL8.4.0 定时备份数据库到本地 创建目录,创建配置文件启动容器命令定时备份MySQL执行脚本Linux每日定时任务命令文件内参数其他时间参数 AT一次性定时任务 创建目录,创建配置文件 $ mkdir -p /opt/mysql/conf$ vim /opt/mysql/…...
网络原理一>UDP协议详解
UDP和TCP都是应用层中的重要协议,如果做基础架构开发,会用得多一些。 这一篇我们先简单聊一下的UDP TCP格式呈现: 我们知道UDP是一种无连接,面向数据报,全双工,不可靠传输特性的网络协议。 基本格式如图…...
MySQL的小问题
编码问题 不管官方使用什么编码:latin1、gbk、utf8、utfmb4。统一使用utfmb4 MySQL中的utf8并不是utf-8,它省略了一个字节,只是用三个字节存储所有的符号,utfmb4才是utf-8 远程登录问题: MySQL官方默认没有启动远程…...
Mac——Docker desktop安装与使用教程
摘要 本文是一篇关于Mac系统下Docker Desktop安装与使用教程的博文。首先介绍连接WiFi网络,然后详细阐述了如何在Mac上安装Docker,包括下载地址以及不同芯片版本的选择。接着讲解了如何下载基础镜像和指定版本镜像,旨在帮助用户在Mac上高效使…...
FastApi Swagger 序列化问题
问题 错误现象: fastapi的 swagger 界面无法正常打开控制台报错:raise PydanticInvalidForJsonSchema(fCannot generate a JsonSchema for {error_info}) 详细报错: File "d:\Envs\miniconda3\envs\xdagent\lib\site-packages\pydan…...
《机器学习》——sklearn库中CountVectorizer方法(词频矩阵)
CountVectorizer方法介绍 CountVectorizer 是 scikit-learn 库中的一个工具,它主要用于将文本数据转换为词频矩阵,而不是传统意义上的词向量转换,但可以作为词向量转换的一种基础形式。用于将文本数据转换为词频矩阵,它是文本特征…...
UML系列之Rational Rose笔记三:活动图(泳道图)
一、新建活动图(泳道图) 依旧在用例视图里面,新建一个activity diagram;新建好之后,就可以绘制活动图了: 正常每个活动需要一个开始,点击黑点,然后在图中某个位置安放,接…...
Java面向对象面经总结
目录 面向对象基础 面向对象与面向过程的区别 创建一个对象用什么运算符,对象实体与对象引用的区别 对象相等和引用相等的区别 构造方法的特点,是否可被重写? 面向对象三大特征 封装 继承 多态 接口和抽象类的共同点和区别 深拷贝…...
红队工具使用全解析:揭开网络安全神秘面纱一角
红队工具使用全解析:揭开网络安全神秘面纱一角 B站红队公益课:https://space.bilibili.com/350329294 学习网盘资源链接:https://pan.quark.cn/s/4079487939e8 嘿,各位网络安全爱好者们!在风云变幻的网络安全战场上&am…...
OpenLinkSaas 2025年第一季度开发计划
OpenLinkSaas在2025的发展方向是强化基础设施和研发协作,弱化管理相关的功能。 为了根据参与到软件研发的整个流程,OpenLinkSaas会增加一系列的基础设施项目,并和OpenLinksaas进行深度整合。 目前计划中的基础设施: 链路追踪系统(OpenDragonF…...
【python小工具】怎么获取视频的关键帧频率?
使用 FFmpeg 提取 MP4 视频的关键帧并计算关键帧频率可以按以下步骤进行: 提取关键帧: 使用 FFmpeg 提取视频中的关键帧可以通过以下命令实现: ffmpeg -i input.mp4 -vf "selecteq(pict_type,I)" -vsync vfr keyframes_%03d.jpg…...
调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
【第二十一章 SDIO接口(SDIO)】
第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...
PL0语法,分析器实现!
简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
【堆垛策略】设计方法
堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下…...
