一、概述
消息队列,Message Queue,常用于解决并发系统中的资源一致性问题,提升峰值的处理能力,同时保证消息的顺序性、可恢复性、必送达性,对应用进行解耦,或者实现异步通讯等。市面上的 MQ应用有很多(例如:Kafka,RabbitMQ,Disque),同时也可以基于 Redis 来实现,比较典型的方案有:
- 基于List的 LPUSH+BRPOP 的实现
- PUB/SUB,订阅/发布模式
- 基于Sorted-Set的实现
-
基于Stream类型的实现
在消息队列使用中,有生产者producter和消费者consumer。生产者负责生成消息,消费者负责使用处理消息。生产,指的是将消息放入消息队列。 消费,指的是读取并处理消息。通常一个消息再被消费后,就应该从消息队列中删除。
二、实现
1、基于List的LPUSH+BRPOP的实现
LPUSH,将消息放入消息队列(生产者)
BRPOP,从队列中取出消息,阻塞模式(消费者)
TBase中不支持BRPOP,只支持RPOP,BRPOP是RPOP的阻塞版本
该模式优点:
- 实现简单
- Reids支持持久化消息,意味着消息不会丢失,可以重复查看(注意不是消费,只看不用,LRANGE类的指令)。
- 可以保证顺序,保证使用LPUSH命令,可以保证消息的顺序性
- 使用RPUSH,可以将消息放在队列的开头,达到优先消息的目的,可以实现简易的消息优先队列。
该模式缺点: - 做消费确认ACK比较麻烦,就是不能保证消费者在读取之后,未处理后的宕机问题。导致消息意外丢失。通常需要自己维护一个Pending列表,保证消息的处理确认。
- 不能做广播模式,例如典型的Pub/Discribe模式。
- 不能重复消费,一旦消费就会被删除
- 不支持分组消费,需要自己在业务逻辑层解决
2、PUB/SUB,订阅/发布模式
SUBSCRIBE,用于订阅信道
PUBLISH,向信道发送消息
UNSUBSCRIBE,取消订阅
生产者和消费者通过相同的一个信道(Channel)进行交互。信道其实也就是队列。通常会有多个消费者。多个消费者订阅同一个信道,当生产者向信道发布消息时,该信道会立即将消息逐一发布给每个消费者。可见,该信道对于消费者是发散的信道,每个消费者都可以得到相同的消息。典型的对多的关系。
该模式优点:
- 典型的广播模式,一个消息可以发布到多个消费者
- 多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息
- 消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息
该模式缺点: - 消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回
- 不能保证每个消费者接收的时间是一致的
- 若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时
Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。
3、基于SortedSet有序集合的实现
ZADD KEY score member,压入集合
ZRANGEBYSCORE,依据score获取成员
有序集合的方案是在自己确定消息顺ID时比较常用,使用集合成员的Score来作为消息ID,保证顺序,还可以保证消息ID的单调递增。通常可以使用时间戳+序号的方案。确保了消息ID的单调递增,利用SortedSet的依据Score排序的特征,就可以制作一个有序的消息队列了。
和上面的方案相比,优点就是可以自定义消息ID,在消息ID有意义时,比较重要。缺点也明显,不允许重复消息(以为是集合),同时消息ID确定有错误会导致消息的顺序出错。
4、基于stream实现
TBase还不支持该数据结构
Redis5.0中发布的Stream类型,也用来实现典型的消息队列。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:
- 消息ID的序列化生成
- 消息遍历
- 消息的阻塞和非阻塞读取
- 消息的分组消费
- 未完成消息的处理
- 消息队列监控
追加新消息,XADD,生产消息
XADD,命令用于在某个stream(流数据)中追加消息,演示如下:
127.0.0.1:6379> XADD memberMessage * user kang msg Hello
"1553439850328-0"
127.0.0.1:6379> XADD memberMessage * user zhong msg nihao
"1553439858868-0"
语法格式为:
XADD key ID field string [field string ...]
需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。 ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。field string [field string]
就是当前消息内容,由1个或多个key-value构成。
上面的例子中,在memberMemsages这个key中追加了user kang msg Hello
这个消息。Redis使用毫秒时间戳和序号生成了消息ID。此时,消息队列中就有一个消息可用了。
从消息队列中获取消息,XREAD,消费消息
XREAD,从Stream中读取消息,演示如下:
127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
2) 1) 1) "1553439850328-0"
2) 1) "user"
2) "kang"
3) "msg"
4) "Hello"
2) 1) "1553439858868-0"
2) 1) "user"
2) "zhong"
3) "msg"
4) "nihao"
语法格式为:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
-
[COUNT count]
用于限定获取的消息数量 -
[BLOCK milliseconds]
用于设置XREAD为阻塞模式,默认为非阻塞模式 -
ID
用于设置由哪个消息ID开始读取。使用0表示从第一条消息开始。(本例中就是使用0)此处需要注意,消息队列ID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用无意义)。
XRED读消息时分为阻塞和非阻塞模式,使用BLOCK选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。
Pending 等待列表
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM
设计了Pending
列表,用于记录读取但并未处理完毕的消息。命令XPENDIING
用来获消费组或消费内消费者的未处理完毕的消息。演示如下:
127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
1) (integer) 5 # 5个已读取但未处理的消息
2) "1553585533795-0" # 起始ID
3) "1553585533795-4" # 结束ID
4) 1) 1) "consumerA" # 消费者A有3个
2) "3"
2) 1) "consumerB" # 消费者B有1个
2) "1"
3) 1) "consumerC" # 消费者C有1个
2) "1"
127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 选项可以获取详细信息
1) 1) "1553585533795-0" # 消息ID
2) "consumerA" # 消费者
3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
4) (integer) 5 # 消息被读取了5次,delivery counter
2) 1) "1553585533795-1"
2) "consumerA"
3) (integer) 1654355
4) (integer) 4
# 共5个,余下3个省略 ...
127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
1) 1) "1553585533795-0"
2) "consumerA"
3) (integer) 1641083
4) (integer) 5
# 共3个,余下2个省略 ...
每个Pending的消息有4个属性:
- 消息ID
- 所属消费者
- IDLE,已读取时长
- delivery counter,消息被读取次数
上面的结果我们可以看到,我们之前读取的消息,都被记录在Pending
列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令XACK
完成告知消息处理完成,演示如下:
127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
(integer) 1
127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
1) (integer) 4 # 已读取但未处理的消息已经变为4个
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # 消费者A,还有2个消息处理
2) "2"
2) 1) "consumerB"
2) "1"
3) 1) "consumerC"
2) "1"
有了这样一个Pending机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该Pending列表,就可以继续处理该消息了,保证消息的有序和不丢失。
此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者Pending的消息,转义给其他的消费者处理,就是消息转移。
消息转移
消息转移的操作时将某个消息转移到自己的Pending列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:
# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
2) "consumerA"
3) (integer) 15907787
4) (integer) 4
# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
2) 1) "msg"
2) "2"
# 消息1553585533795-1已经转移到消费者B的Pending中。
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
2) "consumerB"
3) (integer) 84404 # 注意IDLE,被重置了
4) (integer) 5 # 注意,读取次数也累加了1次
以上代码,完成了一次消息转移。转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功,因为IDLE不满足条件。例如下面的连续两条转移,第二条不会成功。
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1
这就是消息转移。至此我们使用了一个Pending消息的ID,所属消费者和IDLE的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上。
坏消息问题,Dead Letter,死信问题
正如上面所说,如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,演示如下:
# 删除队列中的消息
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# 查看队列中再无此消息
127.0.0.1:6379> XRANGE mq - +
1) 1) "1553585533795-0"
2) 1) "msg"
2) "1"
2) 1) "1553585533795-2"
2) 1) "msg"
2) "3"
注意本例中,并没有删除Pending中的消息因此你查看Pending,消息还会在。可以执行XACK标识其处理完毕!
信息监控,XINFO
Stream提供了XINFO来实现对服务器信息的监控,可以查询、查看队列信息:
127.0.0.1:6379> Xinfo stream mq
1) "length"
2) (integer) 7
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1
9) "last-generated-id"
10) "1553585533795-9"
11) "first-entry"
12) 1) "1553585533795-3"
2) 1) "msg"
2) "4"
13) "last-entry"
14) 1) "1553585533795-9"
2) 1) "msg"
2) "10"
消费组信息:
127.0.0.1:6379> Xinfo groups mq
1) 1) "name"
2) "mqGroup"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1553585533795-4"
消费者组成员信息:
127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
1) 1) "name"
2) "consumerA"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 18949894
2) 1) "name"
2) "consumerB"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 3092719
3) 1) "name"
2) "consumerC"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 23683256
命令一览
命令 | 说明 |
---|---|
XACK | 结束Pending |
XADD | 生成消息 |
XCLAIM | 消息转移 |
XDEL | 删除消息 |
XGROUP | 消费组管理 |
XINFO | 得到消费组信息 |
XLEN | 消息队列长度 |
Pending列表 | Pending列表 |
XRANGE | 获取消息队列中消息 |
XREAD | 消费消息 |
XREADGROUP | 分组消费消息 |
XREVRANGE | 逆序获取消息队列中消息 |
XTRIM | 消息队列容量 |
Reference
[1] 基于Redis实现消息队列典型方案
[2] Stream 类型