通过前面文章详解,我们知道Dubbo服务消费者标签dubbo:reference最终会在Spring容器中创建一个对应的ReferenceBean实例,而ReferenceBean实现了Spring生命周期接口:InitializingBean,接下来应该看一下其afterPropertiesSet方法的实现。
ReferenceBean#afterPropertiesSet
1if (getConsumer() == null) {
2 Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
3 ConsumerConfig.class, false, false);
4 if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
5 ConsumerConfig consumerConfig = null;
6 for (ConsumerConfig config : consumerConfigMap.values()) {
7 if (config.isDefault() == null || config.isDefault().booleanValue()) {
8 if (consumerConfig != null) {
9 throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
10 }
11 consumerConfig = config;
12 }
13 }
14 if (consumerConfig != null) {
15 setConsumer(consumerConfig);
16 }
17 }
18 }
Step1:如果consumer为空,说明dubbo:reference标签未设置consumer属性,如果一个dubbo:consumer标签,则取该实例,如果存在多个dubbo:consumer 配置,则consumer必须设置,否则会抛出异常:“Duplicate consumer configs”。
Step2:如果application为空,则尝试从BeanFactory中查询dubbo:application实例,如果存在多个dubbo:application配置,则抛出异常:“Duplicate application configs”。
Step3:如果ServiceBean的module为空,则尝试从BeanFactory中查询dubbo:module实例,如果存在多个dubbo:module,则抛出异常:"Duplicate module configs: "。
Step4:尝试从BeanFactory中加载所有的注册中心,注意ServiceBean的List< RegistryConfig> registries属性,为注册中心集合。
Step5:尝试从BeanFacotry中加载一个监控中心,填充ServiceBean的MonitorConfig monitor属性,如果存在多个dubbo:monitor配置,则抛出"Duplicate monitor configs: "。
ReferenceBean#afterPropertiesSet
1Boolean b = isInit();
2if (b == null && getConsumer() != null) {
3 b = getConsumer().isInit();
4}
5if (b != null && b.booleanValue()) {
6 getObject();
7}
Step6:判断是否初始化,如果为初始化,则调用getObject()方法,该方法也是FactoryBean定义的方法,ReferenceBean是dubbo:reference所真实引用的类(interface)的实例工程,getObject发返回的是interface的实例,而不是ReferenceBean实例。
1public Object getObject() throws Exception {
2 return get();
3}
ReferenceBean#getObject()方法直接调用其父类的get方法,get方法内部调用init()方法进行初始化
ReferenceConfig#init
1if (initialized) {
2 return;
3 }
4initialized = true;
5if (interfaceName == null || interfaceName.length() == 0) {
6 throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
7}
Step1:如果已经初始化,直接返回,如果interfaceName为空,则抛出异常。
ReferenceConfig#init调用ReferenceConfig#checkDefault
1private void checkDefault() {
2 if (consumer == null) {
3 consumer = new ConsumerConfig();
4 }
5 appendProperties(consumer);
6 }
Step2:如果dubbo:reference标签也就是ReferenceBean的consumer属性为空,调用appendProperties方法,填充默认属性,其具体加载顺序:
从系统属性加载对应参数值,参数键:dubbo.consumer.属性名,从系统属性中获取属性值的方法为:System.getProperty(key)。
加载属性配置文件的值。属性配置文件,可通过系统属性:dubbo.properties.file,如果该值未配置,则默认取dubbo.properties属性配置文件。
ReferenceConfig#init
1 appendProperties(this);
Step3:调用appendProperties方法,填充ReferenceBean的属性,属性值来源与step2一样,当然只填充ReferenceBean中属性为空的属性。
ReferenceConfig#init
1if (getGeneric() == null && getConsumer() != null) {
2 setGeneric(getConsumer().getGeneric());
3}
4if (ProtocolUtils.isGeneric(getGeneric())) {
5 interfaceClass = GenericService.class;
6} else {
7 try {
8 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
9 } catch (ClassNotFoundException e) {
10 throw new IllegalStateException(e.getMessage(), e);
11 }
12 checkInterfaceAndMethods(interfaceClass, methods);
13}
Step4:如果使用返回引用,将interface值替换为GenericService全路径名,如果不是,则加载interfacename,并检验dubbo:reference子标签dubbo:method引用的方法是否在interface指定的接口中存在。
ReferenceConfig#init
1String resolve = System.getProperty(interfaceName); // @1
2String resolveFile = null;
3if (resolve == null || resolve.length() == 0) { // @2
4 resolveFile = System.getProperty("dubbo.resolve.file"); // @3 start
5 if (resolveFile == null || resolveFile.length() == 0) {
6 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
7 if (userResolveFile.exists()) {
8 resolveFile = userResolveFile.getAbsolutePath();
9 }
10 } // @3 end
11 if (resolveFile != null && resolveFile.length() > 0) { // @4
12 Properties properties = new Properties();
13 FileInputStream fis = null;
14 try {
15 fis = new FileInputStream(new File(resolveFile));
16 properties.load(fis);
17 } catch (IOException e) {
18 throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
19 } finally {
20 try {
21 if (null != fis) fis.close();
22 } catch (IOException e) {
23 logger.warn(e.getMessage(), e);
24 }
25 }
26 resolve = properties.getProperty(interfaceName);
27 }
28 }
29 if (resolve != null && resolve.length() > 0) { // @5
30 url = resolve;
31 if (logger.isWarnEnabled()) {
32 if (resolveFile != null && resolveFile.length() > 0) {
33 logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
34 } else {
35 logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
36 }
37 }
38}
Step5:处理dubbo服务消费端resolve机制,也就是说消息消费者只连服务提供者,绕过注册中心。
ReferenceConfig#init
1checkApplication();
2checkStubAndMock(interfaceClass);
Step6:校验ReferenceBean的application是否为空,如果为空,new 一个application,并尝试从系统属性(优先)、资源文件中填充其属性;同时校验stub、mock实现类与interface的兼容性。系统属性、资源文件属性的配置如下:
application dubbo.application.属性名,例如 dubbo.application.name
ReferenceConfig#init
1Map<String, String> map = new HashMap<String, String>();
2Map<Object, Object> attributes = new HashMap<Object, Object>();
3map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
4map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
5map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
6if (ConfigUtils.getPid() > 0) {
7 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
8}
Step7:构建Map,封装服务消费者引用服务提供者URL的属性,这里主要填充side:consume(消费端)、dubbo:2.0.0(版本)、timestamp、pid:进程ID。
ReferenceConfig#init
1if (!isGeneric()) {
2 String revision = Version.getVersion(interfaceClass, version);
3 if (revision != null && revision.length() > 0) {
4 map.put("revision", revision);
5 }
6 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
7 if (methods.length == 0) {
8 logger.warn("NO method found in service interface " + interfaceClass.getName());
9 map.put("methods", Constants.ANY_VALUE);
10 } else {
11 map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
12 }
13
14}
Step8:如果不是泛化引用,增加methods:interface的所有方法名,多个用逗号隔开。
ReferenceConfig#init
1map.put(Constants.INTERFACE_KEY, interfaceName);
2appendParameters(map, application);
3appendParameters(map, module);
4appendParameters(map, consumer, Constants.DEFAULT_KEY);
5appendParameters(map, this);
Step9:用Map存储application配置、module配置、默认消费者参数(ConsumerConfig)、服务消费者dubbo:reference的属性。
ReferenceConfig#init
1String prefix = StringUtils.getServiceKey(map);
2if (methods != null && !methods.isEmpty()) {
3 for (MethodConfig method : methods) {
4 appendParameters(map, method, method.getName());
5 String retryKey = method.getName() + ".retry";
6 if (map.containsKey(retryKey)) {
7 String retryValue = map.remove(retryKey);
8 if ("false".equals(retryValue)) {
9 map.put(method.getName() + ".retries", "0");
10 }
11 }
12 appendAttributes(attributes, method, prefix + "." + method.getName());
13 checkAndConvertImplicitConfig(method, map, attributes);
14 }
15}
Step10:获取服务键值 /{group}/interface:版本,如果group为空,则为interface:版本,其值存为prifex,然后将dubbo:method的属性名称也填入map中,键前缀为dubbo.method.methodname.属性名。dubbo:method的子标签dubbo:argument标签的属性也追加到attributes map中,键为 prifex + methodname.属性名。
ReferenceConfig#init
1String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
2if (hostToRegistry == null || hostToRegistry.length() == 0) {
3 hostToRegistry = NetUtils.getLocalHost();
4} else if (isInvalidLocalHost(hostToRegistry)) {
5 throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" +
6 hostToRegistry);
7}
8map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
Step11:填充register.ip属性,该属性是消息消费者连接注册中心的IP,并不是注册中心自身的IP。
ReferenceConfig#init
1ref = createProxy(map);
Step12:调用createProxy方法创建消息消费者代理,下面详细分析其实现细节。
ReferenceConfig#init
1ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
2ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
Step13:将消息消费者缓存在ApplicationModel中。
ReferenceConfig#createProxy
1URL tmpUrl = new URL("temp", "localhost", 0, map);
2final boolean isJvmRefer;
3if (isInjvm() == null) {
4 if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
5 isJvmRefer = false;
6 } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
7 // by default, reference local service if there is
8 isJvmRefer = true;
9 } else {
10 isJvmRefer = false;
11 }
12} else {
13 isJvmRefer = isInjvm().booleanValue();
14}
Step1:判断该消费者是否是引用本(JVM)内提供的服务。
如果dubbo:reference标签的injvm(已过期,被local属性替换)如果不为空,则直接取该值,如果该值未配置,则判断ReferenceConfig的url属性是否为空,如果不为空,则isJvmRefer =false,表明该服务消费者将直连该URL的服务提供者;如果url属性为空,则判断该协议是否是isInjvm,其实现逻辑:获取dubbo:reference的scop属性,根据其值判断:
ReferenceConfig#createProxy
1if (isJvmRefer) {
2 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
3 invoker = refprotocol.refer(interfaceClass, url);
4 if (logger.isInfoEnabled()) {
5 logger.info("Using injvm service " + interfaceClass.getName());
6 }
7}
Step2:如果消费者引用本地JVM中的服务,则利用InjvmProtocol创建Invoker,dubbo中的invoker主要负责服务调用的功能,是其核心实现,后续会在专门的章节中详细分析,在这里我们需要知道,会创建于协议相关的Invoker即可。
ReferenceConfig#createProxy
1if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
2 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); // @1
3 if (us != null && us.length > 0) {
4 for (String u : us) {
5 URL url = URL.valueOf(u);
6 if (url.getPath() == null || url.getPath().length() == 0) {
7 url = url.setPath(interfaceName);
8 }
9 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { // @2
10 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
11 } else {
12 urls.add(ClusterUtils.mergeUrl(url, map)); // @3
13 }
14 }
15 }
16}
Step3:处理直连情况,与step2互斥。
ReferenceConfig#createProxy
1List<URL> us = loadRegistries(false); // @1
2if (us != null && !us.isEmpty()) {
3 for (URL u : us) {
4 URL monitorUrl = loadMonitor(u); // @2
5 if (monitorUrl != null) {
6 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); // @3
7 }
8 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); // @4
9 }
10}
11if (urls == null || urls.isEmpty()) {
12 throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry
13 address=\"...\" /> to your spring config.");
14}
Step4:普通消息消费者,从注册中心订阅服务。
ReferenceConfig#createProxy
1if (urls.size() == 1) {
2 invoker = refprotocol.refer(interfaceClass, urls.get(0)); // @1
3} else {
4 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); // @2,多个服务提供者URL,集群模式
5 URL registryURL = null;
6 for (URL url : urls) {
7 invokers.add(refprotocol.refer(interfaceClass, url)); // @2
8 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
9 registryURL = url; // use last registry url
10 }
11 }
12 if (registryURL != null) { // registry url is available
13 // use AvailableCluster only when register's cluster is available
14 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
15 invoker = cluster.join(new StaticDirectory(u, invokers)); // @3
16 } else { // not a registry url
17 invoker = cluster.join(new StaticDirectory(invokers));
18 }
19 }
Step5:根据URL获取对应协议的Invoker。
集群模式的Invoker和单个协议Invoker一样实现Invoker接口,然后在集群Invoker中利用Directory保证一个一个协议的调用器,十分的巧妙,在后续章节中将重点分析Dubbo Invoker实现原理,包含集群实现机制。
ReferenceConfig#createProxy
1Boolean c = check;
2if (c == null && consumer != null) {
3 c = consumer.isCheck();
4}
5if (c == null) {
6 c = true; // default true
7}
8if (c && !invoker.isAvailable()) {
9 throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group
10 == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " +
11 NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
12}
ReferenceConfig#createProxy
1return (T) proxyFactory.getProxy(invoker);
2AbstractProxyFactory#getProxy
3public <T> T getProxy(Invoker<T> invoker) throws RpcException {
4 Class<?>[] interfaces = null;
5 String config = invoker.getUrl().getParameter("interfaces"); // @1
6 if (config != null && config.length() > 0) {
7 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
8 if (types != null && types.length > 0) {
9 interfaces = new Class<?>[types.length + 2];
10 interfaces[0] = invoker.getInterface();
11 interfaces[1] = EchoService.class; // @2
12 for (int i = 0; i < types.length; i++) {
13 interfaces[i + 1] = ReflectUtils.forName(types[i]);
14 }
15 }
16 }
17 if (interfaces == null) {
18 interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
19 }
20 return getProxy(invoker, interfaces); // @3
21 }
根据invoker获取代理类,其实现逻辑如下:
本节关于Dubbo服务消费者(服务调用者)的启动流程就梳理到这里,下一篇将重点关注Invoker(服务调用相关的实现细节)。