性能文章>从-99打造Sentinel高可用集群限流中间件>

从-99打造Sentinel高可用集群限流中间件原创

2周前
170313

接上篇Sentinel集群限流探索,上次简单提到了集群限流的原理,然后用官方给的 demo 简单修改了一下,可以正常运行生效。

这一次需要更进一步,基于 Sentinel 实现内嵌式集群限流的高可用方案,并且包装成一个中间件 starter 提供给三方使用。

对于高可用,我们主要需要解决两个问题,这无论是使用内嵌或者独立模式都需要解决的问题,相比而言,内嵌式模式更简单一点。

1、集群 server 自动选举
2、自动故障转移
3、Sentinel-Dashboard持久化到Apollo

集群限流

首先,考虑到大部分的服务可能都不需要集群限流这个功能,因此实现一个注解用于手动开启集群限流模式,只有开启注解的情况下,才去实例化集群限流的 Bean 和限流数据。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({EnableClusterImportSelector.class})
@Documented
public @interface SentinelCluster {
}

public class EnableClusterImportSelector implements DeferredImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata annotationMetadata) {
        return new String[]{ClusterConfiguration.class.getName()};
    }
}

这样写好之后,当扫描到有我们的 SentinelCluster 注解的时候,就会去实例化 ClusterConfiguration。

@Slf4j
public class ClusterConfiguration implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
    private Environment environment;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(ClusterManager.class);
        beanDefinitionBuilder.addConstructorArgValue(this.environment);
        registry.registerBeanDefinition("clusterManager", beanDefinitionBuilder.getBeanDefinition());
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

    }

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
}

在配置中去实例化用于管理集群限流的ClusterManager,这段逻辑和我们之前文章中使用到的一般无二,注册到ApolloDataSource之后自动监听Apollo的变化达到动态生效的效果。

@Slf4j
public class ClusterManager {
    private Environment environment;
    private String namespace;
    private static final String CLUSTER_SERVER_KEY = "sentinel.cluster.server"; //服务集群配置
    private static final String DEFAULT_RULE_VALUE = "[]"; //集群默认规则
    private static final String FLOW_RULE_KEY = "sentinel.flow.rules"; //限流规则
    private static final String DEGRADE_RULE_KEY = "sentinel.degrade.rules"; //降级规则
    private static final String PARAM_FLOW_RULE_KEY = "sentinel.param.rules"; //热点限流规则
    private static final String CLUSTER_CLIENT_CONFIG_KEY = "sentinel.client.config"; //客户端配置

    public ClusterManager(Environment environment) {
        this.environment = environment;
        this.namespace = "YourNamespace";
        init();
    }

    private void init() {
        initClientConfig();
        initClientServerAssign();
        registerRuleSupplier();
        initServerTransportConfig();
        initState();
    }

    private void initClientConfig() {
        ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new ApolloDataSource<>(
                namespace,
                CLUSTER_CLIENT_CONFIG_KEY,
                DEFAULT_SERVER_VALUE,
                source -> JacksonUtil.from(source, ClusterClientConfig.class)
        );
        ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
    }

    private void initClientServerAssign() {
        ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new ApolloDataSource<>(
                namespace,
                CLUSTER_SERVER_KEY,
                DEFAULT_SERVER_VALUE,
                new ServerAssignConverter(environment)
        );
        ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
    }

    private void registerRuleSupplier() {
        ClusterFlowRuleManager.setPropertySupplier(ns -> {
            ReadableDataSource<String, List<FlowRule>> ds = new ApolloDataSource<>(
                    namespace,
                    FLOW_RULE_KEY,
                    DEFAULT_RULE_VALUE,
                    source -> JacksonUtil.fromList(source, FlowRule.class));
            return ds.getProperty();
        });
        ClusterParamFlowRuleManager.setPropertySupplier(ns -> {
            ReadableDataSource<String, List<ParamFlowRule>> ds = new ApolloDataSource<>(
                    namespace,
                    PARAM_FLOW_RULE_KEY,
                    DEFAULT_RULE_VALUE,
                    source -> JacksonUtil.fromList(source, ParamFlowRule.class)
            );
            return ds.getProperty();
        });
    }

