RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

我们知道,消息在RabbitMQ的整个生命周期是生产者投递消息ExchangeExchange根据路由键消息路由到合适的QueueQueue再将消息推(或消费者主动拉)给消费者

在这个过程当中,Exchange根据路由键将消息路由到合适的Queue的过程,可能发生诸如

  1. Exchange没有任何Queue与其绑定,
  2. 或者根据消息的路由键,没有任何一个合适的Queue来投递消息,

从而导致消息路由失败。
对于这些路由失败的消息应该如何处理呢?
有两种方式:

  1. 将消息返回给投递该条消息的生产者。
  2. 使用备份交换机 alternate-exchange(AE)。

方式1:将消息返回给投递该条消息的生产者

  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=true
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=true
  • 交换机定义与消息发送
@Slf4j
@Component
public class NoMatchQueue {

    /**
     * 交换机名称
     */
    public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void send() {
        log.info("发送消息");
        Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
        Message message = MessageBuilder
                .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
                .setContentEncoding(StandardCharsets.UTF_8.displayName())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .build();
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
    }
}

@Configuration
class ExchangeDeclare {
    /**
     * 只定义一个交换机,但是不绑定任何Queue,所以发送到该Exchange的消息都会路由失败
     *
     * @return
     */
    @Bean
    public Exchange noMatchQueueExchange() {
        return ExchangeBuilder
                .topicExchange(NoMatchQueue.EXCHANGE_NAME)
                .durable(true)
                .build();
    }
}
  • 设置回调函数
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息被退回:{}", returnedMessage);
    }
});

方式2:使用备份交换机

使用方式1需要我们在程序中进行编码设置回调函数监听,增加了生产者代码的复杂性,那么为了消息不丢失还有没有其他方式来处理路由失败的消息呢: 答案是使用备份交换机

  • 相较于使用回调函数,使用备份交换机只需要给交换机绑定一个备份交换机即可,当消息路由失败之后,消息将投递到备份交换机,再由备份交换机路由消息到备份队列。这样我们只需要关注这个备份队列就能知道/获取到路由失败的消息。通常情况下备份交换的Type应该设置为fanout
  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=false
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=false
  • 注意: 使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。
  • 正常业务交换机(不绑定队列,使得消息一定会路由失败)
/**
 * 业务交换机
 *
 * @return
 */
@Bean
public Exchange noMatchQueueExchange() {
    return ExchangeBuilder
            .topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
            .durable(true)
            // 绑定备份交换机
            .alternate(X_ALTERNATE)
            .build();
}
  • 备份交换机/队列/绑定

/**
 * 备份队列
 *
 * @return
 */
@Bean
public Queue alternateQueue() {
    return QueueBuilder
            .durable("Q_ALTERNATE")
            .build();
}

/**
 * 备份交换机
 *
 * @return
 */
@Bean
public Exchange alternateExchange() {
    return ExchangeBuilder
            .fanoutExchange(X_ALTERNATE)
            .durable(true)
            .build();
}

/**
 * 备份绑定
 *
 * @param alternateExchange
 * @param alternateQueue
 * @return
 */
@Bean
public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) {
    return BindingBuilder
            .bind(alternateQueue)
            .to(alternateExchange)
            .with("")
            .noargs();
}
  • 消息投递
/**
 * 正常业务交换机
 */
public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE";

@Autowired
private RabbitTemplate rabbitTemplate;

/**
 * 发送消息
 */
@PostConstruct
public void send() {
    log.info("发送消息");
    Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
    Message message = MessageBuilder
            .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
            .setContentEncoding(StandardCharsets.UTF_8.displayName())
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .build();
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
  • 结果是消息被路由到备份交换机的备份队列


    图片.png
  • 且:如果你同时使用了两种方式,即(mandatory为true+Listener监听)和(备份交换机AlternateExchange),消息将只会路由到备份交换机,不会Return回生产者。

  • 在原生RabbitMQ-client中演示这一过程:
@Slf4j
public class AeTest {
    /**
     * 获取Channel
     */
    private static final Channel CHANNEL = MqChannelUtils.getChannel();
    /**
     * 备份交换机
     */
    private static final String X_AE = "X_AE";
    /**
     * 备份交换机绑定的队列
     */
    private static final String Q_AE = "Q_AE";

    /**
     * 正常业务的交换机
     */
    private static final String X_1 = "X_1";

    public static void main(String[] args) throws IOException {
        // 定义备份交换机-其实也是一个正常的交换机
        CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true);
        // 定义备份队列
        CHANNEL.queueDeclare(Q_AE, true, false, false, null);
        // 绑定备份
        CHANNEL.queueBind(Q_AE, X_AE, "");

        HashMap<String, Object> arguments = new HashMap<>();
        // 绑定的备份交换机
        arguments.put("alternate-exchange", X_AE);
        // 定义交换机
        CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments);

        // 添加监听器,看看是否还会return消息
        CHANNEL.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return returnMessage) {
                log.error("消息被退回{}", returnMessage);
            }
        });

        // 尝试向交换机发送消息(无法路由)- mandatory参数无效
        CHANNEL.basicPublish(X_1, "", false, false,
                new AMQP.BasicProperties(), "阿依古丽".getBytes(StandardCharsets.UTF_8));
    }
}
  • 两个交换机,正常的交换机X_1和备份交换机X_AE

    图片.png

    图片.png

    图片.png

  • 备份交换机绑定的队列已经接收到了路由失败的消息


    图片.png
  • 其他要注意的点:

    • 备份交换机的Type设置为fanout比较合适,这样可以忽略RoutingKey,避免备份交换机又路由失败。
    • 被投递到备份交换机的RoutingKey为消息投递到MQ时的原始RoutingKey,不会变,这一点在其他场景下也是一样的。
    • 使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。

# 源代码

https://gitee.com/FutaoSmile/tech-sharing-mq

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容