性能文章>源码分析Dubbo服务提供者启动流程>

源码分析Dubbo服务提供者启动流程原创

2年前
301944

本节将详细分析Dubbo服务提供者的启动流程,请带着如下几个疑问进行本节的阅读,因为这几个问题将是接下来几篇文章分析的重点内容。

  • 什么时候建立与注册中心的连接。
  • 服务提供者什么时候向注册中心注册服务。
  • 服务提供者与注册中心的心跳机制。

从上文中我们得知,服务提供者启动的核心入口为ServiceBean,本节将从源码级别详细剖析ServcieBean的实现原理,即Dubbo服务提供者的启动流程,ServiceBean的继承层次如图所示,dubbo:service标签的所有属性都被封装在此类图结构中。
image1.png

源码分析ServiceBean

ServiceBean#afterPropertiesSet

1if (getProvider() == null) {  // @1
2    Map<String, ProviderConfig> provide
3             ConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class,   false, false); // @2
4              // ......  具体解析代码省略。
5    }
6}

Step1:如果provider为空,说明dubbo:service标签未设置provider属性,如果一个dubbo:provider标签,则取该实例,如果存在多个dubbo:provider配置则provider属性不能为空,否则抛出异常:“Duplicate provider configs”。

ServiceBean#afterPropertiesSet

1 if (getApplication() == null
2         && (getProvider() == null || getProvider().getApplication() == null)) {
3       Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, 
4                  ApplicationConfig.class, false, false);
5        // ...省略
6 }

Step2:如果application为空,则尝试从BeanFactory中查询dubbo:application实例,如果存在多个dubbo:application配置,则抛出异常:“Duplicate application configs”。

Step3:如果ServiceBean的module为空,则尝试从BeanFactory中查询dubbo:module实例,如果存在多个dubbo:module,则抛出异常:"Duplicate module configs: "。

Step4:尝试从BeanFactory中加载所有的注册中心,注意ServiceBean的List< RegistryConfig> registries属性,为注册中心集合。

Step5:尝试从BeanFacotry中加载一个监控中心,填充ServiceBean的MonitorConfig monitor属性,如果存在多个dubbo:monitor配置,则抛出"Duplicate monitor configs: "。

Step6:尝试从BeanFactory中加载所有的协议,注意:ServiceBean的List< ProtocolConfig> protocols是一个集合,也即一个服务可以通过多种协议暴露给消费者。

ServiceBean#afterPropertiesSet

1if (getPath() == null || getPath().length() == 0) {
2       if (beanName != null && beanName.length() > 0 && getInterface() != null && getInterface().length() > 0  && beanName.startsWith(getInterface())) {
3                setPath(beanName);
4       }
5 }
Step7:设置ServiceBean的path属性,path属性存放的是dubbo:service的beanName(dubbo:service id)。

ServiceBean#afterPropertiesSet

1if (!isDelay()) {
2     export();
3}

Step8:如果为启用延迟暴露机制,则调用export暴露服务。首先看一下isDelay的实现,然后重点分析export的实现原理(服务暴露的整个实现原理)。

ServiceBean#isDelay

1private boolean isDelay() {
2        Integer delay = getDelay();
3        ProviderConfig provider = getProvider();
4        if (delay == null && provider != null) {
5            delay = provider.getDelay();
6        }
7        return supportedApplicationListener && (delay == null || delay == -1);
8    }

如果有设置dubbo:service或dubbo:provider的属性delay,或配置delay为-1,都表示启用延迟机制,单位为毫秒,设置为-1,表示等到Spring容器初始化后再暴露服务。从这里也可以看出,Dubbo暴露服务的处理入口为ServiceBean#export->ServiceConfig#export。

源码分析ServiceConfig#export暴露服务

调用链:ServiceBean#afterPropertiesSet->ServiceConfig#export

 1public synchronized void export() {
 2        if (provider != null) {
 3            if (export == null) {
 4                export = provider.getExport();
 5            }
 6            if (delay == null) {
 7                delay = provider.getDelay();
 8            }
 9        }
10        if (export != null && !export) {   // @1
11            return;
12        }
13
14        if (delay != null && delay > 0) {    // @2
15            delayExportExecutor.schedule(new Runnable() {
16                @Override
17                public void run() {
18                    doExport();
19                }
20            }, delay, TimeUnit.MILLISECONDS);
21        } else {
22            doExport();    //@3
23        }
24    }

