[一个简单的秒杀架构的演变]4. 使用分布式限流

地址

目录

1. 思路介绍

之前说到乐观锁更新操作还是执行了近 100 次 SQL,其实这 100 次里就只有 10 次扣库存成功才是有效请求,其他的都是无效请求,为了遵从最后落地到数据库的请求数要尽量少的原则,这里我们使用限流,把大部分无效请求拦截,尽可能保证最终到达数据库的都是有效请求

这次我们引入限流,这里可以先查看一篇文章: 高并发下的限流分析

看完可以了解几种限流算法(计数器(时间窗口),漏桶,令牌桶)以及区别,对比下来,我们这里使用固定时间窗口最好,这里使用 Redis + Lua 的分布式限流方式

2. 限流实现

先写一个工具类,再写一个注解封装,两种形式都可以使用

2.1. Lua脚本

  • 秒级限流(每秒限制多少请求)
-- 实现原理
-- 每次请求都将当前时间,精确到秒作为 key 放入 Redis 中
-- 超时时间设置为 2s, Redis 将该 key 的值进行自增
-- 当达到阈值时返回错误,表示请求被限流
-- 写入 Redis 的操作用 Lua 脚本来完成
-- 利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性

-- 资源唯一标志位
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])

-- 获取当前流量大小
local currentLimit = tonumber(redis.call('get', key) or "0")

if currentLimit + 1 > limit then
    -- 达到限流大小 返回
    return 0;
else
    -- 没有达到阈值 value + 1
    redis.call("INCRBY", key, 1)
    -- 设置过期时间
    redis.call("EXPIRE", key, 2)
    return currentLimit + 1
end
  • 自定义参数限流(自定义多少时间限制多少请求)
-- 实现原理
-- 每次请求都去 Redis 取到当前限流开始时间和限流累计请求数
-- 判断限流开始时间加超时时间戳(限流时间)大于当前请求时间戳
-- 再判断当前时间窗口请求内是否超过限流最大请求数
-- 当达到阈值时返回错误,表示请求被限流,否则通过
-- 写入 Redis 的操作用 Lua 脚本来完成
-- 利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性

-- 一个时间窗口开始时间(限流开始时间)key名称
local timeKey = KEYS[1]
-- 一个时间窗口内请求的数量累计(限流累计请求数)key名称
local requestKey = KEYS[2]
-- 限流大小,限流最大请求数
local maxRequest = tonumber(ARGV[1])
-- 当前请求时间戳
local nowTime = tonumber(ARGV[2])
-- 超时时间戳,一个时间窗口时间(毫秒)(限流时间)
local timeRequest = tonumber(ARGV[3])

-- 获取限流开始时间,不存在为0
local currentTime = tonumber(redis.call('get', timeKey) or "0")
-- 获取限流累计请求数,不存在为0
local currentRequest = tonumber(redis.call('get', requestKey) or "0")

-- 判断当前请求时间戳是不是在当前时间窗口中
-- 限流开始时间加超时时间戳(限流时间)大于当前请求时间戳
if currentTime + timeRequest > nowTime then
    -- 判断当前时间窗口请求内是否超过限流最大请求数
    if currentRequest + 1 > maxRequest then
        -- 在时间窗口内且超过限流最大请求数,返回
        return 0;
    else
        -- 在时间窗口内且请求数没超,请求数加一
        redis.call("INCRBY", requestKey, 1)
        return currentRequest + 1;
    end
else
    -- 超时后重置,开启一个新的时间窗口
    redis.call('set', timeKey, nowTime)
    redis.call('set', requestKey, '0')
    -- 设置过期时间
    redis.call("EXPIRE", timeKey, timeRequest / 1000)
    redis.call("EXPIRE", requestKey, timeRequest / 1000)
    -- 请求数加一
    redis.call("INCRBY", requestKey, 1)
    return 1;
end

2.2. 工具类

  • RedisLimitUtil
package com.example.util;

import ...;

/**
 * RedisLimitUtil
 *
 * @author wliduo[i@dolyw.com]
 * @date 2019/11/14 16:44
 */
@Component
public class RedisLimitUtil {

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(RedisLimitUtil.class);

