性能文章>你为什么吃不透Dubbo源码?这样看既不浪费时间又高效>

你为什么吃不透Dubbo源码?这样看既不浪费时间又高效原创

1年前
665948

本文基于Dubbo 2.7.5源码。

为什么这么说呢?

如果你是一个面试官,你招聘的岗位也不需要改造源码,应聘者也没有说自己熟悉dubbo源码,那面试的过程中也不用去深挖这些源码了,而应该更多的考察应聘者的软件设计思维,编码能力,工作经验是否匹配等。否则你招进来的可能是一个背诵八股文能力强,但是动手能力差的人,浪费了自己的招聘时间。

对于一个求职者,你的求职时间有限,应该是以尽快找到工作为目标,可以针对性的准备。详细研究这些源码细节,反而会吞噬你大量时间,耽误了你宝贵的求职时间。

如果你被迫加班划水,说明加班看技术也并不是你的本意,何不尝试改变一下工作环境呢?

看到这里的朋友,你一定就是万里挑一的那个靓仔,接下来我们开始正文。

首先我们来聊一个大家经常会碰到的问题:

1.为什么你吃不透Dubbo源码?

相关不少朋友会有这个经历:看dubbo源码,看到一半就看不下去了,然后过了几周就忘记之前自己看了啥,再后来,就再也不想继续看了。最后对外宣称:“我研究过Dubbo源码!”。

诚然,这样的状态,再背几道八股文面试题,应付面试足以。但是我相信阅读到这里的朋友绝对不想这样欺骗自己。

一般来说,以下几个原因会导致你阅读Dubbo源码不畅:

  • 对Dubbo SPI的机制不了解,导致在阅读的过程中,不知道要跟哪个类去看代码了;

  • 阅读顺序不对,没有先阅读服务导出和服务引入的代码,直接看服务调用的代码,导致不知道执行流程;

  • 以debug的方式阅读代码,导致跟着跟着就陷入到细枝末节里面去了,到最后不想再看了;

  • 本身对Dubbo很多功能都不了解,决定看源码才香,一上来就看代码,本末倒置,最终导致不知道代码是实现什么功能的…

对于Dubbo源码,首先,你需要先能在项目中引入Dubbo,以及能够使用基本功能,基于这个基础之上,正确的阅读顺序应该是这样的:

  • Dubbo SPI;

  • Dubbo服务导出

  • Dubbo服务引入;

  • Dubbo服务调用流程

最重要的就是在阅读过程中抓住主线流程。接下来我们就详细研究Dubbo源码。(获取高清源码流程图,在Java架构杂谈公众号回复“Dubbo”即可。)


Dubbo 3.0已经发布有一段时间了了,并且提供了强大的功能,如多语言支持,双向流,基于HTTP/2,放弃私有协议,优化服务发现模型,统一流量治理模型等等。这值得我们研究其应用场景和源码。不过,在此之前,我们先得对Dubb 2的源码有所了解,以方便更顺畅的阅读Dubbo 3.0新特性相关代码。

本文就是一篇带您深入了解Dubbo源码的文章,看完包懂,让你对Dubbo源码改造顺手捏来。如果你的工作并不需要改造dubbo源码,就没必要死记硬背这些源码的详细调用流程了,如果有改造,也不用死记硬背,先熟悉了流程,在改造dubbo源码的过程中就会更加熟悉了。否则,就变成了背八股文了。

2. Dubbo服务导出

在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在Java架构杂谈公众号发送SPI获取文章链接。

本文会使用到以下的例子辅助解读Dubbo源码:

假设有如下系统:

  • 消费者:dubbo-consumer-app

  • 调用的接口:com.itzhai.dubbo.demo.DubboTestService.playGame

  • 提供者:dubbo-provider-app

如下图所示:


在SpringBoot项目中,我们要启用Dubbo,一般会在启动类中配置:

1@EnableDubbo(scanBasePackages = {"com.itzhai.dubbo.demo"})

@EnableDubbo注解:

1@Target({ElementType.TYPE})
2@Retention(RetentionPolicy.RUNTIME)
3@Inherited
4@Documented
5@EnableDubboConfig
6@DubboComponentScan
7public @interface EnableDubbo {
8    ...
9}

其中最重要的两个注解为@EnableDubboConfig以及@DubboComponentScan

  • @EnableDubboConfig是用于解析生成DubboConfig配置类的;

  • @DubboComponentScan是用于扫描Dubbo服务以及Dubbo服务引用的。

2.1. 解析生成DubboConfig配置类

我们继续查看@EnableDubboConfig注解:

 1@Target({ElementType.TYPE})
2@Retention(RetentionPolicy.RUNTIME)
3@Inherited
4@Documented
5@Import(DubboConfigConfigurationRegistrar.class)
6public @interface EnableDubboConfig {
7
8    /**
9     * It indicates whether binding to multiple Spring Beans.
10     *
11     * @return the default value is <code>false</code>
12     * @revised 2.5.9
13     */

14    boolean multiple() default true;
15
16}

这里引入了DubboConfigConfigurationRegistrar类,这个是ImportBeanDefinitionRegistrar接口的实现类。ImportBeanDefinitionRegistrar是Spring中用于注册BeanDefinition的扩展接口,实现了该接口的类被@Import注解引入之后,会触发其registerBeanDefinitions方法,然后生成BeanDefiniton对象,并注册到BeanDefinitionRegistry中,为实例化Bean做好准备。

下面看看DubboConfigConfigurationRegistrarregisterBeanDefinitions方法:

 1@Override
2public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
3    AnnotationAttributes attributes = AnnotationAttributes.fromMap(
4            importingClassMetadata.getAnnotationAttributes(EnableDubboConfig.class.getName()));
5
6    boolean multiple = attributes.getBoolean("multiple");
7
8    // Single Config Bindings
9    registerBeans(registry, DubboConfigConfiguration.Single.class);
10
11    if (multiple) { // Since 2.6.6 https://github.com/apache/dubbo/issues/3193
12        registerBeans(registry, DubboConfigConfiguration.Multiple.class);
13    }
14}

其中multiple默认为true,所以默认的会进入到

1registerBeans(registry, DubboConfigConfiguration.Multiple.class);

这个方法中。跟踪registerBeans方法,可得知,是通过AnnotatedBeanDefinitionReader这个类来读取DubboConfigConfiguration.Multiple.class上面的注解的。DubboConfigConfiguration.Multiple类如下:

 1/**
2 * Multiple Dubbo {@link AbstractConfig Config} Bean Binding
3 */

4@EnableDubboConfigBindings({
5        @EnableDubboConfigBinding(prefix = "dubbo.applications", type = ApplicationConfig.class, multiple = true),
6        @EnableDubboConfigBinding(prefix = "dubbo.modules", type = ModuleConfig.class, multiple = true),
7        @EnableDubboConfigBinding(prefix = "dubbo.registries", type = RegistryConfig.class, multiple = true),
8        @EnableDubboConfigBinding(prefix = "dubbo.protocols", type = ProtocolConfig.class, multiple = true),
9        @EnableDubboConfigBinding(prefix = "dubbo.monitors", type = MonitorConfig.class, multiple = true),
10        @EnableDubboConfigBinding(prefix = "dubbo.providers", type = ProviderConfig.class, multiple = true),
11        @EnableDubboConfigBinding(prefix = "dubbo.consumers", type = ConsumerConfig.class, multiple = true),
12        @EnableDubboConfigBinding(prefix = "dubbo.config-centers", type = ConfigCenterBean.class, multiple = true),
13        @EnableDubboConfigBinding(prefix = "dubbo.metadata-reports", type = MetadataReportConfig.class, multiple = true),
14        @EnableDubboConfigBinding(prefix = "dubbo.metricses", type = MetricsConfig.class, multiple = true)
15})
16public static class Multiple {
17
18}

这里又用到了EnableDubboConfigBinding注解:

1@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
2@Retention(RetentionPolicy.RUNTIME)
3@Documented
4@Repeatable(EnableDubboConfigBindings.class)
5@Import(DubboConfigBindingRegistrar.class)
6public @interface EnableDubboConfigBinding {
7      ...
8}

可以发现,最终是通过DubboConfigBindingRegistrar这个类来处理Multiple上面的注解信息的,这个类也是一个ImportBeanDefinitionRegistrar类。接下来看看其registerBeanDefinitions方法:

 1@Override
2public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
3    AnnotationAttributes attributes = AnnotationAttributes.fromMap(
4            importingClassMetadata.getAnnotationAttributes(EnableDubboConfigBinding.class.getName()));
5
6    registerBeanDefinitions(attributes, registry);
7
8}
9
10protected void registerBeanDefinitions(AnnotationAttributes attributes, BeanDefinitionRegistry registry) {
11    String prefix = environment.resolvePlaceholders(attributes.getString("prefix"));
12
13    Class<? extends AbstractConfig> configClass = attributes.getClass("type");
14
15    boolean multiple = attributes.getBoolean("multiple");
16
17    registerDubboConfigBeans(prefix, configClass, multiple, registry);
18
19}

可以发现,最终会读取到每个@EnableDubboConfigBinding注解的prefix和type属性,传入registerDubboConfigBeans进行处理。

registerDubboConfigBeans内部会根据prefix去Spring的environment中对应的配置项,找到之后,实例化成对应type的bean,注册到Spring中。比如:

dubbo.application前缀的配置会生成一个ApplicationConfig的bean,dubbo.protocols的前缀,会生成ProtocolConfigd bean。

registerDubboConfigBeans大致的实现思路如下:

每个@EnableDubboConfigBinding注解都会解析得到一个Config的BeanDefinition注册到Spring中,同时注册一个对应的DubboConfigBindingBeanPostProcessor后置处理器,该处理器内部会利用Spring的DataBinder技术,结合properties文件,为Config Bean对应的属性进行赋值。

具体的registerDubboConfigBeans实现代码这里就不列出来了,感兴趣的自己跟踪阅读。

至此,Dubbo的Config类生成完毕。

2.2. 扫描Dubbo的@Service注解,生成ServiceBean

上面提到@EnableDubboConfig注解会引入DubboComponentScan@DubboComponentScan是用于扫描Dubbo服务以及Dubbo服务引用的,@DubboComponentScan注解如下:

1@Target(ElementType.TYPE)
2@Retention(RetentionPolicy.RUNTIME)
3@Documented
4@Import(DubboComponentScanRegistrar.class)
5public @interface DubboComponentScan {
6    ...
7}

这里引入了DubboComponentScanRegistrar,也是一个ImportBeanDefinitionRegistrar接口实现类,继续看其registerBeanDefinitions方法:

 1@Override
2public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
3    // 获取DubboComponentScan注解配置的扫描路径
4    Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
5
6    // 处理服务导出相关逻辑
7    registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);
8
9    // 处理服务引入相关逻辑
10    registerReferenceAnnotationBeanPostProcessor(registry);
11
12}

其中最主要的两个方法:

  • registerServiceAnnotationBeanPostProcessor

  • 注册ServiceAnnotationBeanPostProcessor(是一个BeanDefinitionRegistryPostProcessor接口实现类)

  • 在Spring启动时会调用postProcessBeanDefinitionRegistry方法扫描添加了@Service注解了的类

  • 然后生成BeanDefinition(会生成两个,一个普通的bean,一个ServiceBean),后续的Spring周期中会生成Bean

  • 在ServiceBean中会监听ContextRefreshedEvent事件,一旦Spring启动完后,就会发出事件触发进行服务导出;

  • registerReferenceAnnotationBeanPostProcessor

  • 注册ReferenceAnnotationBeanPostProcessor(是一个AnnotationInjectedBeanPostProcessor抽象类子类);

  • 在Spring启动时,会通过该类扫描服务引用@Reference的注入点,生成ReferenceBean,然后生成具体的代理类注入到对应的字段中。

本节我们看第一个方法,服务导出相关逻辑。

接下来继续跟踪ServiceAnnotationBeanPostProcessorpostProcessBeanDefinitionRegistry方法:

 1@Override
2public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
3
4    Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
5
6    if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
7        registerServiceBeans(resolvedPackagesToScan, registry);
8    } else {
9        if (logger.isWarnEnabled()) {
10            logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
11        }
12    }
13
14}

关键方法为registerServiceBeans,该方法扫描并且注册ServiceBean,具体代码如下:

 1private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
2
3    // 创建扫描类
4    DubboClassPathBeanDefinitionScanner scanner =
5            new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
6
7    BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
8
9    scanner.setBeanNameGenerator(beanNameGenerator);
10
11    // 设置需要扫描的注解,这里只会扫描Dubbo的@Service注解
12    scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));
13
14    /**
15     * Add the compatibility for legacy Dubbo's @Service
16     *
17     * The issue : https://github.com/apache/dubbo/issues/4330
18     * @since 2.7.3
19     */

20    scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));
21
22    for (String packageToScan : packagesToScan) {
23
24        // Registers @Service Bean first
25        scanner.scan(packageToScan);
26
27        // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
28        Set<BeanDefinitionHolder> beanDefinitionHolders =
29                findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
30
31        if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
32
33            // 处理扫描到的BeanDefinition
34            for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
35                registerServiceBean(beanDefinitionHolder, registry, scanner);
36            }
37
38            if (logger.isInfoEnabled()) {
39                logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
40                        beanDefinitionHolders +
41                        " } were scanned under package[" + packageToScan + "]");
42            }
43
44        } else {
45
46            if (logger.isWarnEnabled()) {
47                logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
48                        + packageToScan + "]");
49            }
50
51        }
52
53    }
54
55}

这里的逻辑比较简单,主要是通过DubboClassPathBeanDefinitionScanner扫描器扫描标注有Dubbo的@Service注解的类,扫描得到相关的BeanDefinition,然后遍历处理每个BeanDefinition,处理方法:ServiceAnnotationBeanPostProcessor#registerServiceBean。接下来看看ServiceAnnotationBeanPostProcessor#registerServiceBean这个方法:

 1private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
2                                 DubboClassPathBeanDefinitionScanner scanner)
 
{
3    // 获取到服务实现类
4    Class<?> beanClass = resolveClass(beanDefinitionHolder);
5    // 获取服务实现类上面的@Service注解
6    Annotation service = findServiceAnnotation(beanClass);
7
8    /**
9     * The {@link AnnotationAttributes} of @Service annotation
10     */

11    // 获取@Service注解上的信息
12    AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, falsefalse);
13
14    // 获取服务接口
15    Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
16    // 获取服务实现类bean名称
17    String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
18
19    // 生成服务的ServiceBeanDefinition
20    AbstractBeanDefinition serviceBeanDefinition =
21            buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);
22
23    // 生成服务对应的Bean名称
24    String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);
25
26    if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
27
28        // ServiceBean注册到Spring容器中,beanName的格式:ServiceBean:org.apache.dubbo.demo.DemoService:1.0.1:itzhai
29        registry.registerBeanDefinition(beanName, serviceBeanDefinition);
30
31        if (logger.isInfoEnabled()) {
32            logger.info("The BeanDefinition[" + serviceBeanDefinition +
33                    "] of ServiceBean has been registered with name : " + beanName);
34        }
35
36    } else {
37
38        if (logger.isWarnEnabled()) {
39            logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
40                    "] of ServiceBean[ bean name : " + beanName +
41                    "] was be found , Did @DubboComponentScan scan to same package in many times?");
42        }
43
44    }
45
46}

这段代码主要逻辑:

  • 获取到服务实现类,以及服务实现类的注解信息,以及服务实现类的接口;

  • 为服务实现类生成一个ServiceBean的Definition,ServiceBean的名称格式为:ServiceBean:org.apache.dubbo.demo.DemoService:1.0.1:itzhai;

  • 把ServiceBeanDefinition注册到Spring容器中。

注意,在生成ServiceBean的Definition方法中,会把服务实现类的bean赋值给ServiceBean的ref属性:

1private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,
2                                                          AnnotationAttributes serviceAnnotationAttributes,
3                                                          Class<?> interfaceClass,
4                                                          String annotatedServiceBeanName)
 
{
5    ...
6    addPropertyReference(builder, "ref", annotatedServiceBeanName);
7    ...
8}

基于ServiceBeanDefinition生成的bean结构如下图所示:

其中SpringBean即为服务实现类的Bean,ServiceBean为Dubbo服务的Bean,具体的服务导出逻辑就封装在ServiceBean中。

在Spring启动完成之后,会触发每个ServiceBean的onApplicationEvent方法,此方法会调用ServiceBean的export()方法,执行服务导出。

接下来就详细看看服务导出的实现逻辑。

2.3. ServiceBean服务导出流程

ServiceBean.export()方法如下:

1@Override
2public void export() {
3    super.export();
4    // 发布ServiceBeanExportedEvent事件
5    publishExportEvent();
6}

继续跟踪super.export()方法:

 1public synchronized void export() {
2    // 读取配置
3    checkAndUpdateSubConfigs();
4
5    // 判断服务是否需要导出
6    if (!shouldExport()) {
7        return;
8    }
9
10    // 判断是否需要延迟发布
11    if (shouldDelay()) {
12        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
13    } else {
14        // 执行服务导出
15        doExport();
16    }
17}

这里有两个核心方法调用:

  • checkAndUpdateSubConfigs():读取配置;

  • doExport():执行服务导出;

接下来详细可靠这两个方法的逻辑。

2.3.1 读取配置

checkAndUpdateSubConfigs()方法的详情如下:

 1public void checkAndUpdateSubConfigs() {
2    // Use default configs defined explicitly on global configs
3    // 补全ServiceConfig中的属性配置,ServiceConfig中值为空的属性,会从ProviderConfig、ModuleConfig、ApplicationConfig中获取
4    completeCompoundConfigs();
5
6    // Config Center should always being started first.
7    // 从配置中心获取配置,包括应用配置和全局配置,保存到
8    // Environment的externalConfigurationMap和appExternalConfigurationMap中
9    // 然后使用配置中心的配置覆盖刷新所有的Config类的属性(除了ServiceConfig)
10    startConfigCenter();
11
12    checkDefault();
13
14    checkProtocol();
15
16    checkApplication();
17
18    // if protocol is not injvm checkRegistry
19    if (!isOnlyInJvm()) {
20        checkRegistry();
21    }
22
23    // 使用配置中心的数据覆盖刷新ServiceConfig
24    this.refresh();
25
26    // 如果配了metadataReportConfig,则刷新配置
27    checkMetadataReport();
28
29    if (StringUtils.isEmpty(interfaceName)) {
30        throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
31    }
32
33    // 如果当前服务的实现类是GenericService接口的实例
34    if (ref instanceof GenericService) {
35        interfaceClass = GenericService.class;
36        if (StringUtils.isEmpty(generic)) {
37            generic = Boolean.TRUE.toString();
38        }
39    } else {
40        // 加载接口
41        try {
42            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
43                    .getContextClassLoader());
44        } catch (ClassNotFoundException e) {
45            throw new IllegalStateException(e.getMessage(), e);
46        }
47        // 刷新MethodConfig
48        checkInterfaceAndMethods(interfaceClass, methods);
49        checkRef();
50        generic = Boolean.FALSE.toString();
51    }
52    ...
53    checkStubAndLocal(interfaceClass);
54    checkMock(interfaceClass);
55}

主要逻辑如下:

  • completeCompoundConfigs()

  • 补全ServiceConfig中的属性配置;

  • 针对ServiceConfig中值为空的属性,会从ProviderConfig、ModuleConfig、ApplicationConfig中获取;

  • 其中ProviderConfig、ModuleConfig、ApplicationConfig这些配置类Bean,在上面已经解析生成;

  • startConfigCenter()

  • 从配置中心获取配置,包括应用配置和全局配置,保存到Environment的externalConfigurationMap和appExternalConfigurationMap中。这一步中,如果配置了ConfigCenter,则会会去连接配置中心,将当前应用的配置和全局配置都拉取到本地。

  • 然后使用配置中心的配置覆盖刷新所有的Config类的属性(除了ServiceConfig);

  • 配置优先级关系图如下:

  • 详细的代码可以自己进一步阅读;

  • this.refresh():刷新ServiceConfig,在这一步,已经把远程配置中心的配置数据拉取下来,此时会对使用这些配置数据刷新本地的ServiceConfig从注解解析出来的配置。

下面看看重要的服务导出方法。

2.3.2 执行服务导出

