2022-05-07 Rabbitmq 主题模式

RabbitMq有三种模式:

fanout 模式(广播模式):
个人理解就跟大广播一样的,只要连接到这种交换器,所以列队拿到的消息都是一模一样的,常用于那种需要通知很多服务或者其他多个系统的消息类型。

direct 模式通过 RoutingKey 将消息发送给指定的队列,个人觉得用于那种只有一两个地方需要接受这样消息的地方,使用消息地方不是特别多的场景。

topic 模式,跟 direct 差不多,但更加灵活,支持模式匹配,通配符等 .* 单个字符匹配 .# 0个或者多个字符匹配;
交换器里的消息按照Routing key分门别类,一个Routing key通常对应一类的消息,一般要接受多个key消息的话,可以配置成类似数组的形式,有时候也需要接受只要key中包含特定字符或者以特定字符开始的消息,那么topic模式是你最佳选择!

今天主要说说topic模式,由于平时用的较多,而且主题模式也更加灵活, 先粘贴个例子;
常量类

/**
 * @Description 地图mq常量
 * @Author FL
 * @Date 16:18 2021/10/25
 **/
public interface MapMQConstant {

    //==============================区域======================================//

    /**
     * 区域消息mq交换机
     */
    String VSE_AREA_MANAGE_EXCHANGE = "vse.area.manage.exchange";

    /**
     * 区域消息key
     */
    String VSE_AREA_MANAGE_KEY = "vse.area.manage";
    /**
     * 区域绑定消息key
     */
    String VSE_AREA_MANAGE_BINDING_KEY = "vse.area.manage.#";
    /**
     * 区域消息queue
     */
    String VSE_AREA_MANAGE_QUEUE = "vse.area.manage.basic.queue";

    //==============================设备======================================//

    /**
     * 设备消息mq交换机
     */
    String VSE_DEVICE_MANAGE_EXCHANGE = "vse.device.manage.exchange";

    /**
     * 设备消息key
     */
    String VSE_DEVICE_MANAGE_KEY = "vse.device.manage";
    /**
     * 设备绑定消息key
     */
    String VSE_DEVICE_MANAGE_BINDING_KEY = "vse.device.manage.#";
    /**
     * 设备消息queue
     */
    String VSE_DEVICE_MANAGE_QUEUE = "vse.device.manage.basic.queue";

    /**
     * 环境采集绑定设备queue
     */
    String VSE_DEVICE_MANAGE_ENV_QUEUE = "vse.device.manage.env.queue";

    /**
     * 设备人脸消息queue
     */
    String VSE_FACE_DEVICE_MANAGE_QUEUE = "vse.device.manage.face.queue";

    //==============================租户======================================//

    /**
     * 租户消息mq交换机
     */
    String VSE_TENANT_MANAGE_EXCHANGE = "vse.tenant.manage.exchange";

    /**
     * 租户消息key
     */
    String VSE_TENANT_MANAGE_KEY = "vse.tenant.manage";
    /**
     * 租户绑定消息key
     */
    String VSE_TENANT_MANAGE_BINDING_KEY = "vse.tenant.manage.#";
    /**
     * 租户消息queue
     */
    String VSE_TENANT_MANAGE_MAP_QUEUE = "vse.tenant.manage.map.dismiss.queue";

    String VSE_TENANT_MANAGE_UPMS_QUEUE = "vse.tenant.manage.upms.dismiss.queue";

    String VSE_TENANT_MANAGE_MESSAGE_QUEUE = "vse.tenant.manage.message.dismiss.queue";

    String VSE_TENANT_MANAGE_EVENT_QUEUE = "vse.tenant.manage.event.dismiss.queue";

    String VSE_TENANT_MANAGE_BASIC_QUEUE = "vse.tenant.manage.basic.dismiss.queue";

    String VSE_TENANT_MANAGE_DECODER_QUEUE = "vse.tenant.manage.decoder.dismiss.queue";

    String VSE_TENANT_MANAGE_FACE_QUEUE = "vse.tenant.manage.face.dismiss.queue";

    String VSE_TENANT_MANAGE_CASCADE_QUEUE = "vse.tenant.manage.cascade.dismiss.queue";

    String VSE_TENANT_MANAGE_QUEUE = "vse.tenant.manage.basic.queue";

    //==============================人脸布控======================================//

    /**
     * 人脸布控消息mq交换机
     */
    String VSE_FACE_IMAGE_EXCHANGE = "vse.face.image.exchange";

    /**
     * 人脸布控消息key
     */
    String VSE_FACE_IMAGE_KEY = "vse.face.image";
    /**
     * 人脸布控绑定消息key
     */
    String VSE_FACE_IMAGE_BINDING_KEY = "vse.face.image.#";
    /**
     * 人脸布控消息queue
     */
    String VSE_FACE_IMAGE_QUEUE = "vse.face.image.face.queue";

}

