SpringBoot应用篇基于Redis实现延时队列

延时队列,相信各位小伙伴并不会陌生,jdk原生提供了延时队列的使用,当然我们这里介绍的不是这种;在实际的项目中,如果我们有延时队列的场景,可以怎样去实现呢

举一个简单的例子,如下单15分钟内,若没有支付,则自动取消订单

本文将介绍一种非常非常简单的实现方式

I. 方案设计

要实现15分钟后自动取消订单,这个也太简单了,来给出一段神级代码

new Thread(() -> {
  // 休眠十五分钟,执行取消订单
  Thread.sleep(15 * 60 * 1000);
  cancelOrder(); 
}).start();

好的,本文就此结束(开玩笑....)

忽略上面的段子,接下来想一想,如果让我们来实现一个延时队列,可以怎么整?

  • 单机:

  • DelayQueue

  • 定时任务

  • 分布式:

  • Quartz定时任务

  • rabbitmq延时队列

  • redis zset

  • redis 过期回调

  • 时间轮

首先我们这里排除掉单机版,至于原因,现在单体单实例应用实在不多见了,直接来看多实例的情况吧

在上面的几种方案中,重心放在redis上,两种case,下面分别介绍一下

1. redis过期时间

我们知道,在使用redis做缓存时,可以设置失效时间,借助redis的失效事件,我们可以来实现延时队列的场景

比如,现在一个订单,我们在redis中新加入一个订单id,失效时间设置为15分钟;当支付成功之后,主动删除这个缓存;若一直没有付钱,则15分钟后,触发一个过期事件,然后订阅这个事件,来执行取消订单

上面这种实现,有两个问题

  • key失效监听,可能存在大量的无效信息
  • 广播方式消费事件,多实例接收到这个事件,怎么防并发?或者没有一个实例接收到这个事件,那么这个取消订单就会漏掉

显然上面的第二点,漏消息是不能接受的

2. redis zset

zset属于redis提供的几个基本数据结构中的一种,它的特点是有 value + score

如果我们想使用zset拉实现演示队列,那么一个可选的方案就是将score设置为触发的时间戳,value为业务值

然后写一个定时任务,不断的从zset中,取出score小于当前时间戳的数据,任务它们都是已经到期可以执行的

借助这个方案,可以相对简单的实现一个演示队列了

II. redis演示队列实现

1. 环境配置

接下来我们将以redis的zset来实现延时队列,本文借助SpringBoot来搭建一个演示工程,使用的基本配置如下

本项目借助SpringBoot 2.2.1.RELEASE + maven 3.5.3 + IDEA进行开发

核心依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- 下面这里两个非必须,主要是后面的实现演示使用到了 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
</dependency>
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
</dependency>

redis使用默认的配置,本机 localhost + 6379

2. 核心实现

借助redis zset来实现延时队列,具体的实现代码就很简单了,无非是从zset中取出score小于当前时间戳的数据

private static final Long DELETE_SUCCESS = 1L;
@Autowired
private StringRedisTemplate redisTemplate;

public String fetchOne(String key) {
    Set<String> sets = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);
    if (CollectionUtils.isEmpty(sets)) {
        return null;
    }

    for (String val : sets) {
        if (DELETE_SUCCESS.equals(redisTemplate.opsForZSet().remove(key, val))) {
            // 删除成功,表示抢占到
            return val;
        }
    }
    return null;
}

注意上面的实现,有一个点需要说一下

zset:每次查询时取了三个数据,然后遍历获取到的数据,依次尝试去删除,若删除成功,则表示当前实例抢占到了这个消息

为什么这样设计?

这里有两个点,先解释第一个,为啥先查后删

如果我们按照正常的实现流程,每次从zset中取一个,但是无法保证这个时候就只有我一个人拿到了这个数据,在多实例的场景下,可能存在多个实例同时拿到了它,那么如何才能表示只有我一个人霸占了她呢(忽然进入言情的世界)

借助redis的单线程机制,只可能有一个实例会删除成功,所以拿到并删除成功的那个小伙伴,就是最终的幸运儿;

因此实现细节就是先查,后删,若删除成功,表示获取成功;否则表示被其他的实例捷足先登

接下来再看第二个,为啥一次拿三个