代码@1:判断是否暴露服务,由dubbo:service export="true|false"来指定。
代码@2:如果启用了delay机制,如果delay大于0,表示延迟多少毫秒后暴露服务,使用ScheduledExecutorService延迟调度,最终调用doExport方法。
代码@3:执行具体的暴露逻辑doExport,需要大家留意:delay=-1的处理逻辑(基于Spring事件机制触发)。

源码分析ServiceConfig#doExport暴露服务

调用链:ServiceBean#afterPropertiesSet调用ServiceConfig#export->ServiceConfig#doExport

ServiceConfig#checkDefault

1private void checkDefault() {
2        if (provider == null) {
3            provider = new ProviderConfig();
4        }
5        appendProperties(provider);
6 }

Step1:如果dubbo:servce标签也就是ServiceBean的provider属性为空,调用appendProperties方法,填充默认属性,其具体加载顺序:

从系统属性加载对应参数值,参数键:dubbo.provider.属性名,可通过System.getProperty获取。

加载属性配置文件的值。属性配置文件,可通过系统属性:dubbo.properties.file,如果该值未配置,则默认取dubbo.properties属性配置文件。

ServiceConfig#doExport

 1if (ref instanceof GenericService) {
 2      interfaceClass = GenericService.class;
 3      if (StringUtils.isEmpty(generic)) {
 4           generic = Boolean.TRUE.toString();
 5      }
 6 } else {
 7      try {
 8            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
 9                        .getContextClassLoader());
10       } catch (ClassNotFoundException e) {
11            throw new IllegalStateException(e.getMessage(), e);
12       }
13       checkInterfaceAndMethods(interfaceClass, methods);
14       checkRef();
15       generic = Boolean.FALSE.toString();
16 }

Step2:校验ref与interface属性。如果ref是GenericService,则为dubbo的泛化实现,然后验证interface接口与ref引用的类型是否一致。

ServiceConfig#doExport

 1if (local != null) {
 2      if ("true".equals(local)) {
 3            local = interfaceName + "Local";
 4      }
 5      Class<?> localClass;
 6      try {
 7             localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
 8       } catch (ClassNotFoundException e) {
 9            throw new IllegalStateException(e.getMessage(), e);
10       }
11      if (!interfaceClass.isAssignableFrom(localClass)) {
12           throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
13       }
14 }

Step3:dubbo:service local机制,已经废弃,被stub属性所替换。

Step4:处理本地存根Stub(图片来源于Dubbo官方文档)。

image2.jpeg
ServiceConfig#doExport

1checkApplication();
2checkRegistry();
3checkProtocol();
4appendProperties(this);

Step5:校验ServiceBean的application、registry、protocol是否为空,并从系统属性(优先)、资源文件中填充其属性。
系统属性、资源文件属性的配置如下:

  • application dubbo.application.属性名,例如 dubbo.application.name
  • registry dubbo.registry.属性名,例如 dubbo.registry.address
  • protocol dubbo.protocol.属性名,例如 dubbo.protocol.port
  • service dubbo.service.属性名,例如 dubbo.service.stub

ServiceConfig#doExport

1checkStubAndMock(interfaceClass);

Step6:校验stub、mock类的合理性,是否是interface的实现类。

ServiceConfig#doExport

1doExportUrls();

Step7:执行doExportUrls()方法暴露服务,接下来会重点分析该方法。

ServiceConfig#doExport

1ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
2ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);

Step8:将服务提供者信息注册到ApplicationModel实例中。

源码分析ServiceConfig#doExportUrls暴露服务具体实现逻辑

调用链:ServiceBean#afterPropertiesSet->ServiceConfig#export>ServiceConfig#doExport

1private void doExportUrls() {
2        List<URL> registryURLs = loadRegistries(true);  // @1
3        for (ProtocolConfig protocolConfig : protocols) {
4            doExportUrlsFor1Protocol(protocolConfig, registryURLs);    // @2
5        }
6 }

