当前位置: 首页 > news >正文

19 客户端服务订阅机制的核心流程

Nacos客户端服务订阅机制的核心流程

说起Nacos的服务订阅机制,大家会觉得比较难理解,那我们就来详细分析一下,那我们先从Nacos订阅的概述说起

Nacos订阅概述

Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理(更新实例,更改本地缓存)。
在这里插入图片描述

定时任务开启

其实订阅本质上就是服务发现的一种方式,也就是在服务发现的时候执行订阅方法,触发定时任务去拉取服务端的数据。

​ NacosNamingService中暴露的许多重载的subscribe,重载的目的就是让大家少写一些参数,这些参数呢,Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)throws NacosException {if (null == listener) {return;}String clusterString = StringUtils.join(clusters, ",");changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString);
}

这里我们先来看subscribe方法,大家可能有些眼熟它是clientProxy类型调用的方法,实际上就是NamingClientProxyDelegate.subscribe(),所以其实这里和之前的服务发现中调用的是一个方法,这里其实是在做服务列表的查询,所以得出结论查询和订阅都调用了同一个方法.

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);// 定时调度UpdateTaskserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的ServiceInfoServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result) {// 如果为null,则进行订阅逻辑处理,基于gRPC协议result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}// ServiceInfo本地缓存处理serviceInfoHolder.processServiceInfo(result);return result;
}