1 发送mq

  log.info("通道关闭发送mq:{} ", JSONObject.toJSONString(messageBean));
                    rabbitTemplate.convertAndSend(MapMQConstant.VSE_DEVICE_MANAGE_EXCHANGE, MapMQConstant.VSE_DEVICE_MANAGE_KEY, messageBean,
                            new CorrelationData(UUID.randomUUID().toString()));

2 接收mq消息

    /**
     * @Description 设备删除, 设备上线
     * @Author FL
     * @Date 10:26 2021/12/25
     * @Param [message]
     **/
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MapMQConstant.VSE_FACE_DEVICE_MANAGE_QUEUE),
            exchange = @Exchange(value = MapMQConstant.VSE_DEVICE_MANAGE_EXCHANGE, type = ExchangeTypes.TOPIC),
            key = MapMQConstant.VSE_DEVICE_MANAGE_BINDING_KEY))
    @RabbitHandler
    public void deviceConsumer(Message message) {
        List<MessageMqBean> messageMqBeans = getMessageMqBeans(message);
        if (CollectionUtil.isNotEmpty(messageMqBeans)) {
            messageMqBeans.forEach(messageMqBean -> {

                String tenantId = messageMqBean.getTenantId();
                String type = messageMqBean.getType();

                JSONArray jsonArray = JSONUtil.parseArray(messageMqBean.getInfo());
                List<DeleteDeviceMq> deviceMqList = jsonArray.toList(DeleteDeviceMq.class);
                // 删除所有该设备下的子任务
                if (MapMessageTypeEnum.DELETE_DEVICE.name().equals(type)) {
                    for (DeleteDeviceMq deleteDeviceMq : deviceMqList) {
                        faceControlService.deviceDeleteControl(deleteDeviceMq.getSn(), tenantId);
                    }
                }
                // 是否有该设备的 离线失败 子任务
                // 子任务放入mq(重新执行)
                if (MapMessageTypeEnum.CHANNEL_ONLINE.name().equals(type)) {
                    for (DeleteDeviceMq deleteDeviceMq : deviceMqList) {
                        faceControlService.offlineFailedTask(deleteDeviceMq.getSn(), tenantId);
                    }
                }
            });
        }
    }

这里面接收mq ,其实可以有很多,为啥呢? 因为,咱们写入mq的时候的路由key 虽然是固定的,但是接收绑定的key可以是匹配模糊接收,什么意思呢?
比如如上, 发送mq的时候,路由key为 vse.device.manage
接收mq的时候 我们的 绑定key 为 vse.device.manage.#

. # 为模糊匹配规则, 匹配0 个或者多个字符串 这里就是0 个字符串,接收到了消息
然后其实还有另一个就是.* 单个字符串匹配,例如
接收mq的时候 我们的 绑定key 改为 vse.device.* 也是可以接受到消息的

如果一个消息,多个地方同时接收,唯一不同的,就是接受消息的时候,建立一个新的对列就行了,就是 接受消息的位置 MapMQConstant.VSE_FACE_DEVICE_MANAGE_QUEUE
每次都是不同的队列,就可以在不同位置接收了

如果我想在另外一个位置接收,如下处理:

    /**
     * @Description 设备消息
     * @Author FL
     * @Date 13:54 2021/10/26
     * @Param [message]
     **/
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MapMQConstant.VSE_DEVICE_MANAGE_ENV_QUEUE),
            exchange = @Exchange(value = MapMQConstant.VSE_DEVICE_MANAGE_EXCHANGE, type = "topic"),
            key = MapMQConstant.VSE_DEVICE_MANAGE_BINDING_KEY))
    @RabbitHandler
    public void deviceConsumer(Message message) {
        List<MessageMqBean> messageMqBeans = getMessageMqBeans(message);
        if (CollectionUtil.isNotEmpty(messageMqBeans)) {
            messageMqBeans.forEach(messageMqBean -> {
                if (MapMessageTypeEnum.DELETE_DEVICE.name().equals(messageMqBean.getType())) {
                    envDeviceChannelSensorService.deleteSensorMq(messageMqBean);
                }
            });
        }
        log.info("传感器设备消息处理完成");
    }

这里的队列变为了 MapMQConstant.VSE_DEVICE_MANAGE_ENV_QUEUE 绑定的key不变

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • 一 概念 AMQP 即Advanced Message Queuing Protocol,一个提供统一消息服务的应...
    weizz5阅读 842评论 0 0
  • RabbitMQ 简介 MQ 消息队列,上承生产者,下接消费者。从生产者侧获取消息,然后将消息转发给消费者。由此可...
    2205阅读 3,480评论 1 11
  • AMQP 0.9.1介绍 AMQP 是什么 AMQP(高级消息队列协议)是一个网络协议,他支持符合要求的客户端应用...
    呆弱鸡阅读 345评论 0 0
  • linux资料总章2.1 1.0写的不好抱歉 但是2.0已经改了很多 但是错误还是无法避免 以后资料会慢慢更新 大...
    数据革命阅读 12,131评论 2 34
  • 系统管理与维护命令 date date(选项)(参数) | 选项 | 说明 | | :-------- | ...
    蓓蓓的万能男友阅读 3,862评论 0 5