性能文章>源码分析Dubbo服务注册与发现机制RegistryDirectory)>

源码分析Dubbo服务注册与发现机制RegistryDirectory)原创

2年前
318922

RegistryDirectory,基于注册中心的服务发现,本文将重点探讨Dubbo是如何实现服务的自动注册与发现。从上篇文章,得知在消息消费者在创建服务调用器(Invoker)【消费者在初始时】时需要根据不同的协议,例如dubbo、registry(从注册中心获取服务提供者)来构建,其调用的方法为Protocol#refer,基于注册中心发现服务提供者的实现协议为RegistryProtocol。

源码分析RegistryProtocol#refer

RegistryProtocol#refer -> doRefer方法。

RegistryProtocol#doRefer


 1private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {    // @1
 2        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);   // @2
 3        directory.setRegistry(registry);
 4        directory.setProtocol(protocol);   // @3
 5        // all attributes of REFER_KEY
 6        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());   // @4
 7        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);  // @5
 8        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
 9                && url.getParameter(Constants.REGISTER_KEY, true)) {
10            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
11                    Constants.CHECK_KEY, String.valueOf(false)));
12        }   // @6
13        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
14                Constants.PROVIDERS_CATEGORY
15                        + "," + Constants.CONFIGURATORS_CATEGORY
16                        + "," + Constants.ROUTERS_CATEGORY));     // @7
17
18        Invoker invoker = cluster.join(directory);    // @8
19        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);     // @9
20        return invoker;
21    }
  • 代码@1:参数详解

    • Cluster cluster:集群策略。
    • Registry registry:注册中心实现类。
    • Class type:引用服务名,dubbo:reference interface。
    • URL url:注册中心URL。
  • 代码@2:构建RegistryDirectory对象,基于注册中心动态发现服务提供者(服务提供者新增或减少),本节重点会剖析该类的实现细节。

  • 代码@3:为RegistryDirectory设置注册中心、协议。

  • 代码@4:获取服务消费者的配置属性。

  • 代码@5:构建消费者URL,例如:

1consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185

  • 代码@6:向注册中心消息消费者:

1consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185

相比第5步的URL,增加了category=consumers、check=false,其中category表示在注册中心的命名空间,这里代表消费端。该步骤的作用就是向注册中心为服务增加一个消息消费者,其生成的效果如下:【以zookeeper为例】。

image1.png
image2.png

  • 代码@7:为消息消费者添加category=providers,configurators,routers属性后,然后向注册中心订阅该URL,关注该服务下的providers,configurators,routers发生变化时通知RegistryDirectory,以便及时发现服务提供者、配置、路由规则的变化。

1consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185

其订阅关系调用的入口为:RegistryDirectory#subscribe方法,是接下来需要重点分析的重点。

  • 代码@8:根据Directory,利用集群策略返回集群Invoker。
  • 代码@9:缓存服务消费者、服务提供者对应关系。

从这里发现,服务的注册与发现与RegistryDirectory联系非常紧密,接下来让我们来详细分析RegistryDirectory的实现细节。

RegistryDirectory类图

image3.jpeg

  • private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
    集群策略,默认为failover。
  • private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader (RouterFactory.class).getAdaptiveExtension()路由工厂,可以通过监控中心或治理中心配置。
  • private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();配置实现工厂类。
  • private final String serviceKey; 服务key,默认为服务接口名。com.alibaba.dubbo.registry.RegistryService,注册中心在Dubbo中也是使用服务暴露。
  • private final Class< T > serviceType;服务提供者接口类,例如interface com.alibaba.dubbo.demo.DemoService
    private final Map< String, String> queryMap:服务消费者URL中的所有属性。
  • private final URL directoryUrl;注册中心URL,只保留消息消费者URL查询属性,也就是queryMap。
  • private final String[] serviceMethods:引用服务提供者方法数组。
  • private final boolean multiGroup:是否引用多个服务组。
  • private Protocol protocol:协议。
  • private Registry registry:注册中心实现者。
  • private volatile List< Configurator> configurators;配置信息。
  • private volatile Map< String, Invoker< T>> urlInvokerMap; 服务URL对应的Invoker(服务提供者调用器)。
  • private volatile Map< String, List< Invoker< T>>> methodInvokerMap; methodName : List< Invoker< T >>,
    dubbo:method 对应的Invoker缓存表。
    private volatile Set< URL > cachedInvokerUrls; 当前缓存的所有URL提供者URL。