代码@1:首先遍历ServiceBean的List< RegistryConfig> registries(所有注册中心的配置信息),然后将地址封装成URL对象,关于注册中心的所有配置属性,最终转换成url的属性(?属性名=属性值),loadRegistries(true),参数的意思:true,代表服务提供者,false:代表服务消费者,如果是服务提供者,则检测注册中心的配置,如果配置了register=“false”,则忽略该地址,如果是服务消费者,并配置了subscribe="false"则表示不从该注册中心订阅服务,故也不返回,一个注册中心URL示例:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=7072&qos.port=22222&registry=zookeeper&timestamp=1527308268041

代码@2:然后遍历配置的所有协议,根据每个协议,向注册中心暴露服务,接下来重点分析doExportUrlsFor1Protocol方法的实现细节。

源码分析doExportUrlsFor1Protocol

调用链:ServiceBean#afterPropertiesSet->ServiceConfig#export>ServiceConfig#doExport->ServiceConfig#doExportUrlsFor1Protocol

ServiceConfig#doExportUrlsFor1Protocol

 1String name = protocolConfig.getName();
 2if (name == null || name.length() == 0) {
 3     name = "dubbo";
 4}
 5Map<String, String> map = new HashMap<String, String>();
 6map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
 7map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
 8map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
 9if (ConfigUtils.getPid() > 0) {
10    map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
11}
12appendParameters(map, application);
13appendParameters(map, module);
14appendParameters(map, provider, Constants.DEFAULT_KEY);
15appendParameters(map, protocolConfig);
16appendParameters(map, this);

Step1:用Map存储该协议的所有配置参数,包括协议名称、dubbo版本、当前系统时间戳、进程ID、application配置、module配置、默认服务提供者参数(ProviderConfig)、协议配置、服务提供Dubbo:service的属性。

ServiceConfig#doExportUrlsFor1Protocol

 1if (methods != null && !methods.isEmpty()) {
 2            for (MethodConfig method : methods) {
 3                appendParameters(map, method, method.getName());
 4                String retryKey = method.getName() + ".retry";
 5                if (map.containsKey(retryKey)) {
 6                    String retryValue = map.remove(retryKey);
 7                    if ("false".equals(retryValue)) {
 8                        map.put(method.getName() + ".retries", "0");
 9                    }
10                }
11                List<ArgumentConfig> arguments = method.getArguments();
12                if (arguments != null && !arguments.isEmpty()) {
13                    for (ArgumentConfig argument : arguments) {
14                        // convert argument type
15                        if (argument.getType() != null && argument.getType().length() > 0) {
16                            Method[] methods = interfaceClass.getMethods();
17                            // visit all methods
18                            if (methods != null && methods.length > 0) {
19                                for (int i = 0; i < methods.length; i++) {
20                                    String methodName = methods[i].getName();
21                                    // target the method, and get its signature
22                                    if (methodName.equals(method.getName())) {
23                                        Class<?>[] argtypes = methods[i].getParameterTypes();
24                                        // one callback in the method
25                                        if (argument.getIndex() != -1) {
26                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
27                                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
28                                            } else {
29                                                throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + 
30                                                      argument.getType());
31                                            }
32                                        } else {
33                                            // multiple callbacks in the method
34                                            for (int j = 0; j < argtypes.length; j++) {
35                                                Class<?> argclazz = argtypes[j];
36                                                if (argclazz.getName().equals(argument.getType())) {
37                                                    appendParameters(map, argument, method.getName() + "." + j);
38                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
39                                                        throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", 
40                                                             type:" + argument.getType());
41                                                    }
42                                                }
43                                            }
44                                        }
45                                    }
46                                }
47                            }
48                        } else if (argument.getIndex() != -1) {
49                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
50                        } else {
51                            throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
52                        }
53
54                    }
55                }
56            } // end of methods for
57        }

Step2:如果dubbo:service有dubbo:method子标签,则dubbo:method以及其子标签的配置属性,都存入到Map中,属性名称加上对应的方法名作为前缀。dubbo:method的子标签dubbo:argument,其键为方法名.参数序号。

ServiceConfig#doExportUrlsFor1Protocol

 1if (ProtocolUtils.isGeneric(generic)) {
 2      map.put(Constants.GENERIC_KEY, generic);
 3      map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
 4 } else {
 5      String revision = Version.getVersion(interfaceClass, version);
 6      if (revision != null && revision.length() > 0) {
 7          map.put("revision", revision);
 8      }
 9      String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
10      if (methods.length == 0) {
11           logger.warn("NO method found in service interface " + interfaceClass.getName());
12           map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
13      } else {
14           map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
15      }
16}

