性能文章>Google Guava之RateLimiter核心源码解读(中)>

Google Guava之RateLimiter核心源码解读(中)原创

4年前
9506110

RateLimiter是Google Guava框架的一个限速器,通常用于控制对某个资源的访问速率。

限速常见的有两种实现方式,一种是令牌桶,另一种是漏桶。

RateLimiter选择了令牌桶作为其底层实现,按照固定速率投放令牌,同时支持突发流量。

在上一篇中,我们先简单解读了Ticker和Stopwatch,它们是RateLimiter底层时间计算的基础。

传送门:《Google Guava之RateLimiter核心源码解读(上)

这一篇,我们将要解读的是创建RateLimiter实例和获取许可的核心代码,涉及到RateLimiter、SmoothRateLimiter和SmoothBursty。

代码仍然是基于Guava 23.0版本。

在进行源码分析之前,先来看一个简单的例子,找点感觉。

示例

@Test
public void testFirst() {
	RateLimiter rateLimiter = RateLimiter.create(1);
	while (true) {
		System.out.println(rateLimiter.acquire(1));
	}
}
/*
执行结果:
0.0
0.993779
0.976652
0.967317
0.9815
0.999405
0.997789
...
*/

在这段代码中,首先创建了一个RateLimiter实例,然后通过while循环不断从限速器中获取许可。

其中,RateLimiter.create(1)表示创建一个RateLimiter实例,每秒的许可数为1。也就是说,如果想从限速器中获得许可,那么每秒最多只能获取1个。

rateLimiter.acquire(1)表示从限速器中获取1个许可,该方法会阻塞直到获得了指定的许可数。

执行程序输出的结果,表示每次获得许可所等待的时间,单位为秒。显然,每获得1个许可需要花费约1秒的时间。这个1秒,就是在创建RateLimiter时,create方法传入的1,即限速器每秒可发放的许可数。

RateLimiter的核心设计思想

RateLimiter的一个主要特性,是能够提供稳定(或固定)的速率。对于这一点,可以通过在初始化时,传入一个期望的速率值。然后根据这个值,计算出一个具体的限制时间,使得线程在获取许可时进行相应的等待。

那么,具体要如何来实现对速率的控制呢?

最简单的方法,就是保留最后一次发放许可给线程请求的时间戳,并且确保在1/QPS秒之后才允许下一次请求许可通过。

其中,QPS指每秒的最大许可数,即每秒最多允许发放多少个许可。而1/QPS,表示每发放1个许可的间隔时间。

例如,对于QPS为5的速率(即每秒发送5个许可),我们只要确保距最后一次准许请求通过的时间差达到200ms,才发放下一个许可,即可满足预期的速率要求。

如果一个新的请求到来了,但距最后一个请求被批准的时间仅过了100ms,那么本次请求需要再等待100ms。

因此,以这样的速率,发送15个许可,总共需要3秒(15除以5)。

如果一个请求速率较高的任务到达一个比较空闲的RateLimiter,那么该任务将立即被授予指定的许可。然而,下一个请求将会受到额外的限制,因为前一个请求预取了一部分许可,需要由本次请求来承担额外的等待时间。

这其实就是RateLimiter支持突发访问流量的体现,但这同时也说明了RateLimiter并不提供多个请求间的公平保证。(想想先来的请求可能不需要等待,而下一个却要为前一个请求进行额外的等待)

基于以上的设计,如果RateLimiter长时间未使用,或者比较空闲的时候,那么空闲期间的许可我们是没有使用到的,这样将会被造成浪费,可能导到资源利用率低等问题。

为了充分利用资源,将空闲期间的许可给利用起来,需要有一个地方来记录空闲期间的许可数。在RateLimiter中,使用storedPermits来存储未被使用的许可数。

默认的RateLimiter配置可以保存最多一秒钟的未使用许可。

RateLimiter类图

RateLimiter的类关系图如下:

RateLimiterClassDiagram.png

RateLimiter和SmoothRateLimiter都是抽象类,SmoothRateLimiter继承了RateLimiter。

SmoothBursty和SmoothWarmingUp都继承SmoothRateLimiter,是RateLimiter的最终实现类。

