当前位置: 首页 > news >正文

七、Nacos源码系列:Nacos服务发现

目录

一、服务发现

二、getServices():获取服务列表

2.1、获取服务列表

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

3.1、从缓存中获取服务信息

3.2、缓存为空,执行订阅服务

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

3.2.2、订阅服务 

 3.2.3、处理服务信息

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

3.4、筛选满足条件的实例 

3.5、总结图


一、服务发现

在discovery-provider项目的pom.xml中,我们引入了如下依赖:

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.9.RELEASE</version>
</dependency>

SpringCloud很多功能都是基于SpringBoot项目的自动配置原理来扩展实现的,下面我们查看spring-cloud-starter-alibaba-nacos-discovery-2.2.9.RELEASE.jar包路径下的spring.factories的"org.springframework.boot.autoconfigure.EnableAutoConfiguration"自动装配类配置,如下图:

如上图,跟客户端服务发现有关的有两个自动配置类:NacosDiscoveryClientConfiguration和NacosDiscoveryAutoConfiguration。

相关源码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,CommonsClientAutoConfiguration.class })
// 在NacosDiscoveryAutoConfiguration自动装配类执行完成后才执行
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {// 创建DiscoveryClient bean对象@Beanpublic DiscoveryClient nacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {return new NacosDiscoveryClient(nacosServiceDiscovery);}@Bean@ConditionalOnMissingBean@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",matchIfMissing = true)public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);}}@Configuration(proxyBeanMethods = false)
// spring.cloud.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnDiscoveryEnabled
// spring.cloud.nacos.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic NacosDiscoveryProperties nacosProperties() {// 匹配配置文件中以“spring.cloud.nacos.discovery”为前缀的那些属性,// 如namespace、username、password、serverAddr等属性return new NacosDiscoveryProperties();}// 创建NacosServiceDiscovery bean对象@Bean@ConditionalOnMissingBeanpublic NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);}}

在NacosDiscoveryAutoConfiguration自动配置类中,创建了一个NacosServiceDiscovery的bean对象,然后在NacosDiscoveryClientConfiguration自动装配时,创建DiscoveryClient的bean对象,传入前面创建的NacosServiceDiscovery对象。

重点关注NacosDiscoveryClient这个类,NacosDiscoveryClient的源码如下:

public class NacosDiscoveryClient implements DiscoveryClient {private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);/*** Nacos Discovery Client Description.*/public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";private NacosServiceDiscovery serviceDiscovery;public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {this.serviceDiscovery = nacosServiceDiscovery;}@Overridepublic String description() {return DESCRIPTION;}@Overridepublic List<ServiceInstance> getInstances(String serviceId) {try {return serviceDiscovery.getInstances(serviceId);}catch (Exception e) {throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, e);}}@Overridepublic List<String> getServices() {try {return serviceDiscovery.getServices();}catch (Exception e) {log.error("get service name from nacos server fail,", e);return Collections.emptyList();}}}

可以看到,NacosDiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances()和getServices()方法,而且都是由NacosServiceDiscovery类去实现。

public class NacosServiceDiscovery {// 跟配置文件中以“spring.cloud.nacos.discovery”前缀的属性配置对应上private NacosDiscoveryProperties discoveryProperties;// nacos服务管理器对象private NacosServiceManager nacosServiceManager;public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {this.discoveryProperties = discoveryProperties;this.nacosServiceManager = nacosServiceManager;}/*** 返回指定group和servic的所有实例*/public List<ServiceInstance> getInstances(String serviceId) throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NacosNamingService#selectInstances()方法List<Instance> instances = namingService().selectInstances(serviceId, group,true);// 将Instance包装成NacosServiceInstance对象返回return hostToServiceInstanceList(instances, serviceId);}/*** 返回指定group的所有服务名称*/public List<String> getServices() throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NamingGrpcClientProxy#getServiceList()方法ListView<String> services = namingService().getServicesOfServer(1,Integer.MAX_VALUE, group);// 返回所有服务名称return services.getData();}public static List<ServiceInstance> hostToServiceInstanceList(List<Instance> instances, String serviceId) {List<ServiceInstance> result = new ArrayList<>(instances.size());for (Instance instance : instances) {ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);if (serviceInstance != null) {result.add(serviceInstance);}}return result;}public static ServiceInstance hostToServiceInstance(Instance instance,String serviceId) {if (instance == null || !instance.isEnabled() || !instance.isHealthy()) {return null;}NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();nacosServiceInstance.setHost(instance.getIp());nacosServiceInstance.setPort(instance.getPort());nacosServiceInstance.setServiceId(serviceId);Map<String, String> metadata = new HashMap<>();metadata.put("nacos.instanceId", instance.getInstanceId());metadata.put("nacos.weight", instance.getWeight() + "");metadata.put("nacos.healthy", instance.isHealthy() + "");metadata.put("nacos.cluster", instance.getClusterName() + "");if (instance.getMetadata() != null) {metadata.putAll(instance.getMetadata());}metadata.put("nacos.ephemeral", String.valueOf(instance.isEphemeral()));nacosServiceInstance.setMetadata(metadata);if (metadata.containsKey("secure")) {boolean secure = Boolean.parseBoolean(metadata.get("secure"));nacosServiceInstance.setSecure(secure);}return nacosServiceInstance;}private NamingService namingService() {return nacosServiceManager.getNamingService();}}

接下来,我们分析前面介绍到的两个重要方法:getInstances(serviceId)和getServices()。

二、getServices():获取服务列表

2.1、获取服务列表

// namingService(): 通过反射创建一个NacosNamingService对象
// NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");Constructor constructor = driverImplClass.getConstructor(Properties.class);return (NamingService) constructor.newInstance(properties);} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}
}
//getServices()调用栈大体如下:namingService().getServicesOfServer(1, Integer.MAX_VALUE, group);NacosNamingService#getServicesOfServerclientProxy.getServiceList(pageNo, pageSize, groupName, selector)NamingClientProxyDelegate#getServiceListgrpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);// 最终会调用NamingGrpcClientProxy#getServiceList()方法
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)throws NacosException {// 构建ServiceListRequest请求(服务列表请求),指定命名空间ID、服务组名ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);if (selector != null) {if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {request.setSelector(JacksonUtils.toJson(selector));}}// 发送服务列表请求给Nacos服务端,接下来由服务端处理ServiceListResponse response = requestToServer(request, ServiceListResponse.class);// 组装返回值出去ListView<String> result = new ListView<>();result.setCount(response.getCount());result.setData(response.getServiceNames());return result;
}

