当前位置: 首页 > news >正文

Broker如何进行定时心跳发送和故障感知

1.前言

此文章是在儒猿课程中的学习笔记,感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》,我本人觉得这个作者还是不错,都是从场景来进行分析,感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔记。想看原版的,还是去儒猿课堂。

这篇文章紧挨着上一篇文章NameServer接收Broker的注册请求处理-CSDN博客

前面我们已经看到了NameServer处理Broker的注册请求的源码流程,NameServer核心就是基于Netty服务器来接收Broker注册请求,然后交给DefaultRequestProcessor这个请求处理组件,来注册Broker注册请求。Broker的注册逻辑是放在RouteInfoManager这个路由数据管理组件实现的。最终这些路由数据都会存放到RouteInfoManager内部的一些Map数据结构组成的路由数据表中。

2.Broker 如何进行定时发送心跳

那么Broker是如何定时发送心跳到NameServer,让NameServer感知到Broker一直存活,这个Broker中的发送注册请求给NameServer,这个代码的入口还是在BrokerController中

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

这块代码,就是启动了一个定时调度的任务,默认是每30s就会执行Broker的注册请求。

我们紧接着看下RouteInfoManager的注册方法的代码逻辑:

   public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {//这里有一个加锁的操作this.lock.writeLock().lockInterruptibly();//从clusterAddrTable中获取clusterName对应的BrokeName的set集合//如果对应的clusterName中没有set集合那么就新建一个set集合Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());//直接将brokerName添加到set集合中//每次心跳过来了 都要进行添加 用set集合保证brokerName的唯一性brokerNames.add(brokerName);boolean registerFirst = false;//这里是根据BrokerName进行获取Broker的数据BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {//如果为空的话 设置registerFirst为true 这个代表是第一次注册的registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}//下面的这些代码应该是设置如果brokerAddrMap中没有这个brokerId对应的地址那么就添加这个地址//有的话 就要进行判断地址是否正确,不正确的就要修改Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {log.debug("remove entry {} from brokerData", item);it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);if (MixAll.MASTER_ID == brokerId) {log.info("cluster [{}] brokerName [{}] master address change from {} to {}",brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);}registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//这里就是放入Broker中心跳信息//每次心跳请求过来之后都要进行重新创建一个BrokerLiveInfo对象 对上一次心跳的信息进行覆盖BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//下面的代码感觉跟心跳的关联关系不大if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}

3.NameSever如何进行故障感知

经过前面的代码我们可以看出来,每隔30s进行发送注册请求的时候,都会针对RouteInfoManager中的brokerLiveTable对上一次已经发送的注册请求进行覆盖。

我们紧接着看下NameServer是如何进行故障感知的,我们直接看NameServer中的initialize方法,可以直接找到这个代码:

his.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);