ServiceConfig#doExport方法如下:

 1protected synchronized void doExport() {
2    if (unexported) {
3        throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
4    }
5    if (exported) {
6        return;
7    }
8    exported = true;
9
10    if (StringUtils.isEmpty(path)) {
11        path = interfaceName;
12    }
13    doExportUrls();
14}

继续查看ServiceConfig#doExportUrls

 1private void doExportUrls() {
2    // 根据注册中心配置生成所有的注册中心URL
3    List<URL> registryURLs = loadRegistries(true);
4
5    for (ProtocolConfig protocolConfig : protocols) {
6        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
7        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
8        ApplicationModel.initProviderModel(pathKey, providerModel);
9        // 服务导出的方法
10        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
11    }
12}

重点的方法调用:ServiceConfig#doExportUrlsFor1Protocol,继续查看该方法:

 1/**
2 * @param protocolConfig 表示一个协议
3 * @param registryURLs 所有注册中心的URL
4 */

5private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
6    // 协议默认为dubbo
7    String name = protocolConfig.getName();
8    if (StringUtils.isEmpty(name)) {
9        name = DUBBO;
10    }
11
12    // 通过这个map收集服务url的参数
13    Map<String, String> map = new HashMap<String, String>();
14    map.put(SIDE_KEY, PROVIDER_SIDE);
15
16    appendRuntimeParameters(map);
17
18    // 监控相关参数
19    appendParameters(map, metrics);
20    // 应用相关参数
21    appendParameters(map, application);
22    ...
23
24    // 解析每个方法的参数配置,如:
25    // @Service(versions="1.0.1", grousp="itzhai", methods = {@Method(name = "playGame", timeout = 3000)})
26    // 解析出来之后,把这些参数拼接到Dubbo服务的导出URL上
27    if (CollectionUtils.isNotEmpty(methods)) {...}
28
29    if (ProtocolUtils.isGeneric(generic)) {...} else {...}
30
31    // token机制:https://dubbo.apache.org/en/docs/v2.7/user/examples/token-authorization/
32    if (!ConfigUtils.isEmpty(token)) {...}
33
34    // export service
35    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
36    Integer port = this.findConfigedPorts(protocolConfig, name, map);
37    // 构造最终的要导出的服务url
38    URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
39    // 生成的URL:dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1
40
41    // 扩展点,可以通过ConfiguratorFactory,对服务url再次进行配置
42    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
43            .hasExtension(url.getProtocol())) {
44        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
45                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
46    }
47
48      // scope可能取值:null, remote, local, none
49    String scope = url.getParameter(SCOPE_KEY); 
50    // don't export when none is configured
51    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
52        // export to local if the config is not remote (export to remote only when config is remote)
53        if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
54            // scope != remote,则进行本地导出,url的protocol会改为injvm
55            exportLocal(url);
56        }
57        // export to remote if the config is not local (export to local only when config is local)
58        if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
59            // 远程导出逻辑
60              ...
61        }
62    }
63    this.urls.add(url);
64}

doExportUrlsFor1Protocol这个方法比较长,主要逻辑:

  • 通过map收集服务url的参数;

  • 解析单个方法的配置;

  • token机制参数处理;

  • 构造要导出的服务的最终URL;

  • 根据scope参数执行具体的服务导出逻辑;

根据不同的scope取值,走不同的逻辑:

我们先来看看远程导出逻辑。

 1// scope != local, 则进行远程导出
2if (CollectionUtils.isNotEmpty(registryURLs)) {
3  // 如果配置了注册中心,则将服务注册到所有的注册中心
4  for (URL registryURL : registryURLs) {
5    //if protocol is only injvm ,not register
6    if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
7      continue;
8    }
9
10    // 设置是否动态服务参数,用于设置对应的zookeeper上的节点是否临时节点
11    url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
12
13    // 获取监控中心地址
14    URL monitorUrl = loadMonitor(registryURL);
15
16    // 设置监控中心地址
17    if (monitorUrl != null) {
18      url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
19    }
20
21    if (logger.isInfoEnabled()) {
22      if (url.getParameter(REGISTER_KEY, true)) {
23        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
24      } else {
25        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
26      }
27    }
28
29    // For providers, this is used to enable custom proxy to generate invoker,默认为javassist
30    String proxy = url.getParameter(PROXY_KEY);
31    if (StringUtils.isNotEmpty(proxy)) {
32      registryURL = registryURL.addParameter(PROXY_KEY, proxy);
33    }
34
35    // 生成ServiceBean对应服务的Invoker代理对象
36    // registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())生成的URL为:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider-app&dubbo=2.0.2&export=dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1&logger=log4j&pid=11035&registry=zookeeper&release=2.7.0&timeout=3000&timestamp=1668952313811
37    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
38
39    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
40
41    // 通过特定协议来执行服务导出,得到Exporter对象,这里的协议为RegistryProtocol
42    Exporter<?> exporter = protocol.export(wrapperInvoker);
43    exporters.add(exporter);
44  }
45else {
46  // 不用注册服务到注册中心,直接进行服务导出
47  if (logger.isInfoEnabled()) {
48    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
49  }
50
51  Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
52  DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
53
54  // 导出的核心方法
55  Exporter<?> exporter = protocol.export(wrapperInvoker);
56  exporters.add(exporter);
57}
58
59/**
60 * @since 2.7.0
61 * ServiceData Store
62 */

63MetadataReportService metadataReportService = null;
64if ((metadataReportService = getMetadataReportService()) != null) {
65  metadataReportService.publishProvider(url);
66}

如果配置了注册中心,首先会把服务URL注册到所有的注册中心,在这之前会通过ProxyFactory生成一个服务的代理类Invoker:

1Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

其中registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())会在注册中心的地址后面拼接上服务的URL地址,拼接的结果如下:

registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider-app&dubbo=2.0.2&export=dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1&logger=log4j&pid=11035&registry=zookeeper&release=2.7.0&timeout=3000&timestamp=1668952313811

通过ProxyFactory创建如下代理类:

然后再把代理类包装成DelegateProviderMetaDataInvoker:

1DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this)

接着执行Exporter<?> exporter = protocol.export(wrapperInvoker)进行服务导出。

这里基于以上的url和Dubbo SPI机制,可以推断用到的是RegistryProtocol#exort方法,代码如下:

 1@Override
2public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
3    // 源url:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?xxx&registry=zookeeper&xxx
4    // 转换为:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?xxx
5    URL registryUrl = getRegistryUrl(originInvoker);
6    // 得到服务提供者url,表示服务提供者:dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?xxx
7    URL providerUrl = getProviderUrl(originInvoker);
8
9    // Subscribe the override data
10    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
11    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
12    //  subscription information to cover.
13
14    // 得到overrideSubscribeUrl,表示需要监听的服务以及监听的类型为configurators,这个是老版本的动态配置监听url,生成的URL格式:providerUrl协议改为provider,再加上category=configurators&check=false
15    // provider://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1
16    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
17
18    // 创建overrideSubscribeUrl的监听器OverrideListener,用于监听overrideSubscribeUrl的变化
19    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
20    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
21
22
23    // 使用providerConfigurationListener和serviceConfigurationListener重写providerUrl
24    // dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1
25    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
26
27    // export invoker
28    // 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了dd
29    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
30
31    // url to registry
32    final Registry registry = getRegistry(originInvoker);
33
34    // 简化providerUrl参数,得到存入到注册中心去的providerUrl
35    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
36
37    // 把服务提供者Invoker、注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
38    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
39            registryUrl, registeredProviderUrl);
40
41    //to judge if we need to delay publish
42    // 判断是否需要注册到注册中心
43    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
44    if (register) {
45        // 注册简化后的服务URL到注册中心
46        register(registryUrl, registeredProviderUrl);
47        providerInvokerWrapper.setReg(true);
48    }
49
50    // 监听路径内容
51    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
52
53    exporter.setRegisterUrl(registeredProviderUrl);
54    exporter.setSubscribeUrl(overrideSubscribeUrl);
55    //Ensure that a new exporter instance is returned every time export
56    return new DestroyableExporter<>(exporter);
57}

这段代码主要处理以下逻辑:

  • getRegistryUrl(originInvoker):得到注册中心的url,这一步会把传进来的registry://xxx?xxx=xxx&amp;registry=zookeeper转换为:zookeeper://xxx?xxx=xxx

  • getProviderUrl(originInvoker):得到服务提供者的url,dubbo://xxx

  • getRegistry(originInvoker):得到注册中心;

  • exporter = doLocalExport(originInvoker, providerUrl):执行服务导出;

  • register(registryUrl, registeredProviderUrl):把服务的URL注册到注册中心;

  • registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener):监听zookeeper节点;

registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())生成的URL为:

我们先来重点看看RegistryProtocol#doLocalExport这个方法,这个方法是执行本地导出的。

2.3.2.1 服务本地导出

代码如下:

1private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
2    String key = getCacheKey(originInvoker);
3
4    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
5        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
6        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
7    });
8}

这里最关键的导出方法是:protocol.export(invokerDelegate),这里用到了Dubbo SPI加载具体实现,基于providerUrl,加载到的为DubboProtocol类,最终会调用到以下几个方法:

  • 1、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export:

1@Override
2public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
3    if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4        return protocol.export(invoker);
5    }
6    return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
7}

这里主要用于添加dubbo protocol对应的invoker的过滤器链,自己实现的Filter扩展类就是在这里加载的。

执行完之后,生成的对象如下:

  • 2、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export:

 1@Override
2public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
3    if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4        return protocol.export(invoker);
5    }
6    return new ListenerExporterWrapper<T>(protocol.export(invoker),
7            // 加载ExporterListener接口扩展点,使用url和EXPORTER_LISTENER_KEY筛选到导出服务相关的扩展点
8            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
9                    .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
10}

这里如果是dubbo://开头的url,则会包装ListenerExporterWrapper,给dubbo protocol对应的invoker添加监听器,用于处理导出结果,这里可以进行扩展。

  • 3、Dubbo SPI调用到的Protocol类:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export:

 1@Override
