nacos1.4源码-服务发现、心跳机制
nacos的服务发现主要采用服务端主动推送+客户端定时拉取;心跳机制通过每5s向服务端发送心跳任务来保活,当超过15s服务端未接收到心跳任务时,将该实例设置为非健康状态;当超过30s时,删除该实例。
1.服务发现
nacos主要采用服务端主动推送+客户端定时拉取来保证AP架构的高可用性。
NacosNamingService:服务发现的接口(客户端!)
每个nacos实例有本地缓存Map,存放所有的实例
调用查询服务实例列表
/nacos`/v1/ns/instance/list
NacosNamingService.getAllInstances() -> getServiceInfo()
核心方法是getServiceInfo,代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null == serviceObj) {// 如果拿到为空,则更新服务列表serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}scheduleUpdateIfAbsent(serviceName, clusters);// 定时拉取return serviceInfoMap.get(serviceObj.getKey());// 从map拿}
本地从服务端拉取服务实例,放到本地缓存中
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);// 调用接口}
客户端通过定时任务-->定时拉取服务端的实例,更新本地缓存(在发起服务调用的时候,才会执行定时任务!)
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}}
2.心跳机制
NamingService->register
注册实例的时候,客户端会有一个心跳任务,定时5s向服务端发送心跳!
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
BeatTask就是心跳任务
调用Instance/beat接口,发送实例心跳检查
BeatTask#run的代码如下:
@Overridepublic void run() {if (beatInfo.isStopped()) {return;}long nextTime = beatInfo.getPeriod();try {JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.warn("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());} catch (Exception unknownEx) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);} finally {executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);// 定时任务}}
服务端会更新心跳的时间!每隔5秒检查一下实例健康状态
对于集群而言,在一台服务端执行定时任务即可!不需要再所有节点都执行定时任务
当实例下线后,服务端通过InstanceController,检查健康状态(健康任务检查)。代码入口:
createEmptyService->createServiceIfAbsent->putServiceAndInit->init->scheduleCheck(定时任务)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断
如果超过15秒,设置实例为非健康(更新健康状态)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// 如果已经过去了30s,直接删掉这个机器(删除实例)
asyncHttpRequest(url, headers, paramValues, callback, HttpMethod.DELETE);
核心代码如下:
@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// 获取服务端的所有实例// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);// 如果超过15s,健康设置为falseLoggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
3.服务事件变动
客户端每隔5秒从服务端拉取一次信息,是不是有延迟!
对于AP架构而言,有一点延迟 无所谓。Ribbon会进行重试
当服务端的服务列表变化时,服务端会主动推送给客户端信息
核心代码入口:notifier#run->handle->onChange
onChange更新注册表的相关代码!
getPushService().serviceChanged(this);// 发布 服务变化 的事件,通知客户端
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}if (StringUtils.isEmpty(instance.getClusterName())) {instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();getClusterMap().put(instance.getClusterName(), cluster);}List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {//make every ip mineList<Instance> entryIPs = entry.getValue();clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);// 核心注册逻辑}setLastModifiedMillis(System.currentTimeMillis());getPushService().serviceChanged(this);// 发布 服务变化 的事件StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),stringBuilder.toString());}
(通过全文搜索事件,找到这个方法)PushService#onApplicationEvent,它来处理事件。代码如下:
udpPush(ackEntry);// 服务端给客户端发送udp信息,通知客户端实例变动
@Override
public void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map<String, Object> cache = new HashMap<>(16);long lastRefTime = System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client.toString());clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map<String, Object> data = null;if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map<String, Object>) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.key));udpPush(ackEntry);// 给客户端发送udp信息}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
客户端接收udp报文的代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null == serviceObj) {// 如果拿到为空,则更新服务列表serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);// upd的端口
相关文章:
nacos1.4源码-服务发现、心跳机制
nacos的服务发现主要采用服务端主动推送客户端定时拉取;心跳机制通过每5s向服务端发送心跳任务来保活,当超过15s服务端未接收到心跳任务时,将该实例设置为非健康状态;当超过30s时,删除该实例。 1.服务发现 nacos主要采…...
C++ 2D平台游戏开发案例
关于2D平台游戏的C开发案例,包括游戏设计、实现细节、图形渲染和音效处理等内容。虽然无法一次性提供3000字,但我会尽量详细描述各个部分,并确保有足够的深度和广度。 2D平台游戏开发案例 一、游戏设计 游戏概述 游戏名称:“冒险…...
【Webpack--019】TreeShaking
🤓😍Sam9029的CSDN博客主页:Sam9029的博客_CSDN博客-前端领域博主 🐱🐉若此文你认为写的不错,不要吝啬你的赞扬,求收藏,求评论,求一个大大的赞!👍* &#x…...

Docker基本操作命令
Docker 是一个开源的应用容器引擎,允许开发者打包应用以及其依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口。主要功能是为开发者提供一个简单…...

开源计算器应用的全面测试计划:确保功能性和可靠性
✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…...
uni.requestPayment 支付成功之后会走 wx.onAppRoute
uni.requestPayment 是用于发起微信支付的统一接口,而 wx.onAppRoute 是用于监听小程序的路由变化。当 uni.requestPayment 支付成功后,如果发生了页面跳转或者其他路由变化,wx.onAppRoute 会被触发。这个行为是正常的,因为支付成…...

统⼀服务入口 - Gateway
网关介绍 问题 在 spring cloud 体系中我们通过 Eureka,Nacos 解决了服务注册,服务发现的问题,使⽤Spring Cloud LoadBalance解决了负载均衡的问题,使⽤ OpenFeign 解决了远程调⽤的问题. 但是当前所有微服务的接⼝都是直接对外暴露的,可以直接通过外部访问.为了保证对外服务的…...
QGraphicsWidget Class
Header:#include < QGraphicsWidget > qmake:QT += widgets Since:Qt 4.4 Inherits:QGraphicsObject and QGraphicsLayoutItem Inherited By:QGraphicsProxyWidget This class was introduced in Qt 4.4. Public Types enum anonymous {Type }Properties autoFi…...
探讨最好用的AI工具:从日常到创新的应用
文章目录 引言常用AI工具1. 语音助手2. 图像识别软件3. 机器翻译工具4. 智能客服系统 创新AI应用1. 自动驾驶汽车2. 虚拟试衣间3. 医疗影像分析4. 个性化推荐系统 个人体验分享1. 通义灵码2. 文心一言3. 智能写作助手4. 智能家居设备5. DALLE6. Whisper7. Codex8. Gym9. ChatGP…...

Python系统教程005(字符串的格式化输出)
知识回顾 1、默认情况下,input函数接收的数据是字符串类型。 2、字符串类型的关键词是str。 3、\n和\t都是转义字符,\n用来换行,\t用来留出一段固定长度的空白。 4、type函数能够用来查看变量的数据类型 5、数据类型的转换,举…...

六款电脑远程控制软件分享,2024最热门软件合集,总有一款适合你!速来看!
想要随时随地控制自己的电脑? 无论你是办公需求,还是要远程协助他人,一款好用的远程控制软件绝对少不了。 2024年最热门的六款远程控制软件已经为你准备好,总有一款适合你,赶快往下看吧! 1. 安企神系统—…...
优质微信群不再难寻!掌握这些技巧就够了!
在当今信息爆炸的时代,微信群已成为人们交流思想、分享知识、建立人脉的重要平台。无论是专业领域的深入探讨,还是兴趣爱好的自由交流,微信群都能为你提供一个即时互动的虚拟空间。然而,面对海量的微信群信息,如何高效…...
python - mysql操作
Python MySQL 操作 1. 背景介绍 常见的Mysql驱动介绍: MySQL-python:也就是MySQLdb。是对C语言操作MySQL数据库的一个简单封装。遵循了Python DB API v2。但是只支持Python2,目前还不支持Python3。mysqlclient:是MySQL-python的…...

基于Springboot+Vue的服装生产管理信息系统设计与实现(含源码数据库)
1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 在这个…...

75.【C语言】文件操作(2)
承接74.【C语言】文件操作(1)文章 目录 5.详细阐释文件的打开和关闭 1.流 2.标准流 3.文件指针 FILE 两层含义 附:FILE的头文件 4.操作文件的步骤 1.fopen函数 编辑 简写的全称查询 输入&输出的含义 2.fclose函数 3.代码示例 补充:绝对路径和相对路径 注意…...
Redis 使用记录
封装调用redis类 import redis from conf.config import RedisConfigclass RedisConfig:redis_json config_data[redis_config]redis_pwd env.get(project_name).get(pwd)host redis_json.get("host")dialog_states_db redis_json.get("dialog_states_db&q…...

IDEA实用小技巧
1. IDEA代码提示忽略大小写 打开设置,点击Editor–>General–>Code Completion ,然后将右侧的Match Case前面的选框去掉勾选。 2. 快速查找接口RestfulToolkitX插件 该插件可以快速查找接口(快捷键为CTRL\) 还会在侧边栏…...
PEI转染试剂对血清的敏感性研究
在细胞生物学和基因工程领域,聚乙烯亚胺(PEI)作为一种常用的转染试剂,广泛应用于基因的递送。然而,PEI转染试剂对血清的敏感性一直是研究的热点问题。转染过程中,血清作为培养基的成分之一,可能…...

手机怎样改网络ip地址?内容详尽实用
随着网络技术的发展,更改手机IP地址已成为一种常见需求。本文将详细介绍如何在不同网络环境下更改手机IP地址,包括移动网络和WiFi网络,以及同时适用于两种网络的方法,内容详尽实用,干货满满。 一、适用于移动网络&…...
使用Pybind11,Python调用C++动态库
最近学习了一下pybind11,使用python来调用C动态库的模式,在某些场景下有用,这里做一个记录。 环境准备 安装python,我这里安装的是3.12版本 下载Pybind11库,这是一个仅包含头文件的轻量级库,使用起来非常…...

【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...

uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...

多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

视觉slam十四讲实践部分记录——ch2、ch3
ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
django blank 与 null的区别
1.blank blank控制表单验证时是否允许字段为空 2.null null控制数据库层面是否为空 但是,要注意以下几点: Django的表单验证与null无关:null参数控制的是数据库层面字段是否可以为NULL,而blank参数控制的是Django表单验证时字…...
日常一水C
多态 言简意赅:就是一个对象面对同一事件时做出的不同反应 而之前的继承中说过,当子类和父类的函数名相同时,会隐藏父类的同名函数转而调用子类的同名函数,如果要调用父类的同名函数,那么就需要对父类进行引用&#…...
深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏
一、引言 在深度学习中,我们训练出的神经网络往往非常庞大(比如像 ResNet、YOLOv8、Vision Transformer),虽然精度很高,但“太重”了,运行起来很慢,占用内存大,不适合部署到手机、摄…...