【Redis】 限流算法的应用

都江堰

一、问题场景

在日常开发中,针对于有大量请求流量的接口,我们通常会做一系列处理,来减轻数据库的压力。大多数情况会在数据库外面加一层缓存。虽然缓存层能帮忙分摊压力,但也存在缓存不能使用的情况:
1、热key大量失效 ~ 所有流量直接达到数据库上
2、缓存穿透 ~ 调用方查询不存在数据库中的数据,缓存也比不可能存在。
3、缓存主从不同步 ~ 为了保证缓存的高可用,通常会使用读写分离。在有大流量的时候,可以能会存在网络延迟,导致主从不同步,缓存层不生效。
4、······

二、问题分析

解决问题之前需要对问题进行分析。 上面的各种场景有相应的处理方案,本文主要针对其共同问题分析(缓存失效,大流量直接打到数据库)。

通常我们加缓存的处理流程:

请求处理流程

有的流程会在应用服务器中在加本地缓存,提供接口的整体响应时间。

缓存失效会导致流量直接打到数据库上,如下图:

穿透缓存情况

三、解决方案

如何解决缓存失效,所有流量直接打到数据库?

通常我们解决数据库压力的方式有 缓存、限流、降级等方法。

现在缓存存在失效的情况,可以采用限流方案,控制一段时间内访问数据库的请求量。

那么何为限流呢

顾名思义,限流就是限制流量,就像你宽带包了1个G的流量,用完了就没了。通过限流,我们可以很好地控制系统的qps,从而达到保护系统的目的。接下来将会介绍一下常用的限流算法以及他们各自的特点。

1、计算器限流

计数器算法是限流算法里最简单也是最容易实现的一种算法。比如我们规定,对于A接口来说,我们1分钟的访问次数不能超过100个。那么我们可以这么做:在一开 始的时候,我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于100并且该请求与第一个 请求的间隔时间还在1分钟之内,那么说明请求数过多;如果该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter,具体算法的示意图如下:

计算器限流算法示意图

Redis Lua实现:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CommonLimiter {
    /**
     * 计数容量
     *
     * @return 漏斗容量
     */
    double capacity();

    /**
     * 计数时间段
     *
     * @return 计数时间段
     */
    long countTime();

    /**
     * 时间单位
     *
     * @return 时间单位
     */
    TimeUnit countTimeUnit() default TimeUnit.SECONDS;

    /**
     * 每次请求所需加的数
     *
     * @return 每次请求所需加的数
     */
    double requestNeed() default 1;
}

@Slf4j
public class RedisCommonRateLimiter extends AbstractRedisRateLimiter {
    /**
     * 等待时间
     */
    private final Map<String, Long> waitMill = new ConcurrentHashMap<>();

    /**
     * 脚本
     */
    private static final String SCRIPT  = "local limitInfo = redis.call('hmget', KEYS[1], 'capacity', 'count', 'lastTs')\n" +
            "local capacity = limitInfo[1]\n" +
            "local count = limitInfo[2]\n" +
            "local lastTs = limitInfo[3]\n" +
            "if capacity == false then\n" +
            "    capacity = tonumber(ARGV[1])\n" +
            "    count = tonumber(ARGV[2])\n" +
            "    lastTs = tonumber(ARGV[4])\n" +
            "    redis.call('hmset', KEYS[1], 'capacity', capacity, 'count', count, 'lastTs', lastTs)\n" +
            "    return -1\n" +
            "else\n" +
            "    local nowTs = tonumber(ARGV[4])\n" +
            "    local period = tonumber(ARGV[3])\n" +
            "    local requestNeed = tonumber(ARGV[2])\n" +
            "    if nowTs - period >= requestNeed then\n" +
            "        if capacity - count >= requestNeed then\n" +
            "            count = count + requestNeed\n" +
            "            redis.call('hmset', KEYS[1], 'count', count)\n" +
            "            return -1\n" +
            "        else\n" +
            "           return lastTs\n" +
            "        end\n" +
            "    else\n" +
            "       count = requestNeed\n" +
            "       lastTs = nowTs\n" +
            "       redis.call('hmset', KEYS[1], 'count', count, 'lastTs', lastTs)\n" +
            "       return -1\n" +
            "    end\n" +
            "end";

    /**
     * redis脚本对象
     */
    private static final DefaultRedisScript<Long> DEFAULT_REDIS_SCRIPT;

    static {
        DEFAULT_REDIS_SCRIPT = new DefaultRedisScript<>();
        DEFAULT_REDIS_SCRIPT.setScriptText(SCRIPT);
    }