RegistryDirectory 构造方法

 1public RegistryDirectory(Class<T> serviceType, URL url) {    // @1
 2        super(url);
 3        if (serviceType == null)
 4            throw new IllegalArgumentException("service type is null.");
 5        if (url.getServiceKey() == null || url.getServiceKey().length() == 0)
 6            throw new IllegalArgumentException("registry serviceKey is null.");
 7        this.serviceType = serviceType;  
 8        this.serviceKey = url.getServiceKey();     // @2
 9        this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));  // @3
10        this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY); //@4
11        String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
12        this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
13        String methods = queryMap.get(Constants.METHODS_KEY);
14        this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);   // @5
15    }
  • 代码@1:参数描述,serviceType:消费者引用的服务< dubbo:reference interface="" …/>;URL url:注册中心的URL,例如:
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=5552&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5552%26qos.port%3D33333%26register.ip%3D192.168.56.1%26side%3Dconsumer%26timestamp%3D1528379076123&timestamp=1528379076179
  • 代码@2:获取注册中心URL的serviceKey:com.alibaba.dubbo.registry.RegistryService。

  • 代码@3:获取注册中心URL消费提供者的所有配置参数:从url属性的refer。

  • 代码@4:初始化haulovverrideDirecotryUrl、directoryUrl:注册中心的URL,移除监控中心以及其他属性值,只保留消息消费者的配置属性。

  • 代码@5:获取服务消费者单独配置的方法名dubbo:method。

RegistryDirectory#subscribe

1public void subscribe(URL url) {
2     setConsumerUrl(url);   // @1
3     registry.subscribe(url, this); // @2
4}
  • 代码@1:设置RegistryDirectory的consumerUrl为消费者URL。
  • 代码@2:调用注册中心订阅消息消息消费者URL,首先看一下接口Registry#subscribe的接口声明:
    RegistryService:void subscribe(URL url, NotifyListener listener); 这里传入的NotifyListener为RegistryDirectory,其注册中心的subscribe方法暂时不深入去跟踪,不过根据上面URL上面的特点,应该能猜出如下实现关键点:
consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185
  • 根据消息消费者URL,获取服务名。
  • 根据category=providers、configurators、routers,分别在该服务名下的providers目录、configurators目录、routers目录建立事件监听,监听该目录下节点的创建、更新、删除事件,然后一旦事件触发,将回调RegistryDirectory#void notify(List< URL> urls)。

RegistryDirectory#notify

首先该方法是在注册中心providers、configurators、routers目录下的节点发生变化后,通知RegistryDirectory,已便更新最新信息,实现”动态“发现机制。

RegistryDirectory#notify


 1List<URL> invokerUrls = new ArrayList<URL>();
 2List<URL> routerUrls = new ArrayList<URL>();
 3List<URL> configuratorUrls = new ArrayList<URL>();
 4for (URL url : urls) {
 5     String protocol = url.getProtocol();    // @1 
 6     String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);   // @2
 7     if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {   // @3
 8           routerUrls.add(url);
 9      } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {   // @4
10           configuratorUrls.add(url);
11     } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {    // @5
12           invokerUrls.add(url);
13     } else {
14          logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + 
15                NetUtils.getLocalHost());
16     }
17}

Step1:根据通知的URL的前缀,分别添加到:invokerUrls(提供者url)、routerUrls(路由信息)、configuratorUrls (配置url)。

  • 代码@1:从url中获取协议字段,例如condition://、route://、script://、override://等。
  • 代码@2:获取url的category,在注册中心的命令空间,例如:providers、configurators、routers。
  • 代码@3:如果category等于routers或协议等于route,则添加到routerUrls中。
  • 代码@4:如果category等于configurators或协议等于override,则添加到configuratorUrls中。
  • 代码@5:如果category等于providers,则表示服务提供者url,加入到invokerUrls中。

RegistryDirectory#notify

1// configurators
2if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
3    this.configurators = toConfigurators(configuratorUrls);
4}

Step2:将configuratorUrls转换为配置对象List< Configurator> configurators,该方法将在《源码分析Dubbo配置规则实现细节》一文中详细讲解。

RegistryDirectory#notify

1// routers
2if (routerUrls != null && !routerUrls.isEmpty()) {
3      List<Router> routers = toRouters(routerUrls);
4      if (routers != null) { // null - do nothing
5            setRouters(routers);
6      }
7}

Step3:将routerUrls路由URL转换为Router对象,该部分内容将在《源码分析Dubbo路由机制实现细节》一文中详细分析。

RegistryDirectory#notify

1// providers
2refreshInvoker(invokerUrls);

Step4:根据回调通知刷新服务提供者集合。

RegistryDirectory#refreshInvoker

RegistryDirectory#refreshInvoker

1if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
2         && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
3     this.forbidden = true; // Forbid to access
4     this.methodInvokerMap = null; // Set the method invoker map to null
5     destroyAllInvokers(); // Close all invokers
6} 