    /**
     * 秒级限流(每秒限制多少请求)字符串脚本
     */
    private static String LIMIT_SECKILL_SCRIPT = null;

    /**
     * 自定义参数限流(自定义多少时间限制多少请求)字符串脚本
     */
    private static String LIMIT_CUSTOM_SCRIPT = null;

    /**
     * redis-key-前缀-limit-限流
     */
    private static final String LIMIT = "limit:";

    /**
     * redis-key-名称-limit-一个时间窗口内请求的数量累计(限流累计请求数)
     */
    private static final String LIMIT_REQUEST = "limit:request";

    /**
     * redis-key-名称-limit-一个时间窗口开始时间(限流开始时间)
     */
    private static final String LIMIT_TIME = "limit:time";

    /**
     * 构造方法初始化加载Lua脚本
     */
    public RedisLimitUtil() {
        LIMIT_SECKILL_SCRIPT = getScript("redis/limit-seckill.lua");
        LIMIT_CUSTOM_SCRIPT = getScript("redis/limit-custom.lua");
    }

    /**
     * 秒级限流判断(每秒限制多少请求)
     *
     * @param maxRequest 限流最大请求数
     * @return boolean
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/25 17:57
     */
    public Long limit(String maxRequest) {
        // 获取key名,当前时间戳
        String key = LIMIT + String.valueOf(System.currentTimeMillis() / 1000);
        // 传入参数,限流最大请求数
        List<String> args = new ArrayList<>();
        args.add(maxRequest);
        return eval(LIMIT_SECKILL_SCRIPT, Collections.singletonList(key), args);
    }

    /**
     * 自定义参数限流判断(自定义多少时间限制多少请求)
     *
     * @param maxRequest 限流最大请求数
     * @param timeRequest 一个时间窗口(秒)
     * @return boolean
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/25 17:57
     */
    public Long limit(String maxRequest, String timeRequest) {
        // 获取key名,一个时间窗口开始时间(限流开始时间)和一个时间窗口内请求的数量累计(限流累计请求数)
        List<String> keys = new ArrayList<>();
        keys.add(LIMIT_TIME);
        keys.add(LIMIT_REQUEST);
        // 传入参数,限流最大请求数,当前时间戳,一个时间窗口时间(毫秒)(限流时间)
        List<String> args = new ArrayList<>();
        args.add(maxRequest);
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(timeRequest);
        return eval(LIMIT_CUSTOM_SCRIPT, keys, args);
    }

    /**
     * 执行Lua脚本方法
     *
     * @param script
     * @param keys
     * @param args
     * @return java.lang.Object
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/26 10:50
     */
    private Long eval(String script, List<String> keys, List<String> args) {
        // 执行脚本
        Object result = JedisUtil.eval(script, keys, args);
        // 结果请求数大于0说明不被限流
        return (Long) result;
    }

    /**
     * 获取Lua脚本
     *
     * @param path
     * @return java.lang.String
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/25 17:57
     */
    private static String getScript(String path) {
        StringBuilder stringBuilder = new StringBuilder();
        InputStream inputStream = RedisLimitUtil.class.getClassLoader().getResourceAsStream(path);
        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
            String str;
            while ((str = bufferedReader.readLine()) != null) {
                stringBuilder.append(str).append(System.lineSeparator());
            }
        } catch (IOException e) {
            logger.error(Arrays.toString(e.getStackTrace()));
            throw new CustomException("获取Lua限流脚本出现问题: " + Arrays.toString(e.getStackTrace()));
        }
        return stringBuilder.toString();
    }

}

2.3. 注解

  • pom.xml(注解借助AOP实现)
<!-- AOP -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>
  • Limit
package com.example.limit;

import java.lang.annotation.*;

/**
 * 限流注解
 *
 * @author wliduo[i@dolyw.com]
 * @date 2019/11/26 9:59
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Limit {

    /**
     * 限流最大请求数
     * @return
     */
    String maxRequest() default "10";

    /**
     * 一个时间窗口(毫秒)
     * @return
     */
    String timeRequest() default "1000";

}
  • LimitAspect
package com.example.limit;

import ...;

/**
 * LimitAspect限流切面
 *
 * @author wliduo[i@dolyw.com]
 * @date 2019/11/26 10:07
 */