同时,SmoothBursty和SmoothWarmingUp直接在SmoothRateLimiter内部定义,都是静态的内部类。

因此,在创建RateLimiter的具体实例时,实际上是在内部创建SmoothBursty或SmoothWarmingUp实例。

核心源码解读

创建RateLimiter实例


public abstract class RateLimiter {
  /**
   * 创建一个RateLimiter实例,并设置每秒的许可数.
   */
  public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }
  
  /**
   * 创建一个默认实现为SmoothBursty的RateLimiter实例.
   * 
   * @param permitsPerSecond 每秒许可数
   * @param stopwatch 支持sleep的秒表
   * @return
   */
  static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
	// 创建一个RateLimiter实例,默认实现为SmoothBursty. 突发时间为1s
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond); // 设置速率,即每秒的许可数
    return rateLimiter;
  }
}

其中,SleepingStopwatch.createFromSystemTimer()表示创建一个支持sleep方法的秒表。SleepingStopwatch有两个方法,一个是获取从启动限速器到当前的时间,另一个是执行不可中断的睡眠。

public static final SleepingStopwatch createFromSystemTimer() {
  return new SleepingStopwatch() {
	// 初始化秒表
    final Stopwatch stopwatch = Stopwatch.createStarted();

    /**
     * 从启动到现在经过的微秒数。相当于ticker.read() - startTick,即"当前纳秒 - 启动时的纳秒",然后转换为微秒。
     */
    @Override
    protected long readMicros() {
      return stopwatch.elapsed(MICROSECONDS);
    }

    /**
     * 不可中断的进行sleep,时间为微秒。<br>
     * 实际上,如果遇到中断,会先忽略中断异常,继续sleep,直到等待了指定时长后才结束。
     */
    @Override
    protected void sleepMicrosUninterruptibly(long micros) {
      if (micros > 0) {
        Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
      }
    }
  };
}
  /**
   * 不可中断的执行Thread.sleep()。<br>
   * 不可中断是指,当线程发生中断时,sleep不会被中断,直到等待时间到了才会返回。<br>
   * 如果sleep期间被中断过,在返回前会恢复线程的中断状态,即执行Thread.currentThread().interrupt()。
   */
  public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
    boolean interrupted = false;
    try {
      long remainingNanos = unit.toNanos(sleepFor); // 转化为纳秒
      long end = System.nanoTime() + remainingNanos; // 等待的结束时间
      while (true) {
        try {
          // TimeUnit.sleep() treats negative timeouts just like zero.
          NANOSECONDS.sleep(remainingNanos); // 等待remainingNanos纳秒
          return;
        } catch (InterruptedException e) { // 被中断了
          interrupted = true; // 记录中断状态
          remainingNanos = end - System.nanoTime(); // 还要等待多久
        }
      }
    } finally {
      if (interrupted) {
        Thread.currentThread().interrupt(); // 如果等待期间发生了中断,则恢复线程的中断状态
      }
    }
  }

SleepingStopwatch不是我们本讲的重点,代码也比较简单,大家自行看一下注解基本就明白了。

让我们回到create方法,首先是创建SmoothBursty实例,下面看看它的源代码。

创建SmoothBursty实例

  /**
   * 支持突发流量的RateLimiter
   */
  static final class SmoothBursty extends SmoothRateLimiter {
    final double maxBurstSeconds; // 最大的突发秒数,默认为1s. 即突发许可数为:maxBurstSeconds * permitsPerSecond

    /**
     * 构造函数
     * 
     * @param stopwatch 秒表
     * @param maxBurstSeconds 最大突发秒数
     */
    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
      super(stopwatch);
      this.maxBurstSeconds = maxBurstSeconds;
    }
  }

调用SmoothBursty的构造函数,首先是将stopwatch往父类传,然后将突发秒数maxBurstSeconds保存下来。

super(stopwatch)将会调用SmoothRateLimiter的构造函数:

  private SmoothRateLimiter(SleepingStopwatch stopwatch) {
    super(stopwatch);
  }

最后再调用RateLimiter的构造函数:

  RateLimiter(SleepingStopwatch stopwatch) {
    this.stopwatch = checkNotNull(stopwatch);
  }

