当前位置: 首页 > news >正文

Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker

基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。

此前我们学习了Dubbo3.1版本的MigrationRuleHandler这个处理器,它用于通过动态更改规则来控制迁移行为。MigrationRuleListener的onrefer方法是Dubbo2.x 接口级服务发现与Dubbo3.x应用级服务发现之间迁移的关键。在未来的版本MigrationRuleListener将会被删除。

我们最后讲到了MigrationRuleHandler的refreshInvoker方法,该方法除了刷新invoker迁移新规则之外,还负责远程服务发现订阅的逻辑,即消费者能发现远程服务提供方的地址列表。而接口级的服务引入订阅则是通过refreshInterfaceInvoker方法实现的,refreshServiceDiscoveryInvoker方法则实现应用级服务发现。

本次我们来学习接口级服务发现订阅refreshInterfaceInvoker。

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)

1 refreshInterfaceInvoker刷新接口级invoker

该方法具有接口级别的远程服务发现、引入、订阅能力,大概逻辑为:

  1. 首先判断是否需要刷新invoker,即重新创建真实invoker:如果真实invoker不存在,或者已被销毁,或者内部没有Directory,则需要刷新。
  2. 一般情况下,当启动消费者并首次执行refer的时候,真实invoker为null,需要创建真实invoker。
  3. 通过注册中心操作类registryProtocol#getInvoker方法来引入服务提供者invoker,这是消费者进行接口级服务发现订阅的核心逻辑。
/*** MigrationInvoker的方法* <p>* 刷新接口invoker** @param latch 倒计数器*/
protected void refreshInterfaceInvoker(CountDownLatch latch) {/** 1 如果MigrationInvoker内部的真实invoker存在,那么清空真实invoker的directory的*/clearListener(invoker);/** 2 判断是否需要刷新invoker,即重新创建真实invoker* 如果真实invoker不存在,或者已被销毁,或者内部没有Directory* 一般情况下,当启动消费者并首次执行refer的时候,真实invoker为null,需要创建*/if (needRefresh(invoker)) {if (logger.isDebugEnabled()) {logger.debug("Re-subscribing interface addresses for interface " + type.getName());}//如果不为null,则销毁if (invoker != null) {invoker.destroy();}/** 3 通过注册中心操作类registryProtocol获取真实invoker** 这是消费者进行接口级服务发现订阅的核心逻辑,这里的registryProtocol类型为InterfaceCompatibleRegistryProtocol*/invoker = registryProtocol.getInvoker(cluster, registry, type, url);}/** 设置监听器*/setListener(invoker, () -> {latch.countDown();if (reportService.hasReporter()) {reportService.reportConsumptionStatus(reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface"));}//如果迁移状态为APPLICATION_FIRST,那么设置首选invokerif (step == APPLICATION_FIRST) {calcPreferredInvoker(rule);}});
}

2 getInvoker获取invoker

这是默认消费者进行接口级服务发现订阅的核心逻辑,这里的registryProtocol类型为InterfaceCompatibleRegistryProtocol。

  1. 首先创建接口级动态注册心中目录RegistryDirectory
  2. 随后调用doCreateInvoker方法创建服务引入invoker
/*** InterfaceCompatibleRegistryProtocol的方法* <p>* 获取接口级别invoker** @param cluster  集群操作对象* @param registry 注册中心对象,例如ListenerRegistryWrapper(ZookeeperRegistry)* @param type     接口类型* @param url      注册中心协议url,协议是真实注册中心协议,例如zookeeper* @return 真实invoker*/
@Override
public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {/** 1 创建动态注册心中目录DynamicDirectory*/DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);/** 2 创建invoker*/return doCreateInvoker(directory, cluster, registry, type);
}

3 RegistryDirectory注册中心目录

RegistryDirectory是基于注册中心的服务发现使用的服务目录,其内部存储着从远程注册中心获取的服务提供者的信息。消费者进行远程rpc调用的时候能够通过RegistryDirectory找到一个远程服务提供者。

RegistryDirectory会自动从注册中心更新 Invoker列表、配置信息、路由列表。

下面是RegistryDirectory的构造器的逻辑,仅仅是进行了一系列的初始化操作。

/*** 创建动态注册心中目录** @param serviceType 服务接口类型* @param url         注册中心协议url,协议是真实注册中心协议,例如zookeeper*/
public RegistryDirectory(Class<T> serviceType, URL url) {//调用父类构造器super(serviceType, url);//设置模块模型moduleModel = getModuleModel(url.getScopeModel());//设置消费者配置监听器ConsumerConfigurationListenerconsumerConfigurationListener = getConsumerConfigurationListener(moduleModel);
}

3.1 DynamicDirectory动态目录