从上面的分析可以看出,如果我一次只拿一个,那么我抢占到的几率并不太大,特别是当实例比较多时,可能会做多次的无效操作;为了减少这个可能性,所以我一次多拿几个做备选,这样抢占到的概率就会高一些,至于为什么是3,这个就看实际的实例与定时任务的执行间隔了

3. 写入队列

上面是从队列中拿数据,有拿当然得有写,所以我们简单的封装一下写入队列的case

@Component
public class RedisDelayListWrapper implements ApplicationContextAware {
    private static final Long DELETE_SUCCESS = 1L;

    private Set<String> topic = new CopyOnWriteArraySet<>();

    public void publish(String key, Object val, long delayTime) {
        topic.add(key);
        String strVal = val instanceof String ? (String) val : JSONObject.toJSONString(val);

        redisTemplate.opsForZSet().add(key, strVal, System.currentTimeMillis() + delayTime);
    }
}

4. 定时取演示队列消息

接下来就是一个定时任务,不断的调用上面的实现,从zset中获取到期的数据

@Scheduled(fixedRate = 10_000)
public void schedule() {
    for (String specialTopic : topic) {
        String cell = fetchOne(specialTopic);
        if (cell != null) {
            applicationContext.publishEvent(new DelayMsg(this, cell, specialTopic));
        }
    }
}

@ToString
public static class DelayMsg extends ApplicationEvent {
    @Getter
    private String msg;
    @Getter
    private String topic;

    public DelayMsg(Object source, String msg, String topic) {
        super(source);
        this.msg = msg;
        this.topic = topic;
    }
}

上面的定时任务,直接借助Spring的@Schedule来实现,遍历所有的topic,捞出数据之后,通过spring的 event/listener事件机制来实现消息处理的解耦

5. 消息消费

最终就是我们的消息消费逻辑了,主要就是消费前面抛出的DelayMsg,我们这里借助AOP来实现消息过滤

定义一个注解Consumer,用来指定消费哪个topic

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {
    String topic();
}

注意这个注解上面还有 @EventListener,表明它可以监听的spring的事件

aop拦截逻辑,根据topic进行过滤

@Aspect
@Component
public class ConsumerAspect {

    @Around("@annotation(consumer)")
    public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {
        Object[] args = joinPoint.getArgs();
        boolean check = false;
        for (Object obj : args) {
            if (obj instanceof RedisDelayListWrapper.DelayMsg) {
                check = consumer.topic().equals(((RedisDelayListWrapper.DelayMsg) obj).getTopic());
            }
        }

        if (!check) {
            // 不满足条件,直接忽略
            return null;
        }

        // topic匹配成功,执行
        return joinPoint.proceed();
    }
}

5. 测试demo

最后写一个测试demo,验证下上面的实现

@EnableScheduling
@RestController
@SpringBootApplication
public class Application {
    private static final String TEST_DELAY_QUEUE = "test";
    private static final String DEMO_DELAY_QUEUE = "demo";
    @Autowired
    private RedisDelayListWrapper redisDelayListWrapper;

    private Random random = new Random();

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

    @GetMapping(path = "publish")
    public String publish(String msg, Long delayTime) {
        if (delayTime == null) {
            delayTime = 10_000L;
        }

        String queue = random.nextBoolean() ? TEST_DELAY_QUEUE : DEMO_DELAY_QUEUE;
        msg = queue + "#" + msg + "#" + (System.currentTimeMillis() + delayTime);
        redisDelayListWrapper.publish(queue, msg, delayTime);
        System.out.println("延时: " + delayTime + "ms后消费: " + msg + " now:" + LocalDateTime.now());
        return "success!";
    }

    @Consumer(topic = TEST_DELAY_QUEUE)
    public void consumer(RedisDelayListWrapper.DelayMsg delayMsg) {
        System.out.println("TEST消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());
    }

    @Consumer(topic = DEMO_DELAY_QUEUE)
    public void consumerDemo(RedisDelayListWrapper.DelayMsg delayMsg) {
        System.out.println("DEMO消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());
    }
}

SpringBoot应用篇基于Redis实现延时队列

6. 小结

本文属于一个实战小技巧,借助redis的zset来灵活的实现一个简单的延时队列,实现倒是没有太大的难度,其中的一些小细节还是挺有意思的,好的,今天分享到此over!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容