Step3:添加methods键值对,存放dubbo:service的所有方法名,多个方法名用,隔开,如果是泛化实现,填充genric=true,methods为"*";

ServiceConfig#doExportUrlsFor1Protocol

1if (!ConfigUtils.isEmpty(token)) {
2      if (ConfigUtils.isDefault(token)) {
3            map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
4       } else {
5            map.put(Constants.TOKEN_KEY, token);
6       }
7}

Step4:根据是否开启令牌机制,如果开启,设置token键,值为静态值或uuid。

ServiceConfig#doExportUrlsFor1Protocol

1if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
2       protocolConfig.setRegister(false);
3       map.put("notify", "false");
4}

Step5:如果协议为本地协议(injvm),则设置protocolConfig#register属性为false,表示不向注册中心注册服务,在map中存储键为notify,值为false,表示当注册中心监听到服务提供者发送变化(服务提供者增加、服务提供者减少等事件时不通知。

ServiceConfig#doExportUrlsFor1Protocol

1// export service
2String contextPath = protocolConfig.getContextpath();
3if ((contextPath == null || contextPath.length() == 0) && provider != null) {
4     contextPath = provider.getContextpath();
5}
Step6:设置协议的contextPath,如果未配置,默认为/interfacename

ServiceConfig#doExportUrlsFor1Protocol

1String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
2Integer port = this.findConfigedPorts(protocolConfig, name, map);

Step7:解析服务提供者的IP地址与端口。
服务IP地址解析顺序:(序号越小越优先)

  • 系统环境变量,变量名:DUBBO_DUBBO_IP_TO_BIND
  • 系统属性,变量名:DUBBO_DUBBO_IP_TO_BIND
  • 系统环境变量,变量名:DUBBO_IP_TO_BIND
  • 系统属性,变量名:DUBBO_IP_TO_BIND
  • dubbo:protocol 标签的host属性 --》 dubbo:provider 标签的host属性
  • 默认网卡IP地址,通过InetAddress.getLocalHost().getHostAddress()获取,如果IP地址不符合要求,继续下一个匹配。
    判断IP地址是否符合要求的标准是:

1   public static boolean isInvalidLocalHost(String host) {
2        return host == null
3                || host.length() == 0
4                || host.equalsIgnoreCase("localhost")
5                || host.equals("0.0.0.0")
6                || (LOCAL_IP_PATTERN.matcher(host).matches());
7      }

选择第一个可用网卡,其实现方式是建立socket,连接注册中心,获取socket的IP地址。其代码:

 1Socket socket = new Socket();
 2        try {
 3              SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
 4              socket.connect(addr, 1000);
 5              hostToBind = socket.getLocalAddress().getHostAddress();
 6              break;
 7         } finally {
 8              try {
 9                      socket.close();
10              } catch (Throwable e) {
11              }
12        }

服务提供者端口解析顺序:(序号越小越优先)

  • 系统环境变量,变量名:DUBBO_DUBBO_PORT_TO_BIND
  • 系统属性,变量名:DUBBO_DUBBO_PORT_TO_BIND
  • 系统环境变量,变量名:DUBBO_PORT_TO_BIND
  • 系统属性,变量名DUBBO_PORT_TO_BIND
  • dubbo:protocol标签port属性、dubbo:provider标签的port属性。
  • 随机选择一个端口。
    ServiceConfig#doExportUrlsFor1Protocol
1URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

Step8:根据协议名称、协议host、协议端口、contextPath、相关配置属性(application、module、provider、protocolConfig、service及其子标签)构建服务提供者URI。
URL运行效果图:

image3.jpeg

以dubbo协议为例,展示最终服务提供者的URL信息如下:

dubbo://192.168.56.1:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.56.1&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5916&qos.port=22222&side=provider&timestamp=1527168070857

ServiceConfig#doExportUrlsFor1Protocol

1String scope = url.getParameter(Constants.SCOPE_KEY);
2// don't export when none is configured
3if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
4     // 接口暴露实现逻辑
5}

Step9:获取dubbo:service标签的scope属性,其可选值为none(不暴露)、local(本地)、remote(远程),如果配置为none,则不暴露。默认为local。