    @Override
    public Boolean canExecute(String keys, Annotation annotation) {
        CommonLimiter commonLimiter = (CommonLimiter) annotation;
        // 计数量
        double capacity = commonLimiter.capacity();
        // 时间段
        double period = commonLimiter.countTimeUnit().toMillis(commonLimiter.countTime());
        // 当前操作时间
        long currentTime = System.currentTimeMillis();
        // 操作数
        double requestNeed = commonLimiter.requestNeed();

        if (waitMill.get(keys) != null && currentTime - period <= waitMill.get(keys)) {
            // 当前时间段访问量已经满了 直接返回
            return false;
        }
        // 要么key为空, 要么已经到底下一个时间段
        waitMill.remove(keys);

        Object[] args = new Double[] {capacity, requestNeed, period, (double) currentTime};
        Long result = RedisCacheUtils.executeScript(DEFAULT_REDIS_SCRIPT, Lists.newArrayList(keys), args);
        // 返回错误判断
        if (null == result) {
            log.error("【commonLimit】 error result!");
            return false;
        }
        // 是否获取令牌成功
        if (result < 0) {
            return true;
        } else {
            // 未获取成功, 记录下该时间段的开始时间
            waitMill.put(keys, result);
            return false;
        }
    }

这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题,我们看下图:

计算器限流算法问题

从上图中我们可以看到,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。如果用户利用窗口时间重置的时间点,会导致系统瞬间接受到两倍的限制流量。
解决方法:
1、设置较小的流量限制数,这样即使是两倍,也在系统可接受范围内。
2、改进计算器算法,提高算法的统计精度。使用滑动窗口算法。

时间窗口算法

滑动窗口算法是在计数器算法上进行改进的,提高统计的精度。实现原理图:

时间窗口算法原理图

一个时间窗口就是一分钟。红色的虚线框则为一分钟的流量窗口。每次计数前,先取当前时间以及当前时间-一分钟的时间窗口,获取该窗口的请求数量,若超过限制数量,则拒绝。未超过数量则加入窗口中。

Redis Lua实现:


@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface WindowLimiter {
    /**
     * 持续时间,窗口间隔
     *
     * @return 窗口间隔
     */
    int during() default 1;

    /**
     * 单位时间
     *
     * @return 单位时间
     */
    TimeUnit duringUnit() default TimeUnit.SECONDS;

    /**
     * 通过的请求数
     *
     * @return 请求数
     */
    long passCount();

    /**
     * redis限流 模式
     */
    RedisRateLimitModelEnum clusterLimitModel() default RedisRateLimitModelEnum.WINDOW;
}

public class RedisWindowRateLimiter extends AbstractRedisRateLimiter {

    /**
     * 脚本
     */
    private static final String SCRIPT = "redis.call('zadd',KEYS[1],ARGV[1],ARGV[1]) redis.call('zremrangebyscore',KEYS[1],0,ARGV[2]) return redis.call('zcard',KEYS[1]) <= tonumber(ARGV[3])";

    /**
     * redis脚本对象
     */
    private static final DefaultRedisScript<Boolean> DEFAULT_REDIS_SCRIPT;

    static {
        DEFAULT_REDIS_SCRIPT = new DefaultRedisScript<>();
        DEFAULT_REDIS_SCRIPT.setScriptText(SCRIPT);
    }

    @Override
    public Boolean canExecute(String keys, Annotation annotation) {
        WindowLimiter tokenLimiter = (WindowLimiter) annotation;
        // 窗口时长
        long rateUnitMis = tokenLimiter.duringUnit().toMillis(tokenLimiter.during());
        // 当前流量处理时间 (窗口结束时间)
        long currentTime = System.currentTimeMillis();
        // (窗口开始时间)
        long lastTime = currentTime - rateUnitMis;
        Object[] args = new Long[]{currentTime, lastTime, tokenLimiter.passCount()};
        return RedisCacheUtils.executeScript(DEFAULT_REDIS_SCRIPT, Lists.newArrayList(keys), args);
    }
}

回顾一下刚才的计数器算法,我们可以发现,计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。

由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

2、令牌桶算法

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

1)、所有的请求在处理之前都需要拿到一个可用的令牌才会被处理;
2)、根据限流大小,设置按照一定的速率往桶里添加令牌;
3)、桶设置最大的放置令牌限制,当桶满时、新添加的令牌就被丢弃或者拒绝;
4)、请求达到后首先要获取令牌桶中的令牌,拿着令牌才可以进行其他的业务逻辑,处理完业务逻辑之后,将令牌直接删除;
5)、令牌桶有最低限额,当桶中的令牌达到最低限额的时候,请求处理完之后将不会删除令牌,以此保证足够的限流;

令牌桶算法原理图

