延时队列常用实现详解

time.png

队列是一种线性表,内部的元素是有序的,具有先进先出的特性。
延时队列,顾名思义,它是一个队列,但更重要的是具有延时的特性,与普通队列的先进先出不同,延时队列可以指定队列中的消息在某个时间点被消费。

延时队列的使用场景

  1. 订单提交后一定时间内未支付需要自动取消。

  2. 接口调用失败后阶梯式的补偿调用。

  3. 任务超时提醒。

  4. 预定会议提前十五分钟通知与会人员参加会议。

延时队列常用实现方式

java DelayQueue延时队列

DelayQueue是无界的延时阻塞队列,内部是使用优先级队列PriorityQueue实现的,其是按时间来定优先级的延时阻塞队列,只有在延迟期满时才能从队列中提取元素,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素,当执行队列take操作元素未过期时会阻塞当前线程到元素过期为止;PriorityQueue是通过二叉小顶堆实现, 其任意一个非叶子节点的权值,都不大于其左右子节点的权值。

priorityQueue-japepr.png

示例
队列中的元素必须实现Delayed接口

public class MeetingNotice implements Delayed {

private long noticeTime;

private long meetingId;
  
public MeetingNotice(long meetingId, long noticeTime, TimeUnit unit) {
    this.name = name;
    this.noticeTime = System.currentTimeMillis() + (noticeTime > 0 ? unit.toMillis(noticeTime) : 0);
}

@Override
public long getDelay(TimeUnit unit) {
    return noticeTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
    MeetingNotice meetingNotice = (MeetingNotice) o;
    long diff = this.time - meetingNotice.time;
    if (diff <= 0) {
        return -1;
    } else {
        return 1;
    }
}
} 

DelayQueue 仅适用于单机部署的应用,对于分布式场景无法适用,同时也不适用于队列元素量很大的场景,不支持持久化。

Redis key过期回调

redis key的过期事件是通过redis 2.8.0之后版本提供的订阅发布功能(pub/sub)下发的,当key过期后系统自动Pub,应用程序只需订阅(sub)该事件即可。

实现步骤

  1. 修改redis.conf文件配置如下参数

    notify-keyspace-events Ex

  2. 客户端订阅

    redis key过期后系统会publish 频道(channel) __keyevent@0__:expired 其中__keyevent 为固定前缀,@0表示db0,订阅是可根据自己的dbindex进行调整,:expired 表示过期事件。客户端可通过SUBSCRIBEPSUBSCRIBE订阅,如SUBSCRIBE __keyevent@0__:expired 监听db0的key过期事件。

示例

     public class RedisKeyExpiredListener extends KeyExpirationEventMessageListener {
     
         public RedisKeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
             super(listenerContainer);
         }
     
         @Override
         public void onMessage(Message message, byte[] pattern) {
             String redisKey = message.toString();
             log.info("监听到key: {}  过期" , redisKey);
         }
     
         @Configuration
         static class RedisKeyExpiredConfig {
     
             /************注册redis监听bean************/
             @Bean
             RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {
                 RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
                 listenerContainer.setConnectionFactory(connectionFactory);
                 return listenerContainer;
             }
     
             @Bean
             KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
                 return new RedisKeyExpiredListener(listenerContainer);
             }
         }
     
     }

存在的问题

key的失效通知无法保证时效性。redis过期策略有一下三种:

策略 说明 优点 缺点
定时删除 在设置 Key 过期时间的同时创建定时器,让定时器在 Key 过期时执行删除操作 保证过期数据能被及时删除 耗 CPU,尤其当存在大量非永久 Key 时,对 CPU 影响更严重
惰性删除 Key 过期时不主动删除,获取数据时判断该 Key 是否过期,如果过期直接删除 对 CPU 消耗小 耗内存,如果数据过期但又没有任何操作来获取该数据,哪怕数据已经过期了,但该数据任会一直存在
定期删除 每隔一段时间执行一次删除操作 不如定时删除那么消耗 CPU,也不如惰性删除那么占内存 比定时删除更消耗内存,必惰性删除更消耗 CPU

