nacos1.4源码-服务发现、心跳机制
nacos的服务发现主要采用服务端主动推送+客户端定时拉取;心跳机制通过每5s向服务端发送心跳任务来保活,当超过15s服务端未接收到心跳任务时,将该实例设置为非健康状态;当超过30s时,删除该实例。
1.服务发现
nacos主要采用服务端主动推送+客户端定时拉取来保证AP架构的高可用性。
NacosNamingService:服务发现的接口(客户端!)
每个nacos实例有本地缓存Map,存放所有的实例
调用查询服务实例列表
/nacos`/v1/ns/instance/list
NacosNamingService.getAllInstances() -> getServiceInfo()
核心方法是getServiceInfo,代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null == serviceObj) {// 如果拿到为空,则更新服务列表serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}scheduleUpdateIfAbsent(serviceName, clusters);// 定时拉取return serviceInfoMap.get(serviceObj.getKey());// 从map拿}
本地从服务端拉取服务实例,放到本地缓存中
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);// 调用接口}
客户端通过定时任务-->定时拉取服务端的实例,更新本地缓存(在发起服务调用的时候,才会执行定时任务!)
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}}
2.心跳机制
NamingService->register
注册实例的时候,客户端会有一个心跳任务,定时5s向服务端发送心跳!
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
BeatTask就是心跳任务
调用Instance/beat接口,发送实例心跳检查
BeatTask#run的代码如下:
@Overridepublic void run() {if (beatInfo.isStopped()) {return;}long nextTime = beatInfo.getPeriod();try {JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.warn("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());} catch (Exception unknownEx) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);} finally {executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);// 定时任务}}
服务端会更新心跳的时间!每隔5秒检查一下实例健康状态
对于集群而言,在一台服务端执行定时任务即可!不需要再所有节点都执行定时任务
当实例下线后,服务端通过InstanceController,检查健康状态(健康任务检查)。代码入口:
createEmptyService->createServiceIfAbsent->putServiceAndInit->init->scheduleCheck(定时任务)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断
如果超过15秒,设置实例为非健康(更新健康状态)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// 如果已经过去了30s,直接删掉这个机器(删除实例)
asyncHttpRequest(url, headers, paramValues, callback, HttpMethod.DELETE);
核心代码如下:
@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// 获取服务端的所有实例// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);// 如果超过15s,健康设置为falseLoggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
3.服务事件变动
客户端每隔5秒从服务端拉取一次信息,是不是有延迟!
对于AP架构而言,有一点延迟 无所谓。Ribbon会进行重试
当服务端的服务列表变化时,服务端会主动推送给客户端信息
核心代码入口:notifier#run->handle->onChange
onChange更新注册表的相关代码!
getPushService().serviceChanged(this);// 发布 服务变化 的事件,通知客户端
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}if (StringUtils.isEmpty(instance.getClusterName())) {instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();getClusterMap().put(instance.getClusterName(), cluster);}List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {//make every ip mineList<Instance> entryIPs = entry.getValue();clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);// 核心注册逻辑}setLastModifiedMillis(System.currentTimeMillis());getPushService().serviceChanged(this);// 发布 服务变化 的事件StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),stringBuilder.toString());}
(通过全文搜索事件,找到这个方法)PushService#onApplicationEvent,它来处理事件。代码如下:
udpPush(ackEntry);// 服务端给客户端发送udp信息,通知客户端实例变动
@Override
public void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map<String, Object> cache = new HashMap<>(16);long lastRefTime = System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client.toString());clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map<String, Object> data = null;if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map<String, Object>) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.key));udpPush(ackEntry);// 给客户端发送udp信息}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
客户端接收udp报文的代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null == serviceObj) {// 如果拿到为空,则更新服务列表serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);// upd的端口
相关文章:
nacos1.4源码-服务发现、心跳机制
nacos的服务发现主要采用服务端主动推送客户端定时拉取;心跳机制通过每5s向服务端发送心跳任务来保活,当超过15s服务端未接收到心跳任务时,将该实例设置为非健康状态;当超过30s时,删除该实例。 1.服务发现 nacos主要采…...
C++ 2D平台游戏开发案例
关于2D平台游戏的C开发案例,包括游戏设计、实现细节、图形渲染和音效处理等内容。虽然无法一次性提供3000字,但我会尽量详细描述各个部分,并确保有足够的深度和广度。 2D平台游戏开发案例 一、游戏设计 游戏概述 游戏名称:“冒险…...
【Webpack--019】TreeShaking
🤓😍Sam9029的CSDN博客主页:Sam9029的博客_CSDN博客-前端领域博主 🐱🐉若此文你认为写的不错,不要吝啬你的赞扬,求收藏,求评论,求一个大大的赞!👍* &#x…...
Docker基本操作命令
Docker 是一个开源的应用容器引擎,允许开发者打包应用以及其依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口。主要功能是为开发者提供一个简单…...
开源计算器应用的全面测试计划:确保功能性和可靠性
✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…...
uni.requestPayment 支付成功之后会走 wx.onAppRoute
uni.requestPayment 是用于发起微信支付的统一接口,而 wx.onAppRoute 是用于监听小程序的路由变化。当 uni.requestPayment 支付成功后,如果发生了页面跳转或者其他路由变化,wx.onAppRoute 会被触发。这个行为是正常的,因为支付成…...
统⼀服务入口 - Gateway
网关介绍 问题 在 spring cloud 体系中我们通过 Eureka,Nacos 解决了服务注册,服务发现的问题,使⽤Spring Cloud LoadBalance解决了负载均衡的问题,使⽤ OpenFeign 解决了远程调⽤的问题. 但是当前所有微服务的接⼝都是直接对外暴露的,可以直接通过外部访问.为了保证对外服务的…...
QGraphicsWidget Class
Header:#include < QGraphicsWidget > qmake:QT += widgets Since:Qt 4.4 Inherits:QGraphicsObject and QGraphicsLayoutItem Inherited By:QGraphicsProxyWidget This class was introduced in Qt 4.4. Public Types enum anonymous {Type }Properties autoFi…...
探讨最好用的AI工具:从日常到创新的应用
文章目录 引言常用AI工具1. 语音助手2. 图像识别软件3. 机器翻译工具4. 智能客服系统 创新AI应用1. 自动驾驶汽车2. 虚拟试衣间3. 医疗影像分析4. 个性化推荐系统 个人体验分享1. 通义灵码2. 文心一言3. 智能写作助手4. 智能家居设备5. DALLE6. Whisper7. Codex8. Gym9. ChatGP…...
Python系统教程005(字符串的格式化输出)
知识回顾 1、默认情况下,input函数接收的数据是字符串类型。 2、字符串类型的关键词是str。 3、\n和\t都是转义字符,\n用来换行,\t用来留出一段固定长度的空白。 4、type函数能够用来查看变量的数据类型 5、数据类型的转换,举…...
六款电脑远程控制软件分享,2024最热门软件合集,总有一款适合你!速来看!
想要随时随地控制自己的电脑? 无论你是办公需求,还是要远程协助他人,一款好用的远程控制软件绝对少不了。 2024年最热门的六款远程控制软件已经为你准备好,总有一款适合你,赶快往下看吧! 1. 安企神系统—…...
优质微信群不再难寻!掌握这些技巧就够了!
在当今信息爆炸的时代,微信群已成为人们交流思想、分享知识、建立人脉的重要平台。无论是专业领域的深入探讨,还是兴趣爱好的自由交流,微信群都能为你提供一个即时互动的虚拟空间。然而,面对海量的微信群信息,如何高效…...
python - mysql操作
Python MySQL 操作 1. 背景介绍 常见的Mysql驱动介绍: MySQL-python:也就是MySQLdb。是对C语言操作MySQL数据库的一个简单封装。遵循了Python DB API v2。但是只支持Python2,目前还不支持Python3。mysqlclient:是MySQL-python的…...
基于Springboot+Vue的服装生产管理信息系统设计与实现(含源码数据库)
1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 在这个…...
75.【C语言】文件操作(2)
承接74.【C语言】文件操作(1)文章 目录 5.详细阐释文件的打开和关闭 1.流 2.标准流 3.文件指针 FILE 两层含义 附:FILE的头文件 4.操作文件的步骤 1.fopen函数 编辑 简写的全称查询 输入&输出的含义 2.fclose函数 3.代码示例 补充:绝对路径和相对路径 注意…...
Redis 使用记录
封装调用redis类 import redis from conf.config import RedisConfigclass RedisConfig:redis_json config_data[redis_config]redis_pwd env.get(project_name).get(pwd)host redis_json.get("host")dialog_states_db redis_json.get("dialog_states_db&q…...
IDEA实用小技巧
1. IDEA代码提示忽略大小写 打开设置,点击Editor–>General–>Code Completion ,然后将右侧的Match Case前面的选框去掉勾选。 2. 快速查找接口RestfulToolkitX插件 该插件可以快速查找接口(快捷键为CTRL\) 还会在侧边栏…...
PEI转染试剂对血清的敏感性研究
在细胞生物学和基因工程领域,聚乙烯亚胺(PEI)作为一种常用的转染试剂,广泛应用于基因的递送。然而,PEI转染试剂对血清的敏感性一直是研究的热点问题。转染过程中,血清作为培养基的成分之一,可能…...
手机怎样改网络ip地址?内容详尽实用
随着网络技术的发展,更改手机IP地址已成为一种常见需求。本文将详细介绍如何在不同网络环境下更改手机IP地址,包括移动网络和WiFi网络,以及同时适用于两种网络的方法,内容详尽实用,干货满满。 一、适用于移动网络&…...
使用Pybind11,Python调用C++动态库
最近学习了一下pybind11,使用python来调用C动态库的模式,在某些场景下有用,这里做一个记录。 环境准备 安装python,我这里安装的是3.12版本 下载Pybind11库,这是一个仅包含头文件的轻量级库,使用起来非常…...
用LLM自动生成CUDA内核真的靠谱吗?实测KernelBench框架效果与避坑指南
LLM自动生成CUDA内核的实践验证:KernelBench框架深度评测与技术指南 当我在项目中发现某个PyTorch模型的矩阵乘法操作消耗了60%的推理时间时,第一反应是考虑手工编写CUDA内核来优化。但作为一个同时维护三个项目的工程师,时间成本让我犹豫——…...
AI辅助开发:用自然语言描述需求,让快马平台自动生成精准的Copaw自动化脚本
AI辅助开发:用自然语言描述需求,让快马平台自动生成精准的Copaw自动化脚本 最近在做一个自动化测试项目,需要大量使用Copaw框架来模拟用户操作。作为一个刚接触Copaw的新手,最头疼的就是要花大量时间研究各种API和页面元素定位方…...
基于氢储能的热电联供型微电网优化调度方法附Matlab代码
✅作者简介:热爱科研的Matlab仿真开发者,擅长毕业设计辅导、数学建模、数据处理、建模仿真、程序设计、完整代码获取、论文复现及科研仿真。🍎 往期回顾关注个人主页:Matlab科研工作室👇 关注我领取海量matlab电子书和…...
Windows 11终极优化指南:用Win11Debloat实现系统加速51%的免费方案
Windows 11终极优化指南:用Win11Debloat实现系统加速51%的免费方案 【免费下载链接】Win11Debloat A simple, lightweight PowerShell script that allows you to remove pre-installed apps, disable telemetry, as well as perform various other changes to decl…...
从Windows玩家到Linux新手:我的Ubuntu 22.04双系统入坑实录与软件生态迁移心得
从Windows玩家到Linux新手:我的Ubuntu 22.04双系统入坑实录与软件生态迁移心得 第一次看到Ubuntu的紫色登录界面时,我盯着那个不断旋转的加载动画发了五分钟呆——作为用了十五年Windows的老用户,这个瞬间仿佛打开了新世界的大门。但兴奋感很…...
哔哩下载姬:三步搞定B站视频永久收藏的智能工具
哔哩下载姬:三步搞定B站视频永久收藏的智能工具 【免费下载链接】downkyi 哔哩下载姬downkyi,哔哩哔哩网站视频下载工具,支持批量下载,支持8K、HDR、杜比视界,提供工具箱(音视频提取、去水印等)…...
中国四大高考工厂是哪四所
根据当前(2026年4月)可查的权威公开资料,“中国四大高考工厂”通常指以下四所中学: 1、河北衡水中学 2、安徽毛坦厂中学 3、河南郸城一高(即郸城县第一高级中学) 4、湖北黄冈中学 背…...
OpenClaw健康监测:用Phi-3-mini-128k-instruct分析智能手表数据
OpenClaw健康监测:用Phi-3-mini-128k-instruct分析智能手表数据 1. 为什么选择OpenClaw处理健康数据? 去年体检报告上的几项异常指标让我开始关注日常健康监测。虽然手环和智能手表能记录睡眠、心率等数据,但原始数据报表就像一本天书——我…...
AQ智商测试
AQ逆商测试结果分析(PSYTOPIC版) Psytopic分析:您的AQ得分是 168 ,在人群中属较高水平 。 以下是PSYTOPIC为您提供的分析参考: 你能面对现实,对来自工作和生活中的困难应对自如,并敢于迎接逆境…...
HunyuanVideo-Foley性能测试指南:在RTX 4090D上的推理速度与显存占用
HunyuanVideo-Foley性能测试指南:在RTX 4090D上的推理速度与显存占用 1. 前言:为什么需要性能测试 音效生成模型在实际业务场景中的表现,直接影响着用户体验和系统成本。对于企业用户来说,了解模型在特定硬件上的性能表现至关重…...