接下来,我们看看服务端怎么处理这个服务列表请求的。通过对ServiceListRequest类引用的追踪,我们发现是在com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler#handle这个方法中对客户端提交的服务列表请求进行处理的。

// 处理客户端提交的服务列表请求
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {// ServiceManager.getInstance()通过单例返回一个ServiceManager对象/*** 获取指定命令空间下的所有服务,在ServiceManager中存在一个map保存着每个命名空间中的所有服务。* ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2)* key: namespaceId* value: Set<Service>* 注册实例的时候,就往这个map写入了数据** ServiceManager.getInstance().getSingletons(request.getNamespace())相当于执行:* namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1))*/Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());// 构建响应结果ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());if (!serviceSet.isEmpty()) {// 过滤指定分组的Service,添加groupServiceName,格式如:groupA@@serviceACollection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());// 按分页裁剪serviceNameSetList<String> serviceNameList = ServiceUtil.pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);result.setCount(serviceNameSet.size());result.setServiceNames(serviceNameList);}return result;
}

从源码可以看出,Nacos服务端从ServiceManager管理器中的一个map(namespaceSingletonMaps)中拿出指定命名空间那些Service,并根据筛选条件过滤满足条件的Service,然后组装好groupServiceName(格式如:groupA@@serviceA)并返回。

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