/*** 动态目录** @param serviceType 服务接口类型* @param url         注册中心协议url*/
public DynamicDirectory(Class<T> serviceType, URL url) {//调用父类构造器super(url, true);//模块模型ModuleModel moduleModel = url.getOrDefaultModuleModel();//基于Dubbo SPI获取Cluster的自适应实现,Cluster$Adaptivethis.cluster = moduleModel.getExtensionLoader(Cluster.class).getAdaptiveExtension();//基于Dubbo SPI获取RouterFactory的自适应实现,RouterFactory$Adaptivethis.routerFactory = moduleModel.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();if (serviceType == null) {throw new IllegalArgumentException("service type is null.");}if (StringUtils.isEmpty(url.getServiceKey())) {throw new IllegalArgumentException("registry serviceKey is null.");}//是否需要注册,默认truethis.shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);//是否简化,默认falsethis.shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);//服务接口类型this.serviceType = serviceType;//服务keythis.serviceKey = super.getConsumerUrl().getServiceKey();//消费者urlthis.directoryUrl = consumerUrl;//消费者消费的服务分组信息String group = directoryUrl.getGroup("");//多个组this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));//当是服务目录信息为空时否快速失败,默认truethis.shouldFailFast = Boolean.parseBoolean(ConfigurationUtils.getProperty(moduleModel, Constants.SHOULD_FAIL_FAST_KEY, "true"));
}

3.2 AbstractDirectory抽象目录

/*** 抽象目录* @param url 注册中心协议url* @param isUrlFromRegistry 是否是注册中心协议*/
public AbstractDirectory(URL url, boolean isUrlFromRegistry) {this(url, null, isUrlFromRegistry);
}public AbstractDirectory(URL url, RouterChain<T> routerChain, boolean isUrlFromRegistry) {if (url == null) {throw new IllegalArgumentException("url == null");}//移除属性this.url = url.removeAttribute(REFER_KEY).removeAttribute(MONITOR_KEY);Map<String, String> queryMap;//获取refer属性,即服务引用参数mapObject referParams = url.getAttribute(REFER_KEY);if (referParams instanceof Map) {queryMap = (Map<String, String>) referParams;//消费者url,协议默认为consumerthis.consumerUrl = (URL) url.getAttribute(CONSUMER_URL_KEY);} else {queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));}// remove some local only parameters 删除一些仅本地参数ApplicationModel applicationModel = url.getOrDefaultApplicationModel();this.queryMap = applicationModel.getBeanFactory().getBean(ClusterUtils.class).mergeLocalParams(queryMap);if (consumerUrl == null) {String host = isNotEmpty(queryMap.get(REGISTER_IP_KEY)) ? queryMap.get(REGISTER_IP_KEY) : this.url.getHost();String path = isNotEmpty(queryMap.get(PATH_KEY)) ? queryMap.get(PATH_KEY) : queryMap.get(INTERFACE_KEY);String consumedProtocol = isNotEmpty(queryMap.get(PROTOCOL_KEY)) ? queryMap.get(PROTOCOL_KEY) : CONSUMER;URL consumerUrlFrom = this.url.setHost(host).setPort(0).setProtocol(consumedProtocol).setPath(path);if (isUrlFromRegistry) {// reserve parameters if url is already a consumer urlconsumerUrlFrom = consumerUrlFrom.clearParameters();}this.consumerUrl = consumerUrlFrom.addParameters(queryMap);}//连接检查任务调度执行器,Dubbo-framework-connectivity-scheduler,线程数为可用cup核心数this.connectivityExecutor = applicationModel.getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class).getConnectivityScheduledExecutor();//获取配置对象Configuration configuration = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultModuleModel());//获取dubbo.reconnect.reconnectTaskTryCount配置,默认10,重新连接任务的最大数量this.reconnectTaskTryCount = configuration.getInt(RECONNECT_TASK_TRY_COUNT, DEFAULT_RECONNECT_TASK_TRY_COUNT);//获取dubbo.reconnect.reconnectTaskPeriod配置,默认1000ms,重新连接任务的间隔时间this.reconnectTaskPeriod = configuration.getInt(RECONNECT_TASK_PERIOD, DEFAULT_RECONNECT_TASK_PERIOD);//路由器链setRouterChain(routerChain);
}

4 doCreateInvoker创建invoker

该方法由InterfaceCompatibleRegistryProtocol的父类RegistryProtocol实现。大概步骤为:

  1. 首先根据消费者信息转换为消费者注册信息url,内部包括消费者ip、指定引用的protocol(默认consumer协议)、指定引用的服务接口、指定引用的方法以及其他消费者信息。
  2. 调用registry.register方法将消费者注册信息url注册到注册中心。
  3. 调用directory.buildRouterChain方法构建服务调用路由链。
  4. 调用directory.subscribe方法进行服务发现、引入并订阅服务。
  5. 调用cluster.join方法进行集群容错能力包装。