**Java代码实现: **可以看下Google的Guava中 RateLimiter类,使用的就是令牌桶算法。

Redis Lua实现:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface TokenLimiter {
   /**
     * 令牌桶容量
     *
     * @return 令牌桶容量
     */
    double capacity();

    /**
     * 令牌生成时间
     *
     * @return 令牌生成速率
     */
    double tokenGenTime();

    /**
     * 生成时间单位,默认秒
     *
     * @return 生成时间单位
     */
    TimeUnit tokenGenTimeUnit() default TimeUnit.SECONDS;

    /**
     * 每次请求所需要的令牌数
     *
     * @return 每次请求所需要的令牌数
     */
    double requestNeed() default 1;

    /**
     * redis限流 模式
     */
    RedisRateLimitModelEnum redisRateLimitModelEnum() default RedisRateLimitModelEnum.TOKEN;
}

@Slf4j
public class RedisTokenRateLimiter extends AbstractRedisRateLimiter {

    /**
     * 等待时间
     */
    private final Map<String, Long> waitMill = new ConcurrentHashMap<>();

    /**
     * 脚本
     */
    private static final String SCRIPT  = "local limitInfo = redis.call('hmget', KEYS[1], 'capacity', 'funnelRate', 'leftToken', 'lastTs')\n" +
            "local capacity = limitInfo[1]\n" +
            "local tokenRate = limitInfo[2]\n" +
            "local leftToken = limitInfo[3]\n" +
            "local lastTs = limitInfo[4]\n" +
            "if capacity == false then\n" +
            "    capacity = tonumber(ARGV[1])\n" +
            "    tokenRate = tonumber(ARGV[2])\n" +
            "    local requestNeed = tonumber(ARGV[3])\n" +
            "    leftToken = capacity -  requestNeed\n" +
            "    lastTs = tonumber(ARGV[4])\n" +
            "    redis.call('hmset', KEYS[1], 'capacity', capacity, 'funnelRate', tokenRate, 'leftToken', leftToken, 'lastTs', lastTs)\n" +
            "    return -1\n" +
            "else\n" +
            "    local nowTs = tonumber(ARGV[4])\n" +
            "    local genTokenNum = tonumber((nowTs - lastTs) * tokenRate)\n" +
            "    leftToken = genTokenNum + leftToken\n" +
            "    leftToken = math.min(capacity, leftToken)\n" +
            "    lastTs = nowTs\n" +
            "    local requestNeed = tonumber(ARGV[3])\n" +
            "    if leftToken >= requestNeed then\n" +
            "        leftToken = leftToken - requestNeed\n" +
            "        redis.call('hmset', KEYS[1], 'leftToken', leftToken, 'lastTs', lastTs)\n" +
            "        return -1\n" +
            "    end\n" +
            "    return (requestNeed - leftToken) / tokenRate\n" +
            "end";

    /**
     * redis脚本对象
     */
    private static final DefaultRedisScript<Long> DEFAULT_REDIS_SCRIPT;

    static {
        DEFAULT_REDIS_SCRIPT = new DefaultRedisScript<>();
        DEFAULT_REDIS_SCRIPT.setScriptText(SCRIPT);
    }

    @Override
    public Boolean canExecute(String keys, Annotation annotation) {
        TokenLimiter tokenLimiter = (TokenLimiter) annotation;
        // 令牌数量
        double capacity = tokenLimiter.capacity();
        // 令牌生成时间
        long tokenGenTime = tokenLimiter.tokenGenTimeUnit().toMillis(tokenLimiter.tokenGenTime());
        // 令牌生成速率
        double tokenGenRate = capacity / tokenGenTime;
        // 获取令牌数量
        double requestNeedToken = tokenLimiter.requestNeed();
        // 当前处理时间
        long currentTimeMillis = System.currentTimeMillis();

        // 令牌获取是否需要等待
        if (waitMill.get(keys) != null && currentTimeMillis < waitMill.get(keys)) {
            // 暂时没有令牌
            return false;
        }

        Object[] args = new Double[] {capacity, tokenGenRate, requestNeedToken, (double) currentTimeMillis};
        Long result = RedisCacheUtils.executeScript(DEFAULT_REDIS_SCRIPT, Lists.newArrayList(keys), args);
        // 返回错误判断
        if (null == result) {
            log.error("【rateTokenLimit】 error result!");
            return false;
        }
        // 是否获取令牌成功
        if (result < 0) {
            waitMill.put(keys, currentTimeMillis);
            return true;
        } else {
            // 未获取成功, 返回的是下一个令牌的等待时间
            waitMill.put(keys, currentTimeMillis + result);
            return false;
        }
    }
}