@Order(0)
@Aspect
@Component
public class LimitAspect {

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(LimitAspect.class);

    /**
     * 一个时间窗口时间(毫秒)(限流时间)
     */
    private static final String TIME_REQUEST = "1000";

    /**
     * RedisLimitUtil
     */
    @Autowired
    private RedisLimitUtil redisLimitUtil;

    /**
     * 对应注解
     *
     * @param
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/26 10:11
     */
    @Pointcut("@annotation(com.example.limit.Limit)")
    public void aspect() {}

    /**
     * 切面
     *
     * @param proceedingJoinPoint
     * @return java.lang.Object
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/26 10:11
     */
    @Around("aspect() && @annotation(limit)")
    public Object Interceptor(ProceedingJoinPoint proceedingJoinPoint, Limit limit) {
        Object result = null;
        Long maxRequest = 0L;
        // 一个时间窗口(毫秒)为1000的话默认调用秒级限流判断(每秒限制多少请求)
        if (TIME_REQUEST.equals(limit.timeRequest())) {
            maxRequest = redisLimitUtil.limit(limit.maxRequest());
        } else {
            maxRequest = redisLimitUtil.limit(limit.maxRequest(), limit.timeRequest());
        }
        // 返回请求数量大于0说明不被限流
        if (maxRequest > 0) {
            // 放行,执行后续方法
            try {
                result = proceedingJoinPoint.proceed();
            } catch (Throwable throwable) {
                throw new CustomException(throwable.getMessage());
            }
        } else {
            // 直接返回响应结果
            throw new CustomException("请求拥挤,请稍候重试");
        }
        return result;
    }

    /**
     * 执行方法前再执行
     *
     * @param limit
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/26 10:10
     */
    @Before("aspect() && @annotation(limit)")
    public void before(Limit limit) {
        // logger.info("before");
    }

    /**
     * 执行方法后再执行
     *
     * @param limit
     * @return void
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/26 10:10
     */
    @After("aspect() && @annotation(limit)")
    public void after(Limit limit) {
        // logger.info("after");
    }

}

2.4. 测试入口

写个 LimitController 简单测试下,工具类和注解的使用,可以使用 PostMan 或者 JMeter 测试,都是 Get 请求,也可以直接用浏览器窗口打开请求

package com.example.controller;

import ...;

/**
 *  计数器(固定时间窗口)限流接口测试
 *
 * @author wliduo[i@dolyw.com]
 * @date 2019/11/24 19:27
 */
@RestController
@RequestMapping("/limit")
public class LimitController {

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(LimitController.class);

    /**
     * 一个时间窗口内最大请求数(限流最大请求数)
     */
    private static final Long MAX_NUM_REQUEST = 2L;

    /**
     * 一个时间窗口时间(毫秒)(限流时间)
     */
    private static final Long TIME_REQUEST = 5000L;

    /**
     * 一个时间窗口内请求的数量累计(限流请求数累计)
     */
    private AtomicInteger requestNum = new AtomicInteger(0);

    /**
     * 一个时间窗口开始时间(限流开始时间)
     */
    private AtomicLong requestTime = new AtomicLong(System.currentTimeMillis());

    /**
     * RedisLimitUtil
     */
    @Autowired
    private RedisLimitUtil redisLimitUtil;

    /**
     * 计数器(固定时间窗口)请求接口
     *
     * @param
     * @return java.lang.String
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/25 16:19
     */
    @GetMapping
    public String index() {
        long nowTime = System.currentTimeMillis();
        // 判断是在当前时间窗口(限流开始时间)
        if (nowTime < requestTime.longValue() + TIME_REQUEST) {
            // 判断当前时间窗口请求内是否限流最大请求数
            if (requestNum.longValue() < MAX_NUM_REQUEST) {
                // 在时间窗口内且请求数量还没超过最大,请求数加一
                requestNum.incrementAndGet();
                logger.info("请求成功,当前请求是{}次", requestNum.intValue());
                return "请求成功,当前请求是" + requestNum.intValue() + "次";
            }
        } else {
            // 超时后重置(开启一个新的时间窗口)
            requestTime = new AtomicLong(nowTime);
            requestNum = new AtomicInteger(0);
        }
        logger.info("请求失败,被限流");
        return "请求失败,被限流";
    }