这行代码的主要作用就,就是启动一个定时调度线程,每隔10s进行扫描一次RouteInfoManager的scanNotActiveBroker方法,我们接下来看下这个方法,

    public int scanNotActiveBroker() {// 这块的方法主要是brokerLiveTable的集合中的所有元素//拿到broker最新一次的心跳时间 //broker的最新一次心跳时间+120s 小于 当前时间戳 //就把这个broker进行移除掉int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}

4.总结

我们还是用一张图来进行总结这篇博客的内容:

相关文章:

Broker如何进行定时心跳发送和故障感知

1.前言 此文章是在儒猿课程中的学习笔记&#xff0c;感兴趣的想看原来的课程可以去咨询儒猿课堂《从0开始带你成为RocketMQ高手》&#xff0c;我本人觉得这个作者还是不错&#xff0c;都是从场景来进行分析&#xff0c;感觉还是挺适合我这种小白的。这块主要都是我自己的学习笔…...

网络安全设备主要有什么

网络安全设备指的肯定是硬件设备了&#xff0c;国内卖安全硬件的没几家&#xff0c;天融信&#xff0c;启明星辰&#xff0c;绿盟&#xff0c;深信服&#xff0c;就这四家卖的比较齐全吧&#xff0c;上它们官网看一下&#xff0c;就知道市面上主要的网络安全设备有哪些了。分类…...

Android Framework WMS全面概述和知识要点

一、概述 定义与作用 在 Android 系统中&#xff0c;WindowManagerService&#xff08;WMS&#xff09;就像是一个大管家&#xff0c;负责管理整个系统的窗口界面。它是 Android Framework 的核心组件之一&#xff0c;处于 system_server 进程内&#xff0c;在 Framework 层占…...

记一次某红蓝演练经历

在某天接到任务&#xff0c;对xxx进行一次红蓝演练&#xff0c;于是把自己渗透过程给记录下来&#xff0c;漏洞关键地方也会打码&#xff0c;希望各位大佬理解&#xff0c;菜鸡一枚&#xff0c;勿喷/(ㄒoㄒ)/~~ 概述 拿到目标域名第一件事就是信息收集&#xff0c;曾经一位大…...

一个运行在浏览器中的开源Web操作系统Puter本地部署与远程访问

文章目录 前言1.关于Puter2.本地部署Puter3.Puter简单使用4. 安装内网穿透5.配置puter公网地址6. 配置固定公网地址 &#x1f4a1; 推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击跳转到网站…...

【零基础入门Go语言】struct 和 interface:Go语言是如何实现继承的?

提到面向对象编程中的继承&#xff0c;许多人脑海中可能会浮现出 Java、C 等语言中那一套熟悉的类继承体系。然而&#xff0c;Go 语言作为一门别具一格的编程语言&#xff0c;并没有遵循传统的继承模式。那么&#xff0c;在 Go 语言的世界里&#xff0c;它是怎样实现类似于继承…...

麦田物语学习笔记:实现拖拽物品交换数据和在地图上生成物品

基本流程 1.代码思路 (1)InventoryUI的PlayerSlots与PlayerBag里一一对应,所以想要实现交换数据实际上是,先拿到被拖拽的物体所对的Slot的序号和目标的Slot序号,然后将这两个序号对调一下 (2)物品交换的数据逻辑应该在InventoryManager里去调用,因为InventoryManager里管理了p…...

一些计算机零碎知识随写(25年1月)-1

我原以为世界上有技术的那批人不会那么闲&#xff0c;我错了&#xff0c;被脚本真实了。 今天正隔着画画呢&#xff0c;手机突然弹出几条安全告警通知。 急忙打开服务器&#xff0c;发现问题不简单&#xff0c;直接关服务器重装系统..... 首先&#xff0c;不要认为小网站&…...

Qt学习笔记第81到90讲

第81讲 串口调试助手实现自动发送 为这个名叫“定时发送”的QCheckBox编写槽函数。 想要做出定时发送的效果&#xff0c;必须引入QT框架下的毫秒级定时器QTimer&#xff0c;查阅手册了解详情。 在widget.h内添加新的私有成员变量&#xff1a; QTimer *timer; 在widget类的构造…...

Centos9 + Docker 安装 MySQL8.4.0 + 定时备份数据库到本地

Centos9 Docker 安装 MySQL8.4.0 定时备份数据库到本地 创建目录&#xff0c;创建配置文件启动容器命令定时备份MySQL执行脚本Linux每日定时任务命令文件内参数其他时间参数 AT一次性定时任务 创建目录&#xff0c;创建配置文件 $ mkdir -p /opt/mysql/conf$ vim /opt/mysql/…...

网络原理一>UDP协议详解

UDP和TCP都是应用层中的重要协议&#xff0c;如果做基础架构开发&#xff0c;会用得多一些。 这一篇我们先简单聊一下的UDP TCP格式呈现&#xff1a; 我们知道UDP是一种无连接&#xff0c;面向数据报&#xff0c;全双工&#xff0c;不可靠传输特性的网络协议。 基本格式如图…...

MySQL的小问题

编码问题 不管官方使用什么编码&#xff1a;latin1、gbk、utf8、utfmb4。统一使用utfmb4 MySQL中的utf8并不是utf-8&#xff0c;它省略了一个字节&#xff0c;只是用三个字节存储所有的符号&#xff0c;utfmb4才是utf-8 远程登录问题&#xff1a; MySQL官方默认没有启动远程…...

Mac——Docker desktop安装与使用教程

摘要 本文是一篇关于Mac系统下Docker Desktop安装与使用教程的博文。首先介绍连接WiFi网络&#xff0c;然后详细阐述了如何在Mac上安装Docker&#xff0c;包括下载地址以及不同芯片版本的选择。接着讲解了如何下载基础镜像和指定版本镜像&#xff0c;旨在帮助用户在Mac上高效使…...

FastApi Swagger 序列化问题

问题 错误现象&#xff1a; fastapi的 swagger 界面无法正常打开控制台报错&#xff1a;raise PydanticInvalidForJsonSchema(fCannot generate a JsonSchema for {error_info}) 详细报错&#xff1a; File "d:\Envs\miniconda3\envs\xdagent\lib\site-packages\pydan…...

《机器学习》——sklearn库中CountVectorizer方法(词频矩阵)

CountVectorizer方法介绍 CountVectorizer 是 scikit-learn 库中的一个工具&#xff0c;它主要用于将文本数据转换为词频矩阵&#xff0c;而不是传统意义上的词向量转换&#xff0c;但可以作为词向量转换的一种基础形式。用于将文本数据转换为词频矩阵&#xff0c;它是文本特征…...

UML系列之Rational Rose笔记三:活动图(泳道图)

一、新建活动图&#xff08;泳道图&#xff09; 依旧在用例视图里面&#xff0c;新建一个activity diagram&#xff1b;新建好之后&#xff0c;就可以绘制活动图了&#xff1a; 正常每个活动需要一个开始&#xff0c;点击黑点&#xff0c;然后在图中某个位置安放&#xff0c;接…...

Java面向对象面经总结

目录 面向对象基础 面向对象与面向过程的区别 创建一个对象用什么运算符&#xff0c;对象实体与对象引用的区别 对象相等和引用相等的区别 构造方法的特点&#xff0c;是否可被重写&#xff1f; 面向对象三大特征 封装 继承 多态 接口和抽象类的共同点和区别 深拷贝…...

红队工具使用全解析:揭开网络安全神秘面纱一角

红队工具使用全解析&#xff1a;揭开网络安全神秘面纱一角 B站红队公益课&#xff1a;https://space.bilibili.com/350329294 学习网盘资源链接&#xff1a;https://pan.quark.cn/s/4079487939e8 嘿&#xff0c;各位网络安全爱好者们&#xff01;在风云变幻的网络安全战场上&am…...

OpenLinkSaas 2025年第一季度开发计划

OpenLinkSaas在2025的发展方向是强化基础设施和研发协作&#xff0c;弱化管理相关的功能。 为了根据参与到软件研发的整个流程&#xff0c;OpenLinkSaas会增加一系列的基础设施项目&#xff0c;并和OpenLinksaas进行深度整合。 目前计划中的基础设施: 链路追踪系统(OpenDragonF…...

【python小工具】怎么获取视频的关键帧频率?

使用 FFmpeg 提取 MP4 视频的关键帧并计算关键帧频率可以按以下步骤进行&#xff1a; 提取关键帧&#xff1a; 使用 FFmpeg 提取视频中的关键帧可以通过以下命令实现&#xff1a; ffmpeg -i input.mp4 -vf "selecteq(pict_type,I)" -vsync vfr keyframes_%03d.jpg…...

国防科技大学计算机基础课程笔记02信息编码

1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制&#xff0c;因此这个了16进制的数据既可以翻译成为这个机器码&#xff0c;也可以翻译成为这个国标码&#xff0c;所以这个时候很容易会出现这个歧义的情况&#xff1b; 因此&#xff0c;我们的这个国…...

C++_核心编程_多态案例二-制作饮品

#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为&#xff1a;煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例&#xff0c;提供抽象制作饮品基类&#xff0c;提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

C++.OpenGL (10/64)基础光照(Basic Lighting)

基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...