2public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
3    URL url = invoker.getUrl();
4
5    // export service.
6    String key = serviceKey(url);
7    // 构造DubboExporter,用于服务导出
8    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
9    exporterMap.put(key, exporter);
10
11    //export an stub service for dispatching event
12    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
13    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
14    if (isStubSupportEvent && !isCallbackservice) {
15        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
16        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
17            if (logger.isWarnEnabled()) {
18                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
19                        "], has set stubproxy support event ,but no stub methods founded."));
20            }
21
22        } else {
23            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
24        }
25    }
26
27    // 启动Server
28    openServer(url);
29
30    // 一些特殊的序列化机制处理
31    optimizeSerialization(url);
32
33    return exporter;
34}

最终会创建一个DubboExporter对象,并放入到exporterMap中,后续执行服务的时候,会从这个map中检索到需要执行的服务,如下图所示:

执行完export方法之后,会返回到org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport方法,这里会包装多一层ExporterChangeableWrapper

而执行完org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport之后,会返回到org.apache.dubbo.registry.integration.RegistryProtocol#export这个方法,最后会多包装一层DestroyableExporter

最里面的代理类中包括了引用的服务实现类ZDubboTestServiceImpl,当通过Dubbo调用服务的时候,会从exporterMap中找到对应的DubboExporter,最终就会调用这个服务实现类的方法。

接下来的DubboProtocol#openServer就是构建传输层链路与建立连接的了,下面详细看看。

构建传输层链路与启动服务

在Dubbo中,数据处理分为了数据交互层和数据传输层:

接下来看看这个连接是怎么建立的,跟踪DubboProtocol#openServer方法:

 1private void openServer(URL url) {
2    // find server. key = IP + port, 如:192.168.1.101:20880
3    String key = url.getAddress();
4    // client can export a service which's only for server to invoke
5    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
6    if (isServer) {
7        ExchangeServer server = serverMap.get(key);
8        if (server == null) {
9            synchronized (this) {
10                server = serverMap.get(key);
11                if (server == null) {
12                    serverMap.put(key, createServer(url));
13                }
14            }
15        } else {
16            // server supports reset, use together with override
17            server.reset(url);
18        }
19    }
20}

可以发现,这里会为每个 key(IP:port)启动一个服务:

继续跟踪,发现最终会调用到DubboProtocol#createServer方法:

 1private ExchangeServer createServer(URL url) {
2    url = URLBuilder.from(url)
3            // send readonly event when server closes, it's enabled by default
4            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
5            // enable heartbeat by default
6            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
7            .addParameter(CODEC_KEY, DubboCodec.NAME)
8            .build();
9
10    // 获取协议名称,如netty,netty,jetty等
11    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
12
13    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
14        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
15    }
16
17    ExchangeServer server;
18    try {
19        server = Exchangers.bind(url, requestHandler);
20    } catch (RemotingException e) {
21        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
22    }
23
24    // 校验客户端协议
25    str = url.getParameter(CLIENT_KEY);
26    if (str != null && str.length() > 0) {
27        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
28        if (!supportedTypes.contains(str)) {
29            throw new RpcException("Unsupported client type: " + str);
30        }
31    }
32
33    return server;
34}

这里最关键的是调用数据交换层的bind方法:

1server = Exchangers.bind(url, requestHandler);

继续跟踪Exchangers#bind方法:

 1public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2    if (url == null) {
3        throw new IllegalArgumentException("url == null");
4    }
5    if (handler == null) {
6        throw new IllegalArgumentException("handler == null");
7    }
8    // codec 协议编码方式
9    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
10    return getExchanger(url).bind(url, handler);
11}

最终会调用getExchanger得到一个Exchanger实现去启动服务器,这里基于SPI扩展机制,会获取到HeaderExchanger:

1public static Exchanger getExchanger(URL url) {
2    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
3    return getExchanger(type);
4}
5
6public static Exchanger getExchanger(String type) {
7    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
8}

接着会调用HeaderExchanger的bind方法去启动服务器,继续跟踪此方法:

1@Override
2public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
3    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
4}

这里会把传进来的handler(ExchangeHandlerAdapter)包装两层:HeaderExchangeHandler,DecodeHandler,然后使用Transporters.bind来启动服务器,最后再包装成HeaderExchangeServer:

 1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
2    if (url == null) {
3        throw new IllegalArgumentException("url == null");
4    }
5    if (handlers == null || handlers.length == 0) {
6        throw new IllegalArgumentException("handlers == null");
7    }
8    ChannelHandler handler;
9    if (handlers.length == 1) {
10        handler = handlers[0];
11    } else {
12        handler = new ChannelHandlerDispatcher(handlers);
13    }
14
15    // 默认调用到NettyTransporter的bind方法
16    return getTransporter().bind(url, handler);
17}
18
19public static Transporter getTransporter() {
20    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
21}

最终基于Dubbo SPI机制,获取到传输层的实现类去connect,默认的传输层实现类为:NettyTransporter:

1@Override
2public Server bind(URL url, ChannelHandler listener) throws RemotingException {
3    return new NettyServer(url, listener);
4}

继续调用new NettyServer:

1public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
2    // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
3    // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
4    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
5}

这里会调用wrapChannelHandler继续把handler包装多几层:

1protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
2    // 先通过调用ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
3    // 把handler包装成AllChannelHandler
4    // 然后继续把AllChannelHandler包装成HeartbeatHandler,HeartbeatHandler包装成MultiMessageHandler
5    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
6            .getAdaptiveExtension().dispatch(handler, url)));
7}

最终包装成如下几层:

MultiMessageHandler-->HeartbeatHandler-->AllChannelHandler

最终会调用到NettyServer的doOpen()方法创建连接:

 1@Override
2protected void doOpen() throws Throwable {
3    bootstrap = new ServerBootstrap();
4
5    bossGroup = new NioEventLoopGroup(1new DefaultThreadFactory("NettyServerBoss"true));
6    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
7            new DefaultThreadFactory("NettyServerWorker"true));
8
9    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
10    channels = nettyServerHandler.getChannels();
11
12    bootstrap.group(bossGroup, workerGroup)
13            .channel(NioServerSocketChannel.class)
14            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
15            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
16            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
17            .childHandler(new ChannelInitializer<NioSocketChannel>() {
18                @Override
19                protected void initChannel(NioSocketChannel ch) throws Exception {
20                    // FIXME: should we use getTimeout()?
21                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
22                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
23                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
24                            .addLast("decoder", adapter.getDecoder())
25                            .addLast("encoder", adapter.getEncoder())
26                            .addLast("server-idle-handler"new IdleStateHandler(00, idleTimeout, MILLISECONDS))
27                            .addLast("handler", nettyServerHandler);
28                }
29            });
30    // bind
31    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
32    channelFuture.syncUninterruptibly();
33    channel = channelFuture.channel();
34
35}

这里就是我们熟悉的Netty服务器开启的代码了。至此,服务导出的主线流程执行完毕。

传输层构建的链路如下:

可以发现,基于Dubbo SPI机制,数据传输层可以灵活切换到Netty、Mina等不同的网络框架。

2.3.2.2 服务导出到注册中心

RegistryProtocol.exort()方法在执行完本地导出之后,会继续把服务URL注册到注册中心:

1register(registryUrl, registeredProviderUrl);

继续调用以下代码:

1public void register(URL registryUrl, URL registeredProviderUrl) {
2    Registry registry = registryFactory.getRegistry(registryUrl);
3    registry.register(registeredProviderUrl);
4}

这里底层会基于SPI的IoC,创建具体的注册中心,如ZookeeperRegistry。

最后通过register方法,把提供者的URL注册到注册中心。

2.3.3 服务导出相关URL订阅

最后,我们梳理下服务导出需要订阅到的URL。

相关代码在RegistryProtocol#exort中,如下:

 1    // 得到overrideSubscribeUrl,表示需要监听的服务以及监听的类型为configurators,这个是老版本的动态配置监听url,生成的URL格式:providerUrl协议改为provider,再加上category=configurators&check=false
2    // provider://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1
3    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
4
5    // 创建overrideSubscribeUrl的监听器OverrideListener,用于监听overrideSubscribeUrl的变化
6    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
7    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
8
9
10    // 使用providerConfigurationListener和serviceConfigurationListener重写providerUrl
11    // dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668952315016&version=1.0.1
12    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
13
14    ...
15
16    // url to registry
17    final Registry registry = getRegistry(originInvoker);
18
19    // 简化providerUrl参数,得到存入到注册中心去的providerUrl
20    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
21
22    // 把服务提供者Invoker、注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
23    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
24            registryUrl, registeredProviderUrl);
25
26    ...
27
28    // 监听旧版本的动态配置
29    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
30
31    ...

主要逻辑:

  • 先生成老版本的动态配置监听URL:overrideSubscribeUrl;

  • 创建OverrideListener用于处理监听到的overrideSubscribeUrl变更事件;

  • 执行overrideUrlWithConfig(),处理以下任务:

  • 通过 ProviderConfigurationListener 监听应用级别的动态配置,同时从配置中心获取动态配置,重写providerUrl;

  • 通过 ServiceConfigurationListener 监听服务级别的动态配置,同时从配置中心获取动态配置,重写providerUrl;

  • 执行registry.subscribe()监听旧版本的动态配置。

2.3.3.1 动态配置变更监听处理

监听到动态配置变更之后,最终会调用到这个方法:

org.apache.dubbo.registry.integration.RegistryProtocol.OverrideListener#doOverrideIfNecessary

 1public synchronized void doOverrideIfNecessary() {
2    final Invoker<?> invoker;
3    if (originInvoker instanceof InvokerDelegate) {
4        invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
5    } else {
6        invoker = originInvoker;
7    }
8    //The origin invoker
9    URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
10    String key = getCacheKey(originInvoker);
11
12    ExporterChangeableWrapper<?> exporter = bounds.get(key);
13    if (exporter == null) {
14        logger.warn(new IllegalStateException("error state, exporter should not be null"));
15        return;
16    }
17
18    //The current, may have been merged many times
19    URL currentUrl = exporter.getInvoker().getUrl();
20
21    //Merged with this configuration
22    URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
23
24    newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
25    newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
26            .getConfigurators(), newUrl);
27
28    if (!currentUrl.equals(newUrl)) {
29        // 重新导出
30        RegistryProtocol.this.reExport(originInvoker, newUrl);
31        logger.info("exported provider url changed, origin url: " + originUrl +
32                ", old export url: " + currentUrl + ", new export url: " + newUrl);
33    }
34}

