应用场景
- 延迟发送短信
- 用户下单,30分钟超时未支付,取消订单
- 预约工作会议,20分钟后自动通知所有参会人员
解决方案
- 方案一:通过
死信队列
和设置TTL
超时时间,故意让消息超时未消费(相当于延时再处理),让消息投递到死信队列,然后处理死信消息时,进行延时后的处理 - 方案二:使用RabbitMQ官方提供的
DelayExchange
插件
安装RabbitMQ
- 我使用的RabbitMQ,是使用Docker进行安装的,如果你也想使用Docker安装,可以参考一下
- Centos7 + Docker
下载RabbitMQ镜像
docker pull rabbitmq:3.8-management
安装RabbitMQ
- 这里的命令,映射了RabbitMQ的plugin插件目录为
mq-plugins
,待会要使用这个名称去查询真实目录地址
docker run \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
下载插件
- 插件社区地址:https://www.rabbitmq.com/community-plugins.html
- 使用方式:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq/
注意:插件的版本要和安装的RabbitMQ的版本相搭配,否则可能会有意外的问题产生!
- 例如我使用的RabbitMQ版本是
3.8.5
,而DelayExchange
插件则需要使用3.8.9
,这个版本的插件适用于RabbitMQ3.8.5
及其以上版本。其他版本可去Github
中下载 - 插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
安装插件
上传插件
- 查看刚才映射的
mq-plugins
目录的真实目录路径
docker volume inspect mq-plugins
查看输出的信息中的
Mountpoint
字段,我的是/var/lib/docker/volumes/mq-plugins/_data
使用
FinalShell
或其他shell工具,将下载好的DelayExchange
插件压缩包,拖拽上传到/var/lib/docker/volumes/mq-plugins/_data
目录去(注:不要解压)
安装插件
- 进入RabbitMQ容器内部,
-it
后面的mq
为刚才使用Docker安装RabbitMQ时,起的名称,请改成你定义的名称
docker exec -it mq bash
- 进入容器内部后,使用命令安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 等待命令执行,输出
The following plugins have been enabled: rabbitmq_delayed_message_exchange
,即为成功安装插件
DelayExchange插件的原理
DelayExchange
需要将一个交换机声明为delayed
类型。当我们发送消息到delayExchange时,流程如下:
- 接收消息
- 判断消息是否具备
x-delay
属性 - 如果有
x-delay
属性,说明是延迟消息
,持久化到硬盘,读取x-delay
值,作为延迟时间 - 返回
routing not found
结果给消息发送者 -
x-delay
时间到期后,重新投递消息到指定队列
使用插件
声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed
属性为true
,然后声明队列与其绑定即可
声明DelayExchange交换机
- 方式一:基于注解方式(推荐)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delayed.queue", durable = "true"),
exchange = @Exchange(name = "delayed.direct",delayed = "true"),
key = "delayed"
))
public void listenDelayedQueue(String msg){
log.info("接收到 delayed.queue的延迟消息:{}", msg);
}
- 方式二:基于@Bean的方式
// 交换机和队列的配置
@Configuration
public class DelayedConfig {
// 创建延迟交换机
@Bean
public DirectExchange delayedExchange() {
return ExchangeBuilder.directExchange("delayed.direct")
// 声明延迟属性
.delayed()
.build();
}
// 创建延迟队列
@Bean
public Queue delayedQueue() {
return new Queue("delayed.queue");
}
// 绑定交换机和队列
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed");
}
}
// 监听器
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "delayed.queue")
public void listenSimpleQueue(Message msg) throws Exception {
log.info("接收到 delayed.queue的延迟消息:{}", msg);
}
}
发送延迟消息
- 发送消息时,一定要携带
x-delay
属性,指定延迟的时间 - 下面使用一个Controller的接口来测试,传入秒值,发送一个延迟消息到交换机中
- 例如:
http://localhost:8001/msg/sendDelayed/3
,消息将会延迟3秒后发送给消费者的监听器
@GetMapping("/sendDelayed/{time}")
public ResponseEntity sendDelayed(@PathVariable("time") Integer time) {
String exchange = "delayed.direct";
Message message = MessageBuilder.withBody("delayed message".getBytes())
// 设置延时时间,时间单位为毫秒值
.setHeader("x-delay", time * 1000)
.build();
rabbitTemplate.send(exchange, "delayed", message);
return ResponseEntity.ok("success" + new Date());
}
总结
- 要使用
DelayExchange
插件的步骤- 声明一个交换机,添加
delayed
属性为true
,表明是一个延迟消息交换机 - 发送消息时,添加
x-delay
头,值为超时时间,单位为毫秒
- 声明一个交换机,添加