    /**
     * 计数器(固定时间窗口)请求接口(限流工具类实现)
     *
     * @param
     * @return java.lang.String
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/25 18:02
     */
    @GetMapping("/redis")
    public String redis() {
        Long maxRequest = redisLimitUtil.limit(MAX_NUM_REQUEST.toString());
        // 结果请求数大于0说明不被限流
        if (maxRequest > 0) {
            logger.info("请求成功,当前请求是{}次", maxRequest);
            return "请求成功,当前请求是" + maxRequest + "次";
        }
        logger.info("请求失败,被限流");
        return "请求拥挤,请稍候重试";
    }

    /**
     * 计数器(固定时间窗口)请求接口(限流注解实现)
     *
     * @param
     * @return java.lang.String
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/11/26 9:46
     */
    @Limit(maxRequest = "2", timeRequest = "3000")
    @GetMapping("/annotation")
    public String annotation() {
        logger.info("请求成功");
        return "请求成功";
    }

}

3. 代码实现

有了上面的注解,我们只需要 Controller 加个方法就行,在 SeckillEvolutionController 添加乐观锁加缓存再加限流下单的入口方法

  • SeckillEvolutionController
/**
 * 使用乐观锁下订单,并且添加读缓存,再添加限流
 *
 * @param id 商品ID
 * @return com.example.common.ResponseBean
 * @throws Exception
 * @author wliduo[i@dolyw.com]
 * @date 2019/11/22 14:24
 */
@Limit
@PostMapping("/createOptimisticLockOrderWithRedisLimit/{id}")
public ResponseBean createOptimisticLockOrderWithRedisLimit(@PathVariable("id") Integer id) throws Exception {
    // 错误的,线程不安全
    // Integer orderCount = seckillEvolutionService.createOptimisticLockOrderWithRedisWrong(id);
    // 正确的,线程安全
    Integer orderCount = seckillEvolutionService.createOptimisticLockOrderWithRedisSafe(id);
    return new ResponseBean(HttpStatus.OK.value(), "购买成功", null);
}

添加注解 @Limit 即可,默认限流为每秒最多请求10次

4. 开始测试

使用 JMeter 测试上面的代码,JMeter 的使用可以查看: JMeter的安装使用

我们调用一下商品库存初始化的方法,我使用的是 PostMan,初始化库存表商品 10 个库存,而且清空订单表

图片

接着使用 PostMan 调用缓存预热方法,提前加载好缓存

图片

这时候可以看到我们的数据,库存为 10,卖出为 0 ,订单表为空

图片

缓存数据也是这样

图片

打开 JMeter,添加测试计划(测试计划文件在项目的src\main\resources\jmx下),模拟 500 个并发线程测试秒杀 10 个库存的商品

图片

PS: 这次我们填写 Ramp-Up 时间为 5 秒,意思为执行 5 秒,每秒执行 100 个并发,因为如果都在 1S 内执行完,会被限流,然后填写请求地址,点击启动图标开始

图片

可以看到 500 个并发线程执行完,数据是正确的

图片

我们可以看下 Druid 的监控,地址: http://localhost:8080/druid/sql.html

图片

使用了限流,可以看到乐观锁更新不像之前那样执行 157 次了,只执行了 36 次,很多请求直接被限流了,我们看下后台日志,可以看到很多请求直接被限流限制了,这样就达到了我们的目的

图片

5. 最后总结

那我们还可以怎么优化提高吞吐量以及性能呢,我们上文所有例子其实都是同步请求,完全可以利用同步转异步来提高性能,这里我们将下订单的操作进行异步化,利用消息队列来进行解耦,这样可以然 DB 异步执行下单

每当一个请求通过了限流和库存校验之后就将订单信息发给消息队列,这样一个请求就可以直接返回了,消费程序做下订单的操作,对数据进行入库落地,因为异步了,所以最终需要采取回调或者是其他提醒的方式提醒用户购买完成

参考

  1. 感谢hllcve_的Spring Boot自定义注解: https://www.jianshu.com/p/e04eeae86cf9
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342