最终,如果判断到providerUrl有变更,则执行重新导出方法reExport():

 1public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
2
3    // update local exporter
4    ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl);
5
6    // update registry
7    URL registryUrl = getRegistryUrl(originInvoker);
8    final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl);
9
10    //decide if we need to re-publish
11    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker);
12    ProviderInvokerWrapper<T> newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
13
14    /**
15     * Only if the new url going to Registry is different with the previous one should we do unregister and register.
16     */

17    if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) {
18        unregister(registryUrl, providerInvokerWrapper.getProviderUrl());
19        register(registryUrl, registeredProviderUrl);
20        newProviderInvokerWrapper.setReg(true);
21    }
22
23    exporter.setRegisterUrl(registeredProviderUrl);
24}

这里代码不继续跟进了,主要逻辑:

  • 如果newInvokerUrl有变更,则调用doChangeLocalExport()重新执行本地导出,这里又会继续执行DubboProtocol#export方法:

  • 会重新生成一个InvokerDelegate方法,把newInvokerUrl保存在新生成的对象中;

  • 注意,如果是重新导出,并不会关掉Netty服务重新开启,相关代码:

    •  1private void openServer(URL url) {
      2  // find server. key = IP + port, 如:192.168.1.101:20880
      3  String key = url.getAddress();
      4  // client can export a service which's only for server to invoke
      5  boolean isServer = url.getParameter(IS_SERVER_KEY, true);
      6  if (isServer) {
      7      ExchangeServer server = serverMap.get(key);
      8      if (server == null) {
      9          synchronized (this) {
      10              server = serverMap.get(key);
      11              if (server == null) {
      12                  serverMap.put(key, createServer(url));
      13              }
      14          }
      15      } else {
      16          // server supports reset, use together with override
      17          server.reset(url);
      18      }
      19  }
      20}
    • 可以发现,最终调用到了server.reset(url),该方法内部会检测心跳间隔时间有没有变更,如果有变更,则取消原来的心跳任务,根据新的心跳时间重新创建任务;

  • 如果providerUrl有变更,则删除掉配置中心原来的url,重新注册新的providerUrl。

订阅的URL如下所示:

至此,服务导出主线逻辑梳理完成。

2.4. 服务导出总结

服务导出主要做的事情:

  • 解析配置生成Dubbo的Config配置类;

  • 扫描Dubbo服务@Service注解,生成ServiceBean;

  • 执行ServiceBean的export方法执行服务导出

  • 先获取到各种配置,包括配置中心的配置,最后用这些配置覆盖刷新ServiceBean的配置;

  • 基于以上配置信息,构造导出服务的URL;

  • 执行本地导出,主要是用到Protocol的export方法,基于Dubbo的SPI机制,以及URL的参数,调用各种扩展去封装Invoker,Invoker最内部是一个代理类,包含了实际服务的引用;最后为不同的端口分别启动Netty服务,构造传输层的链路;

  • 服务导出到注册中心,即把Invoker对应的URL注册到注册中心;

  • 服务导出相关URL订阅,当感知到动态配置变更的时候,更新providerURL,最终执行RegistryProtocol的reExport方法。

2. Dubbo服务引入

在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在Java架构杂谈公众号发送SPI获取文章链接。

2.1. 识别@Reference注入点

首先,应用会注册一个ReferenceAnnotationBeanPostProcessor用于处理@Reference注解。

1public class ReferenceAnnotationBeanPostProcessor extends AnnotationInjectedBeanPostProcessor implements
2        ApplicationContextAwareApplicationListener 
{
3  ...
4}

该类继承AnnotationInjectedBeanPostProcessor,实现了自定义注解注入,这里的自定义注解为Dubbo的@Reference注解,大家也可以基于该抽象类实现自己的注入注解。

AnnotationInjectedBeanPostProcessor中,会寻找所有被@Reference注解标注的属性或者方法,作为注入点:

 1@Override
2public PropertyValues postProcessPropertyValues(
3        PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName)
 throws BeanCreationException 
{
4
5    // 通过自定义注解寻找需要注入的属性(对于ReferenceAnnotationBeanPostProcessor类既是@Reference注解标注的属性)
6    InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
7    try {
8        metadata.inject(bean, beanName, pvs);
9    } catch (BeanCreationException ex) {
10        throw ex;
11    } catch (Throwable ex) {
12        throw new BeanCreationException(beanName, "Injection of @" + getAnnotationType().getSimpleName()
13                + " dependencies is failed", ex);
14    }
15    return pvs;
16}

findInjectionMetadata方法会找到所有需要注入的字段(AnnotatedFieldElement)和方法(AnnotatedMethodElement),包装成AnnotatedInjectionMetadata返回。

最终调用metadata.inject(bean, beanName, pvs)进行依赖注入,该方法根据FieldElement的具体子类调用其inject方法,来完成属性或者方法的注入,这里就是Spring框架的依赖注入代码了:

 1    public void inject(Object target, String beanName, PropertyValues pvs) throws Throwable {
2        Collection<InjectedElement> elementsToIterate =
3                (this.checkedElements != null ? this.checkedElements : this.injectedElements);
4        if (!elementsToIterate.isEmpty()) {
5            boolean debug = logger.isDebugEnabled();
6            for (InjectedElement element : elementsToIterate) {
7                if (debug) {
8                    logger.debug("Processing injected element of bean '" + beanName + "': " + element);
9                }
10                element.inject(target, beanName, pvs);
11            }
12        }
13    }

而dubbo继承了Spring的InjectionMetadata.InjectedElement,我们挑一个AnnotatedFieldElement来看其具体实现:

 1@Override
2protected void
3inject(Object bean, String beanName, PropertyValues pvs) throws Throwable 
{
4
5    Class<?> injectedType = field.getType();
6
7    // 获取需要注入的对象
8    Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);
9
10      // 通过反射把对象注入到字段中
11    ReflectionUtils.makeAccessible(field);
12
13    field.set(bean, injectedObject);
14
15}

关键是getInjectedObject方法,该方法获取到了需要依赖注入的值,最终通过反射注入属性。

2.2. 生成服务引入的对象

下面看看getInjectedObject是怎么获取到注入的属性的值的,该方法最终会调用到org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor#doGetInjectedBean这个方法:

 1@Override
2protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
3                                   InjectionMetadata.InjectedElement injectedElement)
 throws Exception 
{
4
5    /**
6     * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
7     */

8    // referencedBeanName即为当前所引入服务对应的ServiceBean的beanName, 命名规则:ServiceBean:interfaceClassName:version:group
9    String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
10
11    /**
12     * The name of bean that is declared by {@link Reference @Reference} annotation injection
13     */

14    // 根据@Reference注解的所有信息以及属性接口类型,生成referenceBeanName
15    String referenceBeanName = getReferenceBeanName(attributes, injectedType);
16
17    // 获取一个referenceBean对象,会先判断referenceBeanCache是否存在ReferenceBean,如果不存在则创建一个
18    ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
19
20    // 把referenceBean注册到Spring容器中,该方法内部逻辑:
21      // 如果Spring容器中存在referencedBeanName对应的ServiceBean,说明引用的是本地服务,则获取到该ServiceBean的ref属性,命名一个referenceBeanName别名
22      // 如果Spring容器中不存在referencedBeanName对应的ServiceBean,则说明是远程服务,则注册刚刚创建的referenceBean到Spring容器中,名称为:referenceBeanName
23    registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);
24
25    cacheInjectedReferenceBean(referenceBean, injectedElement);
26
27    // 创建一个代理对象,属性注入的就是这个代理对象,内部会继续调用referenceBean.get()
28    return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
29}

接着看getOrCreateProxy方法:

1private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
2    if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
3        return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
4                wrapInvocationHandler(referenceBeanName, referenceBean));
5    } else {                                    // ReferenceBean should be initialized and get immediately
6        return referenceBean.get();
7    }
8}

最终都会调用到referenceBean.get()方法获取实例,ReferenceBean是ReferenceConfig的子类,最终会调用到ReferenceConfig的init()方法进行初始化,此方法最终会调用org.apache.dubbo.config.ReferenceConfig#createProxy方法来创建一个代理对象赋值给ReferenceBean的ref属性,该方法处理的逻辑分支比较多,我们这里假设只配置了一个注册中心的情况下,代码会走到这里来:

1if (urls.size() == 1) {
2    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
3else {
4    ...

urls.get(0)为注册中心的url,最终REF_PROTOCOL.refer会调用到以下几个方法:

  • 1、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#refer:

1    @Override
2    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
3        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {  // dubbo://
4            return protocol.refer(type, url);
5        }
6        return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
7    }

这里主要用于添加dubbo protocol对应的invoker的过滤器链,自己实现的Filter扩展类就是在这里加载的。

  • 2、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#refer:

 1@Override
2public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
3    if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {  // dubbo://
4        return protocol.refer(type, url);
5    }
6    return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
7            Collections.unmodifiableList(
8                    ExtensionLoader.getExtensionLoader(InvokerListener.class)
9                            .getActivateExtension(url, INVOKER_LISTENER_KEY)));
10}

这里如果是dubbo://开头的url,则会包装ListenerInvokerWrapper,给dubbo protocol对应的invoker添加监听器,用于处理结果,这里可以进行扩展。

  • 3、Dubbo SPI调用到的Protocol类:org.apache.dubbo.registry.integration.RegistryProtocol#refer:

 1@Override
2@SuppressWarnings("unchecked")
3public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
4    // url由 registry:// 改为 zookeeper://
5    url = URLBuilder.from(url)
6            .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
7            .removeParameter(REGISTRY_KEY)
8            .build();
9
10    // 这里基于Dubbo SPI的Adaptive机制,拿到注册中心实现,ZookeeperRegistry
11      // url.protocol="zookeeper"
12    Registry registry = registryFactory.getRegistry(url);
13
14    if (RegistryService.class.equals(type)) {
15        return proxyFactory.getInvoker((T) registry, type, url);
16    }
17
18    // qs,queryString, 消费者引入服务时所配置的参数
19    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
20
21    String group = qs.get(GROUP_KEY);
22    if (group != null && group.length() > 0) {
23        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
24            return doRefer(getMergeableCluster(), registry, type, url);
25        }
26    }
27
28    // 继续调用doRefer
29    return doRefer(cluster, registry, type, url);
30}