ServiceConfig#doExportUrlsFor1Protocol

 1// export to local if the config is not remote (export to remote only when config is remote)
 2if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {     // @1
 3       exportLocal(url);
 4}
 5// export to remote if the config is not local (export to local only when config is local)
 6if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {     // @2
 7        if (logger.isInfoEnabled()) {
 8              logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
 9         }
10        if (registryURLs != null && !registryURLs.isEmpty()) {   // @3
11              for (URL registryURL : registryURLs) {
12                    url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));  // @4
13                    URL monitorUrl = loadMonitor(registryURL);       // @5
14                    if (monitorUrl != null) {
15                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());  
16                    }
17                    if (logger.isInfoEnabled()) {
18                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
19                    }
20                   Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));  // @6
21                   DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);    
22                   Exporter<?> exporter = protocol.export(wrapperInvoker);    // @7
23                   exporters.add(exporter);
24               }
25         } else {
26               Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
27               DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
28               Exporter<?> exporter = protocol.export(wrapperInvoker);
29               exporters.add(exporter);
30         }
31}

**Step10:**根据scope来暴露服务,如果scope不配置,则默认本地与远程都会暴露,如果配置成local或remote,那就只能是二选一。
代码@1:如果scope不为remote,则先在本地暴露(injvm):,具体暴露服务的具体实现,将在remote 模式中详细分析。
代码@2:如果scope不为local,则将服务暴露在远程。
代码@3:remote方式,检测当前配置的所有注册中心,如果注册中心不为空,则遍历注册中心,将服务依次在不同的注册中心进行注册。
代码@4:如果dubbo:service的dynamic属性未配置, 尝试取dubbo:registry的dynamic属性,该属性的作用是否启用动态注册,如果设置为false,服务注册后,其状态显示为disable,需要人工启用,当服务不可用时,也不会自动移除,同样需要人工处理,此属性不要在生产环境上配置。
代码@5:根据注册中心url(注册中心url),构建监控中心的URL,如果监控中心URL不为空,则在服务提供者URL上追加monitor,其值为监控中心url(已编码)。

  • 如果dubbo spring xml配置文件中没有配置监控中心(dubbo:monitor),如果从系统属性-Ddubbo.monitor.address,-Ddubbo.monitor.protocol构建MonitorConfig对象,否则从dubbo的properties配置文件中寻找这个两个参数,如果没有配置,则返回null。
  • 如果有配置,则追加相关参数,dubbo:monitor标签只有两个属性:address、protocol,其次会追加interface(MonitorService)、协议等。
    代码@6:通过动态代理机制创建Invoker,dubbo的远程调用实现类。

Dubbo远程调用器如何构建,这里不详细深入,重点关注WrapperInvoker的url为:
image4.jpeg

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6328%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527255510215&pid=6328&qos.port=22222&registry=zookeeper&timestamp=1527255510202,

这里有两个重点值得关注:

  1. path属性:com.alibaba.dubbo.registry.RegistryService,注册中心也类似于服务提供者。
  2. export属性:值为服务提供者的URL,为什么需要关注这个URL呢?请看代码@7,protocol属性为Protocol$Adaptive,Dubbo在加载组件实现类时采用SPI(插件机制,有关于插件机制,在该专题后续文章将重点分析),在这里我们只需要知道,根据URL冒号之前的协议名将会调用相应的方法。

其映射关系(列出与服务启动相关协议实现类):
image5.png

dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol       //文件位于dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
2registry=com.alibaba.dubbo.registry.integration.RegistryProtocol   //文件位于dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol

代码@7:根据代码@6的分析,将调用RegistryProtocol#export方法。

源码分析RegistryProtocol#export方法

调用链:ServiceBean#afterPropertiesSet->ServiceConfig#export->ServiceConfig#doExport->ServiceConfig#doExportUrlsFor1Protocol->RegistryProtocol#export

RegistryProtocol#export

 1@Override
 2    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 3        //export invoker
 4        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);   // @1
 5
 6        URL registryUrl = getRegistryUrl(originInvoker);       // @2
 7
 8        //registry provider
 9        final Registry registry = getRegistry(originInvoker);                          // @3