// 调用栈如下:
// namingService().selectInstances(serviceId, group,true);// NamingService#selectInstances(serviceName, groupName, healthy, true)// NamingService#selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, true)
// getInstances(serviceId)方法最终会调用NacosNamingService#selectInstances()获取实例信息。
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;// 集群名称,使用逗号分隔String clusterString = StringUtils.join(clusters, ",");// 是否订阅,默认是订阅的if (subscribe) {/*** 1.从缓存中获取ServiceInfo* ConcurrentMap<String, ServiceInfo> serviceInfoMap* key:  groupName@@serviceName  或者  groupName@@serviceName@@clusterString* value: ServiceInfo*/// 示例:serviceName=discovery-provider   groupName=DEFAULT_GROUPserviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);// 2.缓存为空,执行订阅服务if (null == serviceInfo) {serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);}} else {// 3.非订阅,通过grpc发送ServiceQueryRequest服务查询请求serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);}// 4.筛选满足条件的实例return selectInstances(serviceInfo, healthy);
}

上述流程的基本逻辑为:

  • 如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从中获取实例列表,这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获得到服务信息。
  • 如果是非订阅模式,那就直接请求服务器端,获得服务信息。

3.1、从缓存中获取服务信息

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());// 组装服务名(带组名):groupName@@serviceName// 例如:DEFAULT_GROUP@@discovery-providerString groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 如果指定了集群,那么key还会加上"@@clusters"String key = ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// ConcurrentMap<String, ServiceInfo> serviceInfoMap// 从缓存中获取服务信息return serviceInfoMap.get(key);
}

3.2、缓存为空,执行订阅服务

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);// clientProxy在构造方法中初始化为:NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);

实际上调用的是NamingClientProxyDelegate#subscribe()方法:

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);// 服务名称(带组名)  格式:groupName@@serviceNameString serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);// 如果集群名称非空,key还需要拼接上集群名称String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);// 调度更新,往线程池中提交一个UpdateTask任务serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的服务信息ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result || !isSubscribed(serviceName, groupName, clusters)) {// 缓存中不存在对应的服务信息 或者 SubscriberRedoData还未注册,则执行订阅result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(result);return result;
}

主要逻辑有下面三个,分析如下。

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {if (!asyncQuerySubscribeService) {return;}// 组装key   格式:groupName@@serviceName@@clustersString serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);// Map<String, ScheduledFuture<?>> futureMap = new HashMap<>()// futureMap用于保存UpdateTask线程池任务的执行结果if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {// double check双重检查,如果非空,直接返回,也就是相同的groupName@@serviceName@@clusters,只会存在一个UpdateTask任务if (futureMap.get(serviceKey) != null) {return;}// 往线程池中添加一个更新任务// UpdateTask实现了Runnable接口,需要关注其run()方法ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}private synchronized ScheduledFuture<?> addTask(UpdateTask task) {// 延迟1s执行UpdateTaskreturn executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到,使用了一个map来保存线程池任务的响应,延迟1s执行调度更新任务。我们看下UpdateTask的源码:

public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private boolean isCancel;private final String serviceName;private final String groupName;private final String clusters;private final String groupedServiceName;private final String serviceKey;/*** the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty*/private int failCount = 0;public UpdateTask(String serviceName, String groupName, String clusters) {this.serviceName = serviceName;this.groupName = groupName;this.clusters = clusters;this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);}@Overridepublic void run() {long delayTime = DEFAULT_DELAY;try {if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);isCancel = true;return;}ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (serviceObj == null) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime = serviceObj.getLastRefTime();return;}if (serviceObj.getLastRefTime() <= lastRefTime) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);}lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {// 记录失败次数incFailCount();return;}// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败次数resetFailCount();} catch (NacosException e) {handleNacosException(e);} catch (Throwable e) {handleUnknownException(e);} finally {if (!isCancel) {// 注意:延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),TimeUnit.MILLISECONDS);}}}private void handleNacosException(NacosException e) {incFailCount();int errorCode = e.getErrCode();if (NacosException.SERVER_ERROR == errorCode) {handleUnknownException(e);}NAMING_LOGGER.warn("Can't update serviceName: {}, reason: {}", groupedServiceName, e.getErrMsg());}private void handleUnknownException(Throwable throwable) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, throwable);}private void incFailCount() {int limit = 6;if (failCount == limit) {return;}failCount++;}private void resetFailCount() {failCount = 0;}
}