总结:恒定速率流入,可以支持突发流量。通常突发流量最大值对于我们自己维护的服务是清晰可控的,为保证系统的最大可用性(尽可能处理更多的请求),同时防止自己的服务被打垮,优先使用令牌桶算法。

3、漏桶算法

漏桶算法思路很简单。以固定速率从桶中流出水滴,以任意速率往桶中放入水滴,桶容量大小是不会发生改变的。

流入:以任意速率往桶中放入水滴。
流出:以固定速率从桶中流出水滴。
水滴:是唯一不重复的标识。
在前提是同一时刻,因为桶中的容量是固定的,如果流入水滴的速率>流出的水滴速率,桶中的水滴可能会溢出。那么溢出的水滴请求都是拒绝访问的,或者直接调用服务降级方法。其原理示意图如下:

漏桶算法

Redis Lua实现:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface FunnelLimiter {
    /**
     * 漏斗容量
     *
     * @return 漏斗容量
     */
    double capacity();

    /**
     * 每秒漏出的速率
     *
     * @return 每秒漏出的速率
     */
    double funnelRate() ;
    
    /**
     * 时间
     *
     * @return 时间
     */
    long funnelTime() default 1;

    /**
     * 时间单位
     *
     * @return 时间单位
     */
    TimeUnit funnelTimeUnit() default TimeUnit.SECONDS;

    /**
     * 每次请求所需加的水量
     *
     * @return 每次请求所需加的水量
     */
    double requestNeed() default 1;

    /**
     * redis限流 模式
     */
    RedisRateLimitModelEnum clusterLimitModel() default RedisRateLimitModelEnum.FUNNEL;


@Slf4j
public class RedisFunnelRateLimiter extends AbstractRedisRateLimiter {

    /**
     * 脚本
     */
    private static final String SCRIPT  = "local limitInfo = redis.call('hmget', KEYS[1], 'capacity', 'funnelRate', 'requestNeed', 'water', 'lastTs')\n" +
            "local capacity = limitInfo[1]\n"  +
            "local funnelRate = limitInfo[2]\n" +
            "local water = limitInfo[3]\n" +
            "local lastTs = limitInfo[4]\n" +
            "if capacity == false then\n" +
            "    capacity = tonumber(ARGV[1])\n" +
            "    funnelRate = tonumber(ARGV[2])\n" +
            "    water = tonumber(ARGV[3])\n" +
            "    lastTs = tonumber(ARGV[4])\n" +
            "    redis.call('hmset', KEYS[1], 'capacity', capacity, 'funnelRate', funnelRate, 'water', water, 'lastTs', lastTs)\n" +
            "    return true\n" +
            "else\n" +
            "    local nowTs = tonumber(ARGV[4])\n" +
            "    local waterPass = tonumber((nowTs - lastTs) * funnelRate)\n" +
            "    water = math.max(0, water - waterPass)\n" +
            "    requestNeed = tonumber(requestNeed)\n" +
            "    if capacity - water >= requestNeed then\n" +
            "        lastTs = nowTs\n" +
            "        water = water + requestNeed\n" +
            "        redis.call('hmset', KEYS[1], 'water', water, 'lastTs', lastTs)\n" +
            "        return true\n" +
            "    end\n" +
            "    return false\n" +
            "end";

    /**
     * redis脚本对象
     */
    private static final DefaultRedisScript<Boolean> DEFAULT_REDIS_SCRIPT;

    static {
        DEFAULT_REDIS_SCRIPT = new DefaultRedisScript<>();
        DEFAULT_REDIS_SCRIPT.setScriptText(SCRIPT);
    }

    @Override
    public Boolean canExecute(String keys, Annotation annotation) {
        FunnelLimiter funnelLimiter = (FunnelLimiter) annotation;
        // 漏斗容量
        double capacity = funnelLimiter.capacity();
        // 漏斗流速
        double funnelRate = funnelLimiter.funnelRate() / funnelLimiter.funnelTimeUnit().toMillis(funnelLimiter.funnelTime());
        // 进入漏斗的水量
        double requestNeed = funnelLimiter.requestNeed();
        // 当前处理时间
        long currentTimeMillis = System.currentTimeMillis();
        Object[] args = new Double[] {capacity, funnelRate, requestNeed, (double) currentTimeMillis};
        return RedisCacheUtils.executeScript(DEFAULT_REDIS_SCRIPT, Lists.newArrayList(keys), args);
    }

恒定速率流出,不支持突发流量。在依赖服务没有做限流的场景下,可以用于防止打垮我们依赖服务,因为第三方服务的最大水位及其在最大水位可持续服务多长时间,对上层服务是未知的。

总结

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。

学习文章

三种常见的限流算法
redis应用-限流

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容