Step1:如果invokerUrls不为空并且长度为1,并且协议为empty,表示该服务的所有服务提供者都下线了。需要销毁当前所有的服务提供者Invoker。

RegistryDirectory#refreshInvoker

 1this.forbidden = false; // Allow to access
 2Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
 3if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
 4        invokerUrls.addAll(this.cachedInvokerUrls);
 5} else {
 6      this.cachedInvokerUrls = new HashSet<URL>();
 7      this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
 8}
 9if (invokerUrls.isEmpty()) {
10      return;
11}

Step2: 如果invokerUrls为空,并且已缓存的invokerUrls不为空,将缓存中的invoker url复制到invokerUrls中,这里可以说明如果providers目录未发送变化,invokerUrls则为空,表示使用上次缓存的服务提供者URL对应的invoker;如果invokerUrls不为空,则用iinvokerUrls中的值替换原缓存的invokerUrls,这里说明,如果providers发生变化,invokerUrls中会包含此时注册中心所有的服务提供者。如果invokerUrls为空,则无需处理,结束本次更新服务提供者Invoker操作。

RegistryDirectory#refreshInvoker

1Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
2Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map

Step3:将invokerUrls转换为对应的Invoke,然后根据服务级的url:invoker映射关系创建method:List< Invoker>映射关系,将在下文相信分析。

RegistryDirectory#refreshInvoker

1this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
2this.urlInvokerMap = newUrlInvokerMap;
3try {
4 destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
5} catch (Exception e) {
6 logger.warn("destroyUnusedInvokers error. ", e);
7}

  • Step4:如果支持multiGroup机制,则合并methodInvoker,将在下文分析,然后根据toInvokers、toMethodInvokers刷新当前最新的服务提供者信息。

RegistryDirectory#toInvokers

RegistryDirectory#toInvokers

1String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
2for (URL providerUrl : urls) {
3    // ...
4}
  • Step1:获取消息消费者URL中的协议类型,< dubbo:reference protocol="" …/>属性值,然后遍历所有的Invoker Url(服务提供者URL)。

RegistryDirectory#toInvokers

 1if (queryProtocols != null && queryProtocols.length() > 0) {
 2       boolean accept = false;
 3       String[] acceptProtocols = queryProtocols.split(",");
 4       for (String acceptProtocol : acceptProtocols) {
 5            if (providerUrl.getProtocol().equals(acceptProtocol)) {
 6                  accept = true;
 7                  break;
 8            }
 9       }
10      if (!accept) {
11            continue;
12      }
13}
  • Step2: 从这一步开始,代码都包裹在for(URL providerUrl : urls)中,一个一个处理提供者URL。如果dubbo:referecnce标签的protocol不为空,则需要对服务提供者URL进行过滤,匹配其协议与protocol属性相同的服务,如果不匹配,则跳过后续处理逻辑,接着处理下一个服务提供者URL。

RegistryDirectory#toInvokers

1if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
2      continue;
3}
  • Step3:如果协议为empty,跳过,处理下一个服务提供者URL。

RegistryDirectory#toInvokers

1if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
2 logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to
3 consumer " + NetUtils.getLocalHost()
4 + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
5 continue;
6}
Step4:验证服务提供者协议,如果不支持,则跳过。

RegistryDirectory#toInvokers

1URL url = mergeUrl(providerUrl);
  • Step5:合并URL中的属性,其具体实现细节如下:

消费端属性覆盖生产者端属性(配置属性消费者端优先生产者端属性),其具体实现方法:ClusterUtils.mergeUrl(providerUrl, queryMap),其中queryMap为消费端属性。

  • a、首先移除只在服务提供者端生效的属性(线程池相关):threadname、default.threadname、threadpool、default.threadpool、corethreads、default.corethreads、threads、default.threads、queues、default.queues、alive、default.alive、transporter、default.transporter,服务提供者URL中的这些属性来源于dubbo:protocol、dubbo:provider。
  • b、用消费端配置属性覆盖服务端属性。
  • c、如下属性以服务端优先:dubbo(dubbo信息)、version(版本)、group(服务组)、methods(服务方法)、timestamp(时间戳)。
  • d、合并服务端,消费端Filter,其配置属性(reference.filter),返回结果为:
provider#reference.filter,
consumer#reference.filter。
  • e、合并服务端,消费端Listener,其配置属性(invoker.listener),返回结果为:
provider#invoker.listener,consumer#invoker.listener。

合并configuratorUrls 中的属性,我们现在应该知道,dubbo可以在监控中心或管理端(dubbo-admin)覆盖覆盖服务提供者的属性,其使用协议为override。