run()方法主要逻辑就是使用grpc向服务端发送ServiceQueryRequest服务查询请求,然后处理服务信息,获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件。

这里有重试机制,最多重试6次,延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)。

3.2.2、订阅服务 

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);}// 缓存SubscriberRedoData重做数据,定时使用redoData重新订阅,// 具体实现在RedoScheduledTask(由NamingGrpcRedoService定时调度),最终调用的也是NamingGrpcClientProxy#doSubscribe// 缓存重做数据保存在map中:private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);// 使用grpc发送服务订阅请求return doSubscribe(serviceName, groupName, clusters);
}public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);// private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();synchronized (subscribes) {subscribes.put(key, redoData);}
}

订阅服务首先会缓存SubscriberRedoData重做数据,实际上就是保存在一个map中,后续可以定时使用SubscriberRedoData重做数据来重新订阅,然后使用grpc发送服务订阅请求。

我们来看下如何订阅服务的。

public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {// 构建一个SubscribeServiceRequest客户端订阅请求// 服务端处理代码: com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler.handleSubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);// grpc请求Nacos服务端进行订阅SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);// 标记SubscriberRedoData重做数据为已订阅redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();
}

通过grpc向Nacos服务端发起一个订阅请求,服务端真正的处理是在:com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle()方法。

public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 构建一个Service服务,指定为临时实例Service service = Service.newService(namespaceId, groupName, serviceName, true);// 构建Subscriber订阅者对象Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),namespaceId, groupedServiceName, 0, request.getClusters());// serviceStorage.getData(service): 从缓存中获取serviceInfo// metadataManager.getServiceMetadata(service).orElse(null): 从内存(map)获取ServiceMetadata// ServiceUtil.selectInstancesWithHealthyProtection(): 仅包含有保护机制的健康实例ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,true, subscriber.getIp());if (request.isSubscribe()) {// 订阅服务clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));} else {// 取消订阅服务clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

我们重点关注订阅服务的方法:

public void subscribeService(Service service, Subscriber subscriber, String clientId) {Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}// 添加到订阅者列表中,实际上就是保存在map中// 订阅者列表: protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();// 发布客户端订阅事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

将service添加到订阅者列表中,然后发布客户端订阅事件,这个在之前分析过,客户端订阅事件是在com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation进行处理,

核心逻辑就是将服务添加到ClientServiceIndexesManager的subscriberIndexes订阅者列表中:

private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

 3.2.3、处理服务信息

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 获取老的服务ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}// 重新存入客户端缓存中serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 对比下服务信息是否发生变更boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 如果发生改变,发送实例变更事件,处理源码在:InstancesChangeNotifierNotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 同步serviceInfo数据到本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;
}

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

真正执行的是com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService():

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {// 构建服务查询请求// Nacos服务端处理是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler.handleServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);request.setCluster(clusters);request.setHealthyOnly(healthyOnly);request.setUdpPort(udpPort);// 通过grpc请求Nacos服务端处理QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);return response.getServiceInfo();
}

queryInstancesOfService()核心就是构建了一个服务查询请求,通过grpc请求Nacos服务端,接下来我们直接看服务端的处理代码,具体是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler#handle。

public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String groupName = request.getGroupName();String serviceName = request.getServiceName();Service service = Service.newService(namespaceId, groupName, serviceName);String cluster = null == request.getCluster() ? "" : request.getCluster();boolean healthyOnly = request.isHealthyOnly();// 从缓存中获取serviceInfoServiceInfo result = serviceStorage.getData(service);// 从内存(map)获取ServiceMetadataServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);// 获取有保护机制的健康实例result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,meta.getClientIp());return QueryServiceResponse.buildSuccessResponse(result);
}

3.4、筛选满足条件的实例 

private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<>();}// 遍历所有实例,直接移除掉不满足条件的实例Iterator<Instance> iterator = list.iterator();while (iterator.hasNext()) {Instance instance = iterator.next();// 筛选出健康、启用、权重大于0的实例if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {iterator.remove();}}return list;
}

3.5、总结图

 

相关文章:

七、Nacos源码系列:Nacos服务发现

目录 一、服务发现 二、getServices()&#xff1a;获取服务列表 2.1、获取服务列表 2.2、总结图 三、getInstances(serviceId)&#xff1a;获取服务实例列表 3.1、从缓存中获取服务信息 3.2、缓存为空&#xff0c;执行订阅服务 3.2.1、调度更新&#xff0c;往线程池中…...

Vue源码系列讲解——模板编译篇【一】(综述)

目录 1. 前言 2. 什么是模板编译 3. 整体渲染流程 4. 模板编译内部流程 4.1 抽象语法树AST 4.2 具体流程 5. 总结 1. 前言 在前几篇文章中&#xff0c;我们介绍了Vue中的虚拟DOM以及虚拟DOM的patch(DOM-Diff)过程&#xff0c;而虚拟DOM存在的必要条件是得先有VNode&…...

【机器学习】数据清洗之识别异常点

&#x1f388;个人主页&#xff1a;甜美的江 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;机器学习 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步…...

MacOS 制作 TF 卡/ U 盘镜像

最近有张老的 TF 卡没办法直接拷贝里面的数据&#xff0c;于是打算利用 dd 工具直接全卡拷贝为镜像再分析里面的数据 在终端中&#xff0c;输入以下命令来列出所有磁盘设备&#xff1a; diskutil list这将显示Mac上所有的磁盘设备。你需要找到TF卡对应的设备&#xff0c;它通…...

怎么用postman调用webservice(反推SoapUI)

<soapenv:Envelope xmlns:soapenv“http://schemas.xmlsoap.org/soap/envelope/” xmlns:lis“LisDataTrasen”> soapenv:Header/ soapenv:Body lis:Test lis:test111111111</lis:test> </lis:Test> </soapenv:Body> </soapenv:Envelope> Conten…...

【开源】JAVA+Vue.js实现衣物搭配系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 衣物档案模块2.2 衣物搭配模块2.3 衣物收藏模块 三、系统设计3.1 用例设计3.2 E-R图设计3.3 数据库设计3.3.1 衣物档案表3.3.2 衣物搭配表3.3.3 衣物收藏表 四、系统实现4.1 登录页4.2 衣物档案模块4.3 衣物搭配模块4.4…...

【Flask + AI】接入CHATGLM API 实现翻译接口

【Flask AI】接入CHATGLM API 实现翻译接口 最近的项目中&#xff0c;需要加一个翻译功能&#xff0c;正好chatglm4发布了&#xff0c;于是决定着手用它实现。 https://chatglm.cn 准备 首先&#xff0c;在chatglm开发者中心申请api key&#xff0c;这里不再赘述 其次&…...

并发事务带来的问题及解决方法

引言 在数据库系统中&#xff0c;事务是指一组操作被视为一个逻辑单元&#xff0c;要么全部执行成功&#xff0c;要么全部不执行&#xff0c;保证数据库的一致性和完整性。而并发事务则是指多个事务同时执行的情况。虽然并发事务能够提高系统的性能和吞吐量&#xff0c;但也会…...

CRNN介绍:用于识别图中文本的深度学习模型

CRNN&#xff1a;用于识别图中文本的深度学习模型 CRNN介绍&#xff1a;用于识别图中文本的深度学习模型CRNN的结构组成部分工作原理 CRNN结构分析卷积层&#xff08;Convolutional Layers&#xff09;递归层&#xff08;Recurrent Layers&#xff09;转录层&#xff08;Transc…...

机器人运动学林沛群——变换矩阵

对于仅有移动&#xff0c;由上图可知&#xff1a; A P B P A P B o r g ^AP^BP^AP_{B org} APBPAPBorg​ 对于仅有转动&#xff0c;可得&#xff1a; A P B A R B P ^AP^A_BR^BP APBA​RBP 将转动与移动混合后&#xff0c;可得&#xff1a; 一个例子 在向量中&#xff…...

阿里云增加数据库访问白名单