/*** RegistryProtocol的方法* 创建ClusterInvoker** @param directory 动态目录* @param cluster   集群* @param registry  注册中心* @param type      服务接口类型* @return ClusterInvoker*/
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {//注册中心操作类directory.setRegistry(registry);//设置协议,Protocol$Adaptivedirectory.setProtocol(protocol);// all attributes of REFER_KEY 消费者服务引用参数Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());//消费者信息转消费者注册信息urlURL urlToRegistry = new ServiceConfigURL(//获取protocol属性,只调用指定协议的服务提供方,其它协议忽略,默认值consumerparameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),//消费者ipparameters.remove(REGISTER_IP_KEY),//端口0,//服务接口路径getPath(parameters, type),//服务引用参数parameters);urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());//是否应该注册,默认trueif (directory.isShouldRegister()) {//设置注册的消费者urldirectory.setRegisteredConsumerUrl(urlToRegistry);/** 1 消费者注册信息url注册到注册中心*/registry.register(directory.getRegisteredConsumerUrl());}/** 2 构建服务路由器链*/directory.buildRouterChain(urlToRegistry);/** 3 服务发现并订阅服务*/directory.subscribe(toSubscribeUrl(urlToRegistry));/** 4 集群容错包装*/return (ClusterInvoker<T>) cluster.join(directory, true);
}

4.1 register注册接口级消费者信息

该方法的源码我们在此前学习provider导出服务并且接口级服务注册到注册中心的时候就讲过了,即注册接口级别服务消费者和提供者信息是同一个方法。

该方法将会通过注册中心操作类Registry将服务消费者url的信息注册到注册中心。以ZookeeperRegistry为例,根据url构建节点路径,/dubbo/{servicePath}/ consumers /{urlString},例如:/dubbo/org.apache.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F10.253.45.126%2Forg.apache.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26background%3Dfalse%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D62247%26release%3D%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1667211594775%26unloadClusterRelated%3Dfalse,节点的值是服务提供者的节点ip。

注意此节点是一个临时节点,当服务关闭时节点删除。对于接口级的消费者服务注册,在zookeeper中的节点样式如下,可以发现和服务提供者注册的服务信息在同一个大的目录下面,即在同一个服务接口目录下:


4.2 buildRouterChain构建路由链

buildRouterChain方法用于构建路由链RouterChain,每个Directory都有一条RouterChain。服务消费者会向注册中心获取服务提供者的地址列表,当消费端发起调用服务时,会先根据路由策略选出需要调用的目标服务提供者地址列表,随后根据负载算法直接调用提供者。

流量路由,顾名思义就是把具有某些属性特征的流量,路由到指定的目标。dubbo流量路由是流量治理中重要的一环,多个路由如同流水线一样,形成一条路由链,从所有的地址表中筛选出最终目的地址集合,再通过负载均衡策略选择访问的地址。开发者可以基于流量路由标准来实现各种场景,如灰度发布、金丝雀发布、容灾路由、标签路由等。

该方法入口为DynamicDirectory的buildRouterChain方法,基于RouterChain.buildChain构建路由链并设置给AbstractDirectory 的routerChain属性。

/*** DynamicDirectory的方法* 构建服务路由链** @param url 消费者注册信息url*/
public void buildRouterChain(URL url) {//构建一个RouterChainthis.setRouterChain(RouterChain.buildChain(getInterface(), url));
}

RouterChain.buildChain方法源码如下。

/*** RouterChain的方法** 构建路由链** @param interfaceClass 服务接口class* @param url 消费者注册信息url* @return 消费者服务路由链*/
public static <T> RouterChain<T> buildChain(Class<T> interfaceClass, URL url) {ModuleModel moduleModel = url.getOrDefaultModuleModel();//从消费者注册信息url中获取获取router属性的值,并且拆分后作为扩展名获取全部扩展名的RouterFactory实现,默认返回空列表List<RouterFactory> extensionFactories = moduleModel.getExtensionLoader(RouterFactory.class).getActivateExtension(url, ROUTER_KEY);//从 RouterFactory 获取 Router,默认返回空列表List<Router> routers = extensionFactories.stream().map(factory -> factory.getRouter(url)).sorted(Router::compareTo).collect(Collectors.toList());//获取状态路由列表,默认有5个状态路由List<StateRouter<T>> stateRouters = moduleModel.getExtensionLoader(StateRouterFactory.class).getActivateExtension(url, ROUTER_KEY).stream().map(factory -> factory.getRouter(interfaceClass, url)).collect(Collectors.toList());//是否快速失败boolean shouldFailFast = Boolean.parseBoolean(ConfigurationUtils.getProperty(moduleModel, Constants.SHOULD_FAIL_FAST_KEY, "true"));//获取RouterSnapshotSwitcherRouterSnapshotSwitcher routerSnapshotSwitcher = ScopeModelUtil.getFrameworkModel(moduleModel).getBeanFactory().getBean(RouterSnapshotSwitcher.class);//创建一个RouterChain对象并返回return new RouterChain<>(routers, stateRouters, shouldFailFast, routerSnapshotSwitcher);
}