10        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);     // @4start
11
12        //to judge to delay publish whether or not
13        boolean register = registedProviderUrl.getParameter("register", true);  
14
15        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
16
17        if (register) {  
18            register(registryUrl, registedProviderUrl);      // @4 end
19            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
20        }
21
22        // Subscribe the override data
23        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
24        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);          // @5 start
25        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
26        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
27        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);            // @5 end
28        //Ensure that a new exporter instance is returned every time export
29        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
30    }

代码@1:启动服务提供者服务,监听指定端口,准备服务消费者的请求,这里其实就是从WrapperInvoker中的url(注册中心url)中提取export属性,描述服务提供者的url,然后启动服务提供者。

image6.jpeg
从上图中,可以看出,将调用DubboProtocol#export完成dubbo服务的启动,利用netty构建一个微型服务端,监听端口,准备接受服务消费者的网络请求,本节旨在梳理其启动流程,具体实现细节,将在后续章节中详解,这里我们只要知道,< dubbo:protocol name=“dubbo” port=“20880” />,会再此次监听该端口,然后将dubbo:service的服务handler加入到命令处理器中,当有消息消费者连接该端口时,通过网络解包,将需要调用的服务和参数等信息解析处理后,转交给对应的服务实现类处理即可。

代码@2:获取真实注册中心的URL,例如zookeeper注册中心的

URL:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D10252%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527263060882&pid=10252&qos.port=22222&timestamp=1527263060867

代码@3:根据注册中心URL,从注册中心工厂中获取指定的注册中心实现类:zookeeper注册中心的实现类为:ZookeeperRegistry
代码@4:获取服务提供者URL中的register属性,如果为true,则调用注册中心的ZookeeperRegistry#register方法向注册中心注册服务(实际由其父类FailbackRegistry实现)。
代码@5:服务提供者向注册中心订阅自己,主要是为了服务提供者URL发送变化后重新暴露服务,当然,会将dubbo:reference的check属性设置为false。

到这里就对文章开头提到的问题1,问题2做了一个解答,其与注册中心的心跳机制等将在后续章节中详细分析。

文字看起来可能不是很直观,现整理一下Dubbo服务提供者启动流程图如下:

image7.jpeg

RegistryProtocol#export中调用doLocalExport方法,根据服务暴露协议建立网络通讯服务器,在特定端口建立监听,监听来自消息消费端服务的请求。

RegistryProtocol#doLocalExport:


 1private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
 2        String key = getCacheKey(originInvoker);
 3        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
 4        if (exporter == null) {
 5            synchronized (bounds) {
 6                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
 7                if (exporter == null) {
 8                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));   // @1
 9                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);    // @2
10                    bounds.put(key, exporter);
11                }
12            }
13        }
14        return exporter;
15    }

代码@1:如果服务提供者以dubbo协议暴露服务,getProviderUrl(originInvoker)返回的URL将以dubbo://开头。
代码@2:根据Dubbo内置的SPI机制,将调用DubboProtocol#export方法。

源码分析DubboProtocol#export

 1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
 2        URL url = invoker.getUrl();     // @1
 3        // export service.
 4        String key = serviceKey(url);      // @2
 5        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
 6        exporterMap.put(key, exporter);
 7
 8        //export an stub service for dispatching event
 9        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);    //@3  start
10        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);                                                  
11        if (isStubSupportEvent && !isCallbackservice) {                                                                                                                        
12            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
13            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
14                if (logger.isWarnEnabled()) {
15                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
16                            "], has set stubproxy support event ,but no stub methods founded."));
17                }
18            } else {
19                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);                                                                      
20            }
21        }   // @3 end
22
23        openServer(url);   // @4
24        optimizeSerialization(url);  // @5
25        return exporter;                
26    }

代码@1:获取服务提供者URL,以协议名称,这里是dubbo://开头。
代码@2:从服务提供者URL中获取服务名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880。
代码@3:是否将转发事件导出成stub。
代码@4:根据url打开服务,下面将详细分析其实现。
代码@5:根据url优化器序列化方式。

源码分析DubboProtocol#openServer

 1private void openServer(URL url) {
 2        // find server.
 3        String key = url.getAddress();    // @1
 4        //client can export a service which's only for server to invoke
 5        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
 6        if (isServer) {
 7            ExchangeServer server = serverMap.get(key);           // @2
 8            if (server == null) {
 9                serverMap.put(key, createServer(url));                    //@3
10            } else {
11                // server supports reset, use together with override
12                server.reset(url);                                                       //@4
13            }
14        }
15    }