可以看到,stopwatch对象从RateLimiter的create方法传进来,千辛万苦又回到了RateLimiter的构造函数,最终保存到RateLimiter的变量stopwatch。

绕了一大圈,看着好像有点傻,那是因为RateLimiter本身是抽象类,创建的实例实际上是其实现类SmoothBursty,成员变量stopwatch实际是SmoothBursty对象的。

接下来是调用rateLimiter.setRate(double permitsPerSecond)方法。该方法用于更新速率值,即更新每秒的最大许可数。

  /**
   * 更新速率值,即更新每秒的最大许可数. <br>
   * 当前正受速率限制的请求,不会受本次更新影响。下一次请求才会受影响,使用新的速率值
   */
  public final void setRate(double permitsPerSecond) {
    synchronized (mutex()) {
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

首先是调用mutex()获取锁对象,在首次调用时创建锁对象,后续调用直接返回。

然后通过synchronized关键字,尝试获取锁。

获得锁之后,执行doSetRate()方法,传入速率值,以及当前时间(其实就是从启动到现在经过的微秒数,也可理解为当前时间)。

doSetRate()方法在SmoothRateLimiter类中实现:

  /**
   * 设置速率
   */
  @Override
  final void doSetRate(double permitsPerSecond, long nowMicros) {
	// 1.更新令牌桶中的令牌数,以及下一次请求可以获取令牌的起始时间.
    resync(nowMicros);
    // 2.计算并保存每1个许可请求的时间间隔,即添加令牌时间间隔. 公式:1000ms/每秒许可数
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    // 3.更新当前存储令牌数,和牌桶最大存储的令牌数量
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

总体三分为3步,下面进行分析。

1.更新令牌桶及下一次请求可以获取令牌的起始时间

更新令牌桶中的令牌数,以及下一次请求可以获取令牌的起始时间.

  /**
   * 根据当前时间,更新令牌桶中的令牌数storedPermits,以及下一次请求可以获取令牌的起始时间nextFreeTicketMicros。<br>
   * 前者可以理解为根据规则往令牌桶中添加令牌,以便有请求时可以从中获取令牌进行发放。
   */
  void resync(long nowMicros) {
	// 更新令牌桶中的令牌数,以及下一次请求可以获取令牌的起始时间.
	// 仅当当前时间比下一次可获取令牌的时间晚时才需要更新,如果当前时间更加说明发送新令牌的时间还没到.
    if (nowMicros > nextFreeTicketMicros) {
      // 根据已经过去的时间,计算出可以再新增多少个令牌存放到令牌桶中。初始化的时候,调用该方法,结果为Infinity
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      // 更新令牌桶内剩余的可发放的令牌数。maxPermits是上限,这里其实是确保令牌桶不超限:storedPermits <= maxPermits
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }

要看懂这段代码,得先了解nextFreeTicketMicros的含义。

在开篇处,我们讲到要做限速,要记录最后一次发放许可让请求通过的时间。如果是这样的话,那么每次请求获取许可时,都需要计算一下本次与上一次发放许可的时间差。

在RateLimiter中,做了一点优化,每次获取许可用,记录的是下一次(而不是本次)可以获取许可的时间,这个时间就是上面的nextFreeTicketMicros。

nextFreeTicketMicros的具体含义是,下一次请求可以获取令牌的起始时间。该时间可能比当前请求的时间早,也可能是之后的某个时间。

如果nextFreeTicketMicros比当前时间还晚,则说明本次获取许可的时间还未到,后面需要等待。

而如果nextFreeTicketMicros比当前时间早,那么就进入上面的if代码块。目的有两个,一个是更新下一次请求可以获取令牌的起始时间,另一个是根据时间差计算令牌桶中未被使用可供后续发放的令牌数storedPermits。

这两个变量,都在SmoothRateLimiter中定义。

2.计算并保存每1个许可请求的时间间隔

计算并保存每1个许可请求的时间间隔,即添加1个令牌时间间隔. 公式:1000ms/每秒许可数。

最后保存到SmoothRateLimiter的stableIntervalMicros变量中。

double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;

stableIntervalMicros声明如下:

  /**
   * 发放每1个许可的时间间隔,也是每添加1令牌到令牌桶的时间间隔. 单位为微秒<br>
   * 比如每秒5个许可,则1000ms/5=200ms,即每200ms可发放1个许可。
   */
  double stableIntervalMicros;

3.更新当前存储令牌数,和牌桶最大存储的令牌数量

更新当前存储令牌数,和牌桶最大存储的令牌数量。

// 3.更新当前存储令牌数,和牌桶最大存储的令牌数量
doSetRate(permitsPerSecond, stableIntervalMicros);

doSetRate取决于具体的实现,对应的实现在SmoothBursty中。

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits; // 初始化调用时为0
  maxPermits = maxBurstSeconds * permitsPerSecond; // 令牌桶最大存储的令牌数量=最大突发秒数*每秒许可数
  if (oldMaxPermits == Double.POSITIVE_INFINITY) { // 什么场景下会这么大?普通的应用遇不到
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else { // 如果是初始化,oldMaxPermits为0,storedPermits设置为0,刚开始令牌桶是空的。
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits; // 如果更新已有限速器的速率,则重新计算当前存储令牌数.
  }
}

在这段代码中,有一个新的变量maxPermits,表示令牌桶最大存储的令牌数量。

maxPermits的计算公式为:允许突发秒数 * 每秒许可数。也就是这段代码:maxPermits = maxBurstSeconds * permitsPerSecond;

maxPermits与storedPermits之间的关系为:storedPermits <= maxPermits。也就是说,令牌桶中的令牌数,不能超过最大存储令牌数maxPermits。

if (oldMaxPermits == Double.POSITIVE_INFINITY)

if代码段目前没想到应用场景,一般应用不会使用到。

对于else代码段,有两种场景会用到。第一个场景是创建RateLimiter的时候进行初始化,第二个是对已有RateLimiter实例进行速率更新时使用到。

如果是创建新的RateLimiter实例,那么刚开始令牌桶是空的,初始化为0。

如果是更新已有实例的速率,则根据新的最大速率,按比例缩放storedPermits,这个比例就是新旧的两个最大令牌数的比值:maxPermits / oldMaxPermits,再用这个比例乘以旧的令牌桶中的令牌数storedPermits,最终得到令牌桶中新的存储令牌数。

以上,就是创建RateLimiter实例的过程。

获取许可

在完成RateLimiter实例的创建之后,接下来就是实际应用中最核心的方法acquire(),用于从限速器中获取指定数量的许可。

public abstract class RateLimiter {
  /**
   * 获取指定数量的许可数,阻塞直到可以获取时才返回.<br>
   * 同时,返回获取该许可数阻塞等待的时间. 返回0表示本次获取立即返回,不需要等待.
   */
  public double acquire(int permits) {
	// 保留指定数量的许可数,以便将来/后续使用,并返回获取指定许可数仍需等待的时间
    long microsToWait = reserve(permits);
    // 不可中断的等待指定的微秒数
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    // 等待时间转化为秒数
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }
  
  /**
   * 保留指定数量的许可数,以便将来/后续使用,并返回获取指定许可数仍需等待的时间.<br>
   * 保留,可以理解为预先申请额度,但这个额度需要时间到了才可以使用。具体是否需要等待,取决于返回的时间值.<br>
   */
  final long reserve(int permits) {
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }
  
  /**
   * 保留/消费掉令牌,并返回本次获取令牌仍需等待的时间。<br>
   * 只要时间点到了,就可以获取不超过maxPermits的令牌数。至于原本不能满足的/突发的令牌数,让下一个请求去等待吧。

   * @param permits 请求许可数/令牌数
   * @param nowMicros 从启动到现在经过的微秒数,用来表示当前时间
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
	// 能够获取到令牌的时间点
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    // 如果momentAvailable > nowMicros,则需要等待,计算出等待时间。0表示不需要等待
    return max(momentAvailable - nowMicros, 0);
  }
}

acquire()的主要逻辑,是调用reserve()方法,预留指定数量的许可,计算出下一次可获取许可的时间,并返回本次获取许可仍需要等待的时间,然后调用sleepMicrosUninterruptibly()方法进行不可中断的等待,最后返回本次获取许可所等待的时间。

reserve()方法,同样需要先获得锁,然后再调用reserveAndGetWaitLength()方法。

reserveAndGetWaitLength()传入了两个参数,一个是本次要获取的许可数,另一个是当前时间(其实就是从启动到现在经过的微秒数,也可理解为当前时间)。

reserveAndGetWaitLength()先调用reserveEarliestAvailable()方法,将参数往下传。然后再计算出本次需要等待的时间,并返回该值。

abstract class SmoothRateLimiter extends RateLimiter {
  /**
   * 保留本次请求的许可数,更新下一次请求可以获取令牌的起始时间,并返回本次请求许可可以使用的时间.
   *
   * @param permits 请求许可数/令牌数
   * @param nowMicros 从启动到现在经过的微秒数. 用来代表当前时间
   * @return the time that the permits may be used, or, if the permits may be used immediately, an
   *     arbitrary past or present time
   */
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
	// 根据当前时间,更新令牌桶中的令牌数storedPermits,以及下一次请求可以获取令牌的起始时间nextFreeTicketMicros。
    resync(nowMicros);
    
    // 本次请求可以获取令牌的时间点。如果nowMicros >= nextFreeTicketMicros,则不需要等待。
    long returnValue = nextFreeTicketMicros;
    // 本次能满足的令牌数量,即本次请求应该发放多少个令牌。确保不超出已存储的令牌数,确保storedPermitsToSpend <= storedPermits
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可能为0
    // 本次未被满足的令牌数量,即requiredPermits中有多少个满足不了. 如果能满足,那么计算结果为0.
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 如果requiredPermits未被全部满足,还缺多少个令牌,乘以每个令牌发放的时间间隔(freshPermits * stableIntervalMicros),得出要等多久才能拿到令牌
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // SmoothBursty默认返回0
            + (long) (freshPermits * stableIntervalMicros);
    
    // 加上waitMicros,更新下一次请求可以获取令牌的起始时间
    // 这里就有预留的概念,将本次未满足的令牌数需要等待的时间,加到nextFreeTicketMicros上面,下一次获取的线程需要等到该时间点之后才可获取。
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 【核心】是更新这个值
    // 令牌桶内剩余的可发放的令牌数
    this.storedPermits -= storedPermitsToSpend; // 这里只是减去当前能满足的数量,至于突发的那部分,等到nextFreeTicketMicros到了自然就能补回来了
    
    // 返回在执行resync之后,下一次请求可获取令牌的起始时间nextFreeTicketMicros.
    // 请注意,这里返回的nextFreeTicketMicros,是没有加上waitMicros的. 加上之后的值,是用于下一次请求使用的.
    return returnValue;
  }
  
    /**
   * 根据当前时间,更新令牌桶中的令牌数storedPermits,以及下一次请求可以获取令牌的起始时间nextFreeTicketMicros。<br>
   * 前者可以理解为根据规则往令牌桶中添加令牌,以便有请求时可以从中获取令牌进行发放。
   */
  void resync(long nowMicros) {
	// 更新令牌桶中的令牌数,以及下一次请求可以获取令牌的起始时间.
	// 仅当当前时间比下一次可获取令牌的时间晚时才需要更新,如果当前时间更加说明发送新令牌的时间还没到.
    if (nowMicros > nextFreeTicketMicros) {
      // 根据已经过去的时间,计算出可以再新增多少个令牌存放到令牌桶中。初始化的时候,调用该方法,结果为Infinity
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      // 更新令牌桶内剩余的可发放的令牌数。maxPermits是上限,这里其实是确保令牌桶不超限:storedPermits <= maxPermits
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }
}

reserveEarliestAvailable()方法,是实现acquire()方法的核心逻辑。

reserveEarliestAvailable()可以分为4个子流程,分别如下。

1.更新令牌桶及下一次请求可以获取令牌的起始时间

首先调用resync()方法,主要用于判断是否需要更新令牌桶及下一次请求可以获取令牌的起始时间,如果需要更计算出需要增加的令牌数,存储到storedPermits中,并设置nextFreeTicketMicros为传入的时间。

resync()方法在前面讲解doSetRate()时已经分析过,这里就不再重复分析。

2.暂存本次可获取许可的时间点用于最后返回

实际,第2步只有一句代码。

long returnValue = nextFreeTicketMicros;

在reserveEarliestAvailable()方法的最后返回,用于计算在acquire()方法返回前需要等待的时间,而该等待时间也会返回给调用方。

3.计算本次能满足和未被满足的许可数,以及预取许可需要额外等待的时间

1)当前可用的令牌数,就是令牌桶storedPermits的数量,因此能满足的许可数不能超过storedPermits的值。

// 本次能满足的令牌数量,即本次请求应该发放多少个令牌。确保不超出已存储的令牌数,确保storedPermitsToSpend <= storedPermits
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可能为0

2)而未被满足的许可数,自然就是请求获取的许可数requiredPermits,减去能满足的许可数storedPermitsToSpend。

// 本次未被满足的令牌数量,即requiredPermits中有多少个满足不了. 如果能满足,那么计算结果为0.
double freshPermits = requiredPermits - storedPermitsToSpend;

3)计算出获取"未被满足的许可数"需要等待的时间

// 如果requiredPermits未被全部满足,还缺多少个令牌,乘以每个令牌发放的时间间隔(freshPermits * stableIntervalMicros),得出要等多久才能拿到令牌
long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // SmoothBursty默认返回0
            + (long) (freshPermits * stableIntervalMicros);

对于SmoothBursty,storedPermitsToWaitTime()方法默认是返回0,因此可以忽略。

实际上就是用未被满足的许可数freshPermits,乘以每个令牌发放的时间间隔stableIntervalMicros,得到满足所有许可需要额外等待的时间。

4.更新下一次请求可以获取令牌的起始时间和令牌桶中的令牌数

// 加上waitMicros,更新下一次请求可以获取令牌的起始时间
// 这里就有预留的概念,将本次未满足的令牌数需要等待的时间,加到nextFreeTicketMicros上面,下一次获取的线程需要等到该时间点之后才可获取。
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 【核心】是更新这个值
// 令牌桶内剩余的可发放的令牌数
this.storedPermits -= storedPermitsToSpend; // 这里只是减去当前能满足的数量,至于突发的那部分,等到nextFreeTicketMicros到了自然就能补回来了

LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros)的作用是将 nextFreeTicketMicros和waitMicros相加,计算出在本次获取许可之后,下一次请求可以获取令牌的起始时间。

