Dubbo源码解析一服务暴露与发现
Dubbo 服务暴露与发现
- 1. Spring中自定义Schema
- 1.1 案例使用
- 1.2 dubbo中的相关对象
- 2. 服务暴露机制
- 2.1 术语解释
- 2.2 流程机制
- 2.3 源码分析
- 2.3.1 导出入口
- 2.3.2 导出服务到本地
- 2.3.3 导出服务到远程(重点)
- 2.3.4 开启Netty服务
- 2.3.5 服务注册
- 2.3.6 总结
- 3. 服务发现
- 3.1 服务发现流程
- 3.2 源码分析
- 3.2.1 引用入口
- 3.2.2 创建客户端
- 3.2.3 注册
- 3.2.4 创建代理对象
- 3.2.5 总结
1. Spring中自定义Schema
在 Dubbo 中,可以使用 XML 配置相关信息来引入服务或者导出服务(也可以使用注解)。配置完成,启动工程,Spring 会读取配置文件,生成注入相关Bean
从 Spring 2.0 开始,Spring 开始提供了一种基于 XML Schema 格式扩展机制,用于自定义和配置 bean,从而被spring框架所解析
1.1 案例使用
Spring XML Schema 扩展机制实现步骤:
- 创建配置属性的JavaBean对象
- 创建一个 XML Schema 文件,描述自定义的合法构建模块,也就是xsd文件
- 自定义处理器类,并实现NamespaceHandler接口
- 自定义解析器,实现BeanDefinitionParser接口(最关键的部分)
- 编写Spring.handlers和spring.schemas文件配置所有部件
定义JavaBean对象,在spring中此对象会根据配置自动创建
@Data
public class User {private String id; private String name; private Integer age;
}
在META-INF下定义user.xsd文件,使用xsd用于描述标签的规则
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.ibyte.com/schema/user"xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:beans="http://www.springframework.org/schema/beans" targetNamespace="http://www.ibyte.com/schema/user"elementFormDefault="qualified" attributeFormDefault="unqualified"> <xsd:import namespace="http://www.springframework.org/schema/beans" /> <xsd:element name="user"><xsd:complexType> <xsd:complexContent> <xsd:extension base="beans:identifiedType"> <xsd:attribute name="name" type="xsd:string" /> <xsd:attribute name="age" type="xsd:int" /> </xsd:extension> </xsd:complexContent> </xsd:complexType> </xsd:element>
</xsd:schema>
Spring读取xml文件时,会根据标签的命名空间找到其对应的NamespaceHandler,在NamespaceHandler内会注册标签对应的解析器BeanDefinitionParser
自定义的handler可以继承NamespaceHandlerSupport(推荐), 也可以实现NamespaceHandler
import org.springframework.beans.factory.xml.NamespaceHandlerSupport;public class UserNamespaceHandler extends NamespaceHandlerSupport {public void init() {registerBeanDefinitionParser("user", new UserBeanDefinitionParser());}
}
BeanDefinitionParser是标签对应的解析器,Spring读取到对应标签时会使用该类进行解析;
public class UserBeanDefinitionParser extendsAbstractSingleBeanDefinitionParser {protected Class getBeanClass(Element element) {return User.class;}protected void doParse(Element element, BeanDefinitionBuilder bean) {String name = element.getAttribute("name");String age = element.getAttribute("age");String id = element.getAttribute("id");if (StringUtils.hasText(id)) {bean.addPropertyValue("id", id);}if (StringUtils.hasText(name)) {bean.addPropertyValue("name", name);}if (StringUtils.hasText(age)) {bean.addPropertyValue("age", Integer.valueOf(age));}}
}
定义spring.handlers文件,内部保存命名空间与NamespaceHandler类的对应关系;必须放在classpath下的META-INF文件夹中
http\://www.ibyte.com/schema/user=com.ibyte.schema.UserNamespaceHandler
定义spring.schemas文件,内部保存命名空间对应的xsd文件位置;必须放在classpath下的META-INF文件夹中
http\://www.ibyte.com/schema/user.xsd=META-INF/user.xsd
在spring工程中进行使用和测试,定义spring配置文件,导入对应约束, Spring会根据自定义标签, 生成对应的对象放到Spring容器中
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:task="http://www.springframework.org/schema/task" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:ibyte="http://www.ibyte.com/schema/user"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsdhttp://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsdhttp://www.ibyte.com/schema/user http://www.ibyte.com/schema/user.xsd"><ibyte:user id="user" name="zhangsan" age="12"></itheima:user></beans>
当网络中无法获取到xsd文件时, 会尝试尝试获取本地xsd文件META-INF/user.xsd
编写测试类,通过spring容器获取对象user
public class SchemaDemo {public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("/spring/applicationContext.xml");User user = (User)ctx.getBean("user");System.out.println(user);}
}
1.2 dubbo中的相关对象
Dubbo是运行在spring容器中,dubbo的配置文件也是通过spring的配置文件applicationContext.xml来加载,所以dubbo的自定义配置标签实现,其实同样依赖spring的xml schema机制
- 在dubbo-demo-xml模块中可以查看dubbo如何在spring的配置文件中进行配置
- spring通过Schema机制来解析这些配置并注册对应的bean
找到dubbo-config/dubbo-config-spring模块
解析的核心在DubboNamespaceHandler中
可以看出Dubbo所有的组件都是由DubboBeanDefinitionParser解析, dubbo对解析进行了统一的封装,并通过registerBeanDefinitionParser方法来注册到spring中最后解析对应的对象。
这些对象中我们重点关注的有以下三个: - ServiceBean:服务提供者暴露服务的核心对象 (提供者的启动入口)
- ReferenceBean:服务消费者发现服务的核心对象 (消费者的启动入口)
- RegistryConfig:定义注册中心的核心配置对象
2. 服务暴露机制
2.1 术语解释
在 Dubbo 的核心领域模型中:
- Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。在服务提供方,Invoker用于调用服务提供类。在服务消费方,Invoker用于执行远程调用
由于 Invoker 是 Dubbo 领域模型中非常重要的一个概念,很多设计思路都是向它靠拢, 提供短 Invoker会对原始的Service进行包装(Wrapper)
-
Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理
export:暴露远程服务
refer:引用远程服务 -
ProxyFactory:获取一个接口的代理类, 生成服务消费者的接口代理类或者生成服务提供者的调用代理
getInvoker:针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象
getProxy:针对client端,创建接口的代理对象,例如DemoService接口的代理实现
- Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等
2.2 流程机制
dubbo的服务暴露原理
在整体上看,Dubbo 框架做服务暴露分为两大部分
- 第一步将持有的服务实例通过代理转换成 Invoker
- 第二步会把 Invoker 通过具体的协议 ( 比如 Dubbo ) 转换成 Exporter, 框架做了这层抽象也大大方便了功能扩展
服务提供方暴露服务的蓝色初始化链,时序图如下:
2.3 源码分析
2.3.1 导出入口
服务导出的入口方法是 ServiceBean 的 onApplicationEvent, ServiceBean实现了ApplicationListener, 时间类型为ContextRefreshedEvent。onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件后执行服务导出操作。方法代码如下:
public void onApplicationEvent(ContextRefreshedEvent event) {// 是否有延迟导出 && 是否已导出 && 是不是已被取消导出if (isDelay() && !isExported() && !isUnexported()) {// 导出服务export();}
}
onApplicationEvent 方法在经过一些判断后,会决定是否调用 export 方法导出服务。在export 根据配置执行相应的动作。最终进入到ServiceConfig.doExportUrls导出服务方法
private void doExportUrls() {// 加载注册中心链接List<URL> registryURLs = loadRegistries(true);// 遍历 protocols,并在每个协议下导出服务for (ProtocolConfig protocolConfig : protocols) {.....//核心,其他略doExportUrlsFor1Protocol(protocolConfig, registryURLs);}
}
可以看出可以存在多注册中心以及多协议
关于多协议多注册中心导出服务首先是根据配置,以及其他一些信息组装 URL。URL 是 Dubbo 配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {String name = protocolConfig.getName();// 如果协议名为空,或空串,则将协议名变量设置为 dubboif (name == null || name.length() == 0) {name = "dubbo";}Map<String, String> map = new HashMap<String, String>();//略// 获取上下文路径String contextPath = protocolConfig.getContextpath();if ((contextPath == null || contextPath.length() == 0) && provider != null) {contextPath = provider.getContextpath();}// 获取 host 和 portString host = this.findConfigedHosts(protocolConfig, registryURLs, map);Integer port = this.findConfigedPorts(protocolConfig, name, map);// 组装 URLURL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);// 省略无关代码
}
上面的代码首先是将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map 中,最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象。前置工作做完,接下来就可以进行服务导出了。服务导出分为导出到本地 (JVM),和导出到远程。在深入分析服务导出的源码前,我们先来从宏观层面上看一下服务导出逻辑。如下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {---省略无关代码if (!SCOPE_NONE.equalsIgnoreCase(scope)) {// export to local if the config is not remote (export to remote only when config is remote) cope != remote,导出到本地if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {/*** 导出服务到本地,injvm*/exportLocal(url);}// export to remote if the config is not local (export to local only when config is local) scope != local,导出到远程if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {if (!isOnlyInJvm() && logger.isInfoEnabled()) {logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);}if (CollectionUtils.isNotEmpty(registryURLs)) { //有注册中心的配置信息for (URL registryURL : registryURLs) {//if protocol is only injvm ,not registerif (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {continue;}url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));// 加载监视器链接URL monitorUrl = loadMonitor(registryURL);if (monitorUrl != null) {// 将监视器链接作为参数添加到 url 中url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());}if (logger.isInfoEnabled()) {logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);}// For providers, this is used to enable custom proxy to generate invokerString proxy = url.getParameter(PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {registryURL = registryURL.addParameter(PROXY_KEY, proxy);}// 为服务提供类(ref)生成 Invoker/*** Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,* 它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现** 在服务提供方,Invoker用于调用服务提供类。在服务消费方,Invoker用于执行远程调用** Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory*/Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfigDelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);/*** 导出服务,并生成 Exporter 与导出服务到本地相比,导出服务到远程的过程要复杂不少,其包含了服务导出与服务注册两个过程** debug此处查看应该走哪个protocol的export* 走的是 RegistryProtocol*/Exporter<?> exporter = protocol.export(wrapperInvoker);/*** 将服务暴露返回的Exporter实例存储到ServiceConfig的 List<Exporter<?>> exporters 中*/exporters.add(exporter);}} else { //无注册中心的配置信息 仅导出服务 不注册Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter<?> exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}/*** @since 2.7.0* ServiceData Store*/MetadataReportService metadataReportService = null;if ((metadataReportService = getMetadataReportService()) != null) {metadataReportService.publishProvider(url);}}}
}
上面代码根据 url 中的 scope 参数决定服务导出方式,分别如下
- scope = none,不导出服务
- scope != remote,导出到本地
- scope != local,导出到远程
不管是导出到本地,还是远程。进行服务导出之前,均需要先创建 Invoker,这是一个很重要的步骤。因此下面先来分析 Invoker 的创建过程。Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {// 为目标类创建 Wrapperfinal Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};
}
如上,JavassistProxyFactory 创建了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 doInvoke
2.3.2 导出服务到本地
Invoke创建成功之后,接下来我们来看本地导出
/*** always export injvm* 当一个应用既是一个服务的提供者,同时也是这个服务的消费者的时候,可以直接对本机提供的服务发起本地调用。* 从 2.2.0 版本开始,Dubbo 默认在本地以 injvm 的方式暴露服务,这样的话,在同一个进程里对这个服务的调用会优先走本地调用。* 与本地对象上方法调用不同的是,Dubbo 本地调用会经过 Filter 链,其中包括了 Consumer 端的 Filter 链以及 Provider 端的 Filter 链。* 通过这样的机制,本地消费者和其他消费者都是统一对待,统一监控,服务统一进行治理*/
private void exportLocal(URL url) {URL local = URLBuilder.from(url).setProtocol(LOCAL_PROTOCOL).setHost(LOCALHOST_VALUE).setPort(0).build();// 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法Exporter<?> exporter = protocol.export( PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
exportLocal 方法比较简单,首先根据 URL 协议头决定是否导出服务。若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。下面我们来看一下 InjvmProtocol 的 export 方法都做了哪些事情。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {// 创建 InjvmExporterreturn new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
如上,InjvmProtocol 的 export 方法仅创建了一个 InjvmExporter,无其他逻辑。到此导出服务到本地就分析完了
2.3.3 导出服务到远程(重点)
导出服务到远程包含了服务导出与服务注册两个过程。先来分析服务导出逻辑。在 RegistryProtocol 的 export 方法上
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {// 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:// zookeeper://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.200.10%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bean.name%3Dorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.200.10%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D15268%26qos.port%3D22222%26register%3Dtrue%26release%3D%26side%3Dprovider%26timestamp%3D1622539128917&pid=15268&qos.port=22222×tamp=1622539128911URL registryUrl = getRegistryUrl(originInvoker);// url to export locally 得到的url示例如下//dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.200.10&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=15268&qos.port=22222®ister=true&release=&side=provider×tamp=1622539128917URL providerUrl = getProviderUrl(originInvoker);// Subscribe the override data// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call// the same service. Because the subscribed is cached key with the name of the service, it causes the// subscription information to cover.// 获取订阅 URL,比如://provider://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.200.10&bind.port=20880&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=15268&qos.port=22222®ister=true&release=&side=provider×tamp=1622539128917final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);// 创建监听器final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);//export invoker 执行服务导出 核心方法---****重点去跟****final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);// url to registry 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistryfinal Registry registry = getRegistry(originInvoker);// 获取已注册的服务提供者 URLfinal URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,registryUrl, registeredProviderUrl);//to judge if we need to delay publishboolean register = registeredProviderUrl.getParameter("register", true);// 根据 register 的值决定是否注册服务if (register) {// 向注册中心注册服务register(registryUrl, registeredProviderUrl);providerInvokerWrapper.setReg(true);}// Deprecated! Subscribe to override rules in 2.6.x or before.// 向注册中心进行订阅 override 数据registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);exporter.setRegisterUrl(registeredProviderUrl);exporter.setSubscribeUrl(overrideSubscribeUrl);//Ensure that a new exporter instance is returned every time export// 创建并返回 DestroyableExporterreturn new DestroyableExporter<>(exporter);
}
上面代码看起来比较复杂,主要做如下一些操作:
- 调用 doLocalExport 导出服务
- 向注册中心注册服务
- 向注册中心进行订阅 override 数据
- 创建并返回 DestroyableExporter
下面先来分析 doLocalExport 方法的逻辑,如下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {// dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.200.10&bind.port=20880&deprecated=false&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=15804&qos.port=22222®ister=true&release=&side=provider×tamp=1622539267498String key = getCacheKey(originInvoker);//访问缓存return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {//创建 Invoker 委托类对象 DelegateInvoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);//----****重点跟 protocol.export(invokerDelegate) 方法,此处protocol根据SPI机制,根据URL中的参数找 DubboProtocol 实现 即根据协议导出return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);});
}
接下来,我们把重点放在 Protocol 的 export 方法上。假设运行时协议为 dubbo,此处的 protocol 变量会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {//dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.200.10&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=9496&qos.port=22222®ister=true&release=&side=provider×tamp=1622539401268URL url = invoker.getUrl();// export service. key=org.apache.dubbo.demo.DemoService:20880// 1. 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880String key = serviceKey(url);// 创建 DubboExporterDubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);exporterMap.put(key, exporter);// 将 <key, exporter> 键值对放入缓存中//export an stub service for dispatching eventBoolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +"], has set stubproxy support event ,but no stub methods founded."));}} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}openServer(url); // 开启服务端(默认Netty服务)optimizeSerialization(url);// 序列化优化return exporter;
}
2.3.4 开启Netty服务
分析DubboProtocol中openServer 方法
private void openServer(URL url) {// find server. key=192.168.200.10:20880String key = url.getAddress();//client can export a service which's only for server to invokeboolean isServer = url.getParameter(IS_SERVER_KEY, true);if (isServer) {ExchangeServer server = serverMap.get(key); // serverMap根据ip:port缓存Server对象 因为服务端可能在本机不同端口暴露if (server == null) {synchronized (this) {server = serverMap.get(key);if (server == null) {serverMap.put(key, createServer(url)); // createServer是核心----****重点去关注******}}} else {// server supports reset, use together with overrideserver.reset(url);}}
}
以上方法会根据ip:port对NettyServer进行缓存, 避免重复创建, 使用了双重锁
接下来分析服务器实例的创建过程。如下:
private ExchangeServer createServer(URL url) {// 添加一系列参数到URL中url = URLBuilder.from(url)// send readonly event when server closes, it's enabled by default.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())// enable heartbeat by default.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) // 添加心跳检测配置到 url 中.addParameter(CODEC_KEY, DubboCodec.NAME) // 添加编码解码器参数.build();//从url获取服务 Transporter 实现,默认netty dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.200.10&bind.port=20880&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8976&qos.port=22222®ister=true&release=&side=provider×tamp=1622539726739String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); // str=netty//判断是否有该 Transporter 扩展点if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { // 判断Transporter是否有netty的扩展点throw new RpcException("Unsupported server type: " + str + ", url: " + url);}ExchangeServer server;try {/*** 绑定并启动服务*/server = Exchangers.bind(url, requestHandler);// requestHandler} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}str = url.getParameter(CLIENT_KEY);if (str != null && str.length() > 0) {Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type: " + str);}}return server;
}
如上,createServer 包含三个核心的逻辑。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。两次检测操作所对应的代码比较直白了,无需多说。但创建服务器的操作目前还不是很清晰,我们继续往下看
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}//向url中添加codec参数 dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.200.10&bind.port=20880&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=6052&qos.port=22222®ister=true&release=&side=provider×tamp=1622539986773url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");//getExchanger(url)根据SPI返回的是 HeaderExchangerreturn getExchanger(url).bind(url, handler);
}
上面代码比较简单,就不多说了。下面看一下 HeaderExchanger 的 bind 方法
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {// 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:// 1. new HeaderExchangeHandler(handler)// 2. new DecodeHandler(new HeaderExchangeHandler(handler))// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
HeaderExchanger 的 bind 方法包含的逻辑比较多,但目前我们仅需关心 Transporters 的 bind 方法逻辑即可。该方法的代码如下
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");}ChannelHandler handler;if (handlers.length == 1) {handler = handlers[0];} else {// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器handler = new ChannelHandlerDispatcher(handlers);}/*** getTransporter()返回 Transporter的自适应实例, 并调用实例方法* dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.200.10&bind.port=20880&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=12416&qos.port=22222®ister=true&release=&side=provider×tamp=1622540622168* 最终找到的扩展点是: NettyTransporter*/return getTransporter().bind(url, handler);
}
如上,getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 TransporterAdaptive,也就是自适应拓展类。TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter
调用NettyTransporter.bind(URL, ChannelHandler)方法。创建一个NettyServer实例。调用NettyServer.doOPen()方法,服务器被开启,服务也被暴露出来了
protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory("NettyServerWorker", true));final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// FIXME: should we use getTimeout()?int idleTimeout = UrlUtils.getIdleTimeout(getUrl());// 初始化编解码器 运行时是 DubboCodec---> ExchangeCodec----> TelnetCodecNettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()) //服务端请求解码器.addLast("encoder", adapter.getEncoder()) // 服务端响应编码器.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))//netty服务端业务处理器.addLast("handler", nettyServerHandler);}});// bindChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();}
2.3.5 服务注册
RegistryProtocol 的 export 方法上, 也就是上面。如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {// ${导出服务}// 省略其他代码boolean register = registeredProviderUrl.getParameter("register", true);if (register) {// 注册服务register(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);// 订阅 override 数据registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);// 省略部分代码
}
服务注册逻辑, 以Zookeeper为例
public void register(URL registryUrl, URL registeredProviderUrl) {// zookeeper://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.200.10%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bean.name%3Dorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.200.10%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D12416%26qos.port%3D22222%26register%3Dtrue%26release%3D%26side%3Dprovider%26timestamp%3D1622540622168&pid=12416&qos.port=22222×tamp=1622540622168/*** @Adaptive({"protocol"})* Registry getRegistry(URL url);* 根据SPI机制找到 ZookeeperRegistry extends FailbackRegistry extends AbstractRegistry* 看这里:register方法在 FailbackRegistry 中,真正执行注册是在ZookeeperRegistry中的doRegister方法里*/Registry registry = registryFactory.getRegistry(registryUrl);registry.register(registeredProviderUrl);
}
register 方法包含两步操作,第一步是获取注册中心实例,第二步是向注册中心注册服务。接下来分两节内容对这两步操作进行分析。
下面先来看一下 getRegistry 方法的源码,这个方法由 AbstractRegistryFactory 实现。如下:
public Registry getRegistry(URL url) {url = url.setPath(RegistryService.class.getName()).addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);String key = url.toServiceString();LOCK.lock();try {// 访问缓存Registry registry = REGISTRIES.get(key);if (registry != null) {return registry;}// 缓存未命中,创建 Registry 实例registry = createRegistry(url);if (registry == null) {throw new IllegalStateException("Can not create registry...");}// 写入缓存REGISTRIES.put(key, registry);return registry;} finally {LOCK.unlock();}
}protected abstract Registry createRegistry(URL url);
如上,getRegistry 方法先访问缓存,缓存未命中则调用 createRegistry 创建 Registry。在此方法中就是通过new ZookeeperRegistry(url, zookeeperTransporter)实例化一个注册中心
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { // ZookeeperTransportersuper(url);if (url.isAnyHost()) {throw new IllegalStateException("registry address == null");}// 获取组名,默认为 dubboString group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);if (!group.startsWith(PATH_SEPARATOR)) {group = PATH_SEPARATOR + group;}this.root = group;// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporterzkClient = zookeeperTransporter.connect(url);zkClient.addStateListener(state -> {if (state == StateListener.RECONNECTED) {try {recover();} catch (Exception e) {logger.error(e.getMessage(), e);}}});}
在上面的代码代码中,我们重点关注 ZookeeperTransporter 的 connect 方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。接下来,再来分析一下 Zookeeper 客户端的创建过程。
public ZookeeperClient connect(URL url) {// 创建 CuratorZookeeperClientreturn new CuratorZookeeperClient(url);
}
继续向下看
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {private final CuratorFramework client;public CuratorZookeeperClient(URL url) {super(url);try {// 创建 CuratorFramework 构造器CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000)).connectionTimeoutMs(5000);String authority = url.getAuthority();if (authority != null && authority.length() > 0) {builder = builder.authorization("digest", authority.getBytes());}// 构建 CuratorFramework 实例client = builder.build();//省略无关代码// 启动客户端client.start();} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}
}
CuratorZookeeperClient 构造方法主要用于创建和启动 CuratorFramework 实例。至此Zookeeper客户端就已经启动了
接下来我们就可以去阅读服务注册的代码了
protected void doRegister(URL url) {try {// 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:// /${group}/${serviceInterface}/providers/${url}// 比如//url dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=10948®ister=true&release=&side=provider×tamp=1622545698657//path /dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.200.10%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bean.name%3Dorg.apache.dubbo.demo.DemoService%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D17212%26register%3Dtrue%26release%3D%26side%3Dprovider%26timestamp%3D1622546575306zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException("Failed to register...");}
}
如上,ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,该方法逻辑不难理解,就不分析了。接下来分析 create 方法,如下:
public void create(String path, boolean ephemeral) {if (!ephemeral) {// 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在if (checkExists(path)) {return;}}int i = path.lastIndexOf('/');if (i > 0) {// 递归创建上一级路径create(path.substring(0, i), false);}// 根据 ephemeral 的值创建临时或持久节点if (ephemeral) {createEphemeral(path);} else {createPersistent(path);}
}
到此关于服务注册的过程就分析完了。整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务
2.3.6 总结
- 在有注册中心,需要注册提供者地址的情况下,ServiceConfig 解析出的 URL 格式为:registry:// registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode(“dubbo://service-host/{服务名}/{版本号}”)
- 基于 Dubbo SPI 的自适应机制,通过 URL registry:// 协议头识别,就调用 RegistryProtocol#export() 方法
1> 将具体的服务类名,比如 DubboServiceRegistryImpl,通过 ProxyFactory 包装成 Invoker 实例
2> 调用 doLocalExport 方法,使用 DubboProtocol 将 Invoker 转化为 Exporter 实例,并打开 Netty 服务端监听客户请求
3> 创建 Registry 实例,连接 Zookeeper,并在服务节点下写入提供者的 URL 地址,注册服务
4> 向注册中心订阅 override 数据,并返回一个 Exporter 实例 - 根据 URL 格式中的 "dubbo://service-host/{服务名}/{版本号}"中协议头 dubbo:// 识别,调用 DubboProtocol#export() 方法,开发服务端口
- RegistryProtocol#export() 返回的 Exporter 实例存放到 ServiceConfig 的 List exporters 中
3. 服务发现
3.1 服务发现流程
在整体上看 , Dubbo 框架做服务消费也分为两大部分 , 第一步通过持有远程服务实例生成Invoker, 这个 Invoker 在客户端是核心的远程代理对象, 第二步会把 Invoker 通过动态代理转换成实现用户接口的动态代理引用
服务消费方引用服务的蓝色初始化链,时序图如下:
3.2 源码分析
3.2.1 引用入口
服务引用的入口方法为 ReferenceBean 的 getObject 方法,该方法定义在 Spring 的 FactoryBean 接口中,ReferenceBean 实现了这个方法, 返回一个自定义的接口代理类
public Object getObject() throws Exception {return get();
}public synchronized T get() {checkAndUpdateSubConfigs();if (destroyed) {throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");}// 检测 ref 是否为空,为空则通过 init 方法创建if (ref == null) {// init 方法主要用于处理配置,以及调用 createProxy 生成代理类init();}return ref;
}
Dubbo 提供了丰富的配置,用于调整和优化框架行为,性能等。Dubbo 在引用或导出服务时,首先会对这些配置进行检查和处理,以保证配置的正确性
private void init() {// 创建代理类ref = createProxy(map);
}
此方法代码很长,主要完成的配置加载,检查,以及创建引用的代理对象。这里要从 createProxy 开始看起。从字面意思上来看,createProxy 似乎只是用于创建代理对象的。但实际上并非如此,该方法还会调用其他方法构建以及合并 Invoker 实例。具体细节如下
private T createProxy(Map<String, String> map) {URL tmpUrl = new URL("temp", "localhost", 0, map);...........isDvmRefer = InjvmProtocol . getlnjvmProtocol( ) . islnjvmRefer(tmpUrl) // 本地引用略if (isJvmRefer) {} else {// 点对点调用略if (url != null && url.length() > 0) {} else {// 加载注册中心 urlList<URL> us = loadRegistries(false);if (us != null && !us.isEmpty()) {for (URL u : us) {URL monitorUrl = loadMonitor(u);if (monitorUrl != null) {map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));}// 添加 refer 参数到 url 中,并将 url 添加到 urls 中urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));}}}// 单个注册中心或服务提供者(服务直连,下同)if (urls.size() == 1) {// 调用 RegistryProtocol 的 refer 构建 Invoker 实例invoker = refprotocol.refer(interfaceClass, urls.get(0));// 多个注册中心或多个服务提供者,或者两者混合} else {List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();URL registryURL = null;// 获取所有的 Invokerfor (URL url : urls) {// 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时// 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法invokers.add(refprotocol.refer(interfaceClass, url));if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {registryURL = url;}}if (registryURL != null) {// 如果注册中心链接不为空,则将使用 AvailableClusterURL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);// 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并invoker = cluster.join(new StaticDirectory(u, invokers));} else {invoker = cluster.join(new StaticDirectory(invokers));}}}//省略无关代码// 真正根据invoker 生成代理类return (T) proxyFactory.getProxy(invoker);
}
上面代码很多,不过逻辑比较清晰
- 如果是本地调用,直接jvm 协议从内存中获取实例
- 如果只有一个注册中心,直接通过 Protocol 自适应拓展类构建 Invoker 实例接口
- 如果有多个注册中心,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker
- 最后调用 ProxyFactory 生成代理类
3.2.2 创建客户端
在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,这里分析DubboProtocol
在AbstractProtocol.refer调用protocolBindingRefer
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {/*** AsyncToSyncInvoker 异步转同步的Invoker* 重点关注:protocolBindingRefer 在 DubboProtocol 中实现*/return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
查看DubboProtocol中的protocolBindingRefer方法, 将URL转Invoker
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);/*** create rpc invoker. 创建 DubboInvoker* 重点看 getClients(url) 根据 provider url 获取客户端,** url=dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&bean.name=org.apache.dubbo.demo.DemoService&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=20192&qos.port=33333®ister=true®ister.ip=192.168.200.10&release=&remote.application=demo-provider&side=consumer&sticky=false×tamp=1622964228064*/DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;
}
上面方法看起来比较简单,创建一个DubboInvoker。通过构造方法传入远程调用的client对象。默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑
private ExchangeClient[] getClients(URL url) {// whether to share connection 是否共享连接boolean useShareConnect = false;int connections = url.getParameter(CONNECTIONS_KEY, 0);// 获取连接数,默认为0,表示未配置List<ReferenceCountExchangeClient> shareClients = null;// if not configured, connection is shared, otherwise, one connection for one serviceif (connections == 0) {useShareConnect = true;/*** The xml configuration should have a higher priority than properties. xml配置比properties优先级更高* 获取<dubbo:consumer/>中的shareconnections属性值,表示共享连接的数量*/String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);shareClients = getSharedClient(url, connections); // 获取指定数量的共享连接(第一次获取还没有会创建)}// 此时的connections代表的是连接数,已经不区分是共享还是新创建的连接了ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {// 若是共享的,直接从shareClients 里取if (useShareConnect) {// 获取共享客户端clients[i] = shareClients.get(i);} else {// 若不是共享的,则新建连接 初始化新的客户端 看这块的实现即可 重点关注的地方clients[i] = initClient(url);}}return clients;
}
这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这个方法
private ExchangeClient initClient(URL url) {// 获取客户端类型,默认为 nettyString str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));//省略无关代码ExchangeClient client;try {// 获取 lazy 配置,并根据配置值决定创建的客户端类型if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {// 创建懒加载 ExchangeClient 实例client = new LazyConnectExchangeClient(url, requestHandler);} else {// 创建普通 ExchangeClient 实例client = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service...");}return client;
}
initClient 方法首先获取用户配置的客户端类型,默认为 netty。下面我们分析一下 Exchangers 的 connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {// 获取 Exchanger 实例,默认为 HeaderExchangeClientreturn getExchanger(url).connect(url, handler);
}
如上,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例。接下来分析 HeaderExchangeClient 的实现
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {// 这里包含了多个调用,分别如下:// 1. 创建 HeaderExchangeHandler 对象// 2. 创建 DecodeHandler 对象// 3. 通过 Transporters 构建 Client 实例// 4. 创建 HeaderExchangeClient 对象return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。如下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}ChannelHandler handler;if (handlers == null || handlers.length == 0) {handler = new ChannelHandlerAdapter();} else if (handlers.length == 1) {handler = handlers[0];} else {// 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器handler = new ChannelHandlerDispatcher(handlers);}// 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例return getTransporter().connect(url, handler);
}
如上,getTransporter 方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。如下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException {// 创建 NettyClient 对象return new NettyClient(url, listener);
}
3.2.3 注册
这里就已经创建好了NettyClient对象。关于 DubboProtocol 的 refer 方法就分析完了。接下来,继续分析 RegistryProtocol 的 refer 方法逻辑
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 取 registry 参数值,并将其设置为协议头url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);// 获取注册中心实例Registry registry = registryFactory.getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// 将 url 查询字符串转为 MapMap<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));// 获取 group 配置String group = qs.get(Constants.GROUP_KEY);if (group != null && group.length() > 0) {if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1|| "*".equals(group)) {// 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑return doRefer(getMergeableCluster(), registry, type, url);}}// 调用 doRefer 继续执行服务引用逻辑return doRefer(cluster, registry, type, url);
}
上面代码首先为 url 设置协议头,然后根据 url 参数加载注册中心实例。然后获取 group 配置,根据 group 配置决定 doRefer 第一个参数的类型。这里的重点是 doRefer 方法,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {// 创建 RegistryDirectory 实例RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);// 设置注册中心和协议directory.setRegistry(registry);directory.setProtocol(protocol);Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());// 生成服务消费者链接URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);// 注册服务消费者,在 consumers 目录下新节点if (!Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}// 订阅 providers、configurators、routers 等节点数据directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个Invoker invoker = cluster.join(directory);ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker;
}
如上,doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。
3.2.4 创建代理对象
Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为 ProxyFactory 的 getProxy,接下来进行分析
private T createProxy(Map<String, String> map) {// 本地引用if (shouldJvmRefer(map)) {// 生成本地引用 URL,协议为 injvmURL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);// 调用 refer 方法构建 InjvmInvoker 实例invoker = REF_PROTOCOL.refer(interfaceClass, url);if (logger.isInfoEnabled()) {logger.info("Using injvm service " + interfaceClass.getName());}} else { // 远程引用// 忽略urls处理代码...// 单个注册中心或服务提供者(服务直连,下同)if (urls.size() == 1) {// 调用 RegistryProtocol 的 refer 构建 Invoker 实例invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); // registry://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=14276&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.demo.DemoService%26lazy%3Dfalse%26methods%3DsayHello%26pid%3D14276%26qos.port%3D33333%26register.ip%3D192.168.200.10%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1622603489641®istry=zookeeper×tamp=1622603494381} else {// 省略多注册中心代码...}}// 省略无关的代码...// create service proxy 根据Invoker 真正生成代理 PROXY_FACTORY=ProxyFactory$Adaptive@xxxxreturn (T) PROXY_FACTORY.getProxy(invoker);}
以上重点看REF_PROTOCOL.refer和getProxy, 分别为注册中心注册和获取代理对象, 本部分关注AbstractProxyFactory中getProxy
public <T> T getProxy(Invoker<T> invoker) throws RpcException {// 调用重载方法return getProxy(invoker, false);
}public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {Class<?>[] interfaces = null;// 获取接口列表String config = invoker.getUrl().getParameter("interfaces");if (config != null && config.length() > 0) {// 切分接口列表String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);if (types != null && types.length > 0) {interfaces = new Class<?>[types.length + 2];// 设置服务接口类和 EchoService.class 到 interfaces 中interfaces[0] = invoker.getInterface();interfaces[1] = EchoService.class;for (int i = 0; i < types.length; i++) {// 加载接口类interfaces[i + 1] = ReflectUtils.forName(types[i]);}}}if (interfaces == null) {interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};}// 为 http 和 hessian 协议提供泛化调用支持,参考 pull request #1827if (!invoker.getInterface().equals(GenericService.class) && generic) {int len = interfaces.length;Class<?>[] temp = interfaces;// 创建新的 interfaces 数组interfaces = new Class<?>[len + 1];System.arraycopy(temp, 0, interfaces, 0, len);// 设置 GenericService.class 到数组中interfaces[len] = GenericService.class;}// 调用重载方法return getProxy(invoker, interfaces);
}public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
如上,上面大段代码都是用来获取 interfaces 数组的,我们继续往下看。getProxy(Invoker, Class<?>[]) 这个方法是一个抽象方法,下面我们到 JavassistProxyFactory 类中看一下该方法的实现代码。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {// 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例/*** 通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,* 并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,* 具体的用途是拦截接口类调用*/return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
上面代码并不多,首先是通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用,到这里代理类生成逻辑就分析完了。整个过程比较复杂
3.2.5 总结
- 从注册中心发现引用服务:在有注册中心,通过注册中心发现提供者地址的情况下,ReferenceConfig 解析出的 URL 格式为:registry://registry-host:/org.apache.registry.RegistryService?refer=URL.encode(“conumer-host/com.foo.FooService?version=1.0.0”)
- 通过 URL 的registry://协议头识别,就会调用RegistryProtocol#refer()方法
1> 查询提供者 URL,如 dubbo://service-host/com.foo.FooService?version=1.0.0 ,来获取注册中心(建立连接)
2> 创建一个 RegistryDirectory 实例并设置注册中心和协议
3> 注册完毕后,订阅 providers,configurators,routers 等节点的数据
4> 通过 URL 的 dubbo:// 协议头识别,调用 DubboProtocol#refer() 方法,创建一个 ExchangeClient 客户端并返回 DubboInvoker 实例 - 由于一个服务可能会部署在多台服务器上,这样就会在 providers 产生多个节点,这样也就会得到多个 DubboInvoker 实例,就需要 RegistryProtocol 调用 Cluster 将多个服务提供者节点包装成一个节点,并返回一个 Invoker
- Invoker 创建完毕后,调用 ProxyFactory 为服务接口生成代理对象,返回提供者引用
相关文章:

Dubbo源码解析一服务暴露与发现
Dubbo 服务暴露与发现 1. Spring中自定义Schema1.1 案例使用1.2 dubbo中的相关对象 2. 服务暴露机制2.1 术语解释2.2 流程机制2.3 源码分析2.3.1 导出入口2.3.2 导出服务到本地2.3.3 导出服务到远程(重点)2.3.4 开启Netty服务2.3.5 服务注册2.3.6 总结 3. 服务发现3.1 服务发现…...

有哪些工具软件一旦用了就离不开?
💖前言 目前,随着科技的快速发展,电脑已经进入了许许多多人的生活 ,在平日的学习、工作和生活里,我们会用的各种各样的强大软件。市面上除了某些大公司开发在强大软件,还有各路大神开发具有某些功能的强大…...

ObjectARX如何判断点和多段线的关系
目录 1 基本思路2 相关知识点2.1 ECS坐标系概述2.2 其他点坐标转换接口2.3 如何获取多段线的顶点ECS坐标 3 实现例程3.1 接口实现3.2 测试代码 4 实现效果 在CAD的二次开发中,点和多段线的关系是一个非常重要且常见的问题,本文实现例程以张帆所著《Objec…...

四、DRF序列化器create方法与update方法
上一章: 二、Django REST Framework (DRF)序列化&反序列化&数据校验_做测试的喵酱的博客-CSDN博客 下一章: 五、DRF 模型序列化器ModelSerializer_做测试的喵酱的博客-CSDN博客 一、背景 1、创建请求,post,用户输入…...

洛谷P8792 最大公约数
[蓝桥杯 2022 国 A] 最大公约数 题目描述 给定一个数组,每次操作可以选择数组中任意两个相邻的元素 x , y x, y x,y 并将其中的一个元素替换为 gcd ( x , y ) \gcd(x, y) gcd(x,y),其中 gcd ( x , y ) \gcd(x, y) gcd(x,y) 表示 x x x 和 y…...

【SpringBoot集成Nacos+Dubbo】企业级项目集成微服务组件,实现RPC远程调用
文章目录 一、需求环境/版本 二、须知2.1、什么是RPC?2.2、什么是Dubbo?2.3、什么是Nacos? 三、普通的SpringBoot项目集成微服务组件方案(笔者给出两种)方案一(推荐)1、导入maven依赖࿰…...

MySQL主从同步(开GTID)
目录 一、搭建简单的主从同步 二、mysql删除主从(若没有配置过可以不用进行这一步) 1、停止slave服务器的主从同步 2、重置master服务 三、开启GTID 1、Master配置 2、Slave配置 一、搭建简单的主从同步 GTID原理:http://t.csdn.cn/g…...

打造精细化调研,这些产品榜上有名,你用了吗?
调查问卷是一种流行的数据收集工具,研究人员、营销人员和企业使用它来征求目标受众的反馈意见。调查问卷工具使创建、分发和分析调查问卷的过程变得更加简单和高效。想要做好一份调查问卷,选择一款好用的工具是少不了的。不过,在众多的问卷工…...
[golang gin框架] 37.ElasticSearch 全文搜索引擎的使用
一.全文搜索引擎 ElasticSearch 的介绍,以及安装配置前的准备工作 介绍 ElasticSearch 是一个基于 Lucene 的 搜索服务器,它提供了一个 分布式多用户能力的 全文搜索引擎,基于 RESTful web 接口,Elasticsearch 是用 Java 开发的,并作为 Apach…...

赋的几个发展阶段
赋,起源于战国,形成于汉代,是由楚辞衍化出来的,也继承了《诗经》讽刺的传统。关于诗和赋的区别,晋代文学家陆机在《文赋》里曾说: 诗缘情而绮靡,赋体物而浏亮。 也就是说,诗是用来抒发主观感情…...

Model-Free TD Control: Sarsa
import time import random # 相对于Q 效果会差一些 class Env():def __init__(self, length, height):# define the height and length of the mapself.length lengthself.height height# define the agents start positionself.x 0self.y 0def render(self, frames50):fo…...

CloudBase CMS的开发注意事项
引言 在进行基于云开发的微信小程序开发时为了减轻工作量打算用CloudBase CMS来减轻工作量,随后去了解并体验了CloudBase CMS的使用,总体来说还有些许问题没有解决,对减轻后台管理工作并没有起到很大的作用。 项目情景 使用CloudBase CMS来管…...

大佬联合署名!反对 ACL 设置匿名期!
夕小瑶科技说 原创 作者 | 智商掉了一地、Python 近日,自然语言处理领域的多位知名学者联合发起了一项反对 ACL 设置匿名期的联合署名行动,包括著名学者 William Wang 和 Yoav Goldberg 在内,还有Christopher Potts、Hal Daume、Luke Zettl…...

【JavaSE】Java基础语法(十四):Static
文章目录 概述特点与应用注意事项为什么一个静态方法中只能访问用static修饰的成员? 概述 Java中的static是一个修饰符(也可称关键字),可以用于修饰变量、方法和代码块。 特点与应用 static修饰的成员具有以下特点: 被类的所有对…...

1.Linux初识
在 Linux 系统中,sudo 是一个重要的命令,可以允许普通用户以管理员权限来运行特定的命令。通过 sudo 命令,普通用户可以暂时获取管理员权限,执行需要管理员身份才能执行的操作。 下面是一些关于 sudo 命令的用法: 以管…...

进程(二)
这一节我们写个MFC剪切板程序 1.下载相应的组件 工具->工具视图,因为之前已经下载过一部分了,这里如果创建MFC报错的话,就要把没下载的补上 此项目需要MFC库 解决方法 2.创建MFC程序 3.打开资源视图,直接在菜单栏顶部搜索…...

《消息队列高手课》课程笔记(二)
消息模型:主题和队列有什么区别? 两类消息模型 早期的消息队列,就是按照“队列”的数据结构来设计的。 生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队也就是…...

以“智”提质丨信创呼叫
随着人工智能、大数据、云计算等新兴技术飞速发展,呼叫中心、全媒体智能客服等现已被广泛应用于多个行业领域。其中,呼叫中心作为政企对外服务的重要窗口,已从“传统电话营销”发展到“智能呼叫中心”阶段,以客户服务为核心&#…...

Pool与PG的说明以及Ceph的IO流程
Pool与PG的说明以及Ceph的IO流程 Pool与PG Ceph中的数据是以对象的形式存储在存储池(pool)中的。每个存储池都被划分为若干个存储组(PG),每个存储组同时也是一个数据分片(shard)。存储组是Ceph用来实现数据的分布式存储和高可用的重要组成部分。每个存储组包含若干…...

20230529_Hadoop_集群操作命令
HDFS_集群操作命令: 一、集群启停命令 # 启动Hadoop的HDFS进程start-dfs.sh# 关闭Hadoop的HDFS进程stop-dfs.sh# 单独关闭某一个进程hadoop-daemon.sh start[/stop] namenode[/datanode/secondarynamenode]二、HDFS文件系统的基本信息 数据的路径表达方式ÿ…...

边缘计算AI硬件智能分析网关V1版的接入流程与使用步骤
我们的AI边缘计算网关硬件——智能分析网关目前有两个版本:V1版与V2版,两个版本都能实现对监控视频的智能识别和分析,支持抓拍、记录、告警等,在AI算法的种类上和视频接入上,两个版本存在些许的区别。V1的基础算法有人…...

【redis】Stream、String 超详细介绍
文章目录 一、Stream1.1 写入数据XADD条目 ID 的格式 1.2 获取数据XRANGE 和 XREVRANGEXREAD 监听新条目非阻塞形式阻塞形式 1.3 消费者组XGROUP 创建消费者组XREADGROUP 通过消费者组消费XACK 确认消息消费者组示例 1.4 XPENDING 和 XCLAIM 认领 其他消费者 的待处理消息XPEND…...

算法基础学习笔记——⑫最小生成树\二分图\质数\约数
✨博主:命运之光 ✨专栏:算法基础学习 目录 ✨最小生成树 🍓朴素Prim 🍓Kruskal算法 ✨二分图 🍓匈牙利算法 ✨质数 🍓(1)质数的判定——试除法 🍓(2&…...

了解信号的传输方式、编码与调制、信道的极限容量
1.了解信号的传输方式、编码与调制、信道的极限容量 笔记来源: 湖科大教书匠:传输方式 声明:该学习笔记来自湖科大教书匠,笔记仅做学习参考 1.1 了解信号的传输方式 串行传输与并行传输 同步传输与异步传输 为什么需要收发双发…...

SpringBoot自动配置原理总结
1、我们需要从主启动类的SpringBootApplication注解开始分析: SpringBootApplication是一个复合注解,进入以后看到主要包括以下三个注解: SpringBootConfiguration EnableAutoConfiguration ComponentScan(excludeFilters { Filter(type …...

【LeetCode: 410. 分割数组的最大值 | 暴力递归=>记忆化搜索=>动态规划 】
🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…...

内核对象和两种同步
概念 Windows 中每个内核对象都只是一个内存块,它由操作系统内核分配,并只能由操作系统内核进 行访问 它的所有者:内核对象的所有者是操作系统内核,而非进程,也就是说当进程退出,内核对象不一定会销毁 法…...

水表远程监控系统有什么功能吗?
水表远程监控系统是通过远程传输水表数据,实现对水表的远程监控和管理的一种智能化系统。它主要具备以下功能: 1.远程抄表功能:通过远程传输技术,实现对水表的远程抄表和监控,无需人工上门抄表,节省人力成本…...

zabbix自定义监控
一、案例操作:自定义监控内容 案列:自定义监控客户端服务器登录的人数 需求:限制登录人数不超过 3 个,超过 3 个就发出报警信息 1、自定义监控内容的操作步骤 1.1 在客户端创建自定义 key 明确需要执行的 linux 命令 who | …...

【AUTOSAR】Com通讯栈配置说明(四)---- Nm模块
Nm模块 NmGlobalConfig NmGlobalConstants NmRxIndicationCallback: callback 函数 NmCycletimeMainFunction:Nm 主函数调用周期 NmDevErrorDetect: 是否支持DET NmVersionInfoApi: 是否支持获取版本信息api PduR模块 PduRBswModules PduRBswModuleRef:关联的BS…...