为服务提供者URL增加check=false,默认只有在服务调用时才检查服务提供者是否可用。

重新复制overrideDirectoryUrl,providerUrl在进过第一步参数合并后(包含override协议覆盖后的属性)赋值给overrideDirectoryUrl。

1String key = url.toFullString(); // The parameter urls are sorted
2if (keys.contains(key)) { // Repeated url
3      continue;
4}
5keys.add(key);
  • Step6:获取url所有属性构成的key,该key也是RegistryDirectory中Map> urlInvokerMap;中的key。

RegistryDirectory#toInvokers

 1Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
 2Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
 3if (invoker == null) { // Not in the cache, refer again
 4   try {
 5         boolean enabled = true;
 6         if (url.hasParameter(Constants.DISABLED_KEY)) {
 7               enabled = !url.getParameter(Constants.DISABLED_KEY, false);
 8          } else {
 9               enabled = url.getParameter(Constants.ENABLED_KEY, true);
10           }
11           if (enabled) {
12                invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
13           }
14     } catch (Throwable t) {
15               logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
16      }
17      if (invoker != null) { // Put new invoker in cache
18            newUrlInvokerMap.put(key, invoker);
19      }
20} else {
21       newUrlInvokerMap.put(key, invoker);
22}
  • Step7:如果localUrlInvokerMap中未包含invoker并且该provider状态为启用,则创建该URL对应的Invoker,并添加到newUrlInvokerMap中。toInvokers运行结束后,回到refreshInvoker方法中继续往下执行,根据 最新的服务提供者映射关系Map< String,Invoker>,构建Map< String,List< Invoker>>,其中键为methodName。然后更新RegistryDirectory的urlInvokerMap、methodInvokerMap属性,并销毁老的Invoker对象,完成一次路由发现过程。

上面整个过程完成了一次动态服务提供者发现流程,下面再分析一下RegistryDirectory的另外一个重要方法,doList,再重复一遍RegistryDirectory的作用,服务提供者目录服务,在集群Invoker的实现中,内部持有一个Direcotry对象,在进行服务调用之前,首先先从众多的Invoker中选择一个来执行,那众多的Invoker从哪来呢?其来源于集群Invoker中会调用Direcotry的public List< Invoker< T>> list(Invocation invocation),首先将调用AbstractDirectory#list方法,然后再内部调用doList方法,doList方法有其子类实现。

RegistryDirectory#doList(Invocation invocation) 方法详解

RegistryDirectory#doList

1if (forbidden) {
2      // 1. No service provider 2. Service providers are disabled
3      throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
4            "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
5                  + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
6}
  • Step1:如果禁止访问(如果没有服务提供者,或服务提供者被禁用),则抛出没有提供者异常。

RegistryDirectory#doList

 1Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
 2if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
 3    String methodName = RpcUtils.getMethodName(invocation);
 4    Object[] args = RpcUtils.getArguments(invocation);
 5    if (args != null && args.length > 0 && args[0] != null
 6             && (args[0] instanceof String || args[0].getClass().isEnum())) {
 7          invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
 8     }
 9    if (invokers == null) {
10          invokers = localMethodInvokerMap.get(methodName);
11    }
12    if (invokers == null) {
13          invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
14     }
15     if (invokers == null) {
16         Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
17         if (iterator.hasNext()) {
18              invokers = iterator.next();
19          }
20     }
21}
22return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
  • Step2:根据方法名称,从Map< String,List< Invoker>>这个集合中找到合适的List< Invoker>,如果方法名未命中,则返回所有的Invoker,localMethodInvokerMap中方法名,主要是dubbo:service的子标签dubbo:method,最终返回invokers。

本文详细介绍了服务消费者基于注册中心的服务发现机制,其中对routers(路由)与configurators(override协议)并未详细展开,下节先重点分析configurators与routers(路由)实现细节。

总结一下服务注册与发现机制
基于注册中心的事件通知(订阅与发布),一切支持事件订阅与发布的框架都可以作为Dubbo注册中心的选型。

服务提供者在暴露服务时,会向注册中心注册自己,具体就是在图片{interface interface}/consumers目录下创建一个节点。

消费者订阅${service interface}/[providers、configurators、routers]三个目录,这些目录下的节点删除、新增事件都胡通知消费者,根据通知,重构服务调用器(Invoker)。

以上就是Dubbo服务注册与动态发现机制的原理与实现细节。

点赞收藏
丁威
请先登录,查看2条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

Redis stream 用做消息队列完美吗?

Redis stream 用做消息队列完美吗?

Netty源码解析:writeAndFlush

Netty源码解析:writeAndFlush

2
2