代码@1:根据url获取网络地址:ip:port,例如:192.168.56.1:20880,服务提供者IP与暴露服务端口号。
代码@2:根据key从服务器缓存中获取,如果存在,则执行代码@4,如果不存在,则执行代码@3。
代码@3:根据URL创建一服务器,Dubbo服务提供者服务器实现类为ExchangeServer。
代码@4:如果服务器已经存在,用当前URL重置服务器,这个不难理解,因为一个Dubbo服务中,会存在多个dubbo:service标签,这些标签都会在服务台提供者的同一个IP地址、端口号上暴露服务。

源码分析DubboProtocol#createServer

 1private ExchangeServer createServer(URL url) {
 2        // send readonly event when server closes, it's enabled by default
 3        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());    // @1
 4        // enable heartbeat by default
 5        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));     // @2
 6        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);  // @3
 7
 8        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))    // @4
 9            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
10
11        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);       // @5
12        ExchangeServer server;
13        try {
14            server = Exchangers.bind(url, requestHandler);    // @6
15        } catch (RemotingException e) {
16            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
17        }
18        str = url.getParameter(Constants.CLIENT_KEY);     //@7
19        if (str != null && str.length() > 0) {
20            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
21            if (!supportedTypes.contains(str)) {
22                throw new RpcException("Unsupported client type: " + str);
23            }
24        }
25        return server;
26    }

代码@1:为服务提供者url增加channel.readonly.sent属性,默认为true,表示在发送请求时,是否等待将字节写入socket后再返回,默认为true。
代码@2:为服务提供者url增加heartbeat属性,表示心跳间隔时间,默认为60*1000,表示60s。
代码@3:为服务提供者url增加server属性,可选值为netty,mina等等,默认为netty。
代码@4:根据SPI机制,判断server属性是否支持。
代码@5:为服务提供者url增加codec属性,默认值为dubbo,协议编码方式。
代码@6:根据服务提供者URI,服务提供者命令请求处理器requestHandler构建ExchangeServer实例。requestHandler的实现具体在以后详细分析Dubbo服务调用时再详细分析。
代码@7:验证客户端类型是否可用。

源码分析Exchangers.bind

根据URL、ExchangeHandler构建服务器

 1public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
 2        if (url == null) {
 3            throw new IllegalArgumentException("url == null");
 4        }
 5        if (handler == null) {
 6            throw new IllegalArgumentException("handler == null");
 7        }
 8        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
 9        return getExchanger(url).bind(url, handler);
10    }

上述代码不难看出,首先根据url获取Exchanger实例,然后调用bind方法构建ExchangeServer,Exchanger接口如下

image1.png

  • ExchangeServer bind(URL url, ExchangeHandler handler) : 服务提供者调用。
  • ExchangeClient connect(URL url, ExchangeHandler handler):服务消费者调用。

dubbo提供的实现类为:HeaderExchanger,其bind方法如下:

HeaderExchanger#bind

1public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
3}

从这里可以看出,端口的绑定由Transporters的bind方法实现。

源码分析Transporters.bind方法

 1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
 2        if (url == null) {
 3            throw new IllegalArgumentException("url == null");
 4        }
 5        if (handlers == null || handlers.length == 0) {
 6            throw new IllegalArgumentException("handlers == null");
 7        }
 8        ChannelHandler handler;
 9        if (handlers.length == 1) {
10            handler = handlers[0];
11        } else {
12            handler = new ChannelHandlerDispatcher(handlers);
13        }
14        return getTransporter().bind(url, handler);
15    }
16
17public static Transporter getTransporter() {
18        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
19}

从这里得知,Dubbo网络传输的接口有Transporter接口实现,其继承类图所示:

image2.png
本文以netty版本来查看一下Transporter实现。

NettyTransporter源码如下:

 1public class NettyTransporter implements Transporter {
 2
 3    public static final String NAME = "netty";
 4
 5    @Override
 6    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
 7        return new NettyServer(url, listener);
 8    }
 9
10    @Override
11    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
12        return new NettyClient(url, listener);
13    }
14}