默认情况下,Redis 使用的是惰性删除 + 定期删除的策略;每隔一段时间(可通过hz参数设置每秒执行的次数),Redis 会分别从各个库随机选取部分测试设置了过期时间的 Key,判断它们是否过期,过期则删除;如果 key 已过期,但没有被定期删除,由于惰性删除策略,在下次请求获取该数据时会将该数据删除。

可通过如下方式提高时效性

  1. 将缓存数据与监听过期key数据分离,例如把缓存数据存在 database0,把监听数据存在 database1;让进行监听的库中 key 尽量少,如果不同业务的监听超时时间差异较大,则考虑将不同业务的超时监听数据存放到不同的数据库;
  2. 调整过期策略为定时删除策略,但这样CPU定时器的开销会增大。

Redis zset

redis zset 结构是一个有序集合,每个元素都会关联一个 double 类型的分数,通过分数来为集合中的成员进行从小到大的排序;有序集合的成员是唯一的,但分数(score)却可以重复。

实现思路

将任务id作为member,到期时间作为score存入到zset中,然后不断轮询获取第一个元素,判断其是否过期,过期后删除并执行任务即可。

@Slf4j
@Component
public class TestZsetDelayTask {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String REDIS_DELAY_TASK_KEY="test_delay_task";

    @PostConstruct
    public void consumerRedisDelayTask() throws InterruptedException {
      ZSetOperations<String,String> zSetOperations = redisTemplate.opsForZSet();
      while(true){
          //获取当前时间内的第一个任务
        Long score = System.currentTimeMillis();
        Set<String> tasks =  zSetOperations.rangeByScore(REDIS_DELAY_TASK_KEY,0,score);
        if(CollectionUtils.isEmpty(tasks)){
            Thread.sleep(200);
        }else{
            //移除该任务
            String task = (String) tasks.iterator().next();
           if(zSetOperations.remove(REDIS_DELAY_TASK_KEY,task)>0){
               log.info("任务: {} 准备执行" , task);
           }
        }
      }
    }

}

也可以通过lua脚本将zrangebyscorezrem操作变成原子操作,避免了多线程时同一个me mber多次zrem。

String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1)\n" +
        "if #resultArray > 0 then\n" +
        "    if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" +
        "        return resultArray[1]\n" +
        "    else\n" +
        "        return ''\n" +
        "    end\n" +
        "else\n" +
        "    return ''\n" +
        "end";

存在的问题

  1. 存在大量的空轮询不但占用了客户端的 CPU,同时也占用了redis的资源,空轮询的客户端有过多,redis的慢查询可能会显著增多。设置sleep的时间过大也会出现时效性不及时问题。
  2. 没有重试和ack机制,客户端异常时,这条任务可能会丢失。

可以使用Redission的RDelayedQueue数据结构,其api类似于java queue使用简单,可更方便的实现基于redis的延时队列。感兴趣的可自行了解,这里不再展开。

RabbitMQ延时队列

RabbitMQ本身没有直接支持延迟队列功能,但是可以通过ttl及dlx(Dead Letter Exchanges)特性模拟出延迟队列的功能。

死信队列

绑定在死信交换机上的队列。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange(死信交换机)和x-dead-letter-routing-key(指定routing-key发送,可选),当消息在一个队列中变成死信 (dead message) 之后,按照这两个参数可以将消息重新路由到另一个DLX Exchange(死信交换机),让消息重新被消费。

队列出现Dead Letter的情况有:

  • 消息或者队列的TTL过期

  • 队列达到最大长度

  • 消息被消费端拒绝(basic.reject / basic.nack),并且requeue = false

RabbitMQ ttl

RabbitMQ可以对消息和队列设置TTL,为队列设置时,队列中所有消息都有相同的过期时间;对消息进行单独设置,每条消息过期时间可以不同;如果同时设置了队列的ttl和消息的ttl以两者之间TTL较小的那个数值为准。消息超过设置的ttl值未被消费,将会变为死信,消费者将无法再收到该消息。

  • x-message-ttl 为队列设置过期时间。

  • expiration 为消息设置过期时间。

RabbitMQ延时队列示例