    private void initServerTransportConfig() {
        ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new ApolloDataSource<>(
                namespace,
                CLUSTER_SERVER_KEY,
                DEFAULT_SERVER_VALUE,
                new ServerTransportConverter(environment)
        );

        ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
    }

    private void initState() {
        ReadableDataSource<String, Integer> clusterModeDs = new ApolloDataSource<>(
                namespace,
                CLUSTER_SERVER_KEY,
                DEFAULT_SERVER_VALUE,
                new ServerStateConverter(environment)
        );

        ClusterStateManager.registerProperty(clusterModeDs.getProperty());
    }
}


这样的话,一个集群限流的基本功能已经差不多是OK了,上述步骤都比较简单,按照官方文档基本都能跑起来,接下来要实现文章开头提及到的核心的几个功能了。

自动选举&故障转移

自动选举怎么实现?简单点,不用考虑那么多,每台机器启动成功之后直接写入到 Apollo 当中,第一个写入成功的就是 Server 节点。

这个过程为了保证并发带来的问题,我们需要加锁确保只有一台机器成功写入自己的本机信息。

由于我使用 Eureka 作为注册中心,Eureka 又有CacheRefreshedEvent本地缓存刷新的事件,基于此每当本地缓存刷新,我们就去检测当前 Server 节点是否存在,然后根据实际情况去实现选举。

E8D319E823D64646B89AB6AAAC61466E.png
首先在 spring.factories 中添加我们的监听器。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.test.config.SentinelEurekaEventListener
监听器只有当开启了集群限流注解SentinelCluster之后才会生效。

@Configuration
@Slf4j
@ConditionalOnBean(annotation = SentinelCluster.class)
public class SentinelEurekaEventListener implements ApplicationListener<CacheRefreshedEvent> {
    @Resource
    private DiscoveryClient discoveryClient;
    @Resource
    private Environment environment;
    @Resource
    private ApolloManager apolloManager;

    @Override
    public void onApplicationEvent(EurekaClientLocalCacheRefreshedEvent event) {
        if (!leaderAlive(loadEureka(), loadApollo())) {
            boolean tryLockResult = redis.lock; //redis或者其他加分布式锁
            if (tryLockResult) {
                try {
                    flush();
                } catch (Exception e) {
                } finally {
                    unlock();
                }
            }
        }
    }
  
    private boolean leaderAlive(List<ClusterGroup> eurekaList, ClusterGroup server) {
        if (Objects.isNull(server)) {
            return false;
        }
        for (ClusterGroup clusterGroup : eurekaList) {
            if (clusterGroup.getMachineId().equals(server.getMachineId())) {
                return true;
            }
        }
        return false;
    }
}

OK,其实看到代码已经知道我们把故障转移的逻辑也实现了,其实道理是一样的。

第一次启动的时候 Apollo 中的 server 信息是空的,所以第一台加锁写入的机器就是 server 节点,后续如果 server 宕机下线,本地注册表缓存刷新,对比 Eureka 的实例信息和 Apollo 中的 server,如果 server 不存在,那么就重新执行选举的逻辑。

0EF0B55018494D7A8DBEDCFD6137FE04.png

需要注意的是,本地缓存刷新的时间极端情况下可能会达到几分钟级别,那么也就是说在服务下线的可能几分钟内没有重新选举出新的 server 节点整个集群限流是不可用的状态,对于业务要求非常严格的情况这个方案就不太适用了。

对于 Eureka 缓存时间同步的问题,可以参考之前的文章Eureka服务下线太慢,电话被告警打爆了。

Dashboard持久化改造

到这儿为止,我们已经把高可用方案实现好了,接下来最后一步,只要通过 Sentinel 自带的控制台能够把配置写入到 Apollo 中,那么应用就自然会监听到配置的变化,达到动态生效的效果。

