你为什么吃不透Dubbo源码?这样看既不浪费时间又高效原创
本文基于Dubbo 2.7.5源码。
为什么这么说呢?
如果你是一个面试官,你招聘的岗位也不需要改造源码,应聘者也没有说自己熟悉dubbo源码,那面试的过程中也不用去深挖这些源码了,而应该更多的考察应聘者的软件设计思维,编码能力,工作经验是否匹配等。否则你招进来的可能是一个背诵八股文能力强,但是动手能力差的人,浪费了自己的招聘时间。
对于一个求职者,你的求职时间有限,应该是以尽快找到工作为目标,可以针对性的准备。详细研究这些源码细节,反而会吞噬你大量时间,耽误了你宝贵的求职时间。
如果你被迫加班划水,说明加班看技术也并不是你的本意,何不尝试改变一下工作环境呢?
看到这里的朋友,你一定就是万里挑一的那个靓仔,接下来我们开始正文。
首先我们来聊一个大家经常会碰到的问题:
1.为什么你吃不透Dubbo源码?
相关不少朋友会有这个经历:看dubbo源码,看到一半就看不下去了,然后过了几周就忘记之前自己看了啥,再后来,就再也不想继续看了。最后对外宣称:“我研究过Dubbo源码!”。
诚然,这样的状态,再背几道八股文面试题,应付面试足以。但是我相信阅读到这里的朋友绝对不想这样欺骗自己。
一般来说,以下几个原因会导致你阅读Dubbo源码不畅:
-
对Dubbo SPI的机制不了解,导致在阅读的过程中,不知道要跟哪个类去看代码了;
-
阅读顺序不对,没有先阅读服务导出和服务引入的代码,直接看服务调用的代码,导致不知道执行流程;
-
以debug的方式阅读代码,导致跟着跟着就陷入到细枝末节里面去了,到最后不想再看了;
-
本身对Dubbo很多功能都不了解,决定看源码才香,一上来就看代码,本末倒置,最终导致不知道代码是实现什么功能的…
对于Dubbo源码,首先,你需要先能在项目中引入Dubbo,以及能够使用基本功能,基于这个基础之上,正确的阅读顺序应该是这样的:
-
Dubbo SPI;
-
Dubbo服务导出
-
Dubbo服务引入;
-
Dubbo服务调用流程
最重要的就是在阅读过程中抓住主线流程。接下来我们就详细研究Dubbo源码。(获取高清源码流程图,在Java架构杂谈
公众号回复“Dubbo”即可。)
Dubbo 3.0已经发布有一段时间了了,并且提供了强大的功能,如多语言支持,双向流,基于HTTP/2,放弃私有协议,优化服务发现模型,统一流量治理模型等等。这值得我们研究其应用场景和源码。不过,在此之前,我们先得对Dubb 2的源码有所了解,以方便更顺畅的阅读Dubbo 3.0新特性相关代码。
本文就是一篇带您深入了解Dubbo源码的文章,看完包懂,让你对Dubbo源码改造顺手捏来。如果你的工作并不需要改造dubbo源码,就没必要死记硬背这些源码的详细调用流程了,如果有改造,也不用死记硬背,先熟悉了流程,在改造dubbo源码的过程中就会更加熟悉了。否则,就变成了背八股文了。
2. Dubbo服务导出
在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在
Java架构杂谈
公众号发送SPI
获取文章链接。
本文会使用到以下的例子辅助解读Dubbo源码:
假设有如下系统:
-
消费者:dubbo-consumer-app
-
调用的接口:com.itzhai.dubbo.demo.DubboTestService.playGame
-
提供者:dubbo-provider-app
如下图所示:
在SpringBoot项目中,我们要启用Dubbo,一般会在启动类中配置:
1@EnableDubbo(scanBasePackages = {"com.itzhai.dubbo.demo"})
@EnableDubbo注解:
1@Target({ElementType.TYPE})
2@Retention(RetentionPolicy.RUNTIME)
3@Inherited
4@Documented
5@EnableDubboConfig
6@DubboComponentScan
7public @interface EnableDubbo {
8 ...
9}
其中最重要的两个注解为@EnableDubboConfig
以及@DubboComponentScan
。
-
@EnableDubboConfig
是用于解析生成DubboConfig配置类的; -
@DubboComponentScan
是用于扫描Dubbo服务以及Dubbo服务引用的。
2.1. 解析生成DubboConfig配置类
我们继续查看@EnableDubboConfig注解:
1@Target({ElementType.TYPE})
2@Retention(RetentionPolicy.RUNTIME)
3@Inherited
4@Documented
5@Import(DubboConfigConfigurationRegistrar.class)
6public @interface EnableDubboConfig {
7
8 /**
9 * It indicates whether binding to multiple Spring Beans.
10 *
11 * @return the default value is <code>false</code>
12 * @revised 2.5.9
13 */
14 boolean multiple() default true;
15
16}
这里引入了DubboConfigConfigurationRegistrar
类,这个是ImportBeanDefinitionRegistrar
接口的实现类。ImportBeanDefinitionRegistrar
是Spring中用于注册BeanDefinition的扩展接口,实现了该接口的类被@Import注解引入之后,会触发其registerBeanDefinitions
方法,然后生成BeanDefiniton对象,并注册到BeanDefinitionRegistry中,为实例化Bean做好准备。
下面看看DubboConfigConfigurationRegistrar
的registerBeanDefinitions
方法:
1@Override
2public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
3 AnnotationAttributes attributes = AnnotationAttributes.fromMap(
4 importingClassMetadata.getAnnotationAttributes(EnableDubboConfig.class.getName()));
5
6 boolean multiple = attributes.getBoolean("multiple");
7
8 // Single Config Bindings
9 registerBeans(registry, DubboConfigConfiguration.Single.class);
10
11 if (multiple) { // Since 2.6.6 https://github.com/apache/dubbo/issues/3193
12 registerBeans(registry, DubboConfigConfiguration.Multiple.class);
13 }
14}
其中multiple默认为true,所以默认的会进入到
1registerBeans(registry, DubboConfigConfiguration.Multiple.class);
这个方法中。跟踪registerBeans方法,可得知,是通过AnnotatedBeanDefinitionReader
这个类来读取DubboConfigConfiguration.Multiple.class
上面的注解的。DubboConfigConfiguration.Multiple类如下:
1/**
2 * Multiple Dubbo {@link AbstractConfig Config} Bean Binding
3 */
4@EnableDubboConfigBindings({
5 @EnableDubboConfigBinding(prefix = "dubbo.applications", type = ApplicationConfig.class, multiple = true),
6 @EnableDubboConfigBinding(prefix = "dubbo.modules", type = ModuleConfig.class, multiple = true),
7 @EnableDubboConfigBinding(prefix = "dubbo.registries", type = RegistryConfig.class, multiple = true),
8 @EnableDubboConfigBinding(prefix = "dubbo.protocols", type = ProtocolConfig.class, multiple = true),
9 @EnableDubboConfigBinding(prefix = "dubbo.monitors", type = MonitorConfig.class, multiple = true),
10 @EnableDubboConfigBinding(prefix = "dubbo.providers", type = ProviderConfig.class, multiple = true),
11 @EnableDubboConfigBinding(prefix = "dubbo.consumers", type = ConsumerConfig.class, multiple = true),
12 @EnableDubboConfigBinding(prefix = "dubbo.config-centers", type = ConfigCenterBean.class, multiple = true),
13 @EnableDubboConfigBinding(prefix = "dubbo.metadata-reports", type = MetadataReportConfig.class, multiple = true),
14 @EnableDubboConfigBinding(prefix = "dubbo.metricses", type = MetricsConfig.class, multiple = true)
15})
16public static class Multiple {
17
18}
这里又用到了EnableDubboConfigBinding
注解:
1@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
2@Retention(RetentionPolicy.RUNTIME)
3@Documented
4@Repeatable(EnableDubboConfigBindings.class)
5@Import(DubboConfigBindingRegistrar.class)
6public @interface EnableDubboConfigBinding {
7 ...
8}
可以发现,最终是通过DubboConfigBindingRegistrar
这个类来处理Multiple上面的注解信息的,这个类也是一个ImportBeanDefinitionRegistrar
类。接下来看看其registerBeanDefinitions
方法:
1@Override
2public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
3 AnnotationAttributes attributes = AnnotationAttributes.fromMap(
4 importingClassMetadata.getAnnotationAttributes(EnableDubboConfigBinding.class.getName()));
5
6 registerBeanDefinitions(attributes, registry);
7
8}
9
10protected void registerBeanDefinitions(AnnotationAttributes attributes, BeanDefinitionRegistry registry) {
11 String prefix = environment.resolvePlaceholders(attributes.getString("prefix"));
12
13 Class<? extends AbstractConfig> configClass = attributes.getClass("type");
14
15 boolean multiple = attributes.getBoolean("multiple");
16
17 registerDubboConfigBeans(prefix, configClass, multiple, registry);
18
19}
可以发现,最终会读取到每个@EnableDubboConfigBinding注解的prefix和type属性,传入registerDubboConfigBeans
进行处理。
registerDubboConfigBeans内部会根据prefix去Spring的environment中对应的配置项,找到之后,实例化成对应type的bean,注册到Spring中。比如:
dubbo.application前缀的配置会生成一个ApplicationConfig的bean,dubbo.protocols的前缀,会生成ProtocolConfigd bean。
registerDubboConfigBeans
大致的实现思路如下:
每个@EnableDubboConfigBinding注解都会解析得到一个Config的BeanDefinition注册到Spring中,同时注册一个对应的DubboConfigBindingBeanPostProcessor后置处理器,该处理器内部会利用Spring的DataBinder技术,结合properties文件,为Config Bean对应的属性进行赋值。
具体的registerDubboConfigBeans
实现代码这里就不列出来了,感兴趣的自己跟踪阅读。
至此,Dubbo的Config类生成完毕。
2.2. 扫描Dubbo的@Service注解,生成ServiceBean
上面提到@EnableDubboConfig
注解会引入DubboComponentScan
。@DubboComponentScan
是用于扫描Dubbo服务以及Dubbo服务引用的,@DubboComponentScan
注解如下:
1@Target(ElementType.TYPE)
2@Retention(RetentionPolicy.RUNTIME)
3@Documented
4@Import(DubboComponentScanRegistrar.class)
5public @interface DubboComponentScan {
6 ...
7}
这里引入了DubboComponentScanRegistrar
,也是一个ImportBeanDefinitionRegistrar
接口实现类,继续看其registerBeanDefinitions
方法:
1@Override
2public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
3 // 获取DubboComponentScan注解配置的扫描路径
4 Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
5
6 // 处理服务导出相关逻辑
7 registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);
8
9 // 处理服务引入相关逻辑
10 registerReferenceAnnotationBeanPostProcessor(registry);
11
12}
其中最主要的两个方法:
-
registerServiceAnnotationBeanPostProcessor
: -
注册ServiceAnnotationBeanPostProcessor(是一个BeanDefinitionRegistryPostProcessor接口实现类)
-
在Spring启动时会调用postProcessBeanDefinitionRegistry方法扫描添加了@Service注解了的类
-
然后生成BeanDefinition(会生成两个,一个普通的bean,一个ServiceBean),后续的Spring周期中会生成Bean
-
在ServiceBean中会监听ContextRefreshedEvent事件,一旦Spring启动完后,就会发出事件触发进行服务导出;
-
registerReferenceAnnotationBeanPostProcessor
: -
注册ReferenceAnnotationBeanPostProcessor(是一个AnnotationInjectedBeanPostProcessor抽象类子类);
-
在Spring启动时,会通过该类扫描服务引用@Reference的注入点,生成ReferenceBean,然后生成具体的代理类注入到对应的字段中。
本节我们看第一个方法,服务导出相关逻辑。
接下来继续跟踪ServiceAnnotationBeanPostProcessor
的postProcessBeanDefinitionRegistry
方法:
1@Override
2public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
3
4 Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
5
6 if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
7 registerServiceBeans(resolvedPackagesToScan, registry);
8 } else {
9 if (logger.isWarnEnabled()) {
10 logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
11 }
12 }
13
14}
关键方法为registerServiceBeans,该方法扫描并且注册ServiceBean,具体代码如下:
1private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
2
3 // 创建扫描类
4 DubboClassPathBeanDefinitionScanner scanner =
5 new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
6
7 BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
8
9 scanner.setBeanNameGenerator(beanNameGenerator);
10
11 // 设置需要扫描的注解,这里只会扫描Dubbo的@Service注解
12 scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));
13
14 /**
15 * Add the compatibility for legacy Dubbo's @Service
16 *
17 * The issue : https://github.com/apache/dubbo/issues/4330
18 * @since 2.7.3
19 */
20 scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));
21
22 for (String packageToScan : packagesToScan) {
23
24 // Registers @Service Bean first
25 scanner.scan(packageToScan);
26
27 // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
28 Set<BeanDefinitionHolder> beanDefinitionHolders =
29 findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
30
31 if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
32
33 // 处理扫描到的BeanDefinition
34 for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
35 registerServiceBean(beanDefinitionHolder, registry, scanner);
36 }
37
38 if (logger.isInfoEnabled()) {
39 logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
40 beanDefinitionHolders +
41 " } were scanned under package[" + packageToScan + "]");
42 }
43
44 } else {
45
46 if (logger.isWarnEnabled()) {
47 logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
48 + packageToScan + "]");
49 }
50
51 }
52
53 }
54
55}
这里的逻辑比较简单,主要是通过DubboClassPathBeanDefinitionScanner
扫描器扫描标注有Dubbo的@Service注解的类,扫描得到相关的BeanDefinition,然后遍历处理每个BeanDefinition,处理方法:ServiceAnnotationBeanPostProcessor#registerServiceBean
。接下来看看ServiceAnnotationBeanPostProcessor#registerServiceBean
这个方法:
1private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
2 DubboClassPathBeanDefinitionScanner scanner) {
3 // 获取到服务实现类
4 Class<?> beanClass = resolveClass(beanDefinitionHolder);
5 // 获取服务实现类上面的@Service注解
6 Annotation service = findServiceAnnotation(beanClass);
7
8 /**
9 * The {@link AnnotationAttributes} of @Service annotation
10 */
11 // 获取@Service注解上的信息
12 AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);
13
14 // 获取服务接口
15 Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
16 // 获取服务实现类bean名称
17 String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
18
19 // 生成服务的ServiceBeanDefinition
20 AbstractBeanDefinition serviceBeanDefinition =
21 buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);
22
23 // 生成服务对应的Bean名称
24 String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);
25
26 if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
27
28 // ServiceBean注册到Spring容器中,beanName的格式:ServiceBean:org.apache.dubbo.demo.DemoService:1.0.1:itzhai
29 registry.registerBeanDefinition(beanName, serviceBeanDefinition);
30
31 if (logger.isInfoEnabled()) {
32 logger.info("The BeanDefinition[" + serviceBeanDefinition +
33 "] of ServiceBean has been registered with name : " + beanName);
34 }
35
36 } else {
37
38 if (logger.isWarnEnabled()) {
39 logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
40 "] of ServiceBean[ bean name : " + beanName +
41 "] was be found , Did @DubboComponentScan scan to same package in many times?");
42 }
43
44 }
45
46}
这段代码主要逻辑:
-
获取到服务实现类,以及服务实现类的注解信息,以及服务实现类的接口;
-
为服务实现类生成一个ServiceBean的Definition,ServiceBean的名称格式为:ServiceBean:org.apache.dubbo.demo.DemoService:1.0.1:itzhai;
-
把ServiceBeanDefinition注册到Spring容器中。
注意,在生成ServiceBean的Definition方法中,会把服务实现类的bean赋值给ServiceBean的ref属性:
1private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,
2 AnnotationAttributes serviceAnnotationAttributes,
3 Class<?> interfaceClass,
4 String annotatedServiceBeanName) {
5 ...
6 addPropertyReference(builder, "ref", annotatedServiceBeanName);
7 ...
8}
基于ServiceBeanDefinition生成的bean结构如下图所示:
其中SpringBean即为服务实现类的Bean,ServiceBean为Dubbo服务的Bean,具体的服务导出逻辑就封装在ServiceBean中。
在Spring启动完成之后,会触发每个ServiceBean的onApplicationEvent方法,此方法会调用ServiceBean的export()方法,执行服务导出。
接下来就详细看看服务导出的实现逻辑。
2.3. ServiceBean服务导出流程
ServiceBean.export()方法如下:
1@Override
2public void export() {
3 super.export();
4 // 发布ServiceBeanExportedEvent事件
5 publishExportEvent();
6}
继续跟踪super.export()方法:
1public synchronized void export() {
2 // 读取配置
3 checkAndUpdateSubConfigs();
4
5 // 判断服务是否需要导出
6 if (!shouldExport()) {
7 return;
8 }
9
10 // 判断是否需要延迟发布
11 if (shouldDelay()) {
12 DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
13 } else {
14 // 执行服务导出
15 doExport();
16 }
17}
这里有两个核心方法调用:
-
checkAndUpdateSubConfigs():读取配置;
-
doExport():执行服务导出;
接下来详细可靠这两个方法的逻辑。
2.3.1 读取配置
checkAndUpdateSubConfigs()方法的详情如下:
1public void checkAndUpdateSubConfigs() {
2 // Use default configs defined explicitly on global configs
3 // 补全ServiceConfig中的属性配置,ServiceConfig中值为空的属性,会从ProviderConfig、ModuleConfig、ApplicationConfig中获取
4 completeCompoundConfigs();
5
6 // Config Center should always being started first.
7 // 从配置中心获取配置,包括应用配置和全局配置,保存到
8 // Environment的externalConfigurationMap和appExternalConfigurationMap中
9 // 然后使用配置中心的配置覆盖刷新所有的Config类的属性(除了ServiceConfig)
10 startConfigCenter();
11
12 checkDefault();
13
14 checkProtocol();
15
16 checkApplication();
17
18 // if protocol is not injvm checkRegistry
19 if (!isOnlyInJvm()) {
20 checkRegistry();
21 }
22
23 // 使用配置中心的数据覆盖刷新ServiceConfig
24 this.refresh();
25
26 // 如果配了metadataReportConfig,则刷新配置
27 checkMetadataReport();
28
29 if (StringUtils.isEmpty(interfaceName)) {
30 throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
31 }
32
33 // 如果当前服务的实现类是GenericService接口的实例
34 if (ref instanceof GenericService) {
35 interfaceClass = GenericService.class;
36 if (StringUtils.isEmpty(generic)) {
37 generic = Boolean.TRUE.toString();
38 }
39 } else {
40 // 加载接口
41 try {
42 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
43 .getContextClassLoader());
44 } catch (ClassNotFoundException e) {
45 throw new IllegalStateException(e.getMessage(), e);
46 }
47 // 刷新MethodConfig
48 checkInterfaceAndMethods(interfaceClass, methods);
49 checkRef();
50 generic = Boolean.FALSE.toString();
51 }
52 ...
53 checkStubAndLocal(interfaceClass);
54 checkMock(interfaceClass);
55}
主要逻辑如下:
-
completeCompoundConfigs()
: -
补全ServiceConfig中的属性配置;
-
针对ServiceConfig中值为空的属性,会从ProviderConfig、ModuleConfig、ApplicationConfig中获取;
-
其中ProviderConfig、ModuleConfig、ApplicationConfig这些配置类Bean,在上面已经解析生成;
-
startConfigCenter()
: -
从配置中心获取配置,包括应用配置和全局配置,保存到Environment的externalConfigurationMap和appExternalConfigurationMap中。这一步中,如果配置了ConfigCenter,则会会去连接配置中心,将当前应用的配置和全局配置都拉取到本地。
-
然后使用配置中心的配置覆盖刷新所有的Config类的属性(除了ServiceConfig);
-
配置优先级关系图如下:
-
详细的代码可以自己进一步阅读;
-
this.refresh()
:刷新ServiceConfig,在这一步,已经把远程配置中心的配置数据拉取下来,此时会对使用这些配置数据刷新本地的ServiceConfig从注解解析出来的配置。
下面看看重要的服务导出方法。
2.3.2 执行服务导出
ServiceConfig#doExport
方法如下:
1protected synchronized void doExport() {
2 if (unexported) {
3 throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
4 }
5 if (exported) {
6 return;
7 }
8 exported = true;
9
10 if (StringUtils.isEmpty(path)) {
11 path = interfaceName;
12 }
13 doExportUrls();
14}
继续查看ServiceConfig#doExportUrls
:
1private void doExportUrls() {
2 // 根据注册中心配置生成所有的注册中心URL
3 List<URL> registryURLs = loadRegistries(true);
4
5 for (ProtocolConfig protocolConfig : protocols) {
6 String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
7 ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
8 ApplicationModel.initProviderModel(pathKey, providerModel);
9 // 服务导出的方法
10 doExportUrlsFor1Protocol(protocolConfig, registryURLs);
11 }
12}
重点的方法调用:ServiceConfig#doExportUrlsFor1Protocol
,继续查看该方法:
1/**
2 * @param protocolConfig 表示一个协议
3 * @param registryURLs 所有注册中心的URL
4 */
5private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
6 // 协议默认为dubbo
7 String name = protocolConfig.getName();
8 if (StringUtils.isEmpty(name)) {
9 name = DUBBO;
10 }
11
12 // 通过这个map收集服务url的参数
13 Map<String, String> map = new HashMap<String, String>();
14 map.put(SIDE_KEY, PROVIDER_SIDE);
15
16 appendRuntimeParameters(map);
17
18 // 监控相关参数
19 appendParameters(map, metrics);
20 // 应用相关参数
21 appendParameters(map, application);
22 ...
23
24 // 解析每个方法的参数配置,如:
25 // @Service(versions="1.0.1", grousp="itzhai", methods = {@Method(name = "playGame", timeout = 3000)})
26 // 解析出来之后,把这些参数拼接到Dubbo服务的导出URL上
27 if (CollectionUtils.isNotEmpty(methods)) {...}
28
29 if (ProtocolUtils.isGeneric(generic)) {...} else {...}
30
31 // token机制:https://dubbo.apache.org/en/docs/v2.7/user/examples/token-authorization/
32 if (!ConfigUtils.isEmpty(token)) {...}
33
34 // export service
35 String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
36 Integer port = this.findConfigedPorts(protocolConfig, name, map);
37 // 构造最终的要导出的服务url
38 URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
39 // 生成的URL:dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider×tamp=1668952315016&version=1.0.1
40
41 // 扩展点,可以通过ConfiguratorFactory,对服务url再次进行配置
42 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
43 .hasExtension(url.getProtocol())) {
44 url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
45 .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
46 }
47
48 // scope可能取值:null, remote, local, none
49 String scope = url.getParameter(SCOPE_KEY);
50 // don't export when none is configured
51 if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
52 // export to local if the config is not remote (export to remote only when config is remote)
53 if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
54 // scope != remote,则进行本地导出,url的protocol会改为injvm
55 exportLocal(url);
56 }
57 // export to remote if the config is not local (export to local only when config is local)
58 if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
59 // 远程导出逻辑
60 ...
61 }
62 }
63 this.urls.add(url);
64}
doExportUrlsFor1Protocol这个方法比较长,主要逻辑:
-
通过map收集服务url的参数;
-
解析单个方法的配置;
-
token机制参数处理;
-
构造要导出的服务的最终URL;
-
根据scope参数执行具体的服务导出逻辑;
根据不同的scope取值,走不同的逻辑:
我们先来看看远程导出逻辑。
1// scope != local, 则进行远程导出
2if (CollectionUtils.isNotEmpty(registryURLs)) {
3 // 如果配置了注册中心,则将服务注册到所有的注册中心
4 for (URL registryURL : registryURLs) {
5 //if protocol is only injvm ,not register
6 if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
7 continue;
8 }
9
10 // 设置是否动态服务参数,用于设置对应的zookeeper上的节点是否临时节点
11 url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
12
13 // 获取监控中心地址
14 URL monitorUrl = loadMonitor(registryURL);
15
16 // 设置监控中心地址
17 if (monitorUrl != null) {
18 url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
19 }
20
21 if (logger.isInfoEnabled()) {
22 if (url.getParameter(REGISTER_KEY, true)) {
23 logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
24 } else {
25 logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
26 }
27 }
28
29 // For providers, this is used to enable custom proxy to generate invoker,默认为javassist
30 String proxy = url.getParameter(PROXY_KEY);
31 if (StringUtils.isNotEmpty(proxy)) {
32 registryURL = registryURL.addParameter(PROXY_KEY, proxy);
33 }
34
35 // 生成ServiceBean对应服务的Invoker代理对象
36 // registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())生成的URL为:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider-app&dubbo=2.0.2&export=dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider×tamp=1668952315016&version=1.0.1&logger=log4j&pid=11035®istry=zookeeper&release=2.7.0&timeout=3000×tamp=1668952313811
37 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
38
39 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
40
41 // 通过特定协议来执行服务导出,得到Exporter对象,这里的协议为RegistryProtocol
42 Exporter<?> exporter = protocol.export(wrapperInvoker);
43 exporters.add(exporter);
44 }
45} else {
46 // 不用注册服务到注册中心,直接进行服务导出
47 if (logger.isInfoEnabled()) {
48 logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
49 }
50
51 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
52 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
53
54 // 导出的核心方法
55 Exporter<?> exporter = protocol.export(wrapperInvoker);
56 exporters.add(exporter);
57}
58
59/**
60 * @since 2.7.0
61 * ServiceData Store
62 */
63MetadataReportService metadataReportService = null;
64if ((metadataReportService = getMetadataReportService()) != null) {
65 metadataReportService.publishProvider(url);
66}
如果配置了注册中心,首先会把服务URL注册到所有的注册中心,在这之前会通过ProxyFactory生成一个服务的代理类Invoker:
1Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
其中registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())
会在注册中心的地址后面拼接上服务的URL地址,拼接的结果如下:
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
?application=dubbo-provider-app&dubbo=2.0.2&export
=dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService
?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider×tamp=1668952315016&version=1.0.1&logger=log4j&pid=11035®istry=zookeeper&release=2.7.0&timeout=3000×tamp=1668952313811
通过ProxyFactory创建如下代理类:
然后再把代理类包装成DelegateProviderMetaDataInvoker:
1DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this)
接着执行Exporter<?> exporter = protocol.export(wrapperInvoker)
进行服务导出。
这里基于以上的url和Dubbo SPI机制,可以推断用到的是RegistryProtocol#exort
方法,代码如下:
1@Override
2public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
3 // 源url:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?xxx®istry=zookeeper&xxx
4 // 转换为:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?xxx
5 URL registryUrl = getRegistryUrl(originInvoker);
6 // 得到服务提供者url,表示服务提供者:dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?xxx
7 URL providerUrl = getProviderUrl(originInvoker);
8
9 // Subscribe the override data
10 // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
11 // the same service. Because the subscribed is cached key with the name of the service, it causes the
12 // subscription information to cover.
13
14 // 得到overrideSubscribeUrl,表示需要监听的服务以及监听的类型为configurators,这个是老版本的动态配置监听url,生成的URL格式:providerUrl协议改为provider,再加上category=configurators&check=false
15 // provider://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider×tamp=1668952315016&version=1.0.1
16 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
17
18 // 创建overrideSubscribeUrl的监听器OverrideListener,用于监听overrideSubscribeUrl的变化
19 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
20 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
21
22
23 // 使用providerConfigurationListener和serviceConfigurationListener重写providerUrl
24 // dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider×tamp=1668952315016&version=1.0.1
25 providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
26
27 // export invoker
28 // 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了dd
29 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
30
31 // url to registry
32 final Registry registry = getRegistry(originInvoker);
33
34 // 简化providerUrl参数,得到存入到注册中心去的providerUrl
35 final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
36
37 // 把服务提供者Invoker、注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
38 ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
39 registryUrl, registeredProviderUrl);
40
41 //to judge if we need to delay publish
42 // 判断是否需要注册到注册中心
43 boolean register = providerUrl.getParameter(REGISTER_KEY, true);
44 if (register) {
45 // 注册简化后的服务URL到注册中心
46 register(registryUrl, registeredProviderUrl);
47 providerInvokerWrapper.setReg(true);
48 }
49
50 // 监听路径内容
51 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
52
53 exporter.setRegisterUrl(registeredProviderUrl);
54 exporter.setSubscribeUrl(overrideSubscribeUrl);
55 //Ensure that a new exporter instance is returned every time export
56 return new DestroyableExporter<>(exporter);
57}
这段代码主要处理以下逻辑:
-
getRegistryUrl(originInvoker)
:得到注册中心的url,这一步会把传进来的registry://xxx?xxx=xxx&registry=zookeeper
转换为:zookeeper://xxx?xxx=xxx
-
getProviderUrl(originInvoker)
:得到服务提供者的url,dubbo://xxx
; -
getRegistry(originInvoker)
:得到注册中心; -
exporter = doLocalExport(originInvoker, providerUrl)
:执行服务导出; -
register(registryUrl, registeredProviderUrl)
:把服务的URL注册到注册中心; -
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)
:监听zookeeper节点;
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())生成的URL为:
我们先来重点看看RegistryProtocol#doLocalExport
这个方法,这个方法是执行本地导出的。
2.3.2.1 服务本地导出
代码如下:
1private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
2 String key = getCacheKey(originInvoker);
3
4 return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
5 Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
6 return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
7 });
8}
这里最关键的导出方法是:protocol.export(invokerDelegate)
,这里用到了Dubbo SPI加载具体实现,基于providerUrl,加载到的为DubboProtocol类,最终会调用到以下几个方法:
-
1、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export:
1@Override
2public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
3 if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4 return protocol.export(invoker);
5 }
6 return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
7}
这里主要用于添加dubbo protocol对应的invoker的过滤器链,自己实现的Filter扩展类就是在这里加载的。
执行完之后,生成的对象如下:
-
2、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export:
1@Override
2public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
3 if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4 return protocol.export(invoker);
5 }
6 return new ListenerExporterWrapper<T>(protocol.export(invoker),
7 // 加载ExporterListener接口扩展点,使用url和EXPORTER_LISTENER_KEY筛选到导出服务相关的扩展点
8 Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
9 .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
10}
这里如果是dubbo://开头的url,则会包装ListenerExporterWrapper,给dubbo protocol对应的invoker添加监听器,用于处理导出结果,这里可以进行扩展。
-
3、Dubbo SPI调用到的Protocol类:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export:
1@Override
2public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
3 URL url = invoker.getUrl();
4
5 // export service.
6 String key = serviceKey(url);
7 // 构造DubboExporter,用于服务导出
8 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
9 exporterMap.put(key, exporter);
10
11 //export an stub service for dispatching event
12 Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
13 Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
14 if (isStubSupportEvent && !isCallbackservice) {
15 String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
16 if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
17 if (logger.isWarnEnabled()) {
18 logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
19 "], has set stubproxy support event ,but no stub methods founded."));
20 }
21
22 } else {
23 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
24 }
25 }
26
27 // 启动Server
28 openServer(url);
29
30 // 一些特殊的序列化机制处理
31 optimizeSerialization(url);
32
33 return exporter;
34}
最终会创建一个DubboExporter对象,并放入到exporterMap中,后续执行服务的时候,会从这个map中检索到需要执行的服务,如下图所示:
执行完export方法之后,会返回到org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
方法,这里会包装多一层ExporterChangeableWrapper
:
而执行完org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
之后,会返回到org.apache.dubbo.registry.integration.RegistryProtocol#export
这个方法,最后会多包装一层DestroyableExporter
:
最里面的代理类中包括了引用的服务实现类ZDubboTestServiceImpl,当通过Dubbo调用服务的时候,会从exporterMap中找到对应的DubboExporter,最终就会调用这个服务实现类的方法。
接下来的DubboProtocol#openServer
就是构建传输层链路与建立连接的了,下面详细看看。
构建传输层链路与启动服务
在Dubbo中,数据处理分为了数据交互层和数据传输层:
接下来看看这个连接是怎么建立的,跟踪DubboProtocol#openServer
方法:
1private void openServer(URL url) {
2 // find server. key = IP + port, 如:192.168.1.101:20880
3 String key = url.getAddress();
4 // client can export a service which's only for server to invoke
5 boolean isServer = url.getParameter(IS_SERVER_KEY, true);
6 if (isServer) {
7 ExchangeServer server = serverMap.get(key);
8 if (server == null) {
9 synchronized (this) {
10 server = serverMap.get(key);
11 if (server == null) {
12 serverMap.put(key, createServer(url));
13 }
14 }
15 } else {
16 // server supports reset, use together with override
17 server.reset(url);
18 }
19 }
20}
可以发现,这里会为每个 key(IP:port)启动一个服务:
继续跟踪,发现最终会调用到DubboProtocol#createServer
方法:
1private ExchangeServer createServer(URL url) {
2 url = URLBuilder.from(url)
3 // send readonly event when server closes, it's enabled by default
4 .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
5 // enable heartbeat by default
6 .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
7 .addParameter(CODEC_KEY, DubboCodec.NAME)
8 .build();
9
10 // 获取协议名称,如netty,netty,jetty等
11 String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
12
13 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
14 throw new RpcException("Unsupported server type: " + str + ", url: " + url);
15 }
16
17 ExchangeServer server;
18 try {
19 server = Exchangers.bind(url, requestHandler);
20 } catch (RemotingException e) {
21 throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
22 }
23
24 // 校验客户端协议
25 str = url.getParameter(CLIENT_KEY);
26 if (str != null && str.length() > 0) {
27 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
28 if (!supportedTypes.contains(str)) {
29 throw new RpcException("Unsupported client type: " + str);
30 }
31 }
32
33 return server;
34}
这里最关键的是调用数据交换层的bind方法:
1server = Exchangers.bind(url, requestHandler);
继续跟踪Exchangers#bind
方法:
1public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2 if (url == null) {
3 throw new IllegalArgumentException("url == null");
4 }
5 if (handler == null) {
6 throw new IllegalArgumentException("handler == null");
7 }
8 // codec 协议编码方式
9 url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
10 return getExchanger(url).bind(url, handler);
11}
最终会调用getExchanger得到一个Exchanger实现去启动服务器,这里基于SPI扩展机制,会获取到HeaderExchanger:
1public static Exchanger getExchanger(URL url) {
2 String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
3 return getExchanger(type);
4}
5
6public static Exchanger getExchanger(String type) {
7 return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
8}
接着会调用HeaderExchanger的bind方法
去启动服务器,继续跟踪此方法:
1@Override
2public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
3 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
4}
这里会把传进来的handler(ExchangeHandlerAdapter)包装两层:HeaderExchangeHandler,DecodeHandler,然后使用Transporters.bind来启动服务器,最后再包装成HeaderExchangeServer:
1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
2 if (url == null) {
3 throw new IllegalArgumentException("url == null");
4 }
5 if (handlers == null || handlers.length == 0) {
6 throw new IllegalArgumentException("handlers == null");
7 }
8 ChannelHandler handler;
9 if (handlers.length == 1) {
10 handler = handlers[0];
11 } else {
12 handler = new ChannelHandlerDispatcher(handlers);
13 }
14
15 // 默认调用到NettyTransporter的bind方法
16 return getTransporter().bind(url, handler);
17}
18
19public static Transporter getTransporter() {
20 return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
21}
最终基于Dubbo SPI机制,获取到传输层的实现类去connect,默认的传输层实现类为:NettyTransporter:
1@Override
2public Server bind(URL url, ChannelHandler listener) throws RemotingException {
3 return new NettyServer(url, listener);
4}
继续调用new NettyServer:
1public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
2 // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
3 // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
4 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
5}
这里会调用wrapChannelHandler继续把handler包装多几层:
1protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
2 // 先通过调用ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
3 // 把handler包装成AllChannelHandler
4 // 然后继续把AllChannelHandler包装成HeartbeatHandler,HeartbeatHandler包装成MultiMessageHandler
5 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
6 .getAdaptiveExtension().dispatch(handler, url)));
7}
最终包装成如下几层:
MultiMessageHandler-->HeartbeatHandler-->AllChannelHandler
最终会调用到NettyServer的doOpen()方法
创建连接:
1@Override
2protected void doOpen() throws Throwable {
3 bootstrap = new ServerBootstrap();
4
5 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
6 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
7 new DefaultThreadFactory("NettyServerWorker", true));
8
9 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
10 channels = nettyServerHandler.getChannels();
11
12 bootstrap.group(bossGroup, workerGroup)
13 .channel(NioServerSocketChannel.class)
14 .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
15 .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
16 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
17 .childHandler(new ChannelInitializer<NioSocketChannel>() {
18 @Override
19 protected void initChannel(NioSocketChannel ch) throws Exception {
20 // FIXME: should we use getTimeout()?
21 int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
22 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
23 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
24 .addLast("decoder", adapter.getDecoder())
25 .addLast("encoder", adapter.getEncoder())
26 .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
27 .addLast("handler", nettyServerHandler);
28 }
29 });
30 // bind
31 ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
32 channelFuture.syncUninterruptibly();
33 channel = channelFuture.channel();
34
35}
这里就是我们熟悉的Netty服务器开启的代码了。至此,服务导出的主线流程执行完毕。
传输层构建的链路如下:
可以发现,基于Dubbo SPI机制,数据传输层可以灵活切换到Netty、Mina等不同的网络框架。
2.3.2.2 服务导出到注册中心
RegistryProtocol.exort()
方法在执行完本地导出之后,会继续把服务URL注册到注册中心:
1register(registryUrl, registeredProviderUrl);
继续调用以下代码:
1public void register(URL registryUrl, URL registeredProviderUrl) {
2 Registry registry = registryFactory.getRegistry(registryUrl);
3 registry.register(registeredProviderUrl);
4}
这里底层会基于SPI的IoC,创建具体的注册中心,如ZookeeperRegistry。
最后通过register方法,把提供者的URL注册到注册中心。
2.3.3 服务导出相关URL订阅
最后,我们梳理下服务导出需要订阅到的URL。
相关代码在RegistryProtocol#exort
中,如下:
1 // 得到overrideSubscribeUrl,表示需要监听的服务以及监听的类型为configurators,这个是老版本的动态配置监听url,生成的URL格式:providerUrl协议改为provider,再加上category=configurators&check=false
2 // provider://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider×tamp=1668952315016&version=1.0.1
3 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
4
5 // 创建overrideSubscribeUrl的监听器OverrideListener,用于监听overrideSubscribeUrl的变化
6 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
7 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
8
9
10 // 使用providerConfigurationListener和serviceConfigurationListener重写providerUrl
11 // dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&bind.ip=192.168.0.106&bind.port=20800&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=11035&release=2.7.0&revision=1.0.1&side=provider×tamp=1668952315016&version=1.0.1
12 providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
13
14 ...
15
16 // url to registry
17 final Registry registry = getRegistry(originInvoker);
18
19 // 简化providerUrl参数,得到存入到注册中心去的providerUrl
20 final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
21
22 // 把服务提供者Invoker、注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
23 ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
24 registryUrl, registeredProviderUrl);
25
26 ...
27
28 // 监听旧版本的动态配置
29 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
30
31 ...
主要逻辑:
-
先生成老版本的动态配置监听URL:overrideSubscribeUrl;
-
创建OverrideListener用于处理监听到的overrideSubscribeUrl变更事件;
-
执行overrideUrlWithConfig(),处理以下任务:
-
通过 ProviderConfigurationListener 监听应用级别的动态配置,同时从配置中心获取动态配置,重写providerUrl;
-
通过 ServiceConfigurationListener 监听服务级别的动态配置,同时从配置中心获取动态配置,重写providerUrl;
-
执行registry.subscribe()监听旧版本的动态配置。
2.3.3.1 动态配置变更监听处理
监听到动态配置变更之后,最终会调用到这个方法:
org.apache.dubbo.registry.integration.RegistryProtocol.OverrideListener#doOverrideIfNecessary
1public synchronized void doOverrideIfNecessary() {
2 final Invoker<?> invoker;
3 if (originInvoker instanceof InvokerDelegate) {
4 invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
5 } else {
6 invoker = originInvoker;
7 }
8 //The origin invoker
9 URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
10 String key = getCacheKey(originInvoker);
11
12 ExporterChangeableWrapper<?> exporter = bounds.get(key);
13 if (exporter == null) {
14 logger.warn(new IllegalStateException("error state, exporter should not be null"));
15 return;
16 }
17
18 //The current, may have been merged many times
19 URL currentUrl = exporter.getInvoker().getUrl();
20
21 //Merged with this configuration
22 URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
23
24 newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
25 newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
26 .getConfigurators(), newUrl);
27
28 if (!currentUrl.equals(newUrl)) {
29 // 重新导出
30 RegistryProtocol.this.reExport(originInvoker, newUrl);
31 logger.info("exported provider url changed, origin url: " + originUrl +
32 ", old export url: " + currentUrl + ", new export url: " + newUrl);
33 }
34}
最终,如果判断到providerUrl有变更,则执行重新导出方法reExport():
1public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
2
3 // update local exporter
4 ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl);
5
6 // update registry
7 URL registryUrl = getRegistryUrl(originInvoker);
8 final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl);
9
10 //decide if we need to re-publish
11 ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker);
12 ProviderInvokerWrapper<T> newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
13
14 /**
15 * Only if the new url going to Registry is different with the previous one should we do unregister and register.
16 */
17 if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) {
18 unregister(registryUrl, providerInvokerWrapper.getProviderUrl());
19 register(registryUrl, registeredProviderUrl);
20 newProviderInvokerWrapper.setReg(true);
21 }
22
23 exporter.setRegisterUrl(registeredProviderUrl);
24}
这里代码不继续跟进了,主要逻辑:
-
如果newInvokerUrl有变更,则调用doChangeLocalExport()重新执行本地导出,这里又会继续执行DubboProtocol#export方法:
-
会重新生成一个InvokerDelegate方法,把newInvokerUrl保存在新生成的对象中;
-
注意,如果是重新导出,并不会关掉Netty服务重新开启,相关代码:
-
1private void openServer(URL url) {
2 // find server. key = IP + port, 如:192.168.1.101:20880
3 String key = url.getAddress();
4 // client can export a service which's only for server to invoke
5 boolean isServer = url.getParameter(IS_SERVER_KEY, true);
6 if (isServer) {
7 ExchangeServer server = serverMap.get(key);
8 if (server == null) {
9 synchronized (this) {
10 server = serverMap.get(key);
11 if (server == null) {
12 serverMap.put(key, createServer(url));
13 }
14 }
15 } else {
16 // server supports reset, use together with override
17 server.reset(url);
18 }
19 }
20} -
可以发现,最终调用到了
server.reset(url)
,该方法内部会检测心跳间隔时间有没有变更,如果有变更,则取消原来的心跳任务,根据新的心跳时间重新创建任务; -
如果providerUrl有变更,则删除掉配置中心原来的url,重新注册新的providerUrl。
订阅的URL如下所示:
至此,服务导出主线逻辑梳理完成。
2.4. 服务导出总结
服务导出主要做的事情:
-
解析配置生成Dubbo的Config配置类;
-
扫描Dubbo服务@Service注解,生成ServiceBean;
-
执行ServiceBean的export方法执行服务导出
-
先获取到各种配置,包括配置中心的配置,最后用这些配置覆盖刷新ServiceBean的配置;
-
基于以上配置信息,构造导出服务的URL;
-
执行本地导出,主要是用到Protocol的export方法,基于Dubbo的SPI机制,以及URL的参数,调用各种扩展去封装Invoker,Invoker最内部是一个代理类,包含了实际服务的引用;最后为不同的端口分别启动Netty服务,构造传输层的链路;
-
服务导出到注册中心,即把Invoker对应的URL注册到注册中心;
-
服务导出相关URL订阅,当感知到动态配置变更的时候,更新providerURL,最终执行RegistryProtocol的reExport方法。
2. Dubbo服务引入
在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在
Java架构杂谈
公众号发送SPI
获取文章链接。
2.1. 识别@Reference注入点
首先,应用会注册一个ReferenceAnnotationBeanPostProcessor
用于处理@Reference
注解。
1public class ReferenceAnnotationBeanPostProcessor extends AnnotationInjectedBeanPostProcessor implements
2 ApplicationContextAware, ApplicationListener {
3 ...
4}
该类继承AnnotationInjectedBeanPostProcessor
,实现了自定义注解注入,这里的自定义注解为Dubbo的@Reference注解,大家也可以基于该抽象类实现自己的注入注解。
在AnnotationInjectedBeanPostProcessor
中,会寻找所有被@Reference
注解标注的属性或者方法,作为注入点:
1@Override
2public PropertyValues postProcessPropertyValues(
3 PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
4
5 // 通过自定义注解寻找需要注入的属性(对于ReferenceAnnotationBeanPostProcessor类既是@Reference注解标注的属性)
6 InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
7 try {
8 metadata.inject(bean, beanName, pvs);
9 } catch (BeanCreationException ex) {
10 throw ex;
11 } catch (Throwable ex) {
12 throw new BeanCreationException(beanName, "Injection of @" + getAnnotationType().getSimpleName()
13 + " dependencies is failed", ex);
14 }
15 return pvs;
16}
findInjectionMetadata方法会找到所有需要注入的字段(AnnotatedFieldElement)和方法(AnnotatedMethodElement),包装成AnnotatedInjectionMetadata返回。
最终调用metadata.inject(bean, beanName, pvs)
进行依赖注入,该方法根据FieldElement的具体子类调用其inject方法,来完成属性或者方法的注入,这里就是Spring框架的依赖注入代码了:
1 public void inject(Object target, String beanName, PropertyValues pvs) throws Throwable {
2 Collection<InjectedElement> elementsToIterate =
3 (this.checkedElements != null ? this.checkedElements : this.injectedElements);
4 if (!elementsToIterate.isEmpty()) {
5 boolean debug = logger.isDebugEnabled();
6 for (InjectedElement element : elementsToIterate) {
7 if (debug) {
8 logger.debug("Processing injected element of bean '" + beanName + "': " + element);
9 }
10 element.inject(target, beanName, pvs);
11 }
12 }
13 }
而dubbo继承了Spring的InjectionMetadata.InjectedElement
,我们挑一个AnnotatedFieldElement
来看其具体实现:
1@Override
2protected void
3inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
4
5 Class<?> injectedType = field.getType();
6
7 // 获取需要注入的对象
8 Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);
9
10 // 通过反射把对象注入到字段中
11 ReflectionUtils.makeAccessible(field);
12
13 field.set(bean, injectedObject);
14
15}
关键是getInjectedObject
方法,该方法获取到了需要依赖注入的值,最终通过反射注入属性。
2.2. 生成服务引入的对象
下面看看getInjectedObject
是怎么获取到注入的属性的值的,该方法最终会调用到org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor#doGetInjectedBean这个方法:
1@Override
2protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
3 InjectionMetadata.InjectedElement injectedElement) throws Exception {
4
5 /**
6 * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
7 */
8 // referencedBeanName即为当前所引入服务对应的ServiceBean的beanName, 命名规则:ServiceBean:interfaceClassName:version:group
9 String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
10
11 /**
12 * The name of bean that is declared by {@link Reference @Reference} annotation injection
13 */
14 // 根据@Reference注解的所有信息以及属性接口类型,生成referenceBeanName
15 String referenceBeanName = getReferenceBeanName(attributes, injectedType);
16
17 // 获取一个referenceBean对象,会先判断referenceBeanCache是否存在ReferenceBean,如果不存在则创建一个
18 ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
19
20 // 把referenceBean注册到Spring容器中,该方法内部逻辑:
21 // 如果Spring容器中存在referencedBeanName对应的ServiceBean,说明引用的是本地服务,则获取到该ServiceBean的ref属性,命名一个referenceBeanName别名
22 // 如果Spring容器中不存在referencedBeanName对应的ServiceBean,则说明是远程服务,则注册刚刚创建的referenceBean到Spring容器中,名称为:referenceBeanName
23 registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);
24
25 cacheInjectedReferenceBean(referenceBean, injectedElement);
26
27 // 创建一个代理对象,属性注入的就是这个代理对象,内部会继续调用referenceBean.get()
28 return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
29}
接着看getOrCreateProxy方法:
1private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
2 if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
3 return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
4 wrapInvocationHandler(referenceBeanName, referenceBean));
5 } else { // ReferenceBean should be initialized and get immediately
6 return referenceBean.get();
7 }
8}
最终都会调用到referenceBean.get()方法获取实例,ReferenceBean是ReferenceConfig的子类,最终会调用到ReferenceConfig的init()方法进行初始化,此方法最终会调用org.apache.dubbo.config.ReferenceConfig#createProxy
方法来创建一个代理对象赋值给ReferenceBean的ref属性,该方法处理的逻辑分支比较多,我们这里假设只配置了一个注册中心的情况下,代码会走到这里来:
1if (urls.size() == 1) {
2 invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
3} else {
4 ...
urls.get(0)为注册中心的url,最终REF_PROTOCOL.refer会调用到以下几个方法:
-
1、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#refer:
1 @Override
2 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
3 if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // dubbo://
4 return protocol.refer(type, url);
5 }
6 return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
7 }
这里主要用于添加dubbo protocol对应的invoker的过滤器链,自己实现的Filter扩展类就是在这里加载的。
-
2、Protocol包装类:org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#refer:
1@Override
2public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
3 if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // dubbo://
4 return protocol.refer(type, url);
5 }
6 return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
7 Collections.unmodifiableList(
8 ExtensionLoader.getExtensionLoader(InvokerListener.class)
9 .getActivateExtension(url, INVOKER_LISTENER_KEY)));
10}
这里如果是dubbo://开头的url,则会包装ListenerInvokerWrapper,给dubbo protocol对应的invoker添加监听器,用于处理结果,这里可以进行扩展。
-
3、Dubbo SPI调用到的Protocol类:org.apache.dubbo.registry.integration.RegistryProtocol#refer:
1@Override
2@SuppressWarnings("unchecked")
3public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
4 // url由 registry:// 改为 zookeeper://
5 url = URLBuilder.from(url)
6 .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
7 .removeParameter(REGISTRY_KEY)
8 .build();
9
10 // 这里基于Dubbo SPI的Adaptive机制,拿到注册中心实现,ZookeeperRegistry
11 // url.protocol="zookeeper"
12 Registry registry = registryFactory.getRegistry(url);
13
14 if (RegistryService.class.equals(type)) {
15 return proxyFactory.getInvoker((T) registry, type, url);
16 }
17
18 // qs,queryString, 消费者引入服务时所配置的参数
19 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
20
21 String group = qs.get(GROUP_KEY);
22 if (group != null && group.length() > 0) {
23 if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
24 return doRefer(getMergeableCluster(), registry, type, url);
25 }
26 }
27
28 // 继续调用doRefer
29 return doRefer(cluster, registry, type, url);
30}
2.2.1 创建动态服务目录
继续调用RegistryProtocol#doRefer
:
1private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
2 // 创建RegistryDirectory动态服务目录,每个服务都对应一个RegistryDirectory
3 // url为注册中心地址
4 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
5 directory.setRegistry(registry);
6 directory.setProtocol(protocol);
7
8 // 被引入的服务配置的参数
9 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
10
11 // 消费者url
12 // consumer://xxxx/com.itzhai.dubbo.demo.DubboTestService?application=duboo-consumer-app&...
13 URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
14 if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
15 directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
16 // 注册消费者url到注册中心(url经过简化处理)
17 registry.register(directory.getRegisteredConsumerUrl());
18 }
19
20 // 构造路由链
21 directory.buildRouterChain(subscribeUrl);
22
23 // 订阅注册中心配置变更,这里会主动拉取配置,初始化好所有的提供者的DubboInvoker
24 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
25 PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
26
27 // 利用传进来的cluster,join得到invoker:MockClusterInvoker --> FailoverClusterInvoker
28 Invoker invoker = cluster.join(directory);
29 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
30 return invoker;
31}
其中
1Invoker invoker = cluster.join(directory);
用到了Dubbo SPI,会依次执行MockClusterWrapper, FailoverCluster,最终生成MockClusterInvoker和FailoverClusterInvoker,代码跟踪到这里,我们可以得出以下对象结构:
2.2.2 构造路由链
1private RouterChain(URL url) {
2 // 获取RouterFactory接口扩展实现类:
3 // 0 = {MockRouterFactory@3505}
4 // 1 = {TagRouterFactory@3506} 标签路由
5 // 2 = {AppRouterFactory@3507} 应用条件路由
6 // 3 = {ServiceRouterFactory@3508} 服务条件路由
7 List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
8 .getActivateExtension(url, (String[]) null);
9
10 // 基于url,使用RouterFactory生成各个类型的Router
11 List<Router> routers = extensionFactories.stream()
12 .map(factory -> factory.getRouter(url))
13 .collect(Collectors.toList());
14
15 // 通过priority对routers进行排序
16 initWithRouters(routers);
17}
其中:
-
AppRouterFactory:
1@Activate(order = 200)
2public class AppRouterFactory implements RouterFactory {
3 public static final String NAME = "app";
4
5 private volatile Router router;
6
7 @Override
8 public Router getRouter(URL url) {
9 if (router != null) {
10 return router;
11 }
12 synchronized (this) {
13 if (router == null) {
14 router = createRouter(url);
15 }
16 }
17 return router;
18 }
19
20 private Router createRouter(URL url) {
21 return new AppRouter(DynamicConfiguration.getDynamicConfiguration(), url);
22 }
23}
最终AppRouter的构造函数中,会绑定一个监听器,监听路径:/dubbo/config/dubbo/dubbo-consumer-app.condition-router
同时会主动从配置中心获取当前服务对应的路径的路由配置,根据配置生成AppRouter,具体代码大家可以自行详细阅读。
-
ServiceRouterFactory:
与AppRouterFactory类似,最终ServiceRouter构造函数会绑定一个监听器,监听路径:/dubbo/config/dubbo/com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai.condition-router
同时会主动从配置中心获取当前服务对应的路径的路由配置,根据配置生成ServiceRouter,具体代码大家可以自行详细阅读。
-
TagRouterFactory.getRouter:
TagRouter与上面两个Router有所不同,因为TagRouter监听的路径为:/dubbo/config/dubbo/dubbo-provider-app.tag-router
,是需要读取提供者配置的tag-router的,所以只有在获取到提供者的应用名之后,才可以读取配置,在TagRouterFactory.getRouter()方法中,并不会监听和拉取配置,只是先初始化好一个空的TagRouter,等到服务提供者的服务url从配置中心拉取到之后,获取到提供者的名称之后,才会拉取tag-router配置,并且构造TagRouter。TODO 触发点
最终构造如下路由链:
到这里构建的对象结构如下:
2.2.3 监听节点
上面最核心的方法就是directory.subscribe(),下面详细看看这个方法(RegistryDirectory#subscribe
):
1public void subscribe(URL url) {
2 setConsumerUrl(url);
3 // 监听应用动态配置 /dubbo/config/dubbo/dubbo-consumer-app.configurators
4 CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
5 // 监听所引入的服务的动态配置 /dubbo/config/dubbo/com.itzhai.DemoTest.configurators
6 serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
7 // 监听其他的节点
8 registry.subscribe(url, this);
9}
这里的registry为:ZookeeperRegistry extends FailbackRegistry,而RegistryDirectory也是一个NotifyListener:
1public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener
2
继续跟踪 registry.subscribe(url, this),最终会调用:org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe,此方法我们先关注订阅单个服务的逻辑:
1} else {
2 // 订阅单个服务
3 List<URL> urls = new ArrayList<>();
4 // toCategoriesPath(url) 获取到要监听的zk路径:
5 // 0 = "/dubbo/com.itzhai.dubbo.demo.DubboTestService/providers"
6 // 1 = "/dubbo/com.itzhai.dubbo.demo.DubboTestService/configurators"
7 // 2 = "/dubbo/com.itzhai.dubbo.demo.DubboTestService/routers"
8 for (String path : toCategoriesPath(url)) {
9 // 根据监听地址获取listeners,没有则先生成一个
10 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
11 if (listeners == null) {
12 zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
13 listeners = zkListeners.get(url);
14 }
15
16 // listener为传入的参数RegistryDirectory
17 // 通过NotifyListener获取ChildListener,为ZookeeperRegistry
18 ChildListener zkListener = listeners.get(listener);
19 if (zkListener == null) {
20 // 初始化监听器
21 listeners.putIfAbsent(listener, (parentPath, currentChilds)
22 -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))
23 );
24 zkListener = listeners.get(listener);
25 }
26 // 为当前处理的path创建zk节点
27 zkClient.create(path, false);
28
29 // 添加节点变更监听,同时获取到节点下的内容
30 List<String> children = zkClient.addChildListener(path, zkListener);
31 if (children != null) {
32 urls.addAll(toUrlsWithEmpty(url, path, children));
33 }
34 }
35 // 调用监听器方法处理获取到的节点内容,初始化DubboInvoker
36 notify(url, listener, urls);
37}
以上代码主要会监听以下三个路径:
-
/dubbo/com.itzhai.dubbo.demo.DubboTestService/providers
-
/dubbo/com.itzhai.dubbo.demo.DubboTestService/configurators
-
/dubbo/com.itzhai.dubbo.demo.DubboTestService/routers
完整的监听节点和处理逻辑如下图:
2.2.4 初始化DubboInvoker
在监听的同时,会拉取监听节点的数据。
然后调用notify(url, listener, urls)处理拉取到的数据。这里我们重点观察/dubbo/com.itzhai.dubbo.demo.DubboTestService/providers
的处理流程,调用以下代码监听该节点:
1List<String> children = zkClient.addChildListener(path, zkListener);
同时返回的children(经过URLDecode解码处理),为引入的服务的所有提供者的url:
-
0 = "
dubbo://192.168.0.106:20810/com.itzhai.dubbo.demo.DubboTestService
?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=40846&release=2.7.0&revision=1.0.1&side=provider×tamp=1668588840665&version=1.0.1" -
1 = "
dubbo://192.168.0.106:20800/com.itzhai.dubbo.demo.DubboTestService
?anyhost=true&application=dubbo-provider-app&bean.name=ServiceBean:com.itzhai.dubbo.demo.DubboTestService:1.0.1:itzhai&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=itzhai&interface=com.itzhai.dubbo.demo.DubboTestService&logger=log4j&methods=playGame&pid=40846&release=2.7.0&revision=1.0.1&side=provider×tamp=1668588840598&version=1.0.1"
接着会调用org.apache.dubbo.registry.support.FailbackRegistry#notify
方法,跟踪notify方法,最终会调用到这里:AbstractRegistry#notify(URL, NotifyListener, List<URL>
),关键的处理逻辑如下:
1// 先把urls转成result这个Map
2Map<String, List<URL>> result = new HashMap<>();
3for (URL u : urls) {
4 if (UrlUtils.isMatch(url, u)) {
5 String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
6 List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
7 categoryList.add(u);
8 }
9}
10if (result.size() == 0) {
11 return;
12}
13Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
14// 遍历处理map
15for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
16 String category = entry.getKey();
17 List<URL> categoryList = entry.getValue();
18 categoryNotified.put(category, categoryList);
19 listener.notify(categoryList);
20 // We will update our cache file after each notification.
21 // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
22 saveProperties(url);
23}
这里会先把传进来的urls转换为Map:
接着遍历map进行处理,遍历的过程中,调用listener.notify(categoryList)进行处理,这里的listener为RegistryDirectory,categoryList即为两个服务提供者的URL,最终会调用到org.apache.dubbo.registry.integration.RegistryDirectory#notify
:
1public synchronized void notify(List<URL> urls) {
2 Map<String, List<URL>> categoryUrls = urls.stream()
3 .filter(Objects::nonNull)
4 .filter(this::isValidCategory)
5 .filter(this::isNotCompatibleFor26x)
6 .collect(Collectors.groupingBy(url -> {
7 if (UrlUtils.isConfigurator(url)) {
8 return CONFIGURATORS_CATEGORY;
9 } else if (UrlUtils.isRoute(url)) {
10 return ROUTERS_CATEGORY;
11 } else if (UrlUtils.isProvider(url)) {
12 return PROVIDERS_CATEGORY;
13 }
14 return "";
15 }));
16
17 // 获取动态配置的URL,然后生成configurators,赋值给configurators
18 List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
19 this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
20
21 // 获取老版本路由的URL,然后生成Router,并调用addRouters添加到路由链中
22 List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
23 toRouters(routerURLs).ifPresent(this::addRouters);
24
25 // 获取服务提供者的URL,进一步处理,生成DubboInvoker
26 List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
27 refreshOverrideAndInvoker(providerURLs);
28}
这里主要是根据传入的url,获取到需要的信息进行加工,最后存储到RegistryDirectory中:
-
获取动态配置URL,并且生成configurators;
-
获取老版本的路由URL,生成Router,添加到路由链中;
-
获取服务提供者的URL,进一步处理,生成DubboInvoker。
以上这些结果最终都存储到RegistryDirectory中。
我们这里重点跟踪生成DubboInvoker的逻辑,当传入的urls为服务提供者的URL时,会执行以下两行代码:
1// 获取服务提供者的URL,进一步处理,生成DubboInvoker
2List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
3refreshOverrideAndInvoker(providerURLs);
生成DubboInvoker的核心方法是refreshOverrideAndInvoker:
1private void refreshOverrideAndInvoker(List<URL> urls) {
2 // mock zookeeper://xxx?mock=return null
3 // 这里会拿到应用动态配置,重写dubbo服务URL,得到最终的URL
4 overrideDirectoryUrl();
5 // 生成DubboInvoker的方法
6 refreshInvoker(urls);
7}
最关键的就是这个refreshInvoker方法,跟踪这个方法,其中最核心的一行代码:
1Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
继续跟踪该方法,最终会调用到这行代码生成invoker:
1invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
这里基于Dubbo SPI,最终调用到的是DubboProtocol.refer方法,同时,由于在dubbo-rpci-api模块中,配置了Dubbo SPI的Wrapper类:
1filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
2listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
3mock=org.apache.dubbo.rpc.support.MockProtocol
所以,这里的执行顺序是:
-
ProtocolListenerWrapper.refer:该Wrapper类中判断到URL如果是dubbo://开通的,会给DubboProtocol对应的invoker添加监听器,包装成
ListenerInvokerWrapper
类,用于处理refer结果,这里可以进行扩展,标签路由器就是在这里设置了监听器,处理完refer之后,进行初始化的; -
ProtocolFilterWrapper.refer:这个Wrapper会给dubbo protocol的url对应的invoker添加Filter过滤器,
我们给dubbo添加的自定义过滤器就是在这里集成进来的
; -
DubboProtocol.refer:生成DubboInvoker的类。
其中DubboProtocol.refer会调用到父类AbstractProtocol的refer方法
:
1@Override
2public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
3 // 包装一个异步转同步的Invoker
4 // type是接口
5 // url是服务地址
6 return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
7}
所以,最终生成DubboInvoker的就是DubboProtocol#protocolBindingRefer
这个方法了:
1@Override
2public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
3
4 optimizeSerialization(url);
5
6 // create rpc invoker.
7 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
8 invokers.add(invoker);
9
10 return invoker;
11}
可以发现在DubboInvoker中维护了和服务提供者的socket连接。
2.3. 构建传输层链路与建立连接
在Dubbo中,数据处理分为了数据交互层和数据传输层:
接下来看看这个连接是怎么建立的,跟踪getClients方法,发现最终会调用到DubboProtocol#initClient
方法:
1private ExchangeClient initClient(URL url) {
2
3 // client type setting.
4 String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
5
6 url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
7 // enable heartbeat by default
8 url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
9
10 // BIO is not allowed since it has severe performance issue.
11 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
12 throw new RpcException("Unsupported client type: " + str + "," +
13 " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
14 }
15
16 ExchangeClient client;
17 try {
18 // connection should be lazy
19 if (url.getParameter(LAZY_CONNECT_KEY, false)) {
20 client = new LazyConnectExchangeClient(url, requestHandler);
21
22 } else {
23 client = Exchangers.connect(url, requestHandler);
24 }
25
26 } catch (RemotingException e) {
27 throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
28 }
29
30 return client;
31}
最核心的一行代码为:
1client = Exchangers.connect(url, requestHandler);
继续跟踪该方法:
1public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
2 if (url == null) {
3 throw new IllegalArgumentException("url == null");
4 }
5 if (handler == null) {
6 throw new IllegalArgumentException("handler == null");
7 }
8 url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
9 return getExchanger(url).connect(url, handler);
10}
最终会调用getExchanger得到一个Exchanger实现去创建连接,这里基于SPI扩展机制,会获取到HeaderExchanger:
1public static Exchanger getExchanger(URL url) {
2 String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
3 return getExchanger(type);
4}
5
6public static Exchanger getExchanger(String type) {
7 return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
8}
接着会调用HeaderExchanger的connect方法去创建连接,继续跟踪此方法:
1@Override
2public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
3 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
4}
这里会把传进来的handler(ExchangeHandlerAdapter)包装两层:HeaderExchangeHandler,DecodeHandler,然后使用Transporters.connect来创建Client连接对象:
1public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
2 if (url == null) {
3 throw new IllegalArgumentException("url == null");
4 }
5 ChannelHandler handler;
6 if (handlers == null || handlers.length == 0) {
7 handler = new ChannelHandlerAdapter();
8 } else if (handlers.length == 1) {
9 handler = handlers[0];
10 } else {
11 handler = new ChannelHandlerDispatcher(handlers);
12 }
13 return getTransporter().connect(url, handler);
14}
15
16public static Transporter getTransporter() {
17 return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
18}
最后再包装成HeaderExchangeClient,HeaderExchangeClient中会把Client对象包装成HeaderExchangeChannel对象,同时开启重连任务和心跳检测任务:
1public HeaderExchangeClient(Client client, boolean startTimer) {
2 Assert.notNull(client, "Client can't be null");
3 this.client = client;
4 this.channel = new HeaderExchangeChannel(client);
5
6 if (startTimer) {
7 URL url = client.getUrl();
8 startReconnectTask(url);
9 startHeartBeatTask(url);
10 }
11}
在Transporters.connect方法中最终基于Dubbo SPI机制,获取到传输层的实现类去connect,默认的传输层实现类为:NettyTransporter:
1@Override
2public Client connect(URL url, ChannelHandler listener) throws RemotingException {
3 return new NettyClient(url, listener);
4}
继续调用new NettyClient:
1public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
2 // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
3 // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
4 super(url, wrapChannelHandler(url, handler));
5}
这里会调用wrapChannelHandler继续把handler包装多几层:
1protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
2 // 先通过调用ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
3 // 把handler包装成AllChannelHandler
4 // 然后继续把AllChannelHandler包装成HeartbeatHandler,HeartbeatHandler包装成MultiMessageHandler
5 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
6 .getAdaptiveExtension().dispatch(handler, url)));
7}
最终包装成如下几层:
MultiMessageHandler-->HeartbeatHandler-->AllChannelHandler
最终会调用到NettyClient的doOpen()方法创建连接:
1@Override
2protected void doOpen() throws Throwable {
3 final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
4 bootstrap = new Bootstrap();
5 bootstrap.group(nioEventLoopGroup)
6 .option(ChannelOption.SO_KEEPALIVE, true)
7 .option(ChannelOption.TCP_NODELAY, true)
8 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
9 //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
10 .channel(NioSocketChannel.class);
11
12 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
13 bootstrap.handler(new ChannelInitializer() {
14
15 @Override
16 protected void initChannel(Channel ch) throws Exception {
17 int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
18 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
19 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
20 .addLast("decoder", adapter.getDecoder())
21 .addLast("encoder", adapter.getEncoder())
22 .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
23 .addLast("handler", nettyClientHandler);
24 String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
25 if(socksProxyHost != null) {
26 int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
27 Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
28 ch.pipeline().addFirst(socks5ProxyHandler);
29 }
30 }
31 });
32}
这里就是我们熟悉的Netty创建客户端连接的代码了。至此,服务引入的主线流程执行完毕。
传输层构建的链路如下:
可以发现,基于Dubbo SPI机制,数据传输层可以灵活切换到Netty、Mina等不同的网络框架。
最终主线流程构造的完整处理链路如下:
在服务调用章节,会为你详细介绍每个类的作用。在这之前,你需要了解服务引用都创建了什么样的东西。
2.4. 服务引入总结
服务引入流程主要做如下事情:
-
扫描项目识别@Reference注入点;
-
获取需要注入的对象:
-
获取到@Reference注解的信息,封装成ReferenceBean;
-
最终执行ReferenceBean的refer方法执行服务引入:
-
为引入的服务创建动态服务目录;
-
构造路由链,存储到动态服务目录中;
-
监听动态配置节点,以及服务提供者节点,并且先从注册中心拉取节点配置;
-
获取到服务提供者的URL之后,最终包装成DubboInvoker;
-
为DubboInvoker包装异步转同步类,以及过滤器链处理类;
-
最后为该DubboInvoker构造传输层链路,以及创建底层网络连接;
-
通过反射进行注入。
可以发现,Dubbo的整个服务引入流程还是挺复杂的,很多的SPI扩展机制,增加了扩展性,但是跟踪代码的时候也会更难跟踪。
整个服务引入的流程,就是构造以上的请求链路,以及创建好socket连接。
3. Dubbo服务调用
在阅读Dubbo源码之前,一定要先整明白Dubbo的SPI机制是什么,能够了解是怎么实现的就最好了。为此,Java架构杂谈输出了一篇Dubbo SPI机制的原理和源码解读文章,感兴趣的朋友可以先阅读了解。在
Java架构杂谈
公众号发送SPI
获取文章链接。
3.1. 消费端调用请求
基于对服务引入源码的分析,我们得出了如下的对象处理链:
我们可以得到消费端调用请求流程,如下图所示:
红色箭头方向为主流程,接下来详细介绍每个步骤执行的逻辑。
3.1.1 MockClusterInvoker
这层主要用于处理mock,以及系统降级相关逻辑。
相关功能参考官网:本地伪装[2]。
3.1.2 RegistryAwareClusterInvoker
如果有两个注册中心,则有这个Invoker,处理注册中心对应的Invoker,校验注册中心是否可用。
3.1.3 FailoverClusterInvoker
服务容错相关逻辑。
3.1.3.1 RouterChain
服务路由,获取到所有的服务提供者,并且通过路由链进行路由。
3.1.3.2 LoadBalance
使用负载均衡策略选择一个Invoker,最终得到的是一个InvokerWrapper。接着调用其invoke方法。
3.1.4 ListenerInvokerWrapper
接着会调用到ListenerInvokerWrapper的invoke方法。
3.1.5 CallbackRegistrationInvoker
接着执行Filter相关逻辑。
3.1.5.1 ConsumerContextFilter
设置Rpc的Context上下文属性。
3.1.5.2 FutureFilter
实现事件通知相关功能,可以借此完成一些通知功能,在调用方法之前,调用方法之后,出现异常时,会触发对应的事件。
3.1.5.3 MonitorFilter
实现Dubbo监控相关逻辑。
3.1.6 AsyncToSyncInvoker
异步转同步处理逻辑。
3.1.6.1 同步转异步逻辑
代码逻辑如下:
1@Override
2public Result invoke(Invocation invocation) throws RpcException {
3 Result asyncResult = invoker.invoke(invocation);
4 try {
5 // 如果invocation指定为同步调用,则执行异步转同步逻辑
6 if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
7 asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
8 }
9 ...
10 return asyncResult;
11}
此方法最终会返回AsyncRpcResult
异步结果,异步转同步逻辑主要就是在这里调用以下方法一直等待处理结果:
1asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
继续跟进invoker.invoke(invocation)
看看内部是如何进行异步调用的,定位到DubboInvoker#doInvoke
方法:
1@Override
2protected Result doInvoke(final Invocation invocation) throws Throwable {
3 ...
4 try {
5 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
6 int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
7 if (isOneway) {
8 ...
9 } else {
10 // 创建一个AsyncRpcResult,承载执行结果
11 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
12 // 异步调用 client的request方法,返回CompletableFuture对象
13 CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
14 // AsyncRpcResult对象订阅responseFuture,当responseFuture完成之后,会调用AsyncRpcResult中的方法
15 asyncRpcResult.subscribeTo(responseFuture);
16 // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
17 FutureContext.getContext().setCompatibleFuture(responseFuture);
18 return asyncRpcResult;
19 }
20 ...
21}
这里创建了一个AsyncRpcResult
对象,然后调用currentClient.request(inv, timeout)
方法返回一个CompletableFuture对象,最后调用asyncRpcResult.subscribeTo(responseFuture)
让AsyncRpcResult对象订阅responseFuture,当responseFuture完成之后,会调用AsyncRpcResult中的方法。
最后跟进看看currentClient.request(inv, timeout)
的逻辑,最终会调用到HeaderExchangeChannel
的request方法:
1@Override
2public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
3 if (closed) {
4 throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
5 }
6 // create request.
7 Request req = new Request();
8 req.setVersion(Version.getProtocolVersion());
9 req.setTwoWay(true);
10 req.setData(request);
11 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
12 try {
13 channel.send(req);
14 } catch (RemotingException e) {
15 future.cancel();
16 throw e;
17 }
18 return future;
19}
可以发现,这里最终创建了一个DefaultFuture
对象,并且返回,在创建DefaultFuture
对象的逻辑中,会把请求id跟DefaultFuture的关系,以及请求id跟channel的关系保存到map中:
1private DefaultFuture(Channel channel, Request request, int timeout) {
2 this.channel = channel;
3 this.request = request;
4 this.id = request.getId();
5 this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
6 // put into waiting map.
7 FUTURES.put(id, this);
8 CHANNELS.put(id, channel);
9}
并且启动一个定时任务,用于判断调用是否超时:
1private static void timeoutCheck(DefaultFuture future) {
2 TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
3 future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
4}
当判断到超时之后,直接返回超时的响应结果。
3.1.6.2 异步响应结果处理
最后我们看看底层获取到响应结果之后,是如何依次通知到 DefaultFuture --> CompletableFuture --> AsyncRpcResult的。
HeaderExchangeHandler#received
方法调用HeaderExchangeHandler#handleResponse
,然后继续调用DefaultFuture#received(Channel, Response)
,最后调用到会调DefaultFuture#received(Channel, Response, boolean)
方法,接收数据,获取到异步结果:
1public static void received(Channel channel, Response response, boolean timeout) {
2 try {
3 // 获取到responseId,根据responseId从map中获取到DefaultFuture对象
4 DefaultFuture future = FUTURES.remove(response.getId());
5 if (future != null) {
6 Timeout t = future.timeoutCheckTask;
7 if (!timeout) {
8 // decrease Time
9 t.cancel();
10 }
11 // 最终完成DefaultFuture对象
12 future.doReceived(response);
13 } else {
14 logger.warn("The timeout response finally returned at "
15 + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
16 + ", response " + response
17 + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
18 + " -> " + channel.getRemoteAddress()));
19 }
20 } finally {
21 CHANNELS.remove(response.getId());
22 }
23}
最终根据响应id从map中找到DefaultFuture,并且把响应结果赋值给它,最终触发执行AsyncRpcResult的相关方法,最后触发了以下代码从阻塞处返回:
1asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
即完成了把异步转换为同步的处理工作。
3.1.7 DubboInvoker
会调用其org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke方法,完成具体的方法调用。
此方法会从可用的clients中选择一个,然后执行异步调用,相关代码上面已经说明过了。
3.1.8 HeaderExchangeClient
调用ExchangeClient数据交换层的方法发送数据,该方法最终会调用到数据传输层的具体方法区发送数据,如org.apache.dubbo.remoting.transport.netty4.NettyChannel#send。
3.2. 提供端处理请求
基于服务导出的源码分析,我们得到了如下的对象处理链:
以及传输层链路:
为此,我们可以得到如下的提供端的请求处理流程:
接下来,我们就看看每一步都是做什么事情的。
3.2.1 NettyServerHandler#channelRead
接收数据,传给下层handler继续处理。
3.2.2 MultiMessageHandler#received
如果接收到的消息是MultiMessage,则进行转换遍历处理。
3.2.3 HeartbeatHandler#received
记录读取数据时间戳;
判断是否心跳请求,如果是心跳请求,则直接响应心跳请求,会把requestId响应回去,用户区分是哪个请求
。
如果是心跳响应,则不用处理,直接返回。
否则继续往下执行。
3.2.4 AllChannelHandler#received
跟Dubbo的线程模型
[3]有关。
如果Dispatcher使用的是all策略(默认策略),则会执行到这里,最终会把请求封装成ChannelEventRunnalbe交给线程池处理,IO线程的任务到此结束。
如果Dubbo Dispatcher使用的是direct策略,则不会执行这个handler。
3.2.5 DecodeHandler#received
如果传入的消息实现了Decodeable接口,则调用接口的decode实现进行decode,提供了一种自定义decode的扩展机制。
3.2.6 HeaderExchangeHandler#handleRequest
判断是否双向通信。
构造Response对象,最终调用ExchangeHandlerAdapter的reply方法,返回一个CompletionStage:
1CompletionStage<Object> future = handler.reply(channel, msg);
拿到CompletionStage对象后:
-
如果是同步调用,则直接拿到结果,并发送到channel中去
-
如果是异步调用,则监听直到拿到服务执行结果,然后发送到channel中去
最终把结果封装为AppResponse对象,包括异常也封装成该对象(HeaderExchangeHandler#handleRequest):
1...
2Object msg = req.getData();
3try {
4 CompletionStage<Object> future = handler.reply(channel, msg);
5
6 future.whenComplete((appResult, t) -> {
7 try {
8 if (t == null) {
9 res.setStatus(Response.OK);
10 res.setResult(appResult);
11 } else {
12 res.setStatus(Response.SERVICE_ERROR);
13 res.setErrorMessage(StringUtils.toString(t));
14 }
15 channel.send(res);
16 } catch (RemotingException e) {
17 logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
18 } finally {
19 // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
20 }
21 });
22} catch (Throwable e) {
23 res.setStatus(Response.SERVICE_ERROR);
24 res.setErrorMessage(StringUtils.toString(e));
25 channel.send(res);
26}
可以发现,最终获取到异步调用的结果之后,再写到Channel中。
底层会封装结果为CompletableFuture:
org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
1Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
2
3
4CompletableFuture<Object> future = wrapWithFuture(value, invocation);
5
6
7AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
8
9
10future.whenComplete((obj, t) -> {
11
12 AppResponse result = new AppResponse();
13 if (t != null) {
14 if (t instanceof CompletionException) {
15 result.setException(t.getCause());
16 } else {
17 result.setException(t);
18 }
19 } else {
20 result.setValue(obj);
21 }
22 asyncRpcResult.complete(result);
23});
24
25return asyncRpcResult;
3.2.7 ExchangeHandlerAdapter#reply
根据请求对象拿到一个Invoker,最终执行Invoker得到结果。
实际调用的方法可以分为两种情况:
-
同步执行:
-
1public String test() {...}
-
异步执行:
-
java<br />public CompletableFuture test() {…}<br />
最终该方法返回一个CompletionStage对象,给到上层处理。
底层执行方法的时候,不管是同步还是异步返回,都统一转换为CompletableFuture:
org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#wrapWithFuture
1private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) {
2 if (RpcContext.getContext().isAsyncStarted()) {
3 return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
4 } else if (value instanceof CompletableFuture) {
5 return (CompletableFuture<Object>) value;
6 }
7 return CompletableFuture.completedFuture(value);
8}
可以发现,为了实现异步调用的功能,Dubbo做了一些比较绕的对象转换逻辑,如下图所示:
梳理清楚了这个处理流程,也就很容易理解为啥可以实现跨服务的异步调用了。
3.2.8 DubboExporter
3.2.8.1 CallbackRegistrationInvoker
执行过滤器链。
EchoFilter
处理Dubbo的回声检测。回声检测相关说明参考官方文档:回声测试[1]
如果判断到是$echo方法,则直接构造一个结果返回:
1@Override
2public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
3 if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
4 return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
5 }
6 return invoker.invoke(inv);
7}
ClassLoaderFilter
用于设置类加载器。
GenericFilter
用于处理泛化服务。
ContextFilter
设置Dubbo ThreadLocal(RpcContext)相关的属性。
TraceFilter
处理dubbo trace命令相关的Filter。
TimeoutFilter
用于处理服务端的超时。
MonitorFilter
实现监控相关功能。
ExceptionFIlter
处理服务异常信息,转换成消费端能够识别的异常。
3.2.8.2 DelegateProviderMetaDataInvoker
继续委托给下层进行处理。
3.2.8.3 AbstractProxyInvoker
通过AbstractProxyInvoker执行最终的服务的方法,不管底层是同步调用还是异步调用,统一包装成异步RPC结果AsyncRpcResult返回。
至此,Dubbo源码解析完成,完整服务导出和引入的源码主线流程图如下:
服务调用处理流程图如下:
获取高清大图,在Java架构杂谈
公众号回复“Dubbo”即可。
References
[1]: 回声测试. Retrieved from https://dubbo.apache.org/zh/docs/advanced/echo-service/
[2]: 本地伪装. Retrieved from https://dubbo.apache.org/zh/docs/advanced/local-mock/
[3]: 线程模型. Retrieved from https://dubbo.apache.org/zh/docs/v2.7/user/examples/thread-model/