该方法会尝试获取消费者引用配置中指定的Router和StateRouter,默认情况下没有Router,但有5个StateRouter。随后将路由链表构建为一个RouterChain返回。

  1. MockInvokersSelector:一种特殊Router,如果一个请求被配置为使用mock,那么这个路由器保证只有带有mock协议的invoker出现在最终的invoker列表中,所有其他invoker将被排除。
  2. StandardMeshRuleRouter:dubbo3新增的Router,用于服务治理。可以动态配置到/dubbo/config/dubbo/{applicationName}.MESHAPPRULE节点。
  3. TagStateRouter:标签路由,根据规则获取指定tag的invoker地址列表,进行过滤排除那些没有的invoker。tag可以动态配置到/dubbo/config/dubbo/{providerApplicationName}.tag-router节点。
  4. ServiceStateRouter:服务级别路由器,可以动态配置到/dubbo/config/dubbo/{interface}:{version}:{group}.condition-router节点。
  5. AppStateRouter:应用级别路由器,可以动态配置到/dubbo/config/dubbo/{applicationName}.condition-router节点。

4.3 subscribe接口级服务发现和订阅

该方法用于消费者端订阅zookeeper服务节点下的providers,configurators,routers节点目录变更,当这些节点目录发生变化时会触发回调通知RegistryDirectory执行notify方法,进而完成本地服务列表的动态更新功能。实际上服务提供者也会订阅,只不过只会订阅configurators节点。

该方法首先调用父类DynamicDirectory的subscribe方法,随后将当前RegistryDirectory实例加入到节点目录变化的回调通知监听器集合中,用以接收通知。

/*** RegistryDirectory的方法* <p>* 订阅服务** @param url 服务消费者url*/
@Override
public void subscribe(URL url) {//调用父类DynamicDirectory的subscribe方法super.subscribe(url);//获取enable-configuration-listen配置,默认trueif (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, org.apache.dubbo.registry.Constants.ENABLE_CONFIGURATION_LISTEN, true)) {//将当前RegistryDirectory加入到节点目录变化的回调通知监听器集合中consumerConfigurationListener.addNotifyListener(this);//引用配置监听器referenceConfigurationListener = new ReferenceConfigurationListener(moduleModel, this, url);}
}

DynamicDirectory的subscribe方法源码如下,主要是调用registry注册中心的subscribe方法实现服务订阅,并将自身作为监听器。

/*** DynamicDirectory的方法* 订阅服务** @param url 服务消费者url*/
public void subscribe(URL url) {//设置subscribeUrl属性setSubscribeUrl(url);//调用registry注册中心的subscribe方法实现服务订阅registry.subscribe(url, this);
}

4.3.1 Registry#subscribe订阅服务

对于接口级别的服务发现协议来说,内部注册中心是各个协议对应的真正的注册中心实现,例如ListenerRegistryWrapper(ZookeeperRegistry),而对于应用级别服务发现协议来说,则是ListenerRegistryWrapper(ServiceDiscoveryRegistry)。

ListenerRegistryWrapper的subscribe方法通过内部的registry#subscribe方法实现订阅。

