Redisson限流器RRateLimiter使用及源码分析原创
1 项目介绍
-
本篇教程,我们分析一下Redisson的限流器RRateLimiter的原理和源码。
-
然后利用Redisson提供的限流器RRateLimiter自定义一个注解,在项目中简化限流器的使用。限流器有误差,但误差不会超过限流次数的一倍。就这么说吧,大多数公司肯定是可以接受这个误差的。
2 Redisson限流器RRateLimiter使用
少废话,先把代码跑起来
- pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.study.demo</groupId>
<artifactId>redisson-rate-limit</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>redisson-rate-limit</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.name>RateLimit</project.name>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.11.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.10.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
- application.yml配置文件
server:
port: 8080
spring:
redis:
host: 192.168.212.142
port: 6379
password: 123321
application:
name: RedissonRateLimit
- 我们自定义的限流器注解,一个注解搞定限流
package com.study.demo.annotation;
import java.lang.annotation.*;
/**
* 我们自定义一个注解,简化限流器的使用。全公司都可以使用这个注解来做限流。
* 注意我们这个限流器不是很精确,但误差不会太大
*/
@Target({ElementType.METHOD}) // 此注解只能用在方法上
@Retention(RetentionPolicy.RUNTIME) // 注解的作用域为JVM运行时
@Documented // 生成javadoc时包含该注解
@Inherited // 此注解允许被集成
public @interface RedissonRateLimit {
/**
* 限流标识key,每个http接口都应该有一个唯一的key。
*
* @return
*/
String key();
/**
* 限流的时间(单位为:秒),比如1分钟内最多1000个请求。注意我们这个限流器不是很精确,但误差不会太大
* @return
*/
long timeOut();
/**
* 限流的次数,比如1分钟内最多1000个请求。注意count的值不能小于1,必须大于等于1
* @return
*/
long count();
}
- 配置Redisson
package com.study.demo.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SentinelServersConfig;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Value("${spring.application.name}")
private String serverName;
@Bean
public RedissonClient redisson(RedisProperties redisProperties) {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort());
singleServerConfig.setPassword(redisProperties.getPassword());
singleServerConfig.setKeepAlive(true);
singleServerConfig.setDatabase(redisProperties.getDatabase());
singleServerConfig.setClientName(serverName);
return Redisson.create(config);
}
}
- Aspect定义一个切面,拦截使用限流器注解的方法
package com.study.demo.aspect;
import com.study.demo.annotation.RedissonRateLimit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
@Slf4j
@Aspect
@Component
// @AutoConfigureAfter 提示应在其他指定的自动配置类之后应用{@link EnableAutoConfiguration自动配置}。
// 也就是说如果容器中有了RedissonClient之后,此类RedissonRateLimitAspect才能被启用
@AutoConfigureAfter(RedissonClient.class)
public class RedissonRateLimitAspect {
@Autowired
private RedissonClient redissonClient;
@PostConstruct
public void init() {
log.info("Redisson的限流器被加载.........");
if (redissonClient == null) {
log.warn("Spring容器中没有RedissonClient,Redisson限流器将无法使用............");
}
}
/**
* 在所有方法上带有@RedissonRateLimit注解的方法运行执行之前,先执行我们这个方法进行校验。
* 校验不通过,说明限流了,抛出异常。
* @param redissonRateLimit
*/
@Before("@annotation(redissonRateLimit)")
public void redissonRateLimitCheck(RedissonRateLimit redissonRateLimit){
if (redissonRateLimit == null) {
// 方法上没有该注解,直接放行
return;
}
// Spring容器中没有RedissonClient,直接放行
if (redissonClient == null) {
log.warn("Spring容器中没有RedissonClient,Redisson限流器将无法使用............");
return;
}
ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();
// 不是http请求,不管
if (servletRequestAttributes == null) {
log.warn("请求不是http,Redisson限流器将无法使用............");
return;
}
log.warn("RedissonRateLimit开始工作");
HttpServletRequest httpServletRequest = servletRequestAttributes.getRequest();
log.info("RedissonRateLimit key:{}, timeOut:{}, count:{}, url:{}", redissonRateLimit.key(), redissonRateLimit.timeOut(), redissonRateLimit.count(), httpServletRequest.getRequestURI());
RRateLimiter rateLimiter = getRedissonRateLimiter(redissonRateLimit);
// 是否触发限流
boolean result = rateLimiter.tryAcquire();
if (!result) {
throw new RuntimeException("当前访问人数过多,请稍后再试");
}
}
/**
* 获取Redisson的RRateLimiter
*
* @param redissonRateLimit
* @return
*/
private RRateLimiter getRedissonRateLimiter(RedissonRateLimit redissonRateLimit){
// 限流次数,注意count的值不能小于1,必须大于等于1
long count = redissonRateLimit.count();
// 限流时间
long timeOut = redissonRateLimit.timeOut();
RRateLimiter rateLimiter = redissonClient.getRateLimiter(redissonRateLimit.key());
// 如果限流器不存在,就创建一个RRateLimiter限流器
if (!rateLimiter.isExists()) {
rateLimiter.trySetRate(RateType.OVERALL, count, timeOut, RateIntervalUnit.SECONDS);
return rateLimiter;
}
// 获取限流的配置信息
RateLimiterConfig rateLimiterConfig = rateLimiter.getConfig();
// 上次配置的限流时间毫秒值
Long rateInterval = rateLimiterConfig.getRateInterval();
// 上次配置的限流次数
Long rate = rateLimiterConfig.getRate();
// 将timeOut转换成毫秒之后再跟rateInterval进行比较
if (TimeUnit.MILLISECONDS.convert(timeOut, TimeUnit.SECONDS) != rateInterval || count != rate) {
// 如果rateLimiterConfig的配置跟我们注解上面的值不一致,说明服务器重启过,程序员又修改了限流的配置
// 删除原有配置
rateLimiter.delete();
// 以程序员重启后的限流配置为准,重新设置
rateLimiter.trySetRate(RateType.OVERALL, count, timeOut, RateIntervalUnit.SECONDS);
}
return rateLimiter;
}
}
- Controller提供一个http接口用于测试
package com.study.demo.controller;
import com.study.demo.annotation.RedissonRateLimit;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.atomic.AtomicLong;
@RestController
public class TestController {
private AtomicLong atomicLong = new AtomicLong(0);
// timeOut = 1, count = 2的意思是1秒钟内,该接口最多可以访问2-4次。注意我们这个限流器不是很精确,但误差不会太大
// 注意count的值不能小于1,必须大于等于1
@RedissonRateLimit(key="testRedissonRateLimiter02", timeOut = 1, count = 2)
@GetMapping("testRedissonRateLimiter")
public void testRedissonRateLimiter() {
System.out.println("测试限流器是否起作用,访问次数:" + atomicLong.incrementAndGet());
}
// 随便哪个接口都可以使用这个限流器注解@RedissonRateLimit,这个接口的限流为:timeOut = 1, count = 200,1秒钟最多访问200-400次。注意我们这个限流器不是很精确,但误差不会太大
// 注意count的值不能小于1,必须大于等于1
@RedissonRateLimit(key="testRedissonRateLimiterEvery", timeOut = 1, count = 200)
@GetMapping("testRedissonRateLimiterEvery")
public void testRedissonRateLimiterEvery() {
System.out.println("测试限流器是否起作用,访问次数:");
}
}
- SpringBoot的启动类
package com.study.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
@SpringBootApplication
//@EnableAspectJAutoProxy
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
3 把这个SpringBoot项目跑起来
- 在浏览器上面访问接口地址:http://localhost:8080/testRedissonRateLimiter?sfsd=09,然后频繁刷新,再去看控制台,会看到这样一个错误:当前访问人数过多,请稍后再试,说明我们的限流器生效了。
使用JMeter压测一下
4 讲解一下项目
-
首先我们自定义了一个限流注解:RedissonRateLimit,注意这个注解是我们自己敲代码敲出来的。不是Redisson提供的注解。
-
我们使用@Aspect定义了一个切面,然后使用@Before("@annotation(redissonRateLimit)")拦截所有使用了我们自定义注解RedissonRateLimit的方法,在目标方法执行之前判断该接口的请求次数在指定时间内是否超过限制次数,超过限制次数抛出异常结束请求流程。
-
我们在Controller类的方法上面使用我们自定义的注解:RedissonRateLimit。
5 Redisson限流器RRateLimiter的原理以及源码分析
将限流的配置信息保存在Redis中
- 先看这俩行代码
RRateLimiter rateLimiter = redissonClient.getRateLimiter(redissonRateLimit.key());
// 如果限流器不存在,就创建一个RRateLimiter限流器
if (!rateLimiter.isExists()) {
rateLimiter.trySetRate(RateType.OVERALL, count, timeOut, RateIntervalUnit.SECONDS);
return rateLimiter;
}
-
RRateLimiter rateLimiter = redissonClient.getRateLimiter(redissonRateLimit.key());根据限流的key从Redisson中获取一个限流器RRateLimiter。
-
rateLimiter.isExists(),判断这个限流key在Redis中是否存在。这个方法会向Redis发出一条命令:exists testRedissonRateLimiter02判断指定的key是否在Redis中存在。
-
如果不存在调用rateLimiter.trySetRate(RateType.OVERALL, count, timeOut, RateIntervalUnit.SECONDS);将限流的配置信息保存在Redis中。trySetRate这个方法会向Redis中发出如下几条命令:
hsetnx testRedissonRateLimiter02 rate 100,这条命令设置限流的次数,testRedissonRateLimiter02这个就是我们在自定义注解那里指定的key值。
hsetnx testRedissonRateLimiter02 interval 1000,这条命令设置限流的时间,单位是毫秒。
hsetnx testRedissonRateLimiter02 type 0,这条命令设置限流的类型,就是RateType.OVERALL的枚举下标值。
将限流的配置信息保存到Redis的HashMap数据结构中,hsetnx 如果不存在才能设置成功。
判断是否超过限流的次数
- 再看这俩行代码
// 是否触发限流
boolean result = rateLimiter.tryAcquire();
if (!result) {
throw new RuntimeException("当前访问人数过多,请稍后再试");
}
- 看rateLimiter.tryAcquire()方法的源码
我们来看上面的Redis命令:
-
hget testRedissonRateLimiter02 rate, hget testRedissonRateLimiter02 interval,hget testRedissonRateLimiter02 type,这三条hget命令就是把限流的配置信息取出来。如果取不出来,会报错:RateLimiter is not initialized
-
local valueName = KEYS[2]; KEYS[2]代表的就是getValueName()这个方法,这个方法的会根据我们的限流key重新生成一个key:{testRedissonRateLimiter02}:value。getValueName()方法的源码如下:
所以getValueName()的返回值为:{testRedissonRateLimiter02}:value。 -
if type == ‘1’ then,我们设置进去的type值为0,这里跳过。
-
local currentValue = redis.call(‘get’, valueName); 这行代码执行的Redis命令: et {testRedissonRateLimiter02}:value,尝试获取限流的剩余次数。
-
判断Redis命令:get {testRedissonRateLimiter02}:value的返回值是否为空,第一次肯定为空,所以会走else分支
-
assert(tonumber(rate) >= tonumber(ARGV[1]), ‘Requested permits amount could not exceed defined rate’); 这行代码的意思是,你设置的限流次数必须大于等于1,否则报错:Requested permits amount could not exceed defined rate。
-
Redis命令:set {testRedissonRateLimiter02}:value 100 px 1000。 设置{testRedissonRateLimiter02}:value这个key,值为:限流次数,并且设置key的过期时间为我们注解上面指定的时间,单位是毫秒。
-
decrby valueName ARGV[1]翻译成Redis命令就是:decrby {testRedissonRateLimiter02}:value 1 对{testRedissonRateLimiter02}:value的值减1,也就是对限流的总次数减1。
-
假如local currentValue = redis.call(‘get’, valueName); 的返回值不为空,说明俩个事情:1.{testRedissonRateLimiter02}:value 这个key在Redis中没有过期。2. 请求不是第一次访问,此时Redis命令:get {testRedissonRateLimiter02}:value的返回值就是剩余的限流次数。
-
tonumber(currentValue) < tonumber(ARGV[1]) then return redis.call(‘pttl’, valueName); 这行代码的意思是:如果currentValue(get命令获取到的限流的剩余次数)小于1,则执行pttl {testRedissonRateLimiter02}:value,将{testRedissonRateLimiter02}:value 这个key的剩余过期时间返回出去,注意这里很关键,因为返回值是一个数字。
-
else redis.call(‘decrby’, valueName, ARGV[1]); 这行代码的意思是:执行decrby {testRedissonRateLimiter02}:value 1,对{testRedissonRateLimiter02}:value的值减1,也就是将限流的剩余次数减1。return nil返回值为nil。
-
代码到这里就结束了,Redisson的限流原理就是这样的。
如果rateLimiter.tryAcquire()方法里面的lua脚本返回nil(nil)代表的就是没有超过限流次数,如果返回的有值说明超过限流了。因为redis命令:pttl永远都有返回值,你pttl 后面跟一个不存在的key,pttl命令也会返回一个数字。
rateLimiter.getConfig()这个方法会执行HGETALL testRedissonRateLimiter02 命令获取限流的所有配置信息。
// 获取限流的配置信息,执行Redis命令: HGETALL testRedissonRateLimiter02 命令获取限流的所有配置信息
RateLimiterConfig rateLimiterConfig = rateLimiter.getConfig();
rateLimiter.delete();这个方法会执行del testRedissonRateLimiter02 删除限流的所有配置。
其实这里Redisson有个BUG,因为他只删除了限流的key:testRedissonRateLimiter02,没有删除{testRedissonRateLimiter02}:value这个key,这个{testRedissonRateLimiter02}:value才是判断请求有没有超过限流次数的。这个BUG是我于2023年2月24日01:35:10发现的,Redisson版本为:3.10.7,不知道Redisson后续的版本有没有修复。
// 将timeOut转换成毫秒之后再跟rateInterval进行比较
if (TimeUnit.MILLISECONDS.convert(timeOut, TimeUnit.SECONDS) != rateInterval || count != rate) {
// 如果rateLimiterConfig的配置跟我们注解上面的值不一致,说明服务器重启过,程序员又修改了限流的配置
// 删除原有配置,执行Redis命令: del testRedissonRateLimiter02 删除限流配置
rateLimiter.delete();
// 以程序员重启后的限流配置为准,重新设置
rateLimiter.trySetRate(RateType.OVERALL, count, timeOut, RateIntervalUnit.SECONDS);
}
- 给大家画一个流程图吧
6 Redis验证
- 连接Redis服务器,执行monitor监控命令。生产环境上面不要执行monitor监控命令,非常耗性能。
- 修改注解上面的超时配置,时间调大一点
- 以debug模式启动项目
调试的时候,走一步就停,然后去Redis服务器上面看监控到的命令就行了。