对于waitMicros,可以理解为预取许可需要等待的时间。思想是预先获取本次满足不了的额外那部分许可,然后让下一个请求去等待这部分时间。

而对于令牌桶中的令牌数,只需要减去本次能满足的数量storedPermitsToSpend。至于额外预取的数量,只需要通过等待相应的时间来抵消即可。因为在等待期间,storedPermits是不会增加的。这是由于等待时间已经加到nextFreeTicketMicros,而storedPermits的计算依赖于nextFreeTicketMicros。

小结

在文章开头,我们先简单了解了RateLimiter的基本设计思想。然后分别分析了创建RateLimiter实例,以及获取许可的源代码。

RateLimiter主要包含2大流程:

1.创建RateLimiter实例

1)创建SmoothBursty实例,初始化秒表和突发秒数;

2)初始化下一次请求可以获取令牌的起始时间nextFreeTicketMicros;

3)通过速率值计算出每产生一个许可的时间间隔stableIntervalMicros;

4)根据最大突发秒数和每秒许可数,初始化最大速率值maxPermits;

5)初始化令牌桶中的令牌数量storedPermits。

2.获取许可

1)根据当前时间,更新令牌桶中的令牌数storedPermits,以及下一次请求可以获取令牌的起始时间nextFreeTicketMicros;

2)计算本次能满足和未被满足的许可数,以及预取许可需要额外等待的时间;

3)更新下一次请求可以获取令牌的起始时间和令牌桶中的令牌数;

4)计算出本次获取许可需要等待的时间,如果大于0则进行不可中断等待后再返回。

最后说明一下,本文中许可和令牌是同一个意思。


公众号:二进制之路

博客:binarylife.icu

点赞收藏
betterlife
请先登录,查看1条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

随机一门技术分享之Netty

随机一门技术分享之Netty

MappedByteBuffer VS FileChannel:从内核层面对比两者的性能差异

MappedByteBuffer VS FileChannel:从内核层面对比两者的性能差异

10
1