2.2.1 创建动态服务目录

继续调用RegistryProtocol#doRefer

 1private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
2    // 创建RegistryDirectory动态服务目录,每个服务都对应一个RegistryDirectory
3      // url为注册中心地址
4    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
5    directory.setRegistry(registry);
6    directory.setProtocol(protocol);
7
8    // 被引入的服务配置的参数
9    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
10
11    // 消费者url
12    // consumer://xxxx/com.itzhai.dubbo.demo.DubboTestService?application=duboo-consumer-app&...
13    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
14    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
15        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
16        // 注册消费者url到注册中心(url经过简化处理)
17        registry.register(directory.getRegisteredConsumerUrl());
18    }
19
20    // 构造路由链
21    directory.buildRouterChain(subscribeUrl);
22
23    // 订阅注册中心配置变更,这里会主动拉取配置,初始化好所有的提供者的DubboInvoker
24    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
25            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
26
27    // 利用传进来的cluster,join得到invoker:MockClusterInvoker --> FailoverClusterInvoker
28    Invoker invoker = cluster.join(directory);
29    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
30    return invoker;
31}

其中

1Invoker invoker = cluster.join(directory);

用到了Dubbo SPI,会依次执行MockClusterWrapper, FailoverCluster,最终生成MockClusterInvoker和FailoverClusterInvoker,代码跟踪到这里,我们可以得出以下对象结构:

2.2.2 构造路由链

 1private RouterChain(URL url) {
2    // 获取RouterFactory接口扩展实现类:
3    // 0 = {MockRouterFactory@3505}
4    // 1 = {TagRouterFactory@3506}      标签路由
5    // 2 = {AppRouterFactory@3507}      应用条件路由
6    // 3 = {ServiceRouterFactory@3508}  服务条件路由
7    List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
8            .getActivateExtension(url, (String[]) null);
9
10    // 基于url,使用RouterFactory生成各个类型的Router
11    List<Router> routers = extensionFactories.stream()
12            .map(factory -> factory.getRouter(url))
13            .collect(Collectors.toList());
14
15    // 通过priority对routers进行排序
16    initWithRouters(routers);
17}

其中:

  • AppRouterFactory:

 1@Activate(order = 200)
2public class AppRouterFactory implements RouterFactory {
3    public static final String NAME = "app";
4
5    private volatile Router router;
6
7    @Override
8    public Router getRouter(URL url) {
9        if (router != null) {
10            return router;
11        }
12        synchronized (this) {
13            if (router == null) {
14                router = createRouter(url);
15            }
16        }
17        return router;
18    }
19
20    private Router createRouter(URL url) {
21        return new AppRouter(DynamicConfiguration.getDynamicConfiguration(), url);
22    }
23}

最终AppRouter的构造函数中,会绑定一个监听器,监听路径:/dubbo/config/dubbo/dubbo-consumer-app.condition-router

同时会主动从配置中心获取当前服务对应的路径的路由配置,根据配置生成AppRouter,具体代码大家可以自行详细阅读。

  • ServiceRouterFactory:

与AppRouterFactory类似,最终ServiceRouter构造函数会绑定一个监听器,监听路径:/dubbo/config/dubbo/com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai.condition-router

同时会主动从配置中心获取当前服务对应的路径的路由配置,根据配置生成ServiceRouter,具体代码大家可以自行详细阅读。

  • TagRouterFactory.getRouter:

TagRouter与上面两个Router有所不同,因为TagRouter监听的路径为:/dubbo/config/dubbo/dubbo-provider-app.tag-router,是需要读取提供者配置的tag-router的,所以只有在获取到提供者的应用名之后,才可以读取配置,在TagRouterFactory.getRouter()方法中,并不会监听和拉取配置,只是先初始化好一个空的TagRouter,等到服务提供者的服务url从配置中心拉取到之后,获取到提供者的名称之后,才会拉取tag-router配置,并且构造TagRouter。TODO 触发点

最终构造如下路由链:

到这里构建的对象结构如下:

2.2.3 监听节点

上面最核心的方法就是directory.subscribe(),下面详细看看这个方法(RegistryDirectory#subscribe):

1public void subscribe(URL url) {
2    setConsumerUrl(url);
3    // 监听应用动态配置 /dubbo/config/dubbo/dubbo-consumer-app.configurators
4    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
5    // 监听所引入的服务的动态配置 /dubbo/config/dubbo/com.itzhai.DemoTest.configurators
6    serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
7    // 监听其他的节点
8    registry.subscribe(url, this);
9}

这里的registry为:ZookeeperRegistry extends FailbackRegistry,而RegistryDirectory也是一个NotifyListener:

1public class RegistryDirectory<Textends AbstractDirectory<Timplements NotifyListener
2

继续跟踪 registry.subscribe(url, this),最终会调用:org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe,此方法我们先关注订阅单个服务的逻辑:

 1else {
2    // 订阅单个服务
3    List<URL> urls = new ArrayList<>();
4    // toCategoriesPath(url) 获取到要监听的zk路径:
5    // 0 = "/dubbo/com.itzhai.dubbo.demo.DubboTestService/providers"
6    // 1 = "/dubbo/com.itzhai.dubbo.demo.DubboTestService/configurators"
7    // 2 = "/dubbo/com.itzhai.dubbo.demo.DubboTestService/routers"
8    for (String path : toCategoriesPath(url)) {
9        // 根据监听地址获取listeners,没有则先生成一个
10        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
11        if (listeners == null) {
12            zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
13            listeners = zkListeners.get(url);
14        }
15
16        // listener为传入的参数RegistryDirectory
17        // 通过NotifyListener获取ChildListener,为ZookeeperRegistry
18        ChildListener zkListener = listeners.get(listener);
19        if (zkListener == null) {
20            // 初始化监听器
21            listeners.putIfAbsent(listener, (parentPath, currentChilds)
22                    -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))
23            );
24            zkListener = listeners.get(listener);
25        }
26        // 为当前处理的path创建zk节点
27        zkClient.create(path, false);
28
29        // 添加节点变更监听,同时获取到节点下的内容
30        List<String> children = zkClient.addChildListener(path, zkListener);
31        if (children != null) {
32            urls.addAll(toUrlsWithEmpty(url, path, children));
33        }
34    }
35    // 调用监听器方法处理获取到的节点内容,初始化DubboInvoker
36    notify(url, listener, urls);
37}

以上代码主要会监听以下三个路径:

  • /dubbo/com.itzhai.dubbo.demo.DubboTestService/providers

  • /dubbo/com.itzhai.dubbo.demo.DubboTestService/configurators

  • /dubbo/com.itzhai.dubbo.demo.DubboTestService/routers

完整的监听节点和处理逻辑如下图:

2.2.4 初始化DubboInvoker

在监听的同时,会拉取监听节点的数据。

然后调用notify(url, listener, urls)处理拉取到的数据。这里我们重点观察/dubbo/com.itzhai.dubbo.demo.DubboTestService/providers的处理流程,调用以下代码监听该节点:

1List<String> children = zkClient.addChildListener(path, zkListener);

同时返回的children(经过URLDecode解码处理),为引入的服务的所有提供者的url:

  • 0 = "dubbo://192.168.0.106:20810/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=40846&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668588840665&version=1.0.1"

  • 1 = "dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=40846&release=2.7.0&revision=1.0.1&side=provider&timestamp=1668588840598&version=1.0.1"

接着会调用org.apache.dubbo.registry.support.FailbackRegistry#notify方法,跟踪notify方法,最终会调用到这里:AbstractRegistry#notify(URL, NotifyListener, List<URL>),关键的处理逻辑如下:

 1// 先把urls转成result这个Map
2Map<String, List<URL>> result = new HashMap<>();
3for (URL u : urls) {
4    if (UrlUtils.isMatch(url, u)) {
5        String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
6        List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
7        categoryList.add(u);
8    }
9}
10if (result.size() == 0) {
11    return;
12}
13Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
14// 遍历处理map
15for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
16    String category = entry.getKey();
17    List<URL> categoryList = entry.getValue();
18    categoryNotified.put(category, categoryList);
19    listener.notify(categoryList);
20    // We will update our cache file after each notification.
21    // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
22    saveProperties(url);
23}

这里会先把传进来的urls转换为Map:

image-20221116170957418

接着遍历map进行处理,遍历的过程中,调用listener.notify(categoryList)进行处理,这里的listener为RegistryDirectory,categoryList即为两个服务提供者的URL,最终会调用到org.apache.dubbo.registry.integration.RegistryDirectory#notify

 1public synchronized void notify(List<URL> urls) {
2    Map<String, List<URL>> categoryUrls = urls.stream()
3            .filter(Objects::nonNull)
4            .filter(this::isValidCategory)
5            .filter(this::isNotCompatibleFor26x)
6            .collect(Collectors.groupingBy(url -> {
7                if (UrlUtils.isConfigurator(url)) {
8                    return CONFIGURATORS_CATEGORY;
9                } else if (UrlUtils.isRoute(url)) {
10                    return ROUTERS_CATEGORY;
11                } else if (UrlUtils.isProvider(url)) {
12                    return PROVIDERS_CATEGORY;
13                }
14                return "";
15            }));
16
17    // 获取动态配置的URL,然后生成configurators,赋值给configurators
18    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
19    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
20
21    // 获取老版本路由的URL,然后生成Router,并调用addRouters添加到路由链中
22    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
23    toRouters(routerURLs).ifPresent(this::addRouters);
24
25    // 获取服务提供者的URL,进一步处理,生成DubboInvoker
26    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
27    refreshOverrideAndInvoker(providerURLs);
28}

这里主要是根据传入的url,获取到需要的信息进行加工,最后存储到RegistryDirectory中:

  • 获取动态配置URL,并且生成configurators;

  • 获取老版本的路由URL,生成Router,添加到路由链中;

  • 获取服务提供者的URL,进一步处理,生成DubboInvoker。

以上这些结果最终都存储到RegistryDirectory中。

我们这里重点跟踪生成DubboInvoker的逻辑,当传入的urls为服务提供者的URL时,会执行以下两行代码:

1// 获取服务提供者的URL,进一步处理,生成DubboInvoker
2List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
3refreshOverrideAndInvoker(providerURLs);