C8DDF22ED94C49E0AEF8B80E10196DEB.png
根据官方的描述,官方已经实现了FlowControllerV2用于集群限流,同时在测试目录下有简单的案例帮助我们快速实现控制台的持久化的逻辑。

我们只要实现DynamicRuleProvider,同时注入到Controller中使用即可,这里我们实现flowRuleApolloProvider用于提供从Apollo查询数据,flowRuleApolloPublisher用于写入限流配置到Apollo。

@RestController
@RequestMapping(value = "/v2/flow")
public class FlowControllerV2 {
    private final Logger logger = LoggerFactory.getLogger(FlowControllerV2.class);

    @Autowired
    private InMemoryRuleRepositoryAdapter<FlowRuleEntity> repository;

    @Autowired
    @Qualifier("flowRuleApolloProvider")
    private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;
    @Autowired
    @Qualifier("flowRuleApolloPublisher")
    private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;


}

实现方式很简单,provider 通过 Apollo 的 open-api 从 namespace 中读取配置,publisher 则是通过 open-api 写入规则。

@Component("flowRuleApolloProvider")
public class FlowRuleApolloProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {

    @Autowired
    private ApolloManager apolloManager;
    @Autowired
    private Converter<String, List<FlowRuleEntity>> converter;

    @Override
    public List<FlowRuleEntity> getRules(String appName) {
        String rules = apolloManager.loadNamespaceRuleList(appName, ApolloManager.FLOW_RULES_KEY);

        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        return converter.convert(rules);
    }
}

@Component("flowRuleApolloPublisher")
public class FlowRuleApolloPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {

    @Autowired
    private ApolloManager apolloManager;
    @Autowired
    private Converter<List<FlowRuleEntity>, String> converter;

    @Override
    public void publish(String app, List<FlowRuleEntity> rules) {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        apolloManager.writeAndPublish(app, ApolloManager.FLOW_RULES_KEY, converter.convert(rules));
    }
}

ApolloManager实现了通过open-api查询和写入配置的能力,使用需要自行配置 Apollo Portal 地址和 token,这里不赘述,可以自行查看 Apollo 的官方文档。

@Component
public class ApolloManager {
    private static final String APOLLO_USERNAME = "apollo";
    public static final String FLOW_RULES_KEY = "sentinel.flow.rules";
    public static final String DEGRADE_RULES_KEY = "sentinel.degrade.rules";
    public static final String PARAM_FLOW_RULES_KEY = "sentinel.param.rules";
    public static final String APP_NAME = "YourAppName";

    @Value("${apollo.portal.url}")
    private String portalUrl;
    @Value("${apollo.portal.token}")
    private String portalToken;
    private String apolloEnv;
    private String apolloCluster = "default";
    private ApolloOpenApiClient client;

    @PostConstruct
    public void init() {
        this.client = ApolloOpenApiClient.newBuilder()
                .withPortalUrl(portalUrl)
                .withToken(portalToken)
                .build();
        this.apolloEnv = "default";
    }

    public String loadNamespaceRuleList(String appName, String ruleKey) {
        OpenNamespaceDTO openNamespaceDTO = client.getNamespace(APP_NAME, apolloEnv, apolloCluster, "default");
        return openNamespaceDTO
                .getItems()
                .stream()
                .filter(p -> p.getKey().equals(ruleKey))
                .map(OpenItemDTO::getValue)
                .findFirst()
                .orElse("");
    }

