这段时间因为业务需要,去学习使用了RateLimiter类来控制访问,发现十分方便好用,因此打算学习一下是如何实现的。
RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率。
重点在于控制的速率。
基础算法
是如何实现这种对速率的控制的呢?这就跟RateLimiter的设计的算法——令牌桶算法 and 漏桶算法
令牌桶算法
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,当一个请求进来的时候,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
1.每秒会有 r 个令牌放入桶中,或者说,每过 1/r 秒桶中增加一个令牌(不是某个时间点一下子给出该秒所有的令牌)
2.桶中最多存放 b 个令牌,如果桶满了,新放入的令牌会被丢弃
3.当一个 n 字节的数据包到达时,消耗 n 个令牌,然后发送该数据包
4.如果桶中可用令牌小于 n,则该数据包将被缓存或丢弃
漏桶算法
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率
区别:
漏桶算法能够强行限制数据的传输速率。
令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。
RateLimiter的使用:
基本使用手册里都有详细描述 ,这里主要说一下之前遇到的问题
使用RateLimiter的时候会在最开始的时候create一个规定的速率。
这就是对速率的控制。
//每秒5个令牌。以每0.2/个令牌的速度放进去
RateLimiter rateLimiter = RateLimiter.create(5.0);
关于acquire 和 tryAcquire
double a = rateLimiter.acquire();
boolean b= rateLimiter.tryAcquire();
最开始,在代码里使用了acquire方法,然后发现有些请求十分缓慢,后来发现是多余的请求会被阻塞,返回被阻塞的时间。但是这样会影响正常的访问。之后改成了tryAcquire,tryAcquire是非阻塞的,如果不能即时获取到令牌,会直接失败,然后直接将返回false的请求给throw 出来。这样去控制一些访问的请求数。
比如 我们按照我们上述规定的5个/s ,一秒内5个请求进来,每隔0.2s就可以允许一个请求,如果超过5个,那么就需要等待,如果用acquire,那么第六个请求需要等1.2秒之后才能获取到令牌,这1.2s里 这个请求是在等待的。但是用tryAcquire的话就会立即返回false;
RateLimiter的实验:
RateLimiter rateLimiter = RateLimiter.create(5);
double a=rateLimiter.acquire(1);
System.out.println("0 = "+a);
double b=rateLimiter.acquire(3);
System.out.println("1 = "+b);
double c=rateLimiter.acquire(1);
System.out.println("3 = "+c);
double d=rateLimiter.acquire(18);
System.out.println("4 = "+d);
double e=rateLimiter.acquire(1);
System.out.println("5 = "+e);
结果:
0 = 0.0 //第一个是预支的
1 = 0.194598 //因为第一个预支了,所以第二次要的等1/5s
3 = 0.596369 //同理 第二次取了3个 就需要给1/5 * 3 s
4 = 0.198327
5 = 3.597525
和Semaphore 的区别
这个是JDK 里 concurrent下的一个类,也是可以指定一个令牌总数。多线程情况下,每个线程要继续执行,需要获取这个令牌。每个线程用完了放回去。主要是控制并发总数。
这里RateLimiter 是控制一个速率
RateLimiter的源码:
SmoothBursty:恒定的速度放入令牌
SmoothWarmingUp:缓慢提升到一定值放入令牌
区别:可以控制速率
RateLimiter 中的时间窗口能且仅能为 1s,可以见下面源码分析。
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
//可以看这里是写死的1s;
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
//这里才是主要设置令牌数的地方
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
static final class SmoothBursty extends SmoothRateLimiter {
/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
final double maxBurstSeconds;
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
//允许保存的最大的permit
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
public double acquire(int permits) {
long microsToWait = reserve(permits); 先计算获取这些请求需要让线程等待多长时间
stopwatch.sleepMicrosUninterruptibly(microsToWait); //让线程阻塞microTowait微秒长的时间
return 1.0 * microsToWait / SECONDS.toMicros(1L);//返回阻塞的时间
}
final long reserve(int permits) {
checkPermits(permits);//主要判断permits为正
synchronized (mutex()) {
//stopwatch 这个是到当前为止 等待睡眠的时间
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
//返回需要等待的时间(正数)
final long reserveAndGetWaitLength(int permits, long nowMicros) {
//获取持续的时间
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
//返回
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); //补充令牌
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
//需要重新生成多少个permit
double freshPermits = requiredPermits - storedPermitsToSpend;
//生成freshPermits个permit需要的时间 这个时间就是下次我们需要减去的时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
//下次获取的时候需要减去的时间=当前下次获取的时候需要减去的时间+这次需要等待的时间
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend; //减去消耗的令牌
return returnValue;
}
//补充令牌
void resync(long nowMicros) {
//如果需要的时间大于这次获取的时候需要减去的时间
if (nowMicros > nextFreeTicketMicros) {
// nextFreeTicketMicros的意思是:下次获取的时候需要减去的时间
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
//storedPermits能保存的permit
//就是过去一段时间的利用不足的storedPermits 最多不超过create的permit
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}