生成DubboInvoker的核心方法是refreshOverrideAndInvoker:

1private void refreshOverrideAndInvoker(List<URL> urls) {
2    // mock zookeeper://xxx?mock=return null
3    // 这里会拿到应用动态配置,重写dubbo服务URL,得到最终的URL
4    overrideDirectoryUrl();
5    // 生成DubboInvoker的方法
6    refreshInvoker(urls);
7}

最关键的就是这个refreshInvoker方法,跟踪这个方法,其中最核心的一行代码:

1Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

继续跟踪该方法,最终会调用到这行代码生成invoker:

1invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);

这里基于Dubbo SPI,最终调用到的是DubboProtocol.refer方法,同时,由于在dubbo-rpci-api模块中,配置了Dubbo SPI的Wrapper类:

1filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
2listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
3mock=org.apache.dubbo.rpc.support.MockProtocol

所以,这里的执行顺序是:

  • ProtocolListenerWrapper.refer:该Wrapper类中判断到URL如果是dubbo://开通的,会给DubboProtocol对应的invoker添加监听器,包装成ListenerInvokerWrapper类,用于处理refer结果,这里可以进行扩展,标签路由器就是在这里设置了监听器,处理完refer之后,进行初始化的

  • ProtocolFilterWrapper.refer:这个Wrapper会给dubbo protocol的url对应的invoker添加Filter过滤器,我们给dubbo添加的自定义过滤器就是在这里集成进来的

  • DubboProtocol.refer:生成DubboInvoker的类。

其中DubboProtocol.refer会调用到父类AbstractProtocol的refer方法

1@Override
2public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
3    // 包装一个异步转同步的Invoker
4      // type是接口
5    // url是服务地址
6    return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
7}

所以,最终生成DubboInvoker的就是DubboProtocol#protocolBindingRefer这个方法了:

 1@Override
2public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
3
4    optimizeSerialization(url);
5
6        // create rpc invoker.
7    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
8    invokers.add(invoker);
9
10    return invoker;
11}

可以发现在DubboInvoker中维护了和服务提供者的socket连接。

2.3. 构建传输层链路与建立连接

在Dubbo中,数据处理分为了数据交互层和数据传输层:

接下来看看这个连接是怎么建立的,跟踪getClients方法,发现最终会调用到DubboProtocol#initClient方法:

 1private ExchangeClient initClient(URL url) {
2
3    // client type setting.
4    String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
5
6    url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
7    // enable heartbeat by default
8    url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
9
10    // BIO is not allowed since it has severe performance issue.
11    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
12        throw new RpcException("Unsupported client type: " + str + "," +
13                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
14    }
15
16    ExchangeClient client;
17    try {
18        // connection should be lazy
19        if (url.getParameter(LAZY_CONNECT_KEY, false)) {
20            client = new LazyConnectExchangeClient(url, requestHandler);
21
22        } else {
23            client = Exchangers.connect(url, requestHandler);
24        }
25
26    } catch (RemotingException e) {
27        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
28    }
29
30    return client;
31}

最核心的一行代码为:

1client = Exchangers.connect(url, requestHandler);

继续跟踪该方法:

 1public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
2    if (url == null) {
3        throw new IllegalArgumentException("url == null");
4    }
5    if (handler == null) {
6        throw new IllegalArgumentException("handler == null");
7    }
8    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
9    return getExchanger(url).connect(url, handler);
10}

最终会调用getExchanger得到一个Exchanger实现去创建连接,这里基于SPI扩展机制,会获取到HeaderExchanger:

1public static Exchanger getExchanger(URL url) {
2    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
3    return getExchanger(type);
4}
5
6public static Exchanger getExchanger(String type) {
7    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
8}

接着会调用HeaderExchanger的connect方法去创建连接,继续跟踪此方法:

1@Override
2public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
3    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
4}

这里会把传进来的handler(ExchangeHandlerAdapter)包装两层:HeaderExchangeHandler,DecodeHandler,然后使用Transporters.connect来创建Client连接对象:

 1public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
2    if (url == null) {
3        throw new IllegalArgumentException("url == null");
4    }
5    ChannelHandler handler;
6    if (handlers == null || handlers.length == 0) {
7        handler = new ChannelHandlerAdapter();
8    } else if (handlers.length == 1) {
9        handler = handlers[0];
10    } else {
11        handler = new ChannelHandlerDispatcher(handlers);
12    }
13    return getTransporter().connect(url, handler);
14}
15
16public static Transporter getTransporter() {
17    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
18}

最后再包装成HeaderExchangeClient,HeaderExchangeClient中会把Client对象包装成HeaderExchangeChannel对象,同时开启重连任务和心跳检测任务:

 1public HeaderExchangeClient(Client client, boolean startTimer) {
2    Assert.notNull(client, "Client can't be null");
3    this.client = client;
4    this.channel = new HeaderExchangeChannel(client);
5
6    if (startTimer) {
7        URL url = client.getUrl();
8        startReconnectTask(url);
9        startHeartBeatTask(url);
10    }
11}

在Transporters.connect方法中最终基于Dubbo SPI机制,获取到传输层的实现类去connect,默认的传输层实现类为:NettyTransporter:

1@Override
2public Client connect(URL url, ChannelHandler listener) throws RemotingException {
3    return new NettyClient(url, listener);
4}

继续调用new NettyClient:

1public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
2    // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
3    // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
4    super(url, wrapChannelHandler(url, handler));
5}

这里会调用wrapChannelHandler继续把handler包装多几层:

1protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
2    // 先通过调用ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
3    // 把handler包装成AllChannelHandler
4    // 然后继续把AllChannelHandler包装成HeartbeatHandler,HeartbeatHandler包装成MultiMessageHandler
5    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
6            .getAdaptiveExtension().dispatch(handler, url)));
7}

最终包装成如下几层:

MultiMessageHandler-->HeartbeatHandler-->AllChannelHandler

最终会调用到NettyClient的doOpen()方法创建连接:

 1@Override
2protected void doOpen() throws Throwable {
3    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
4    bootstrap = new Bootstrap();
5    bootstrap.group(nioEventLoopGroup)
6            .option(ChannelOption.SO_KEEPALIVE, true)
7            .option(ChannelOption.TCP_NODELAY, true)
8            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
9            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
10            .channel(NioSocketChannel.class);
11
12    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
13    bootstrap.handler(new ChannelInitializer() {
14
15        @Override
16        protected void initChannel(Channel ch) throws Exception {
17            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
18            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
19            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
20                    .addLast("decoder", adapter.getDecoder())
21                    .addLast("encoder", adapter.getEncoder())
22                    .addLast("client-idle-handler"new IdleStateHandler(heartbeatInterval, 00, MILLISECONDS))
23                    .addLast("handler", nettyClientHandler);
24            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
25            if(socksProxyHost != null) {
26                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
27                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
28                ch.pipeline().addFirst(socks5ProxyHandler);
29            }
30        }
31    });
32}

这里就是我们熟悉的Netty创建客户端连接的代码了。至此,服务引入的主线流程执行完毕。

传输层构建的链路如下:

可以发现,基于Dubbo SPI机制,数据传输层可以灵活切换到Netty、Mina等不同的网络框架。

最终主线流程构造的完整处理链路如下:

在服务调用章节,会为你详细介绍每个类的作用。在这之前,你需要了解服务引用都创建了什么样的东西。

2.4. 服务引入总结

服务引入流程主要做如下事情:

  • 扫描项目识别@Reference注入点;

  • 获取需要注入的对象:

  • 获取到@Reference注解的信息,封装成ReferenceBean;

  • 最终执行ReferenceBean的refer方法执行服务引入:

    • 为引入的服务创建动态服务目录;

    • 构造路由链,存储到动态服务目录中;

    • 监听动态配置节点,以及服务提供者节点,并且先从注册中心拉取节点配置;

    • 获取到服务提供者的URL之后,最终包装成DubboInvoker;

    • 为DubboInvoker包装异步转同步类,以及过滤器链处理类;

    • 最后为该DubboInvoker构造传输层链路,以及创建底层网络连接;

  • 通过反射进行注入。


可以发现,Dubbo的整个服务引入流程还是挺复杂的,很多的SPI扩展机制,增加了扩展性,但是跟踪代码的时候也会更难跟踪。

整个服务引入的流程,就是构造以上的请求链路,以及创建好socket连接。

3. Dubbo服务调用

在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在Java架构杂谈公众号发送SPI获取文章链接。

3.1. 消费端调用请求

基于对服务引入源码的分析,我们得出了如下的对象处理链:

我们可以得到消费端调用请求流程,如下图所示:

红色箭头方向为主流程,接下来详细介绍每个步骤执行的逻辑。

3.1.1 MockClusterInvoker

这层主要用于处理mock,以及系统降级相关逻辑。

相关功能参考官网:本地伪装[2]

3.1.2 RegistryAwareClusterInvoker

如果有两个注册中心,则有这个Invoker,处理注册中心对应的Invoker,校验注册中心是否可用。

3.1.3 FailoverClusterInvoker

服务容错相关逻辑。

3.1.3.1 RouterChain

服务路由,获取到所有的服务提供者,并且通过路由链进行路由。

3.1.3.2 LoadBalance

使用负载均衡策略选择一个Invoker,最终得到的是一个InvokerWrapper。接着调用其invoke方法。

3.1.4 ListenerInvokerWrapper

接着会调用到ListenerInvokerWrapper的invoke方法。

3.1.5 CallbackRegistrationInvoker

接着执行Filter相关逻辑。

3.1.5.1 ConsumerContextFilter

设置Rpc的Context上下文属性。

3.1.5.2 FutureFilter

实现事件通知相关功能,可以借此完成一些通知功能,在调用方法之前,调用方法之后,出现异常时,会触发对应的事件。

3.1.5.3 MonitorFilter

实现Dubbo监控相关逻辑。

3.1.6 AsyncToSyncInvoker

异步转同步处理逻辑。

3.1.6.1 同步转异步逻辑

代码逻辑如下:

 1@Override
2public Result invoke(Invocation invocation) throws RpcException {
3    Result asyncResult = invoker.invoke(invocation); 
4    try {
5        // 如果invocation指定为同步调用,则执行异步转同步逻辑
6        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
7            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
8        }
9    ...
10    return asyncResult;
11}