@Configuration
@Slf4j
public class RabbitMqDelayQueue {
    /**
     * 普通交换机
     */
    private static final String NORMAL_EXECHANGE = "test-exchange";
    /**
     * 死信交换机名称
     */
    private static final String DLX_EXCHANGE ="test-dlx-exchange";
    /**
     * 普通队列名称
     */
    private static final String NORMAL_QUEUE_NAME = "test-queue";
    /**
     * 死信队列名称
     */
    private static final String DLX_QUEUE_NAME ="test-dlx-queue";

    private static final String ROUTING_KEY="test";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*************普通交换机队列的声明及绑定*************/
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange(NORMAL_EXECHANGE, true,false);
    }

    @Bean
    public Queue normalQueue(){
        //设置队列过期时间
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-message-ttl",30000);
        //设置死信后重新路由的交换机
        args.put("x-dead-letter-exchange", DLX_EXCHANGE);
        args.put("x-dead-letter-routing-key",ROUTING_KEY);
        return new Queue(NORMAL_QUEUE_NAME,true,false,false,args);
    }

    @Bean
    public Binding normalBinding(){
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(ROUTING_KEY);
    }

    /*************死信交换机队列的声明及绑定*************/
    @Bean
    public DirectExchange dlxExchange(){
        return new DirectExchange(DLX_EXCHANGE, true,false);
    }

    @Bean
    public Queue dlxQueue(){
        return new Queue(DLX_QUEUE_NAME,true);
    }

    @Bean
    public Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(ROUTING_KEY);
    }

    @RabbitListener(queues = DLX_QUEUE_NAME)
    public void consumerDlxQueue(@Payload String message) {
        log.info("消费到死信消息:{}",message);
    }


    @PostConstruct
    public void sendMessage(){
        Message message = MessageBuilder.withBody("hello rabbitmq".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                //设置消息的过期时间(毫秒)
                .setExpiration("20000")
                .build();
        rabbitTemplate.send(NORMAL_EXECHANGE,ROUTING_KEY,message);
        log.info("发送mq消息成功");
    }
}

存在的问题

ttl消息按照入发送顺序排列在队列中,且rabbitMQ只会判断队列头消息是否失效,失效后才会加入到死信队列中,如果发送多个过期时间不一致的消息,有可能后面的消息已经过期了,但队列头消息没有过期,导致其他消息不能及时加入到死信队列被消费。

rabbitmq_delayed_message_exchange插件

针对上述的问题,可以使用rabbitmq_delayed_message_exchang插件来解决。

安装该插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间(通过消息头的x-delay指定),如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

插件的安装

  1. 进入https://www.rabbitmq.com/community-plugins.html 页面找到rabbitmq_delayed_message_exchang并下载。
  2. 将下载的插件复制到rabbitmq的plugins目录
  3. 执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange启用该插件。

使用示例

