RabbitMq有三种模式:
fanout 模式(广播模式):
个人理解就跟大广播一样的,只要连接到这种交换器,所以列队拿到的消息都是一模一样的,常用于那种需要通知很多服务或者其他多个系统的消息类型。
direct 模式通过 RoutingKey 将消息发送给指定的队列,个人觉得用于那种只有一两个地方需要接受这样消息的地方,使用消息地方不是特别多的场景。
topic 模式,跟 direct 差不多,但更加灵活,支持模式匹配,通配符等 .* 单个字符匹配 .# 0个或者多个字符匹配;
交换器里的消息按照Routing key分门别类,一个Routing key通常对应一类的消息,一般要接受多个key消息的话,可以配置成类似数组的形式,有时候也需要接受只要key中包含特定字符或者以特定字符开始的消息,那么topic模式是你最佳选择!
今天主要说说topic模式,由于平时用的较多,而且主题模式也更加灵活, 先粘贴个例子;
常量类
/**
* @Description 地图mq常量
* @Author FL
* @Date 16:18 2021/10/25
**/
public interface MapMQConstant {
//==============================区域======================================//
/**
* 区域消息mq交换机
*/
String VSE_AREA_MANAGE_EXCHANGE = "vse.area.manage.exchange";
/**
* 区域消息key
*/
String VSE_AREA_MANAGE_KEY = "vse.area.manage";
/**
* 区域绑定消息key
*/
String VSE_AREA_MANAGE_BINDING_KEY = "vse.area.manage.#";
/**
* 区域消息queue
*/
String VSE_AREA_MANAGE_QUEUE = "vse.area.manage.basic.queue";
//==============================设备======================================//
/**
* 设备消息mq交换机
*/
String VSE_DEVICE_MANAGE_EXCHANGE = "vse.device.manage.exchange";
/**
* 设备消息key
*/
String VSE_DEVICE_MANAGE_KEY = "vse.device.manage";
/**
* 设备绑定消息key
*/
String VSE_DEVICE_MANAGE_BINDING_KEY = "vse.device.manage.#";
/**
* 设备消息queue
*/
String VSE_DEVICE_MANAGE_QUEUE = "vse.device.manage.basic.queue";
/**
* 环境采集绑定设备queue
*/
String VSE_DEVICE_MANAGE_ENV_QUEUE = "vse.device.manage.env.queue";
/**
* 设备人脸消息queue
*/
String VSE_FACE_DEVICE_MANAGE_QUEUE = "vse.device.manage.face.queue";
//==============================租户======================================//
/**
* 租户消息mq交换机
*/
String VSE_TENANT_MANAGE_EXCHANGE = "vse.tenant.manage.exchange";
/**
* 租户消息key
*/
String VSE_TENANT_MANAGE_KEY = "vse.tenant.manage";
/**
* 租户绑定消息key
*/
String VSE_TENANT_MANAGE_BINDING_KEY = "vse.tenant.manage.#";
/**
* 租户消息queue
*/
String VSE_TENANT_MANAGE_MAP_QUEUE = "vse.tenant.manage.map.dismiss.queue";
String VSE_TENANT_MANAGE_UPMS_QUEUE = "vse.tenant.manage.upms.dismiss.queue";
String VSE_TENANT_MANAGE_MESSAGE_QUEUE = "vse.tenant.manage.message.dismiss.queue";
String VSE_TENANT_MANAGE_EVENT_QUEUE = "vse.tenant.manage.event.dismiss.queue";
String VSE_TENANT_MANAGE_BASIC_QUEUE = "vse.tenant.manage.basic.dismiss.queue";
String VSE_TENANT_MANAGE_DECODER_QUEUE = "vse.tenant.manage.decoder.dismiss.queue";
String VSE_TENANT_MANAGE_FACE_QUEUE = "vse.tenant.manage.face.dismiss.queue";
String VSE_TENANT_MANAGE_CASCADE_QUEUE = "vse.tenant.manage.cascade.dismiss.queue";
String VSE_TENANT_MANAGE_QUEUE = "vse.tenant.manage.basic.queue";
//==============================人脸布控======================================//
/**
* 人脸布控消息mq交换机
*/
String VSE_FACE_IMAGE_EXCHANGE = "vse.face.image.exchange";
/**
* 人脸布控消息key
*/
String VSE_FACE_IMAGE_KEY = "vse.face.image";
/**
* 人脸布控绑定消息key
*/
String VSE_FACE_IMAGE_BINDING_KEY = "vse.face.image.#";
/**
* 人脸布控消息queue
*/
String VSE_FACE_IMAGE_QUEUE = "vse.face.image.face.queue";
}
1 发送mq
log.info("通道关闭发送mq:{} ", JSONObject.toJSONString(messageBean));
rabbitTemplate.convertAndSend(MapMQConstant.VSE_DEVICE_MANAGE_EXCHANGE, MapMQConstant.VSE_DEVICE_MANAGE_KEY, messageBean,
new CorrelationData(UUID.randomUUID().toString()));
2 接收mq消息
/**
* @Description 设备删除, 设备上线
* @Author FL
* @Date 10:26 2021/12/25
* @Param [message]
**/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MapMQConstant.VSE_FACE_DEVICE_MANAGE_QUEUE),
exchange = @Exchange(value = MapMQConstant.VSE_DEVICE_MANAGE_EXCHANGE, type = ExchangeTypes.TOPIC),
key = MapMQConstant.VSE_DEVICE_MANAGE_BINDING_KEY))
@RabbitHandler
public void deviceConsumer(Message message) {
List<MessageMqBean> messageMqBeans = getMessageMqBeans(message);
if (CollectionUtil.isNotEmpty(messageMqBeans)) {
messageMqBeans.forEach(messageMqBean -> {
String tenantId = messageMqBean.getTenantId();
String type = messageMqBean.getType();
JSONArray jsonArray = JSONUtil.parseArray(messageMqBean.getInfo());
List<DeleteDeviceMq> deviceMqList = jsonArray.toList(DeleteDeviceMq.class);
// 删除所有该设备下的子任务
if (MapMessageTypeEnum.DELETE_DEVICE.name().equals(type)) {
for (DeleteDeviceMq deleteDeviceMq : deviceMqList) {
faceControlService.deviceDeleteControl(deleteDeviceMq.getSn(), tenantId);
}
}
// 是否有该设备的 离线失败 子任务
// 子任务放入mq(重新执行)
if (MapMessageTypeEnum.CHANNEL_ONLINE.name().equals(type)) {
for (DeleteDeviceMq deleteDeviceMq : deviceMqList) {
faceControlService.offlineFailedTask(deleteDeviceMq.getSn(), tenantId);
}
}
});
}
}
这里面接收mq ,其实可以有很多,为啥呢? 因为,咱们写入mq的时候的路由key 虽然是固定的,但是接收绑定的key可以是匹配模糊接收,什么意思呢?
比如如上, 发送mq的时候,路由key为 vse.device.manage
接收mq的时候 我们的 绑定key 为 vse.device.manage.#
. # 为模糊匹配规则, 匹配0 个或者多个字符串 这里就是0 个字符串,接收到了消息
然后其实还有另一个就是.* 单个字符串匹配,例如
接收mq的时候 我们的 绑定key 改为 vse.device.* 也是可以接受到消息的
如果一个消息,多个地方同时接收,唯一不同的,就是接受消息的时候,建立一个新的对列就行了,就是 接受消息的位置 MapMQConstant.VSE_FACE_DEVICE_MANAGE_QUEUE
每次都是不同的队列,就可以在不同位置接收了
如果我想在另外一个位置接收,如下处理:
/**
* @Description 设备消息
* @Author FL
* @Date 13:54 2021/10/26
* @Param [message]
**/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MapMQConstant.VSE_DEVICE_MANAGE_ENV_QUEUE),
exchange = @Exchange(value = MapMQConstant.VSE_DEVICE_MANAGE_EXCHANGE, type = "topic"),
key = MapMQConstant.VSE_DEVICE_MANAGE_BINDING_KEY))
@RabbitHandler
public void deviceConsumer(Message message) {
List<MessageMqBean> messageMqBeans = getMessageMqBeans(message);
if (CollectionUtil.isNotEmpty(messageMqBeans)) {
messageMqBeans.forEach(messageMqBean -> {
if (MapMessageTypeEnum.DELETE_DEVICE.name().equals(messageMqBean.getType())) {
envDeviceChannelSensorService.deleteSensorMq(messageMqBean);
}
});
}
log.info("传感器设备消息处理完成");
}
这里的队列变为了 MapMQConstant.VSE_DEVICE_MANAGE_ENV_QUEUE 绑定的key不变