Spring Cloud Nacos源码讲解(七)- Nacos客户端服务订阅机制的核心流程
Nacos客户端服务订阅机制的核心流程
说起Nacos的服务订阅机制,大家会觉得比较难理解,那我们就来详细分析一下,那我们先从Nacos订阅的概述说起
Nacos订阅概述
Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理(更新实例,更改本地缓存)。
以下是订阅方法的主线流程,涉及内容比较多,细节比较复杂,所以这里我们主要学习核心部分。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0Coadaqu-1677029556577)(image-20211025182722346.png)]](https://img-blog.csdnimg.cn/80b7c53c238d4229aa95612e85173f81.png)
定时任务开启
其实订阅本质上就是服务发现的一种方式,也就是在服务发现的时候执行订阅方法,触发定时任务去拉取服务端的数据。
NacosNamingService中暴露的许多重载的subscribe,重载的目的就是让大家少写一些参数,这些参数呢,Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)throws NacosException {if (null == listener) {return;}String clusterString = StringUtils.join(clusters, ",");changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString);
}
这里我们先来看subscribe方法,大家可能有些眼熟它是clientProxy类型调用的方法,实际上就是NamingClientProxyDelegate.subscribe(),所以其实这里和之前的服务发现中调用的是一个方法,这里其实是在做服务列表的查询,所以得出结论查询和订阅都调用了同一个方法
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);// 定时调度UpdateTaskserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的ServiceInfoServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result) {// 如果为null,则进行订阅逻辑处理,基于gRPC协议result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}// ServiceInfo本地缓存处理serviceInfoHolder.processServiceInfo(result);return result;
}
但是这里我们要关注这里的任务调度,该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask,而其中的addTask的实现就是发起了一个定时任务:
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {if (futureMap.get(serviceKey) != null) {return;}//构建UpdateTaskScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}
定时任务延迟一秒执行:
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
所以在这里我们得出结论,核心为:调用订阅方法和发起定时任务。
定时任务执行内容
UpdateTask封装了订阅机制的核心业务逻辑,我们来看一下流程图:

当我们知道了整体流程以后,我们再来看对应源码:
@Override
public void run() {long delayTime = DEFAULT_DELAY;try {// 判断是服务是否订阅和未开启过定时任务,如果订阅过直接不在执行if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);return;}// 获取缓存的service信息ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);// 如果为空if (serviceObj == null) {// 根据serviceName从注册中心服务端获取Service信息serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime = serviceObj.getLastRefTime();return;}// 过期服务,服务的最新更新时间小于等于缓存刷新(最后一次拉取数据的时间)时间,从注册中心重新查询if (serviceObj.getLastRefTime() <= lastRefTime) {serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);}//刷新更新时间lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}// 下次更新缓存时间设置,默认6秒// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败数量为0(可能会出现失败情况,没有ServiceInfo,连接失败)resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);} finally {// 下次调度刷新时间,下次执行的时间与failCount有关,failCount=0,则下次调度时间为6秒,最长为1分钟// 即当无异常情况下缓存实例的刷新时间是6秒executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}
}
业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。
总结:
-
订阅方法的调用,并进行EventListener的注册,后面UpdateTask要用来进行判断;
-
通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;
-
通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;
-
UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。
-
重新计算定时任务时间,循环执行流程。
相关文章:
Spring Cloud Nacos源码讲解(七)- Nacos客户端服务订阅机制的核心流程
Nacos客户端服务订阅机制的核心流程 说起Nacos的服务订阅机制,大家会觉得比较难理解,那我们就来详细分析一下,那我们先从Nacos订阅的概述说起 Nacos订阅概述 Nacos的订阅机制,如果用一句话来描述就是:Nacos客…...
【华为OD机试模拟题】用 C++ 实现 - 对称美学(2023.Q1)
最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 获得完美走位(2023.Q1) 文章目录 最近更新的博客使用说明对称美学题目输入输出示例一输入输出说明示例二输入输出说明备注Code使用说明 参加华为od机试,一定要注意不要完全背诵代码...
Go语言内存管理详解-学习笔记
1 自动内存管理 1.1 相关概念 Mutator:业务线程,分配新对象,修改对象指向关系Collector:GC线程,找到存活对象,回收死亡对象的内存空间Serial GC:只有一个collector(需要暂停&#…...
Geospatial Data Science (4): Spatial weights
Geospatial Data Science (4): Spatial weights 在本节中,我们将学习空间分析中关键部分之一的来龙去脉:空间权重矩阵。这些是结构化的数字集,用于形式化数据集中观测值之间的地理关系。本质上,给定地理的空间权重矩阵是维度 N N N 乘以 N N N 的正定矩阵,其中...
JUC-Synchronized相关内容
设计同步器的意义多线程编程中,有可能会出现多个线程同时访问同一个共享、可变资源的情况,这个资源我们称之其为临界资源;这种资源可能是:对象、变量、文件等。共享:资源可以由多个线程同时访问可变:资源可…...
【c++】文件操作(文本文件、二进制文件)
文章目录文件操作文本文件写文件读文件二进制文件写文件读文件文件操作 程序运行时产生的数据都属于临时数据,程序一旦运行结束都会被释放; 通过文件可以将数据持久化; c中对文件操作需要包含头文件 文件类型分为两种: 1、文本文…...
带你了解IP报警柱的特点
IP可视报警柱是一款室外防水紧急求助可视对讲终端。安装在学校、广场、道路人流密集和案件高发区域,当发生紧急情况或需要咨询求助时按下呼叫按钮立即可与监控中心值班人员通话,值班人员也可通过前置摄像头了解现场情况并广播喊话。IP可视报警柱的使用特…...
一步步教你电脑变成服务器,tomcat的花生壳设置(原创)
1,首先你去https://console.oray.com/这网站注册个帐号,如果注册成功它会送你一个免费域名,当然不记得也没关系,你记住你注册的 帐号跟密码,然后下载它的软件(花生壳动态域名6.0正式版)有xp跟li…...
Python 卷积神经网络 ResNet的基本编写方法
ResNet(Residual Network)是由微软亚洲研究院提出的深度卷积神经网络,它在2015年的ImageNet挑战赛上取得了第一名的好成绩。ResNet最大的特点是使用了残差学习,可以解决深度网络退化问题。在传统的深度神经网络中,随着…...
【索引】什么是索引
📔 笔记介绍 大家好,千寻简笔记是一套全部开源的企业开发问题记录,毫无保留给个人及企业免费使用,我是作者星辰,笔记内容整理并发布,内容有误请指出,笔记源码已开源,前往Gitee搜索《…...
【算法刷题】动态规划算法题型及方法归纳
动态规划特点 动态规划中每一个状态一定是由上一个状态推导出来,根据这个特点,可以在状态计算过程中,存储某一条件下的数据,当再次遍历该条件时,直接取该条件对应的数据即可,可以避免重复计算,…...
PolarDB数据库的CSN机制
背景 对postgres数据库熟悉的同学会发现在高并发场景下在获取快照处易出现性能瓶颈,其原因在于PG使用全局数组在共享内存中保存所有事务的状态,在获取快照时需要加锁以保证数据一致性。获取快照时需要持有ProcArraryLock共享锁比遍历ProcArray数组中活跃…...
使用kubeadm 部署kubernetes 1.26.1集群 Calico ToR配置
目录 机器信息 升级内核 系统配置 部署容器运行时Containerd 安装crictl客户端命令 配置服务器支持开启ipvs的前提条件 安装 kubeadm、kubelet 和 kubectl 初始化集群 (master) 安装CNI Calico 集群加入node节点 机器信息 主机名集群角色IP内…...
Servlet笔记(11):Servletcontext对象
1、什么是ServletContext ServletContext是一个全局储存空间,随服务器的生命周期变化, Cookie,Session,ServletContext的区别 Cookie: 存在于客户端的本地文本文件 Session: 存在于服务器的文本文件&#…...
EM算法是什么
EM算法是什么 EM算法(Expectation-Maximization Algorithm)是一种用于参数估计的迭代算法。它常被用于含有隐变量(latent variable)的概率模型中,例如高斯混合模型、隐马尔可夫模型等。 EM算法分为两个步骤ÿ…...
C++---线性dp---方格取数(每日一道算法2023.2.25)
注意事项: 本题属于"数字三角形"和"摘花生"两题的进阶版,建议优先看懂那两道,有助理解。 题目: 输入: 8 2 3 13 2 6 6 3 5 7 4 4 14 5 2 21 5 6 4 6 3 15 7 2 14 0 0 0输出: 67#include <cm…...
《第一行代码》 第八章:应用手机多媒体
一,使用通知 第一步,创建项目,书写布局 <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"android:orientation"vertical"android:layout_width"match_parent"android:layout_he…...
C++设计模式(20)——迭代器模式
亦称: Iterator 意图 迭代器模式是一种行为设计模式, 让你能在不暴露集合底层表现形式 (列表、 栈和树等) 的情况下遍历集合中所有的元素。 问题 集合是编程中最常使用的数据类型之一。 尽管如此, 集合只是一组对…...
戴尔Latitude 3410电脑 Hackintosh 黑苹果efi引导文件
原文来源于黑果魏叔官网,转载需注明出处。硬件型号驱动情况主板戴尔Latitude 3410处理器英特尔酷睿i7-10510U已驱动内存8GB已驱动硬盘SK hynix BC511 NVMe SSD已驱动显卡Intel UHD 620Nvidia GeForce MX230(屏蔽)无法驱动声卡Realtek ALC236已驱动网卡Realtek RTL81…...
一起Talk Android吧(第五百零四回:如何调整组件在约束布局中的位置)
文章目录 背景介绍调整方法一调整方法二经验分享各位看官们大家好,上一回中咱们说的例子是"解决retrofit被混淆后代码出错的问题",这一回中咱们说的例子是" 如何调整组件在约束布局中的位置"。闲话休提,言归正转, 让我们一起Talk Android吧! 背景介绍…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...
(十)学生端搭建
本次旨在将之前的已完成的部分功能进行拼装到学生端,同时完善学生端的构建。本次工作主要包括: 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...
【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...
CMake控制VS2022项目文件分组
我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...
GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
tomcat指定使用的jdk版本
说明 有时候需要对tomcat配置指定的jdk版本号,此时,我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...