阿里云增加数据库访问白名单 概况 我们希望在外网访问数据库时&#xff0c;可能会遇到无法连接的问题&#xff0c;这有可能是被拦截了。这时就需要去查看自己的ip有没有在白名单里面&#xff0c;没有的话就把ip加入到白名单。 路径 阿里云控制台-搜索RDS-进入RDS管理控制台…...

Rust基础拾遗--辅助功能

Rust基础拾遗 前言1.错误处理1.1 panic为什么是 Result 2. create与模块3. 宏4. 不安全代码5. 外部函数 前言 通过Rust程序设计-第二版笔记的形式对Rust相关重点知识进行汇总&#xff0c;读者通读此系列文章就可以轻松的把该语言基础捡起来。 1.错误处理 Rust 中的两类错误处理…...

【数据结构】双向链表(链表实现+测试+原码)

前言 在双向链表之前&#xff0c;如果需要查看单链表来复习一下&#xff0c;链接在这里&#xff1a; http://t.csdnimg.cn/Ib5qS 1.双向链表 1.1 链表的分类 实际中链表的结构非常多样&#xff0c;以下情况组合起来就有8种链表结构&#xff1a; 1.1.1 单向或者双向 1.1.2 …...

ChatGPT 3.5与4.0:深入解析技术进步与性能提升的关键数据

大家好&#xff0c;欢迎来到我的博客&#xff01;今天我们将详细比较两个引人注目的ChatGPT版本——3.5和4.0&#xff0c;通过一些关键数据来深入解析它们之间的差异以及4.0版本的技术进步。 1. 模型规模与参数 ChatGPT 3.5&#xff1a; 参数数量&#xff1a;约1.7亿个模型层数…...

前端JavaScript篇之ajax、axios、fetch的区别

目录 ajax、axios、fetch的区别AjaxAxiosFetch总结注意 ajax、axios、fetch的区别 在Web开发中&#xff0c;ajax、axios和fetch都是用于与服务器进行异步通信的技术&#xff0c;但它们在实现方式和功能上有所不同。 Ajax 定义与特点&#xff1a;Ajax是一种在无需重新加载整个…...

【PyTorch][chapter 15][李宏毅深度学习][Neighbor Embedding-LLE]

前言&#xff1a; 前面讲的都是线性降维&#xff0c;本篇主要讨论一下非线性降维. 流形学习&#xff08;mainfold learning&#xff09;是一类借鉴了拓扑流行概念的降维方法. 如上图,欧式距离上面 A 点跟C点更近&#xff0c;距离B 点较远 但是从图形拓扑结构来看&#xff0c; …...

在JSP中实现JAVABEAN

在JSP中实现JAVABEAN 问题陈述 创建Web应用程序以连接数据库并检索作者名、地址、城市、州及邮政编码等与作者的详细信息。JavaBean组件应接受作者ID、驱动程序名及URL作为参数。信息要从authors表中检索。 解决方案 要解决上述问题,需要执行以下任务: 创建Web应用程序。创…...

智能优化算法 | Matlab实现飞蛾扑火(MFO)(内含完整源码)

文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 智能优化算法 | Matlab实现飞蛾扑火(MFO)(内含完整源码) 源码设计 %%%% clear all clc SearchAgents_no=100; % Number of search ag...

LSF 主机状态 unreach 分析

在LSF集群运行过程中&#xff0c;有主机状态变为 unreach。熟悉LSF的朋友都知道主机状态为 unreach 表示主机上的 SBD 服务中断服务了&#xff0c;但其它服务 LIM 和 RES 还在正常运行。 影响分析 那么主机上的 SBD 服务中断的影响是什么呢&#xff1f; 我们需要先明白 SBD …...

SpringBoot日志

自定义日志 导入的是slf4j的Logger类 package app.controller;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.GetMapping;RestController pu…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

C# 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…...

Kafka主题运维全指南:从基础配置到故障处理

#作者&#xff1a;张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1&#xff1a;主题删除失败。常见错误2&#xff1a;__consumer_offsets占用太多的磁盘。 主题日常管理 …...

Ubuntu系统多网卡多相机IP设置方法

目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机&#xff0c;交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息&#xff0c;系统版本&#xff1a;Ubuntu22.04.5 LTS&#xff1b;内核版本…...