目的
当系统处理能力有限时,控制流量,限流还有一个应用目的是控制用户行为,避免垃圾请求。系统要限定用户的某个行为在指定的时间里只能允许发生N次。
解决方案
设定滑动时间窗口
这个限流需求中存在一个滑动时间窗口(其实就是指定时间这个参数,定宽),可以通过zset的score来圈出这个时间窗口,只需要保留这个时间窗口,窗口之外的数据都可以砍掉。只需要保证唯一性即可,用uuid浪费空间,改用毫秒时间戳。
用zset记录用户行为历史,每个行为都会作为zset中的一个key保存下来。同一个用户的同一种行为用一个zset记录。
为了节省内存,只需要保留时间窗口内的行为记录,同时用户记录在滑动时间窗口的行为是空记录,那么这个zset就可以从内存移除。
通过统计滑动窗口内的行为数量与阈值max_count进行比较得出当前行为是否被允许
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
public class SimpleRateLimiter {
private Jedis jedis;
public SimpleRateLimiter(Jedis jedis) {
this.jedis = jedis;
}
public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
String key = String.format("hist:%s:%s", userId, actionKey);
long nowTs = System.currentTimeMillis();
Pipeline pipe = jedis.pipelined();
pipe.multi();
//保存当前访问人数据
pipe.zadd(key, nowTs, "" + nowTs);
//删除从开始到当前时间-指定时间范围
pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
Response<Long> count = pipe.zcard(key);
pipe.expire(key, period + 1);
pipe.exec();
pipe.close();
return count.get() <= maxCount;
}
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost", 6379);
SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
for (int i = 0; i < 20; i++) {
System.out.println(limiter.isActionAllowed("laoqian", "reply", 60, 5));
}
}
}
如果一定时间内的允许的数量比较大,例如限定60s操作不得超过100万次等,因为要记录时间窗口内所有行为记录,会消耗大量的存储空间。
高级限流算法-漏斗限流
- 漏斗容量有限,当堵住漏嘴时,往里灌水,漏斗到满了装不下会漏
- 当漏嘴放开时,如果漏嘴流速>灌水流速 ,那么漏斗永远装不满。如果漏嘴流速<灌水流速,漏斗的水会满,这是需要停止灌水并等待漏斗等待一部分空间。
所以,漏斗的剩余空间就代表着当前行为可以持续进行的数量,漏嘴的流水速率代表着系统允许该行为的最大频率。
import java.util.HashMap;
import java.util.Map;
/**
* 1. 漏斗容量有限,当堵住漏嘴时,往里灌水,漏斗到满了装不下会漏
* 2. 当漏嘴放开时,如果漏嘴流速>灌水流速 ,那么漏斗永远装不满。如果漏嘴流速<灌水流速,漏斗的水会满,这是需要停止灌水并等待漏斗等待一部分空间。
*
* 所以,漏斗的剩余空间就代表着当前行为可以持续进行的数量,漏嘴的流水速率代表着系统允许该行为的最大频率。
* 问题:不能保证在分布式并发的情况下是线程安全的
*/
public class FunnelRateLimiter {
static class Funnel {
//漏斗容量
int capacity;
//漏嘴流水速率
float leakingRate;
//漏斗剩余空间
int leftQuota;
//上一次漏水时间
long leakingTs;
public Funnel(int capacity, float leakingRate) {
this.capacity = capacity;
this.leakingRate = leakingRate;
this.leftQuota = capacity;
this.leakingTs = System.currentTimeMillis();
}
void makeSpace() {
long nowTs = System.currentTimeMillis();
//距离上一次漏水过去了多久
long deltaTs = nowTs - leakingTs;
//可以腾出不少空间
int deltaQuota = (int) (deltaTs * leakingRate) ;
//间隔时间太长,整数数字过大溢出
if (deltaQuota < 0) {
this.leftQuota = capacity;
this.leakingTs = nowTs;
return;
}
//腾出空间太小,等下一次,最小单位是1
if (deltaQuota < 1) {
return;
}
this.leftQuota += deltaQuota;//增加剩余空间
this.leakingTs = nowTs;//记录漏水时间
//剩余空间不得高于容量
if (this.leftQuota > this.capacity) {
this.leftQuota = this.capacity;
}
}
boolean watering(int quota) {
makeSpace();
//判断剩余空间是否足够
if (this.leftQuota >= quota) {
this.leftQuota -= quota;
return true;
}
return false;
}
}
//所有漏斗
private Map<String, Funnel> funnels = new HashMap<>();
/**
*
* @param userId
* @param actionKey
* @param capacity 漏斗容量
* @param leakingRate 漏嘴流水速率quota/s
* @return
*/
public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
String key = String.format("%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
return funnel.watering(1);//需要1个quota
}
public static void main(String[] args) {
FunnelRateLimiter funnelRateLimiter = new FunnelRateLimiter();
for (int i = 0; i < 20; i++) {
System.out.println(funnelRateLimiter.isActionAllowed("laoqian", "reply", 15, 0.5f));
}
}
}
问题:无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到hash结构,这三个过程不是原子性,需要适当加锁控制,但是加锁可能会失败,需要进行适当重试或者放弃。
限流模块-Redis-Cell
Redis4.0提供限流模块Redis-Cell,使用了漏斗算法,并提供原子限流指令。
指令:cl.throttle
启动redis和模块命令
/usr/local/redis-5.0.6/bin/redis-server --loadmodule 路径/redis-cell-v0.2.5-x86_64-apple-darwin/libredis_cell.dylib /usr/local/redis-5.0.6/bin/redis-conf
127.0.0.1:6379> cl.throttle laoqian:reply 15 30 60
1) (integer) 0 #0 表示允许,1表示拒绝
2) (integer) 16 #漏斗容量 capacity
3) (integer) 15 #漏斗剩余空间 left_quota
4) (integer) -1 #如果被拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 #多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)
在执行限流指令时,如果被拒绝了,就需要丢弃或者重试。cl.throrrle指令直接取返回结果数组的第四个值进行sleep或者异步定时任务来重试。