此方法最终会返回AsyncRpcResult异步结果,异步转同步逻辑主要就是在这里调用以下方法一直等待处理结果:

1asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);

继续跟进invoker.invoke(invocation)看看内部是如何进行异步调用的,定位到DubboInvoker#doInvoke方法:

 1@Override
2protected Result doInvoke(final Invocation invocation) throws Throwable {
3    ...
4    try {
5        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
6        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
7        if (isOneway) {
8            ...
9        } else {
10              // 创建一个AsyncRpcResult,承载执行结果
11            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
12              // 异步调用 client的request方法,返回CompletableFuture对象
13            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
14            // AsyncRpcResult对象订阅responseFuture,当responseFuture完成之后,会调用AsyncRpcResult中的方法
15            asyncRpcResult.subscribeTo(responseFuture);
16            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
17            FutureContext.getContext().setCompatibleFuture(responseFuture);
18            return asyncRpcResult;
19        }
20    ...
21}

这里创建了一个AsyncRpcResult对象,然后调用currentClient.request(inv, timeout)方法返回一个CompletableFuture对象,最后调用asyncRpcResult.subscribeTo(responseFuture)让AsyncRpcResult对象订阅responseFuture,当responseFuture完成之后,会调用AsyncRpcResult中的方法。

最后跟进看看currentClient.request(inv, timeout)的逻辑,最终会调用到HeaderExchangeChannel的request方法:

 1@Override
2public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
3    if (closed) {
4        throw new RemotingException(this.getLocalAddress(), null"Failed to send request " + request + ", cause: The channel " + this + " is closed!");
5    }
6    // create request.
7    Request req = new Request();
8    req.setVersion(Version.getProtocolVersion());
9    req.setTwoWay(true);
10    req.setData(request);
11    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
12    try {
13        channel.send(req);
14    } catch (RemotingException e) {
15        future.cancel();
16        throw e;
17    }
18    return future;
19}

可以发现,这里最终创建了一个DefaultFuture对象,并且返回,在创建DefaultFuture对象的逻辑中,会把请求id跟DefaultFuture的关系,以及请求id跟channel的关系保存到map中:

1private DefaultFuture(Channel channel, Request request, int timeout) {
2    this.channel = channel;
3    this.request = request;
4    this.id = request.getId();
5    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
6    // put into waiting map.
7    FUTURES.put(id, this);
8    CHANNELS.put(id, channel);
9}

并且启动一个定时任务,用于判断调用是否超时:

1private static void timeoutCheck(DefaultFuture future) {
2    TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
3    future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
4}

当判断到超时之后,直接返回超时的响应结果。

3.1.6.2 异步响应结果处理

最后我们看看底层获取到响应结果之后,是如何依次通知到 DefaultFuture --> CompletableFuture --> AsyncRpcResult的。

HeaderExchangeHandler#received方法调用HeaderExchangeHandler#handleResponse,然后继续调用DefaultFuture#received(Channel, Response),最后调用到会调DefaultFuture#received(Channel, Response, boolean)方法,接收数据,获取到异步结果:

 1public static void received(Channel channel, Response response, boolean timeout) {
2    try {
3        // 获取到responseId,根据responseId从map中获取到DefaultFuture对象
4        DefaultFuture future = FUTURES.remove(response.getId());
5        if (future != null) {
6            Timeout t = future.timeoutCheckTask;
7            if (!timeout) {
8                // decrease Time
9                t.cancel();
10            }
11            // 最终完成DefaultFuture对象
12            future.doReceived(response);
13        } else {
14            logger.warn("The timeout response finally returned at "
15                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
16                    + ", response " + response
17                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
18                    + " -> " + channel.getRemoteAddress()));
19        }
20    } finally {
21        CHANNELS.remove(response.getId());
22    }
23}

最终根据响应id从map中找到DefaultFuture,并且把响应结果赋值给它,最终触发执行AsyncRpcResult的相关方法,最后触发了以下代码从阻塞处返回:

1asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);

即完成了把异步转换为同步的处理工作。

3.1.7 DubboInvoker

会调用其org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke方法,完成具体的方法调用。

此方法会从可用的clients中选择一个,然后执行异步调用,相关代码上面已经说明过了。

3.1.8 HeaderExchangeClient

调用ExchangeClient数据交换层的方法发送数据,该方法最终会调用到数据传输层的具体方法区发送数据,如org.apache.dubbo.remoting.transport.netty4.NettyChannel#send。

3.2. 提供端处理请求

基于服务导出的源码分析,我们得到了如下的对象处理链:

以及传输层链路:

为此,我们可以得到如下的提供端的请求处理流程:

接下来,我们就看看每一步都是做什么事情的。

3.2.1 NettyServerHandler#channelRead

接收数据,传给下层handler继续处理。

3.2.2 MultiMessageHandler#received

如果接收到的消息是MultiMessage,则进行转换遍历处理。

3.2.3 HeartbeatHandler#received

记录读取数据时间戳;

判断是否心跳请求,如果是心跳请求,则直接响应心跳请求,会把requestId响应回去,用户区分是哪个请求

如果是心跳响应,则不用处理,直接返回。

否则继续往下执行。

3.2.4 AllChannelHandler#received

跟Dubbo的线程模型[3]有关。

如果Dispatcher使用的是all策略(默认策略),则会执行到这里,最终会把请求封装成ChannelEventRunnalbe交给线程池处理,IO线程的任务到此结束。

如果Dubbo Dispatcher使用的是direct策略,则不会执行这个handler。

3.2.5 DecodeHandler#received

如果传入的消息实现了Decodeable接口,则调用接口的decode实现进行decode,提供了一种自定义decode的扩展机制。

3.2.6 HeaderExchangeHandler#handleRequest

判断是否双向通信。

构造Response对象,最终调用ExchangeHandlerAdapter的reply方法,返回一个CompletionStage:

1CompletionStage<Object> future = handler.reply(channel, msg);

拿到CompletionStage对象后:

  • 如果是同步调用,则直接拿到结果,并发送到channel中去

  • 如果是异步调用,则监听直到拿到服务执行结果,然后发送到channel中去

最终把结果封装为AppResponse对象,包括异常也封装成该对象(HeaderExchangeHandler#handleRequest):

 1...
2Object msg = req.getData();
3try {
4    CompletionStage<Object> future = handler.reply(channel, msg);
5
6    future.whenComplete((appResult, t) -> {
7        try {
8            if (t == null) {
9                res.setStatus(Response.OK);
10                res.setResult(appResult);
11            } else {
12                res.setStatus(Response.SERVICE_ERROR);
13                res.setErrorMessage(StringUtils.toString(t));
14            }
15            channel.send(res);
16        } catch (RemotingException e) {
17            logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
18        } finally {
19            // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
20        }
21    });
22catch (Throwable e) {
23    res.setStatus(Response.SERVICE_ERROR);
24    res.setErrorMessage(StringUtils.toString(e));
25    channel.send(res);
26}

可以发现,最终获取到异步调用的结果之后,再写到Channel中。

底层会封装结果为CompletableFuture:

org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke

 1Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
2
3
4CompletableFuture<Object> future = wrapWithFuture(value, invocation);
5
6
7AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
8
9
10future.whenComplete((obj, t) -> {
11
12    AppResponse result = new AppResponse();
13    if (t != null) {
14        if (t instanceof CompletionException) {
15            result.setException(t.getCause());
16        } else {
17            result.setException(t);
18        }
19    } else {
20        result.setValue(obj);
21    }
22    asyncRpcResult.complete(result);
23});
24
25return asyncRpcResult;

3.2.7 ExchangeHandlerAdapter#reply

根据请求对象拿到一个Invoker,最终执行Invoker得到结果。

实际调用的方法可以分为两种情况:

  • 同步执行:

  • 1public String test() {...}
  • 异步执行:

  • java<br />public CompletableFuture test() {…}<br />

最终该方法返回一个CompletionStage对象,给到上层处理。

底层执行方法的时候,不管是同步还是异步返回,都统一转换为CompletableFuture:

org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#wrapWithFuture

1private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) {
2    if (RpcContext.getContext().isAsyncStarted()) {
3        return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
4    } else if (value instanceof CompletableFuture) {
5        return (CompletableFuture<Object>) value;
6    }
7    return CompletableFuture.completedFuture(value);
8}

可以发现,为了实现异步调用的功能,Dubbo做了一些比较绕的对象转换逻辑,如下图所示:

梳理清楚了这个处理流程,也就很容易理解为啥可以实现跨服务的异步调用了。

3.2.8 DubboExporter

3.2.8.1 CallbackRegistrationInvoker

执行过滤器链。

EchoFilter

处理Dubbo的回声检测。回声检测相关说明参考官方文档:回声测试[1]

如果判断到是$echo方法,则直接构造一个结果返回:

1@Override
2public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
3    if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
4        return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
5    }
6    return invoker.invoke(inv);
7}
ClassLoaderFilter

用于设置类加载器。

GenericFilter

用于处理泛化服务。

ContextFilter

设置Dubbo ThreadLocal(RpcContext)相关的属性。

TraceFilter

处理dubbo trace命令相关的Filter。

TimeoutFilter

用于处理服务端的超时。

MonitorFilter

实现监控相关功能。

ExceptionFIlter

处理服务异常信息,转换成消费端能够识别的异常。

3.2.8.2 DelegateProviderMetaDataInvoker

继续委托给下层进行处理。

3.2.8.3 AbstractProxyInvoker

通过AbstractProxyInvoker执行最终的服务的方法,不管底层是同步调用还是异步调用,统一包装成异步RPC结果AsyncRpcResult返回。


至此,Dubbo源码解析完成,完整服务导出和引入的源码主线流程图如下:

服务调用处理流程图如下:

获取高清大图,在Java架构杂谈公众号回复“Dubbo”即可。

References

[1]: 回声测试. Retrieved from https://dubbo.apache.org/zh/docs/advanced/echo-service/

[2]: 本地伪装. Retrieved from https://dubbo.apache.org/zh/docs/advanced/local-mock/

[3]: 线程模型. Retrieved from https://dubbo.apache.org/zh/docs/v2.7/user/examples/thread-model/

 

点赞收藏
分类:标签:
arthinking
请先登录,查看4条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步
8
4