但是这里我们要关注这里的任务调度,该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask,而其中的addTask的实现就是发起了一个定时任务:

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {if (futureMap.get(serviceKey) != null) {return;}//构建UpdateTaskScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}

定时任务延迟一秒执行:

private synchronized ScheduledFuture<?> addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

所以在这里我们得出结论,核心为:调用订阅方法和发起定时任务。

定时任务执行内容

在这里插入图片描述
当我们知道了整体流程以后,我们再来看对应源码:

@Override
public void run() {long delayTime = DEFAULT_DELAY;try {// 判断是服务是否订阅和未开启过定时任务,如果订阅过直接不在执行if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);return;}// 获取缓存的service信息ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);// 如果为空if (serviceObj == null) {// 根据serviceName从注册中心服务端获取Service信息serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime = serviceObj.getLastRefTime();return;}// 过期服务,服务的最新更新时间小于等于缓存刷新(最后一次拉取数据的时间)时间,从注册中心重新查询if (serviceObj.getLastRefTime() <= lastRefTime) {serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);}//刷新更新时间lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}// 下次更新缓存时间设置,默认6秒// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败数量为0(可能会出现失败情况,没有ServiceInfo,连接失败)resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);} finally {// 下次调度刷新时间,下次执行的时间与failCount有关,failCount=0,则下次调度时间为6秒,最长为1分钟// 即当无异常情况下缓存实例的刷新时间是6秒executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}
}

业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。

总结:

  1. 订阅方法的调用,并进行EventListener的注册,后面UpdateTask要用来进行判断;

  2. 通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;

  3. 通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;

  4. UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。

  5. 重新计算定时任务时间,循环执行流程。

相关文章:

19 客户端服务订阅机制的核心流程

Nacos客户端服务订阅机制的核心流程 说起Nacos的服务订阅机制&#xff0c;大家会觉得比较难理解&#xff0c;那我们就来详细分析一下&#xff0c;那我们先从Nacos订阅的概述说起 Nacos订阅概述 Nacos的订阅机制&#xff0c;如果用一句话来描述就是&#xff1a;Nacos客户端通…...

教师论文|科技专著管理系统

技术&#xff1a;Java、JSP等摘要&#xff1a;随着计算机和互联网技术的发展&#xff0c;社会的信息化程度越来越高&#xff0c;各行各业只有适应这种发展趋势&#xff0c;才能增强自己的适应能力和竞争能力&#xff0c;不断发展壮大。大学作为教育的基地&#xff0c;是社会进步…...

骨传导耳机是什么意思,骨传导耳机的好处具体有哪些

​在这个全民都是手机的时代&#xff0c;各种蓝牙耳机&#xff0c;入耳式耳机&#xff0c;真无线耳机等各种款式琳琅满目。而骨传导耳机是一种全新的科技产物&#xff0c;顾名思义就是通过头骨振动将声音传至外耳内的耳机。由于无需入耳&#xff0c;不会对耳朵造成任何影响。那…...

elasticsearch—使用汇总

文档结构1、概念简介2、使用创景3、核心组件4、环境部署5、操作实践官方网站&#xff1a;https://www.elastic.co/cn/elasticsearch/ 官方手册&#xff1a;https://www.elastic.co/guide/en/elasticsearch/reference/8.6/getting-started.html 参考教程&#xff1a; A&#xff…...

聊一聊代码重构——我们为什么要代码重构

代码重构 事情的起因是在去年下半年&#xff0c;我们终于无法承受往年的历史包袱而决定开始进行代码重构。 在以前我们尝试过进行代码重构但是从来没有系统性的考虑过如何重构。在对代码重构的过程中很多经验都是来自《重构&#xff1a;改善既有代码的设计》这本书&#xff0c;…...

【Python学习笔记】第二十九节 Python2 和Python3发生了哪些变化

Python 版本分为两大流派&#xff0c;一个是 Python 2.x 版本&#xff0c;另外一个是 Python 3.x 版本&#xff0c;Python 官方同时提供了对这两个版本的支持和维护。2020 年 1 月 1 日&#xff0c;Python 官方终止了对 Python 2.7 版本&#xff08;最后一个 Python 2.x 版本&a…...

[oeasy]python0099_雅达利大崩溃_IBM的开放架构_兼容机_oem

雅达利大崩溃 回忆上次内容 个人计算机浪潮已经来临 苹果公司迅速发展微软公司脱离mits准备做纯软件公司IBM用大型机思路制作的5100惨败 Commodore 64 既做计算机又做游戏机 计算机行业和游戏行业 跟随着底层技术不断迭代已经进入了战乱纷纷的年代最终又会如何呢&#xff1f…...

学术论文投稿之同行评审过程中可能会遭遇哪些偏见?

同行评审过程的顺利进行&#xff0c;在很大程度上取决于学术界的积极参与和相互信任&#xff0c;以及需要参与各方都以负责任的态度行事。作为审稿专家&#xff0c;向作者提供公正、客观的评价是至关重要的。同行评审过程中&#xff0c;若有任何偏离客观性的行为&#xff0c;均…...

Python写一个自动发送直播弹幕的工具,非常简单

哈喽大家好&#xff0c;今天给大家用Python整一个可以在直播间自动发弹幕的工具&#xff0c;来为喜欢的主播疯狂扣6 &#xff01; 事情原由昨晚回家&#xff0c;表弟在看LOL直播&#xff0c;看得我气不打一处来&#xff0c;差点就想锤他。 身为程序员的表弟&#xff0c;看直…...

学生档案管理系统的设计与实现

技术&#xff1a;Java、JSP等摘要&#xff1a;本设计是为托普学院学生档案的管理实现电子化而设计的&#xff0c;系统开发采用J2EE技术&#xff0c;数据库采用了SQL Server 2005&#xff0c;因而系统具有很好的扩展性、可移植性&#xff0c;实现了教学资源的信息化管理。主要功…...

JavaEE学习笔记-SpringBoot快速上手、部分注解解释

SpringBoot快速上手 一、快速创建SpringBoot应用1.1利用IDEA提供的Spring Initializr创建Spring Boot应用1.2Spring Boot生成的项目结构介绍1.3初步测试后端是否OK(建立一个controll类)二、热部署2.1 添加依赖2.2 Setting处项目自动化设置2.3 具体项目设置2.4 待选步骤三、注…...

【Python学习笔记】第二十六节 Python PyMySQL

一、什么是 PyMySQL&#xff1f;PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库。可以用它来连接Python和MySQL。如果你追求速度&#xff0c;这是一个很好的选择&#xff0c;因为它比mysql-connector-python快。PyMySQL 遵循 Python 数据库 API v2.0 规范&#x…...

Android问题笔记 -关于Kotlin插件版本的问题

专栏分享点击跳转>Unity3D特效百例点击跳转>案例项目实战源码点击跳转>游戏脚本-辅助自动化点击跳转>Android控件全解手册点击跳转>Scratch编程案例 &#x1f449;关于作者 众所周知&#xff0c;人生是一个漫长的流程&#xff0c;不断克服困难&#xff0c;不断…...

【同步工具类:Phaser】

同步工具类:Phaser介绍特性动态调整线程个数层次Phaser源码分析state 变量解析构造函数对state变量赋值阻塞方法arrive()awaitAdvance()业务场景实现CountDownLatch功能代码测试结果实现 CyclicBarrier功能代码展示测试结果总结介绍 一个可重复使用的同步屏障&#xff0c;功能…...

Linux命令·rmdir

今天学习一下linux中命令&#xff1a; rmdir命令。rmdir是常用的命令&#xff0c;该命令的功能是删除空目录&#xff0c;一个目录被删除之前必须是空的。&#xff08;注意&#xff0c;rm - r dir命令可代替rmdir&#xff0c;但是有很大危险性。&#xff09;删除某目录时也必须具…...

从0开始自制解释器——综述

作为一个程序员&#xff0c;自制自己的编译器一直是一个梦想。之前也曾为了这个梦想学习过类似龙书、虎书这种大部头的书&#xff0c;但是光看理论总有一些云里雾里的感觉。看完只觉得脑袋昏昏沉沉并没有觉得有多少长进。当初看过《疯狂的程序员》这本书&#xff0c;书里说&…...

【spring】spring5特性

1、整个 Spring5 框架的代码基于 Java8&#xff0c;运行时兼容 JDK9&#xff0c;许多不建议使用的类和方 法在代码库中删除 日志框架 2、Spring 5.0 框架自带了通用的日志封装 &#xff08;1&#xff09;Spring5 已经移除 Log4jConfigListener&#xff0c;官方建议使用 Log4j…...

曹云金回归、于谦电影杀青,德云社想不火都难

说起民间最大的相声社团&#xff0c;首屈一指的要属德云社&#xff0c;之所以说德云社最大&#xff0c;主要是优秀相声演员够多。德云社在郭德纲的带领下&#xff0c;如今已经是人才济济&#xff0c;听说最近队伍会进一步壮大&#xff0c;前徒弟曹云金也要回归了。 当年曹云金作…...

从入门到精通:数据库设计规范指南

当我们开始设计数据库时&#xff0c;我们需要确保它是可靠和可扩展的。为了实现这一目标&#xff0c;我们需要遵循一些数据库设计规范。本文将介绍一些数据库设计规范&#xff0c;以确保您的数据库能够满足当前和未来的业务需求。 目录 一、命名规则 二、数据类型 三、索引…...

js 求解《初级算法》8.字符串转换整数(atoi)

一、题目描述 请你来实现一个 myAtoi(string s) 函数&#xff0c;使其能将字符串转换成一个 32 位有符号整数 算法如下&#xff1a; 读入字符串并丢弃无用的前导空格 检查下一个字符&#xff08;假设还未到字符末尾&#xff09;为正还是负号&#xff0c;读取该字符&#xff…...

PHP和Node.js哪个更爽?

先说结论&#xff0c;rust完胜。 php&#xff1a;laravel&#xff0c;swoole&#xff0c;webman&#xff0c;最开始在苏宁的时候写了几年php&#xff0c;当时觉得php真的是世界上最好的语言&#xff0c;因为当初活在舒适圈里&#xff0c;不愿意跳出来&#xff0c;就好比当初活在…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用&#xff1a;实现组件通用属性的渐变过渡效果&#xff0c;提升用户体验。支持属性&#xff1a;width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项&#xff1a; 布局类属性&#xff08;如宽高&#xff09;变化时&#…...

线程与协程

1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指&#xff1a;像函数调用/返回一样轻量地完成任务切换。 举例说明&#xff1a; 当你在程序中写一个函数调用&#xff1a; funcA() 然后 funcA 执行完后返回&…...

C++中string流知识详解和示例

一、概览与类体系 C 提供三种基于内存字符串的流&#xff0c;定义在 <sstream> 中&#xff1a; std::istringstream&#xff1a;输入流&#xff0c;从已有字符串中读取并解析。std::ostringstream&#xff1a;输出流&#xff0c;向内部缓冲区写入内容&#xff0c;最终取…...

SpringCloudGateway 自定义局部过滤器

场景&#xff1a; 将所有请求转化为同一路径请求&#xff08;方便穿网配置&#xff09;在请求头内标识原来路径&#xff0c;然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

大数据学习(132)-HIve数据分析

​​​​&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...

AI语音助手的Python实现

引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...

若依登录用户名和密码加密

/*** 获取公钥&#xff1a;前端用来密码加密* return*/GetMapping("/getPublicKey")public RSAUtil.RSAKeyPair getPublicKey() {return RSAUtil.rsaKeyPair();}新建RSAUti.Java package com.ruoyi.common.utils;import org.apache.commons.codec.binary.Base64; im…...

门静脉高压——表现

一、门静脉高压表现 00:01 1. 门静脉构成 00:13 组成结构&#xff1a;由肠系膜上静脉和脾静脉汇合构成&#xff0c;是肝脏血液供应的主要来源。淤血后果&#xff1a;门静脉淤血会同时导致脾静脉和肠系膜上静脉淤血&#xff0c;引发后续系列症状。 2. 脾大和脾功能亢进 00:46 …...