    public void writeAndPublish(String appName, String ruleKey, String value) {
        OpenItemDTO openItemDTO = new OpenItemDTO();
        openItemDTO.setKey(ruleKey);
        openItemDTO.setValue(value);
        openItemDTO.setComment("Add Sentinel Config");
        openItemDTO.setDataChangeCreatedBy(APOLLO_USERNAME);
        openItemDTO.setDataChangeLastModifiedBy(APOLLO_USERNAME);
        client.createOrUpdateItem(APP_NAME, apolloEnv, apolloCluster, "default", openItemDTO);

        NamespaceReleaseDTO namespaceReleaseDTO = new NamespaceReleaseDTO();
        namespaceReleaseDTO.setEmergencyPublish(true);
        namespaceReleaseDTO.setReleasedBy(APOLLO_USERNAME);
        namespaceReleaseDTO.setReleaseTitle("Add Sentinel Config Release");
        client.publishNamespace(APP_NAME, apolloEnv, apolloCluster, "default", namespaceReleaseDTO);
    }

}

对于其他规则,比如降级、热点限流都可以参考此方式去修改,当然控制台要做的修改肯定不是这一点点,比如集群的flowId默认使用的单机自增,这个肯定需要修改,还有页面的传参、查询路由的修改等等,比较繁琐,就不在此赘述了,总归也就是工作量的问题。

好了,本期内容就这些,我是艾小仙,我们下期见。

请先登录,查看1条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

从一起GC血案谈到反射原理
前言 首先回答一下提问者的问题。这主要是由于存在大量反射而产生的临时类加载器和 ASM 临时生成的类,这些类会被保留在 Metaspace,一旦 Metaspace 即将满的时候,就会触发 Fu
类初始化导致死锁
一张图简单描述死锁 如上图,Thread1 拿到了 object1,Thread2 拿到了 object2,但是现在 Thread1 需要拿到 object2 的锁才能继续往下,Thread2 又要拿到 object1 才能继续往下
在调试器里看LINUX内核态栈溢出
图灵最先发明了栈,但没有给它取名字。德国人鲍尔也“发明”了栈,取名叫酒窖。澳大利亚人汉布林也“发明”了栈,取名叫弹夹。1959年,戴克斯特拉在度假时想到了Stack这个名字,后来被广泛使用。
不起眼,但是足以让你收获的JVM内存案例
今天的这个案例我觉得应该会让你涨姿势吧,不管你对JVM有多熟悉,看到这篇文章,应该还是会有点小惊讶的,不过我觉得这个案例我分享出来,是想表达不管多么奇怪的现象请一定要追究下去,会让你慢慢变得强大起来,
性能优化之提升QPS:发现一个Java开源项目优化点,点进来就是你的了
hello,大家好呀,我是小楼。最近无聊(摸)闲逛(鱼)github时,发现了一个阿里开源项目可以贡献代码的地方。不是写单测、改代码格式那种,而是比较有挑战的性能优化,最关键的是还不难,仔细看完本文后,有点基础就能写出来的那种,话不多说,发车!相信大家在日常写代码获取时间戳时,会写出如下
对不起,我错了,这题不简单!
hello,大家好呀,我是小楼。前几天不是写了这篇文章《发现一个开源项目优化点,点进来就是你的了》嘛。文章介绍了Sentinl的自适应缓存时间戳算法,从原理到实现都手把手解读了,而且还发现Sentinel-Go还未实现这个自适应算法,于是我就觉得,这简单啊,把Java代码翻译成Go不就可以混
【全网首发】Redis系列4:高可用之Sentinel(哨兵模式)
Redis系列1:深刻理解高性能Redis的本质Redis系列2:数据持久化提高可用性Redis系列3:高可用之主从架构1 背景从第三篇 Redis系列3:高可用之主从架构 ,我们知道,为Redis配置主从模式,可以大幅度的提高Redis服务的可用性,减少甚至避免Red
从-99打造Sentinel高可用集群限流中间件
接上篇Sentinel集群限流探索,上次简单提到了集群限流的原理,然后用官方给的 demo 简单修改了一下,可以正常运行生效。这一次需要更进一步,基于 Sentinel 实现内嵌式集群限流的高可用方案,并且包装成一个中间件 starter 提供给三方使用。对于高可用,我们主要需要解决两个问题,这
3
1