本节将详细分析Dubbo服务提供者的启动流程,请带着如下几个疑问进行本节的阅读,因为这几个问题将是接下来几篇文章分析的重点内容。
从上文中我们得知,服务提供者启动的核心入口为ServiceBean,本节将从源码级别详细剖析ServcieBean的实现原理,即Dubbo服务提供者的启动流程,ServiceBean的继承层次如图所示,dubbo:service标签的所有属性都被封装在此类图结构中。
ServiceBean#afterPropertiesSet
1if (getProvider() == null) { // @1
2 Map<String, ProviderConfig> provide
3 ConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false); // @2
4 // ...... 具体解析代码省略。
5 }
6}
Step1:如果provider为空,说明dubbo:service标签未设置provider属性,如果一个dubbo:provider标签,则取该实例,如果存在多个dubbo:provider配置则provider属性不能为空,否则抛出异常:“Duplicate provider configs”。
ServiceBean#afterPropertiesSet
1 if (getApplication() == null
2 && (getProvider() == null || getProvider().getApplication() == null)) {
3 Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
4 ApplicationConfig.class, false, false);
5 // ...省略
6 }
Step2:如果application为空,则尝试从BeanFactory中查询dubbo:application实例,如果存在多个dubbo:application配置,则抛出异常:“Duplicate application configs”。
Step3:如果ServiceBean的module为空,则尝试从BeanFactory中查询dubbo:module实例,如果存在多个dubbo:module,则抛出异常:"Duplicate module configs: "。
Step4:尝试从BeanFactory中加载所有的注册中心,注意ServiceBean的List< RegistryConfig> registries属性,为注册中心集合。
Step5:尝试从BeanFacotry中加载一个监控中心,填充ServiceBean的MonitorConfig monitor属性,如果存在多个dubbo:monitor配置,则抛出"Duplicate monitor configs: "。
Step6:尝试从BeanFactory中加载所有的协议,注意:ServiceBean的List< ProtocolConfig> protocols是一个集合,也即一个服务可以通过多种协议暴露给消费者。
ServiceBean#afterPropertiesSet
1if (getPath() == null || getPath().length() == 0) {
2 if (beanName != null && beanName.length() > 0 && getInterface() != null && getInterface().length() > 0 && beanName.startsWith(getInterface())) {
3 setPath(beanName);
4 }
5 }
Step7:设置ServiceBean的path属性,path属性存放的是dubbo:service的beanName(dubbo:service id)。
ServiceBean#afterPropertiesSet
1if (!isDelay()) {
2 export();
3}
Step8:如果为启用延迟暴露机制,则调用export暴露服务。首先看一下isDelay的实现,然后重点分析export的实现原理(服务暴露的整个实现原理)。
ServiceBean#isDelay
1private boolean isDelay() {
2 Integer delay = getDelay();
3 ProviderConfig provider = getProvider();
4 if (delay == null && provider != null) {
5 delay = provider.getDelay();
6 }
7 return supportedApplicationListener && (delay == null || delay == -1);
8 }
如果有设置dubbo:service或dubbo:provider的属性delay,或配置delay为-1,都表示启用延迟机制,单位为毫秒,设置为-1,表示等到Spring容器初始化后再暴露服务。从这里也可以看出,Dubbo暴露服务的处理入口为ServiceBean#export->ServiceConfig#export。
调用链:ServiceBean#afterPropertiesSet->ServiceConfig#export
1public synchronized void export() {
2 if (provider != null) {
3 if (export == null) {
4 export = provider.getExport();
5 }
6 if (delay == null) {
7 delay = provider.getDelay();
8 }
9 }
10 if (export != null && !export) { // @1
11 return;
12 }
13
14 if (delay != null && delay > 0) { // @2
15 delayExportExecutor.schedule(new Runnable() {
16 @Override
17 public void run() {
18 doExport();
19 }
20 }, delay, TimeUnit.MILLISECONDS);
21 } else {
22 doExport(); //@3
23 }
24 }
代码@1:判断是否暴露服务,由dubbo:service export="true|false"来指定。
代码@2:如果启用了delay机制,如果delay大于0,表示延迟多少毫秒后暴露服务,使用ScheduledExecutorService延迟调度,最终调用doExport方法。
代码@3:执行具体的暴露逻辑doExport,需要大家留意:delay=-1的处理逻辑(基于Spring事件机制触发)。
调用链:ServiceBean#afterPropertiesSet调用ServiceConfig#export->ServiceConfig#doExport
ServiceConfig#checkDefault
1private void checkDefault() {
2 if (provider == null) {
3 provider = new ProviderConfig();
4 }
5 appendProperties(provider);
6 }
Step1:如果dubbo:servce标签也就是ServiceBean的provider属性为空,调用appendProperties方法,填充默认属性,其具体加载顺序:
从系统属性加载对应参数值,参数键:dubbo.provider.属性名,可通过System.getProperty获取。
加载属性配置文件的值。属性配置文件,可通过系统属性:dubbo.properties.file,如果该值未配置,则默认取dubbo.properties属性配置文件。
ServiceConfig#doExport
1if (ref instanceof GenericService) {
2 interfaceClass = GenericService.class;
3 if (StringUtils.isEmpty(generic)) {
4 generic = Boolean.TRUE.toString();
5 }
6 } else {
7 try {
8 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
9 .getContextClassLoader());
10 } catch (ClassNotFoundException e) {
11 throw new IllegalStateException(e.getMessage(), e);
12 }
13 checkInterfaceAndMethods(interfaceClass, methods);
14 checkRef();
15 generic = Boolean.FALSE.toString();
16 }
Step2:校验ref与interface属性。如果ref是GenericService,则为dubbo的泛化实现,然后验证interface接口与ref引用的类型是否一致。
ServiceConfig#doExport
1if (local != null) {
2 if ("true".equals(local)) {
3 local = interfaceName + "Local";
4 }
5 Class<?> localClass;
6 try {
7 localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
8 } catch (ClassNotFoundException e) {
9 throw new IllegalStateException(e.getMessage(), e);
10 }
11 if (!interfaceClass.isAssignableFrom(localClass)) {
12 throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
13 }
14 }
Step3:dubbo:service local机制,已经废弃,被stub属性所替换。
Step4:处理本地存根Stub(图片来源于Dubbo官方文档)。
ServiceConfig#doExport
1checkApplication();
2checkRegistry();
3checkProtocol();
4appendProperties(this);
Step5:校验ServiceBean的application、registry、protocol是否为空,并从系统属性(优先)、资源文件中填充其属性。
系统属性、资源文件属性的配置如下:
ServiceConfig#doExport
1checkStubAndMock(interfaceClass);
Step6:校验stub、mock类的合理性,是否是interface的实现类。
ServiceConfig#doExport
1doExportUrls();
Step7:执行doExportUrls()方法暴露服务,接下来会重点分析该方法。
ServiceConfig#doExport
1ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
2ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
Step8:将服务提供者信息注册到ApplicationModel实例中。
源码分析ServiceConfig#doExportUrls暴露服务具体实现逻辑
调用链:ServiceBean#afterPropertiesSet->ServiceConfig#export>ServiceConfig#doExport
1private void doExportUrls() {
2 List<URL> registryURLs = loadRegistries(true); // @1
3 for (ProtocolConfig protocolConfig : protocols) {
4 doExportUrlsFor1Protocol(protocolConfig, registryURLs); // @2
5 }
6 }
代码@1:首先遍历ServiceBean的List< RegistryConfig> registries(所有注册中心的配置信息),然后将地址封装成URL对象,关于注册中心的所有配置属性,最终转换成url的属性(?属性名=属性值),loadRegistries(true),参数的意思:true,代表服务提供者,false:代表服务消费者,如果是服务提供者,则检测注册中心的配置,如果配置了register=“false”,则忽略该地址,如果是服务消费者,并配置了subscribe="false"则表示不从该注册中心订阅服务,故也不返回,一个注册中心URL示例:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=7072&qos.port=22222®istry=zookeeper×tamp=1527308268041
代码@2:然后遍历配置的所有协议,根据每个协议,向注册中心暴露服务,接下来重点分析doExportUrlsFor1Protocol方法的实现细节。
源码分析doExportUrlsFor1Protocol
调用链:ServiceBean#afterPropertiesSet->ServiceConfig#export>ServiceConfig#doExport->ServiceConfig#doExportUrlsFor1Protocol
ServiceConfig#doExportUrlsFor1Protocol
1String name = protocolConfig.getName();
2if (name == null || name.length() == 0) {
3 name = "dubbo";
4}
5Map<String, String> map = new HashMap<String, String>();
6map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
7map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
8map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
9if (ConfigUtils.getPid() > 0) {
10 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
11}
12appendParameters(map, application);
13appendParameters(map, module);
14appendParameters(map, provider, Constants.DEFAULT_KEY);
15appendParameters(map, protocolConfig);
16appendParameters(map, this);
Step1:用Map存储该协议的所有配置参数,包括协议名称、dubbo版本、当前系统时间戳、进程ID、application配置、module配置、默认服务提供者参数(ProviderConfig)、协议配置、服务提供Dubbo:service的属性。
ServiceConfig#doExportUrlsFor1Protocol
1if (methods != null && !methods.isEmpty()) {
2 for (MethodConfig method : methods) {
3 appendParameters(map, method, method.getName());
4 String retryKey = method.getName() + ".retry";
5 if (map.containsKey(retryKey)) {
6 String retryValue = map.remove(retryKey);
7 if ("false".equals(retryValue)) {
8 map.put(method.getName() + ".retries", "0");
9 }
10 }
11 List<ArgumentConfig> arguments = method.getArguments();
12 if (arguments != null && !arguments.isEmpty()) {
13 for (ArgumentConfig argument : arguments) {
14 // convert argument type
15 if (argument.getType() != null && argument.getType().length() > 0) {
16 Method[] methods = interfaceClass.getMethods();
17 // visit all methods
18 if (methods != null && methods.length > 0) {
19 for (int i = 0; i < methods.length; i++) {
20 String methodName = methods[i].getName();
21 // target the method, and get its signature
22 if (methodName.equals(method.getName())) {
23 Class<?>[] argtypes = methods[i].getParameterTypes();
24 // one callback in the method
25 if (argument.getIndex() != -1) {
26 if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
27 appendParameters(map, argument, method.getName() + "." + argument.getIndex());
28 } else {
29 throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" +
30 argument.getType());
31 }
32 } else {
33 // multiple callbacks in the method
34 for (int j = 0; j < argtypes.length; j++) {
35 Class<?> argclazz = argtypes[j];
36 if (argclazz.getName().equals(argument.getType())) {
37 appendParameters(map, argument, method.getName() + "." + j);
38 if (argument.getIndex() != -1 && argument.getIndex() != j) {
39 throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ",
40 type:" + argument.getType());
41 }
42 }
43 }
44 }
45 }
46 }
47 }
48 } else if (argument.getIndex() != -1) {
49 appendParameters(map, argument, method.getName() + "." + argument.getIndex());
50 } else {
51 throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
52 }
53
54 }
55 }
56 } // end of methods for
57 }
Step2:如果dubbo:service有dubbo:method子标签,则dubbo:method以及其子标签的配置属性,都存入到Map中,属性名称加上对应的方法名作为前缀。dubbo:method的子标签dubbo:argument,其键为方法名.参数序号。
ServiceConfig#doExportUrlsFor1Protocol
1if (ProtocolUtils.isGeneric(generic)) {
2 map.put(Constants.GENERIC_KEY, generic);
3 map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
4 } else {
5 String revision = Version.getVersion(interfaceClass, version);
6 if (revision != null && revision.length() > 0) {
7 map.put("revision", revision);
8 }
9 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
10 if (methods.length == 0) {
11 logger.warn("NO method found in service interface " + interfaceClass.getName());
12 map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
13 } else {
14 map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
15 }
16}
Step3:添加methods键值对,存放dubbo:service的所有方法名,多个方法名用,隔开,如果是泛化实现,填充genric=true,methods为"*";
ServiceConfig#doExportUrlsFor1Protocol
1if (!ConfigUtils.isEmpty(token)) {
2 if (ConfigUtils.isDefault(token)) {
3 map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
4 } else {
5 map.put(Constants.TOKEN_KEY, token);
6 }
7}
Step4:根据是否开启令牌机制,如果开启,设置token键,值为静态值或uuid。
ServiceConfig#doExportUrlsFor1Protocol
1if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
2 protocolConfig.setRegister(false);
3 map.put("notify", "false");
4}
Step5:如果协议为本地协议(injvm),则设置protocolConfig#register属性为false,表示不向注册中心注册服务,在map中存储键为notify,值为false,表示当注册中心监听到服务提供者发送变化(服务提供者增加、服务提供者减少等事件时不通知。
ServiceConfig#doExportUrlsFor1Protocol
1// export service
2String contextPath = protocolConfig.getContextpath();
3if ((contextPath == null || contextPath.length() == 0) && provider != null) {
4 contextPath = provider.getContextpath();
5}
Step6:设置协议的contextPath,如果未配置,默认为/interfacename
ServiceConfig#doExportUrlsFor1Protocol
1String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
2Integer port = this.findConfigedPorts(protocolConfig, name, map);
Step7:解析服务提供者的IP地址与端口。
服务IP地址解析顺序:(序号越小越优先)
1 public static boolean isInvalidLocalHost(String host) {
2 return host == null
3 || host.length() == 0
4 || host.equalsIgnoreCase("localhost")
5 || host.equals("0.0.0.0")
6 || (LOCAL_IP_PATTERN.matcher(host).matches());
7 }
选择第一个可用网卡,其实现方式是建立socket,连接注册中心,获取socket的IP地址。其代码:
1Socket socket = new Socket();
2 try {
3 SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
4 socket.connect(addr, 1000);
5 hostToBind = socket.getLocalAddress().getHostAddress();
6 break;
7 } finally {
8 try {
9 socket.close();
10 } catch (Throwable e) {
11 }
12 }
服务提供者端口解析顺序:(序号越小越优先)
1URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
Step8:根据协议名称、协议host、协议端口、contextPath、相关配置属性(application、module、provider、protocolConfig、service及其子标签)构建服务提供者URI。
URL运行效果图:
以dubbo协议为例,展示最终服务提供者的URL信息如下:
dubbo://192.168.56.1:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.56.1&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5916&qos.port=22222&side=provider×tamp=1527168070857
ServiceConfig#doExportUrlsFor1Protocol
1String scope = url.getParameter(Constants.SCOPE_KEY);
2// don't export when none is configured
3if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
4 // 接口暴露实现逻辑
5}
Step9:获取dubbo:service标签的scope属性,其可选值为none(不暴露)、local(本地)、remote(远程),如果配置为none,则不暴露。默认为local。
ServiceConfig#doExportUrlsFor1Protocol
1// export to local if the config is not remote (export to remote only when config is remote)
2if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { // @1
3 exportLocal(url);
4}
5// export to remote if the config is not local (export to local only when config is local)
6if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { // @2
7 if (logger.isInfoEnabled()) {
8 logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
9 }
10 if (registryURLs != null && !registryURLs.isEmpty()) { // @3
11 for (URL registryURL : registryURLs) {
12 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); // @4
13 URL monitorUrl = loadMonitor(registryURL); // @5
14 if (monitorUrl != null) {
15 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
16 }
17 if (logger.isInfoEnabled()) {
18 logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
19 }
20 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // @6
21 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
22 Exporter<?> exporter = protocol.export(wrapperInvoker); // @7
23 exporters.add(exporter);
24 }
25 } else {
26 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
27 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
28 Exporter<?> exporter = protocol.export(wrapperInvoker);
29 exporters.add(exporter);
30 }
31}
**Step10:**根据scope来暴露服务,如果scope不配置,则默认本地与远程都会暴露,如果配置成local或remote,那就只能是二选一。
代码@1:如果scope不为remote,则先在本地暴露(injvm):,具体暴露服务的具体实现,将在remote 模式中详细分析。
代码@2:如果scope不为local,则将服务暴露在远程。
代码@3:remote方式,检测当前配置的所有注册中心,如果注册中心不为空,则遍历注册中心,将服务依次在不同的注册中心进行注册。
代码@4:如果dubbo:service的dynamic属性未配置, 尝试取dubbo:registry的dynamic属性,该属性的作用是否启用动态注册,如果设置为false,服务注册后,其状态显示为disable,需要人工启用,当服务不可用时,也不会自动移除,同样需要人工处理,此属性不要在生产环境上配置。
代码@5:根据注册中心url(注册中心url),构建监控中心的URL,如果监控中心URL不为空,则在服务提供者URL上追加monitor,其值为监控中心url(已编码)。
Dubbo远程调用器如何构建,这里不详细深入,重点关注WrapperInvoker的url为:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6328%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527255510215&pid=6328&qos.port=22222®istry=zookeeper×tamp=1527255510202,
这里有两个重点值得关注:
其映射关系(列出与服务启动相关协议实现类):
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol //文件位于dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
2registry=com.alibaba.dubbo.registry.integration.RegistryProtocol //文件位于dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
代码@7:根据代码@6的分析,将调用RegistryProtocol#export方法。
调用链:ServiceBean#afterPropertiesSet->ServiceConfig#export->ServiceConfig#doExport->ServiceConfig#doExportUrlsFor1Protocol->RegistryProtocol#export
RegistryProtocol#export
1@Override
2 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
3 //export invoker
4 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // @1
5
6 URL registryUrl = getRegistryUrl(originInvoker); // @2
7
8 //registry provider
9 final Registry registry = getRegistry(originInvoker); // @3
10 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); // @4start
11
12 //to judge to delay publish whether or not
13 boolean register = registedProviderUrl.getParameter("register", true);
14
15 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
16
17 if (register) {
18 register(registryUrl, registedProviderUrl); // @4 end
19 ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
20 }
21
22 // Subscribe the override data
23 // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
24 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); // @5 start
25 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
26 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
27 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // @5 end
28 //Ensure that a new exporter instance is returned every time export
29 return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
30 }
代码@1:启动服务提供者服务,监听指定端口,准备服务消费者的请求,这里其实就是从WrapperInvoker中的url(注册中心url)中提取export属性,描述服务提供者的url,然后启动服务提供者。
从上图中,可以看出,将调用DubboProtocol#export完成dubbo服务的启动,利用netty构建一个微型服务端,监听端口,准备接受服务消费者的网络请求,本节旨在梳理其启动流程,具体实现细节,将在后续章节中详解,这里我们只要知道,< dubbo:protocol name=“dubbo” port=“20880” />,会再此次监听该端口,然后将dubbo:service的服务handler加入到命令处理器中,当有消息消费者连接该端口时,通过网络解包,将需要调用的服务和参数等信息解析处理后,转交给对应的服务实现类处理即可。
代码@2:获取真实注册中心的URL,例如zookeeper注册中心的
URL:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D10252%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527263060882&pid=10252&qos.port=22222×tamp=1527263060867
代码@3:根据注册中心URL,从注册中心工厂中获取指定的注册中心实现类:zookeeper注册中心的实现类为:ZookeeperRegistry
代码@4:获取服务提供者URL中的register属性,如果为true,则调用注册中心的ZookeeperRegistry#register方法向注册中心注册服务(实际由其父类FailbackRegistry实现)。
代码@5:服务提供者向注册中心订阅自己,主要是为了服务提供者URL发送变化后重新暴露服务,当然,会将dubbo:reference的check属性设置为false。
到这里就对文章开头提到的问题1,问题2做了一个解答,其与注册中心的心跳机制等将在后续章节中详细分析。
文字看起来可能不是很直观,现整理一下Dubbo服务提供者启动流程图如下:
RegistryProtocol#export中调用doLocalExport方法,根据服务暴露协议建立网络通讯服务器,在特定端口建立监听,监听来自消息消费端服务的请求。
RegistryProtocol#doLocalExport:
1private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
2 String key = getCacheKey(originInvoker);
3 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
4 if (exporter == null) {
5 synchronized (bounds) {
6 exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
7 if (exporter == null) {
8 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // @1
9 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // @2
10 bounds.put(key, exporter);
11 }
12 }
13 }
14 return exporter;
15 }
代码@1:如果服务提供者以dubbo协议暴露服务,getProviderUrl(originInvoker)返回的URL将以dubbo://开头。
代码@2:根据Dubbo内置的SPI机制,将调用DubboProtocol#export方法。
1public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2 URL url = invoker.getUrl(); // @1
3 // export service.
4 String key = serviceKey(url); // @2
5 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
6 exporterMap.put(key, exporter);
7
8 //export an stub service for dispatching event
9 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); //@3 start
10 Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
11 if (isStubSupportEvent && !isCallbackservice) {
12 String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
13 if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
14 if (logger.isWarnEnabled()) {
15 logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
16 "], has set stubproxy support event ,but no stub methods founded."));
17 }
18 } else {
19 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
20 }
21 } // @3 end
22
23 openServer(url); // @4
24 optimizeSerialization(url); // @5
25 return exporter;
26 }
代码@1:获取服务提供者URL,以协议名称,这里是dubbo://开头。
代码@2:从服务提供者URL中获取服务名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880。
代码@3:是否将转发事件导出成stub。
代码@4:根据url打开服务,下面将详细分析其实现。
代码@5:根据url优化器序列化方式。
1private void openServer(URL url) {
2 // find server.
3 String key = url.getAddress(); // @1
4 //client can export a service which's only for server to invoke
5 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
6 if (isServer) {
7 ExchangeServer server = serverMap.get(key); // @2
8 if (server == null) {
9 serverMap.put(key, createServer(url)); //@3
10 } else {
11 // server supports reset, use together with override
12 server.reset(url); //@4
13 }
14 }
15 }
代码@1:根据url获取网络地址:ip:port,例如:192.168.56.1:20880,服务提供者IP与暴露服务端口号。
代码@2:根据key从服务器缓存中获取,如果存在,则执行代码@4,如果不存在,则执行代码@3。
代码@3:根据URL创建一服务器,Dubbo服务提供者服务器实现类为ExchangeServer。
代码@4:如果服务器已经存在,用当前URL重置服务器,这个不难理解,因为一个Dubbo服务中,会存在多个dubbo:service标签,这些标签都会在服务台提供者的同一个IP地址、端口号上暴露服务。
1private ExchangeServer createServer(URL url) {
2 // send readonly event when server closes, it's enabled by default
3 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // @1
4 // enable heartbeat by default
5 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @2
6 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @3
7
8 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) // @4
9 throw new RpcException("Unsupported server type: " + str + ", url: " + url);
10
11 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // @5
12 ExchangeServer server;
13 try {
14 server = Exchangers.bind(url, requestHandler); // @6
15 } catch (RemotingException e) {
16 throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
17 }
18 str = url.getParameter(Constants.CLIENT_KEY); //@7
19 if (str != null && str.length() > 0) {
20 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
21 if (!supportedTypes.contains(str)) {
22 throw new RpcException("Unsupported client type: " + str);
23 }
24 }
25 return server;
26 }
代码@1:为服务提供者url增加channel.readonly.sent属性,默认为true,表示在发送请求时,是否等待将字节写入socket后再返回,默认为true。
代码@2:为服务提供者url增加heartbeat属性,表示心跳间隔时间,默认为60*1000,表示60s。
代码@3:为服务提供者url增加server属性,可选值为netty,mina等等,默认为netty。
代码@4:根据SPI机制,判断server属性是否支持。
代码@5:为服务提供者url增加codec属性,默认值为dubbo,协议编码方式。
代码@6:根据服务提供者URI,服务提供者命令请求处理器requestHandler构建ExchangeServer实例。requestHandler的实现具体在以后详细分析Dubbo服务调用时再详细分析。
代码@7:验证客户端类型是否可用。
根据URL、ExchangeHandler构建服务器
1public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2 if (url == null) {
3 throw new IllegalArgumentException("url == null");
4 }
5 if (handler == null) {
6 throw new IllegalArgumentException("handler == null");
7 }
8 url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
9 return getExchanger(url).bind(url, handler);
10 }
上述代码不难看出,首先根据url获取Exchanger实例,然后调用bind方法构建ExchangeServer,Exchanger接口如下
dubbo提供的实现类为:HeaderExchanger,其bind方法如下:
HeaderExchanger#bind
1public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
2 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
3}
从这里可以看出,端口的绑定由Transporters的bind方法实现。
1public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
2 if (url == null) {
3 throw new IllegalArgumentException("url == null");
4 }
5 if (handlers == null || handlers.length == 0) {
6 throw new IllegalArgumentException("handlers == null");
7 }
8 ChannelHandler handler;
9 if (handlers.length == 1) {
10 handler = handlers[0];
11 } else {
12 handler = new ChannelHandlerDispatcher(handlers);
13 }
14 return getTransporter().bind(url, handler);
15 }
16
17public static Transporter getTransporter() {
18 return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
19}
从这里得知,Dubbo网络传输的接口有Transporter接口实现,其继承类图所示:
本文以netty版本来查看一下Transporter实现。
NettyTransporter源码如下:
1public class NettyTransporter implements Transporter {
2
3 public static final String NAME = "netty";
4
5 @Override
6 public Server bind(URL url, ChannelHandler listener) throws RemotingException {
7 return new NettyServer(url, listener);
8 }
9
10 @Override
11 public Client connect(URL url, ChannelHandler listener) throws RemotingException {
12 return new NettyClient(url, listener);
13 }
14}
NettyServer建立网络连接的实现方法为:
1protected void doOpen() throws Throwable {
2 NettyHelper.setNettyLoggerFactory();
3 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
4 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
5 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
6 bootstrap = new ServerBootstrap(channelFactory);
7
8 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // @1
9 channels = nettyHandler.getChannels();
10 // https://issues.jboss.org/browse/NETTY-365
11 // https://issues.jboss.org/browse/NETTY-379
12 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
13 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
14 @Override
15 public ChannelPipeline getPipeline() {
16 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
17 ChannelPipeline pipeline = Channels.pipeline();
18 /*int idleTimeout = getIdleTimeout();
19 if (idleTimeout > 10000) {
20 pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
21 }*/
22 pipeline.addLast("decoder", adapter.getDecoder());
23 pipeline.addLast("encoder", adapter.getEncoder());
24 pipeline.addLast("handler", nettyHandler); // @2
25 return pipeline;
26 }
27 });
28 // bind
29 channel = bootstrap.bind(getBindAddress());
30 }
熟悉本方法需要具备Netty的知识,有关源码:阅读Netty系列文章,这里不对每一行代码进行解读,对于与网络相关的参数,将在后续文章中详细讲解,本方法@1、@2引起了我的注意,首先创建NettyServer必须传入一个服务提供者URL,但从DubboProtocol#createServer中可以看出,Server是基于网络套接字(ip:port)缓存的,一个JVM应用中,必然会存在多个dubbo:server标签,就会有多个URL,这里为什么可以这样做呢?从DubboProtocol#createServer中可以看出,在解析第二个dubbo:service标签时并不会调用createServer,而是会调用Server#reset方法,是不是这个方法有什么魔法,在reset方法时能将URL也注册到Server上,那接下来分析NettyServer#reset方法是如何实现的。
reset方法最终将用Server的reset方法,同样还是以netty版本的NettyServer为例,查看reset方法的实现原理。NettyServer#reset->父类(AbstractServer)
AbstractServer#reset
1public void reset(URL url) {
2 if (url == null) {
3 return;
4 }
5 try { // @1 start
6 if (url.hasParameter(Constants.ACCEPTS_KEY)) {
7 int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
8 if (a > 0) {
9 this.accepts = a;
10 }
11 }
12 } catch (Throwable t) {
13 logger.error(t.getMessage(), t);
14 }
15 try {
16 if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
17 int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
18 if (t > 0) {
19 this.idleTimeout = t;
20 }
21 }
22 } catch (Throwable t) {
23 logger.error(t.getMessage(), t);
24 }
25 try {
26 if (url.hasParameter(Constants.THREADS_KEY)
27 && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
28 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
29 int threads = url.getParameter(Constants.THREADS_KEY, 0);
30 int max = threadPoolExecutor.getMaximumPoolSize();
31 int core = threadPoolExecutor.getCorePoolSize();
32 if (threads > 0 && (threads != max || threads != core)) {
33 if (threads < core) {
34 threadPoolExecutor.setCorePoolSize(threads);
35 if (core == max) {
36 threadPoolExecutor.setMaximumPoolSize(threads);
37 }
38 } else {
39 threadPoolExecutor.setMaximumPoolSize(threads);
40 if (core == max) {
41 threadPoolExecutor.setCorePoolSize(threads);
42 }
43 }
44 }
45 }
46 } catch (Throwable t) {
47 logger.error(t.getMessage(), t);
48 } // @1 end
49 super.setUrl(getUrl().addParameters(url.getParameters())); // @2
50 }
代码@1:首先是调整线程池的相关线程数量,这个好理解。、
代码@2:然后设置调用setUrl覆盖原先NettyServer的private volatile URL url的属性,那为什么不会影响原先注册的dubbo:server呢?
原来NettyHandler上加了注解:@Sharable,由该注解去实现线程安全。
Dubbo服务提供者启动流程将分析到这里了,本文并未对网络细节进行详细分析,旨在梳理出启动流程,有关Dubbo服务网络实现原理将在后续章节中详细分析,敬请期待。