NettyServer建立网络连接的实现方法为:

 1protected void doOpen() throws Throwable {
 2        NettyHelper.setNettyLoggerFactory();
 3        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
 4        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
 5        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
 6        bootstrap = new ServerBootstrap(channelFactory);
 7
 8        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);      // @1
 9        channels = nettyHandler.getChannels();
10        // https://issues.jboss.org/browse/NETTY-365
11        // https://issues.jboss.org/browse/NETTY-379
12        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
13        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
14            @Override
15            public ChannelPipeline getPipeline() {
16                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
17                ChannelPipeline pipeline = Channels.pipeline();
18                /*int idleTimeout = getIdleTimeout();
19                if (idleTimeout > 10000) {
20                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
21                }*/
22                pipeline.addLast("decoder", adapter.getDecoder());
23                pipeline.addLast("encoder", adapter.getEncoder());
24                pipeline.addLast("handler", nettyHandler);     // @2
25                return pipeline;
26            }
27        });
28        // bind
29        channel = bootstrap.bind(getBindAddress());
30    }

熟悉本方法需要具备Netty的知识,有关源码:阅读Netty系列文章,这里不对每一行代码进行解读,对于与网络相关的参数,将在后续文章中详细讲解,本方法@1、@2引起了我的注意,首先创建NettyServer必须传入一个服务提供者URL,但从DubboProtocol#createServer中可以看出,Server是基于网络套接字(ip:port)缓存的,一个JVM应用中,必然会存在多个dubbo:server标签,就会有多个URL,这里为什么可以这样做呢?从DubboProtocol#createServer中可以看出,在解析第二个dubbo:service标签时并不会调用createServer,而是会调用Server#reset方法,是不是这个方法有什么魔法,在reset方法时能将URL也注册到Server上,那接下来分析NettyServer#reset方法是如何实现的。

源码分析DdubboProtocol#reset

reset方法最终将用Server的reset方法,同样还是以netty版本的NettyServer为例,查看reset方法的实现原理。NettyServer#reset->父类(AbstractServer)

AbstractServer#reset

 1public void reset(URL url) {
 2        if (url == null) {
 3            return;
 4        }
 5        try {                                                                                                       // @1 start
 6            if (url.hasParameter(Constants.ACCEPTS_KEY)) {
 7                int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
 8                if (a > 0) {
 9                    this.accepts = a;
10                }
11            }
12        } catch (Throwable t) {
13            logger.error(t.getMessage(), t);
14        }
15        try {
16            if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
17                int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
18                if (t > 0) {
19                    this.idleTimeout = t;
20                }
21            }
22        } catch (Throwable t) {
23            logger.error(t.getMessage(), t);
24        }
25        try {
26            if (url.hasParameter(Constants.THREADS_KEY)
27                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
28                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
29                int threads = url.getParameter(Constants.THREADS_KEY, 0);
30                int max = threadPoolExecutor.getMaximumPoolSize();
31                int core = threadPoolExecutor.getCorePoolSize();
32                if (threads > 0 && (threads != max || threads != core)) {
33                    if (threads < core) {
34                        threadPoolExecutor.setCorePoolSize(threads);
35                        if (core == max) {
36                            threadPoolExecutor.setMaximumPoolSize(threads);
37                        }
38                    } else {
39                        threadPoolExecutor.setMaximumPoolSize(threads);
40                        if (core == max) {
41                            threadPoolExecutor.setCorePoolSize(threads);
42                        }
43                    }
44                }
45            }
46        } catch (Throwable t) {
47            logger.error(t.getMessage(), t);
48        }              // @1 end
49        super.setUrl(getUrl().addParameters(url.getParameters()));    // @2
50    }

代码@1:首先是调整线程池的相关线程数量,这个好理解。、
代码@2:然后设置调用setUrl覆盖原先NettyServer的private volatile URL url的属性,那为什么不会影响原先注册的dubbo:server呢?

原来NettyHandler上加了注解:@Sharable,由该注解去实现线程安全。

Dubbo服务提供者启动流程将分析到这里了,本文并未对网络细节进行详细分析,旨在梳理出启动流程,有关Dubbo服务网络实现原理将在后续章节中详细分析,敬请期待。

点赞收藏
丁威
请先登录,查看4条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

Redis stream 用做消息队列完美吗?

Redis stream 用做消息队列完美吗?

Netty源码解析:writeAndFlush

Netty源码解析:writeAndFlush

4
4