/*** ListenerRegistryWrapper的方法* <p>* 订阅服务** @param url      服务消费者url* @param listener 事件监听器*/
@Override
public void subscribe(URL url, NotifyListener listener) {try {//通过内部的registry#subscribe方法实现订阅if (registry != null) {registry.subscribe(url, listener);}} finally {//注册监听器,监听事件listenerEvent(serviceListener -> serviceListener.onSubscribe(url, registry));}
}

4.3.2 FailbackRegistry#subscribe订阅服务

该方法是subscribe方法入口。

  1. 调用父类AbstractRegistry的subscribe方法,将url和listener添加到中subscribed缓存中。
  2. 随后调用removeFailedSubscribed从failedSubscribed缓存中移除该url失败的订阅。
  3. 最后调用doSubscribe方法向服务器端发送订阅请求,该方法由各个注册中心子类实现。
/*** FailbackRegistry的方法* <p>* 订阅服务** @param url      订阅者url* @param listener 通知监听器*/
@Override
public void subscribe(URL url, NotifyListener listener) {//调用父类AbstractRegistry的subscribe方法,将url和listener添加到中subscribed缓存中super.subscribe(url, listener);//从failedSubscribed缓存中移除该url失败的订阅removeFailedSubscribed(url, listener);try {// Sending a subscription request to the server side/** 向服务器端发送订阅请求*/doSubscribe(url, listener);} catch (Exception e) {//失败处理Throwable t = e;List<URL> urls = getCacheUrls(url);if (CollectionUtils.isNotEmpty(urls)) {notify(url, listener, urls);logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getCacheFile().getName() + ", cause: " + t.getMessage(), t);} else {// If the startup detection is opened, the Exception is thrown directly.boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)&& url.getParameter(Constants.CHECK_KEY, true);boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);} else {logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);}}// Record a failed registration request to a failed list, retry regularly//添加到失败订阅列表,定时默认5000ms重试addFailedSubscribed(url, listener);}
}

ZookeeperRegistry#doSubscribe订阅服务

该方法的大概逻辑为:

  1. 该方法获取url的category参数,该参数表示要监听的子目录,在默认设置下,当url是消费者url时,path变量可以为:
    1. dubbo/[service name]/providers,即服务提供者目录。
    2. dubbo/[service name]/configurators,即配置目录。
    3. dubbo/[service name]/routers,即服务路由目录。
  2. 遍历path,为path设置监听器,并且获取当前path下的子节点。并且加入urls集合中。
  3. 主动调用notify方法通知数据变更,这里实际上会动态更新本地内存和文件中的服务提供者缓存,并将url转换为invoker,还会创建NettyClient与服务端建立连接。这一步也是核心流程。

/*** ZookeeperRegistry的方法* <p>* 订阅服务节点** @param url      订阅者url* @param listener 通知监听器*/
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {try {checkDestroyed();//如果url的interface为*,即指定的服务接口为则监听所有,那么直接监听zk的dubbo root 节点//一般都不会这么指定if (ANY_VALUE.equals(url.getServiceInterface())) {//root节点路径,默认dubbo节点String root = toRootPath();boolean check = url.getParameter(CHECK_KEY, false);//初始化监听器ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());//添加监听器ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChildren) -> {for (String child : currentChildren) {child = URL.decode(child);if (!anyServices.contains(child)) {anyServices.add(child);subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,Constants.CHECK_KEY, String.valueOf(check)), k);}}});//创建dubbo节点zkClient.create(root, false);//添加节点监听器List<String> services = zkClient.addChildListener(root, zkListener);if (CollectionUtils.isNotEmpty(services)) {for (String service : services) {service = URL.decode(service);anyServices.add(service);subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,Constants.CHECK_KEY, String.valueOf(check)), listener);}}}//普通逻辑else {CountDownLatch latch = new CountDownLatch(1);try {List<URL> urls = new ArrayList<>();/*Iterate over the category value in URL.With default settings, the path variable can be when url is a consumer URL:/dubbo/[service name]/providers,/dubbo/[service name]/configurators/dubbo/[service name]/routers*///获取url的category参数,该参数表示要监听的子目录//在默认设置下,当url是消费者url时,path变量可以为://dubbo/[service name]/providers    ,即服务提供者目录//dubbo/[service name]/configurators    ,即配置目录//dubbo/[service name]/routers    ,即路由目录for (String path : toCategoriesPath(url)) {//初始化监听器ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());//获取RegistryChildListenerImpl这个ChildListenerChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch));if (zkListener instanceof RegistryChildListenerImpl) {((RegistryChildListenerImpl) zkListener).setLatch(latch);}// create "directories".//尝试创建监听的节点目录,永久节点zkClient.create(path, false);// Add children (i.e. service items).//为该节点添加一个子节点监听器,返回该节点目前的所有子节点List<String> children = zkClient.addChildListener(path, zkListener);if (children != null) {// The invocation point that may cause 1-1.//如果是providers下的子节点,则将consumer以及对应的子节点存入存入ZookeeperRegistry的缓存stringUrls中urls.addAll(toUrlsWithEmpty(url, path, children));}}/** 主动调用notify方法通知数据变更*/notify(url, listener, urls);} finally {// tells the listener to run only after the sync notification of main thread finishes.latch.countDown();}}} catch (Throwable e) {throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}
}

实际返回的urls中的数据案例如下,可以返回多个子节点集群信息,如果没有配置和路由信息,那么返回的url协议为empty。

4.4 cluster.join集群容错

集群对象Cluster,实际上Cluster不仅仅有容错功能,因为其还代表着Dubbo十层架构中的集群层,因此,集群路由、负载均衡、发起RPC调用、失败重试、服务降级都是在该层实现的。但是这些功能不都是由Cluster接口实现的,而是由集群层的其他核心接口实现,例如Directory 、 Router 、 LoadBalance,他们的关系如下:

  1. 这里的 Invoker 是 Provider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息
  2. Directory 代表多个 Invoker,可以把它看成 List ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更。
  3. Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。
  4. Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等。
  5. LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选。

更多集群容错介绍参见官方文档:https://dubbo.apache.org/zh/docs3-v2/java-sdk/advanced-features-and-usage/service/fault-tolerent-strategy/

Cluster通过基于Dubbo SPI机制获取,同时还会被wrapper包装,例如MockClusterWrapper。MockClusterWrapper#join方法将会被最先调用,它的join方法如下,通过MockClusterInvoker包装下层join方法返回的invoker,用于实现本地mock功能。

/*** MockClusterWrapper的方法** @param directory        服务目录* @param buildFilterChain 是否构建过滤器链* @return MockClusterInvoker* @throws RpcException*/
@Override
public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {//通过MockClusterInvoker包装下层join方法返回的invoker,用于实现本地mock功能return new MockClusterInvoker<T>(directory,this.cluster.join(directory, buildFilterChain));
}

我们可以通过属性cluster设置集群方式,如不设置默认failover,即FailoverCluster。常见策略如下:

  1. Failover Cluster:失败自动切换。
    1. 失败自动切换,Dubbo默认的容错策略。服务消费方调用失败后自动切换到其他服务提供者的服务器进行重试。客户端等待服务端的处理时间超过了设定的超时时间时,也算做失败,将会重试。可通过 retries属性来设置重试次数(不含第一次),默认重试两次。
    2. 通常用于读操作或者具有幂等的写操作,需要注意的是重试会带来更长延迟。
  2. Failfast Cluster:快速失败。
    1. 只发起一次调用,失败后立即抛出异常。
    2. 通常用于非幂等性的写操作,比如新增记录。
  3. Failsafe Cluster:安全失败。
    1. 当调用失败出现异常时,直接忽略此异常,并记录一条日志,返回一个null结果,即使失败了也不会影响整个调用流程。
    2. 通常用于操作日志记录等无需要严格保证成功的操作。
  4. Failback Cluster:失败自动恢复。
    1. 当调用失败出现异常后,不会抛出异常,这类似于Failsafe,但Failback还会将这次调用加入内存中的失败列表中,对于这个列表中的失败调用,会在另一个线程中进行异步重试,重试如果再发生失败,则会忽略,并且即使重试调用成功,原来的调用方也感知不到结果了。
    2. 通常用于对于实时性要求不高,且不需要返回值的一些异步操作,比如消息通知。
  5. Forking Cluster:并行调用。
    1. 并行调用多个服务提供者,只要一个成功即返回。可通过forks=”2”来设置最大并行数。
    2. 通常用于实时性要求较高的读操作,不适用于非幂等操作,并且需要消耗更多的服务资源。
  6. Broadcast Cluster:广播调用。
    1. 广播调用所有提供者,逐个调用,任意一台报错则调用报错。
    2. 通常用于通知所有提供者更新缓存或日志等本地资源信息。
  7. Available Cluster:可用实例调用。
    1. 调用目前可用的实例(只调用一个),如果当前没有可用的实例,则抛出异常。
    2. 通常用于不需要负载均衡的场景。
  8. Mergeable Cluster:合并调用。
    1. 将集群中的调用结果聚合起来返回结果。
    2. 通常和group一起配合使用。通过分组对结果进行聚合并返回聚合后的结果,比如菜单服务,用group区分同一接口的多种实现,现在消费方需从每种group中调用一次并返回结果,对结果进行合并之后返回,这样就可以实现聚合菜单项。
  9. ZoneAware Cluster:多注册中心调用。
    1. 多注册中心订阅的场景,注册中心集群间的负载均衡。

当在对directory执行了buildRouterChain方法构建路由链,以及subscribe方法服务发现并订阅服务的处理之后,directory内部实际上已经包含了configurators配置信息集合,routerChain路由链以及urlInvokerMap这三个缓存。在最后,将会通过cluster.join为服务调用添加集群容错机制。

不同的cluster#join方法会返回具有对应的容错功能的invoker。例如FailoverCluster#join方法将会返回FailoverClusterInvoker。

集群容错的能力只有在服务调用的时候才会得到体现,当我们学习Dubbo服务调用的时候在学习他们的源码。

5 总结

本次我们学习了接口级的服务引入订阅的refreshInterfaceInvoker方法,当时还有最为关键的notify服务通知更新的部分源码没有学习,所以下一篇文章将会介绍,notify通知本地服务更新的源码,然后在进行总结。

相关文章:

Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker

基于Dubbo 3.1&#xff0c;详细介绍了Dubbo服务的发布与引用的源码。 此前我们学习了Dubbo3.1版本的MigrationRuleHandler这个处理器&#xff0c;它用于通过动态更改规则来控制迁移行为。MigrationRuleListener的onrefer方法是Dubbo2.x 接口级服务发现与Dubbo3.x应用级服务发现…...

高级java每日一道面试题-2024年11月04日-Redis篇-Redis如何做内存优化?

如果有遗漏,评论区告诉我进行补充 面试官: Redis如何做内存优化? 我回答: 在Java高级面试中&#xff0c;关于Redis如何做内存优化的问题&#xff0c;可以从以下几个方面进行详细解答&#xff1a; 一、Redis内存优化概述 Redis内存优化主要是指通过一系列策略和技术&#…...

数据结构 -二叉搜索树

一.什么是二叉搜索树 树插入删除方便比线性数组 二.二叉搜索树的查找操作 尾递归可以用循环递归 三.二叉树的插入操作 35要挂在33上面必须记住33的位置 解决方法&#xff0c;要求递归函数返回一个 结点插到33的右子树 四.二叉搜索树的删除 要是删除的是叶子节点之间删除 只有一…...

Ubuntu配置阿里云docker apt源

一、配置阿里云docker apt源 Ubuntu 放弃了apt-key的GPG 密钥的管理方法&#xff0c;用户可以直接添加gpg密钥到/etc/apt/trusted.gpg.d/目录下。 同时添加删除apt source 直接在/etc/apt/sources.list.d/目录下操作即可。 1、删除旧的镜像源 #旧版操作方法 apt-key list # …...

【React】状态管理之Redux

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 状态管理之Redux引言1. Redux 的核心概念1.1 单一数据源&#xff08;Single Sou…...

3195. 有趣的数-13年12月CCF计算机软件能力认证(组合数)

题目 思路 统计方案的时候先去分类&#xff0c;先放01&#xff0c;然后在考虑23对于第k类&#xff0c; 对于01的选择 对于所有的分类&#xff1a;本题我觉得要考虑的几个点就是&#xff1a;状态分类得到数学公式组合数的计算防越界处理 代码 计算组合数的代码模板&#xff1…...

基于 Python 的 Bilibili 评论分析与可视化

一、项目概述 本项目利用 Python 对 Bilibili &#xff08;哔哩哔哩&#xff09;平台上的视频评论数据进行爬取、清洗和分析&#xff0c;并通过可视化展示数据的主要特征。我们通过以下几个步骤实现了这一过程&#xff1a; 数据爬取&#xff1a;使用 Bilibili 提供的 API 获取…...

大语言模型理论基础

文章目录 前言大语言模型必需知识概述大语言模型目标模型上下文神经网络的神经元常见激活函数SigmoidTanhRelusoftmax 通用近似定理多层感知机&#xff08;MLP&#xff09;拟合最后 前言 你好&#xff0c;我是醉墨居士&#xff0c;我们接下来对大语言模型一探究竟&#xff0c;…...

【 LLM论文日更|检索增强:大型语言模型是强大的零样本检索器 】

论文&#xff1a;https://aclanthology.org/2024.findings-acl.943.pdf代码&#xff1a;GitHub - taoshen58/LameR机构&#xff1a;悉尼科技大学 & 微软 & 阿姆斯特丹大学 & 马里兰大学领域&#xff1a;retrieval & llm发表&#xff1a;ACL2024 研究背景 研究…...

【基于轻量型架构的WEB开发】课程 作业3 Spring框架

一. 单选题&#xff08;共12题&#xff0c;48分&#xff09; 1. (单选题)以下有关Spring框架优点的说法不正确的是&#xff08; &#xff09;。 A. Spring就大大降低了组件之间的耦合性。 B. Spring是一种侵入式框架 C. 在Spring中&#xff0c;可以直接通过Spring配置文件管理…...

14.最长公共前缀-力扣(LeetCode)

题目&#xff1a; 解题思路&#xff1a; 解决本题的关键点是确定扫描的方式&#xff0c;大体上有两种方式&#xff1a;横向扫描和纵向扫描。 1、横向扫描&#xff1a;首先比较第一个字符串和第二个字符串&#xff0c;记录二者的公共前缀&#xff0c;然后用当前公共前缀与下一个…...

客户案例|智能进化:通过大模型重塑企业智能客服体验

01 概 述 随着人工智能技术的快速发展&#xff0c;客户对服务体验的期待和需求不断升级。在此背景下&#xff0c;大模型技术的崛起&#xff0c;为智能客服领域带来了创造性的变革。 在上篇文章《在后LLM时代&#xff0c;关于新一代智能体的思考》中有提到&#xff0c;智能客服…...

Flink Job更新和恢复

Checkpoints 的主要目的是为意外失败的作业提供恢复机制。 Savepoints的设计更侧重于可移植性和操作灵活性&#xff0c;尤其是在 job 变更方面。Savepoint 的用例是针对计划中的、手动的运维。例如&#xff0c;可能是更新你的 Flink 版本&#xff0c;更改你的作业图等等。 fli…...

读多写少业务中,MySQL如何优化数据查询方案?

小熊学Java​站点:https://www.javaxiaobear.cn 编程资料合集:https://pqgmzk7qbdv.feishu.cn/base/QXq2bY5OQaZiDksJfZMc30w5nNb?from=from_copylink 看一看当面试官提及“在读多写少的网络环境下,MySQL 如何优化数据查询方案”时,你要从哪些角度出发回答问题??? 案例…...

Bugku CTF_Web——点login咋没反应

Bugku CTF_Web——点login咋没反应 进入靶场 随便输个试试 看来确实点login没反应 抓包看看 也没有什么信息 看了下源码 给了点提示 一个admin.css try ?12713传参试试 拿到一个php代码 <?php error_reporting(0); $KEYctf.bugku.com; include_once("flag.php&q…...

attention 注意力机制 学习笔记-GPT2

注意力机制 这可能是比较核心的地方了。 gpt2 是一个decoder-only模型&#xff0c;也就是仅仅使用decoder层而没有encoder层。 decoder层中使用了masked-attention 来进行注意力计算。在看代码之前&#xff0c;先了解attention-forward的相关背景知识。 在普通的self-atten…...

什么是HTTP,什么是HTTPS?HTTP和HTTPS都有哪些区别?

什么是 HTTP&#xff1f; HTTP&#xff08;Hypertext Transfer Protocol&#xff0c;超文本传输协议&#xff09;是一种应用层协议&#xff0c;用于在互联网上进行数据通信。它定义了客户端&#xff08;通常是浏览器&#xff09;和服务器之间的请求和响应格式。HTTP 是无状态的…...

SkyWalking-安装

SkyWalking-简单介绍 是一个开源的分布式追踪系统&#xff0c;用于检测、诊断和优化分布式系统的功能。 支持 ElasticSearch、H2、MySQL、PostgreSql 等数据库 基于 ElasticSearch 的情况 ElasticSearch&#xff08;ES&#xff09; 安装 1、下载并解压 https://www.elastic…...

RabbitMQ运维

1. 单机多节点 1.1 搭建RabbitMQ ①安装RabbitMQ 略 ②确认RabbitMQ运⾏没问题 #查看RabbitMQ状态 rabbitmqctl status 节点名称: 端口号: 25672:Erlang分布式节点通信的默认端⼝, Erlang是RabbitMQ的底层通信协议.15672: Web管理界⾯的默认端⼝, 通过这个端⼝可以访问R…...

Go语言并发精髓:深入理解和运用go语句

Go语言并发精髓:深入理解和运用go语句 在Go语言的世界里,go语句是实现并发的核心,它简洁而强大,允许程序以前所未有的方式运行多个任务。本文将深入探讨go语句及其执行规则,揭示Go语言并发编程的内在机制,并提供实际案例帮助读者掌握其用法。 1. go语句的基本概念(Wha…...

基于STM32的智能家居系统:MQTT、AT指令、TCP\HTTP、IIC技术

一、项目概述 随着智能家居技术的不断发展&#xff0c;越来越多的家庭开始使用智能设备来提升生活质量和居住安全性。智能家居系统不仅提供了便利的生活方式&#xff0c;还能有效地监测家庭环境&#xff0c;保障家庭安全。本项目以设计一种基于STM32单片机的智能家居系统为目标…...

分糖果(相等分配)

题目&#xff1a;有n种不同口味的糖果&#xff0c;第i种糖果的数量为a[i]&#xff0c;现在需要把糖果分给m个人。分给每个人糖果的数量必须是相等的&#xff0c;并且每个人只能选择一种糖果。也就是说&#xff0c;可以把一种糖果分给多个人&#xff0c;但是一个人的糖果不能有多…...

docker构建jdk11

# 建立一个新的镜像文件&#xff0c;配置模板&#xff1a;新建立的镜像是以centos为基础模板 # 因为jdk必须运行在操作系统之上 FROM centos:7.9.2009# 作者名 MAINTAINER yuanhang# 创建一个新目录来存储jdk文件 RUN mkdir /usr/local/java#将jdk压缩文件复制到镜像中&#…...

唐帕科技校园语音报警系统:通过关键词识别,阻止校园霸凌事件

校园霸凌问题已成为全球教育领域的严峻挑战&#xff0c;给受害者带来了身心上的长期创伤。然而&#xff0c;随着科技的发展&#xff0c;尤其是人工智能和语音识别技术的不断进步&#xff0c;我们开始看到创新性解决方案的出现。校园语音报警系统便是其中一种利用技术手段保护学…...

酒店行业数据仓库

重要名词&#xff1a; PMS&#xff1a;酒店管理系统CRS&#xff1a;中央预定系统客户&#xff1a;可以分为会员、散客&#xff08;自行到店入住&#xff09;、协议&#xff08;与酒店长期合作&#xff0c;内部价&#xff09;、中介预定&#xff1a;可以分为线上预定、线下预定…...

A029-基于Spring Boot的物流管理系统的设计与实现

&#x1f64a;作者简介&#xff1a;在校研究生&#xff0c;拥有计算机专业的研究生开发团队&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339; 赠送计算机毕业设计600…...

Python Day5 进阶语法(列表表达式/三元/断言/with-as/异常捕获/字符串方法/lambda函数

Python 列表推导式是什么 列表推导式是 Python 语言特有的一种语法结构&#xff0c;也可以看成是 Python 中一种独特的数据处理方式&#xff0c; 它在 Python 中用于 转换 和 过滤 数据。 其语法格式如下所示&#xff0c;其中 [if 条件表达式] 可省略。 [表达式 for 迭代变量…...

一文了解Android的核心系统服务

在 Android 系统中&#xff0c;核心系统服务&#xff08;Core System Services&#xff09;是应用和系统功能正常运行的基石。它们负责提供系统级的资源和操作支持&#xff0c;包含了从启动设备、管理进程到提供应用基础组件的方方面面。以下是 Android 中一些重要的核心系统服…...

Scala的Array(1)

Scala的Array表示长度不可变的数组&#xff0c;若需要定义可变数组需要倒包 import scala.collection.mutable.ArrayBuffer 下面是关于Array的一些用法&#xff1a; import scala.collection.mutable.ArrayBufferobject test29 { // //不可变数组 Array // def main(args:…...

[Linux] Linux信号捕捉

在Linux中&#xff0c;信号捕捉是通过使用信号处理函数来实现的。信号是操作系统用于通知进程发生某些事件的机制&#xff0c;例如终止进程、外部中断、非法操作等。常用的信号捕捉机制是通过signal()函数或sigaction()函数来注册信号处理程序。 1. 使用signal()函数 signal(…...