/**
     * 延迟消息交换机
     */
    public final static String DELAY_EXCHANGE = "test-delay-exchange";
    /**
     * 队列
     */
    public final static String DELAY_QUEUE = "test-delay-queue";
    /**
     * 路由Key
     */
    public final static String DELAY_ROUTING_KEY = "test-delay-routingKey";

    @Bean
    public CustomExchange delayMessageExchange() {
        //自定义交换机,type必须为x-delayed-message,添加参数x-delayed-type=direct
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", false, false, args);
    }

    @Bean
    public Queue delayMessageQueue() {
        return new Queue(DELAY_QUEUE, true);
    }

    @Bean
    public Binding bindingDelayExchangeAndQueue() {
        return BindingBuilder.bind(delayMessageQueue()).to(delayMessageExchange()).with(DELAY_ROUTING_KEY).noargs();
    }

    @PostConstruct
    public void sendDelayMessages(){
        Message message1 = MessageBuilder.withBody("delay message1".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        //设置消息过期时间
        message1.getMessageProperties().setDelay(20000);
        rabbitTemplate.send(DELAY_EXCHANGE,DELAY_ROUTING_KEY,message1);
        Message message2 = MessageBuilder.withBody("delay message2".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        //设置消息过期时间
        message2.getMessageProperties().setDelay(15000);
        rabbitTemplate.send(DELAY_EXCHANGE,DELAY_ROUTING_KEY,message2);
        log.info("发送mq delay 消息成功");
    }

    @RabbitListener(queues = DELAY_QUEUE)
    public void consumerDelayQueue(@Payload String message) {
        log.info("消费到延时消息:{}",message);
    }

插件的局限

  1. 插件极限时间是 8byte 长度 ms,大概 49天,如果你的延时时间很长,超过49天那么该消息将会立刻被投递到队列中,不会延时。
  2. 该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景。有关详情,请参见#72
  3. 如果该插件被禁用那么插件上的延时消息将丢失(还未投递到目标队列的)。

时间轮

时间轮的应用广泛,包括linux内核的调度、zookeeper、netty、kafka、xxl-job、quartz等均有使用时间轮。

原理

timewheel.png

图中的圆盘可以看作是钟表的刻度。比如一圈round长度为24秒,刻度数为8,那么每一个刻度表示3秒。那么时间精度就是3秒。每个刻度为一个bucket(实际上就是TimerTaskList),TimerTaskList是环形双向链表,在其中链表项TimeTaskEntry封装了真正的定时任务TimerTask。TimerTaskList使用expiration字段记录了整个TimerTaskList的超时时间。TimeTaskEntry中的expirationMs字段记录了超时时间戳,timerTask字段指向了对应的TimerTask任务;根据每个TimerTaskEntry的过期时间和当前时间轮的时间,选择一个合适的bucket,把这个TimerTaskEntry对象放进去;对于延迟超过时间轮所能表示的范围有两种处理方式,一是通过增加一个字段-轮数,Netty 就是这样实现的;二是多层时间轮,Kakfa 是这样实现的。

下面介绍下kafka的多层时间轮,层数越高时间跨度越大。

timewheel2-1.png

每个使用到的TimerTaskList都会加入到DelayQueue中,DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头,通过一个线程获取到DelayQueue中的超时的任务列表TimerTaskList之后,既可以根据TimerTaskList的expiration来推进时间轮的时间,也可以就获取到的TimerTaskList执行相应的操作,TimerTaskEntry该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。

举个例子

假设现在有一个任务在445ms后执行,默认情况下,各个层级的时间轮的时间格个数为20,第一层时间轮每一个时间格跨度为1ms,整个时间轮跨度为20ms,跨度不够。第二层时间轮每一个时间格跨度为20ms,整个时间轮跨度为400ms,跨度依然不够,第三层时间轮每一个时间格跨度为400ms,整个时间轮跨度为8000ms,现在跨度够了,此任务就放在第三层时间轮的第一个时间格对应的TimerTaskList,等待被执行,此TimerTaskList到期时间是400ms,随着时间的流逝,当此TimerTaskList到期时,距离该任务到期时间还有45ms,不能执行该任务,将重新提交到时间轮,此时第一层时间轮跨度依然不够,不能执行任务,第二层时间轮时间格跨度为20,整个世间轮跨度为400,跨度足够,放在第三个时间格等待执行,如此往复几次,高层时间轮最终会慢慢移动到低层时间轮上,最终任务到期执行。

与kafka时间轮相比,netty采用的是轮次来解决超过时间轮所能表示的范围,通过固定的时间间隔tickDuration扫描,时候未到就等待来进行时间轮的推动,会有空推进的情况,例如 TickDuration 为1秒,此时就一个延迟350秒的任务,那就是有349次无用的操作。

Netty时间轮使用示例

public class TimeWheelDealyQueue {
    public static void main(String[] args) {
        /**
         * 参数依次为
         * 1.ThreadFactory 自定义线程工厂,用于创建线程执行TimerTask
         * 2.tickDuration  间隔多久走到下一槽(相当于时钟走一格),值越小,时间轮精度越高
         * 3.unit 定义tickDuration的时间单位
         * 4.ticksPerWheel 一圈有多个槽
         * 5.leakDetection 是否开启内存泄漏检测。
         * 6. maxPendingTimeouts 最多待执行的任务个数。0或负数表示无限制。
         */
        Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 10, TimeUnit.MILLISECONDS, 10);
        System.out.println("开始添加任务:" + System.currentTimeMillis());
        //延迟任务,5秒后执行
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("任务开始执行:"+System.currentTimeMillis());
            }
        }, 5, TimeUnit.SECONDS);
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容