Spring Boot 集成 RabbitMQ

依赖与配置

在 pom.xml 中引入 RabbitMQ 相关依赖

<!-- AMQP 依赖, RabbitMq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.2.7</version>
</dependency>

在 application.yml 中添加配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

RabbitMQ使用

扇形交换机

扇形交换机,顾名思义,就是像一把扇子一样,一个交换器可以绑定多个队列,只要交换机接收到消息就会发送给所有和它绑定的队列。

假设扇形交换机 fanoutExchange 绑定了队列 fanoutQueue1 和 fanoutQueue2 ,那么我们往 fanoutExchange 发送一条消息,fanoutQueue1 和 fanoutQueue2 都会收到一条相同的消息,如果消息未被消费我们可以在 RabbitMQ 管理端看到这两个队列和队列内积压的一条相同的消息。

@Configuration
public class FanoutExchangeConfig {

    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";
    public static final String FANOUT_EXCHANGE = "fanout.exchange";

    //声明队列Q1
    @Bean("fanoutQ1")
    public Queue fanoutQ1() {
        return new Queue(FANOUT_QUEUE1);
    }

    //声明队列Q1
    @Bean("fanoutQ2")
    public Queue fanoutQ2() {
        return new Queue(FANOUT_QUEUE2);
    }

    //声明扇形交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    //队列Q1绑定扇形交换机
    @Bean("bindFanoutQ1")
    public Binding bindFanoutQ1() {
        return BindingBuilder.bind(fanoutQ1()).to(fanoutExchange());
    }

    //队列Q2绑定扇形交换机
    @Bean("bindFanoutQ2")
    public Binding bindFanoutQ2() {
        return BindingBuilder.bind(fanoutQ2()).to(fanoutExchange());
    }

}

编写 junit 测试, 发送消息

@Test
public void fanoutTest() {
    rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, "", "fantoutTest");
}

发送消息后由于没有消费者,可以在管理端看到积压在队列中的消息。

直连交换机

一个直连交换机可以有多个队列,但每个队列都有一个路由一一匹配,交换机根据路由将消息投递到对应队列中。当同一个队列有多个消费者时,消息不会被重复消费,直连交换机能够轮询公平的将消息分发给每个消费者。

假设直连交换机 directExchange 与队列 directQueue1 通过路由 directRoute1 绑定, 与directQueue2 通过路由 directRoute2 绑定。当生产者发送路由为 directRoute1 的消息给 directExchange 时,消息会被投递到 directQueue1 ,directQueue2 则接收不到消息。

@Configuration
public class DirectExchangeConfig {

    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";
    public static final String DIRECT_EXCHANGE = "direct.exchange";

    public static final String DIRECT_ROUTE_KEY1 = "direct.route.key1";

    //声明队列Q1
    @Bean("directQ1")
    public Queue directQ1() {
        return new Queue(DIRECT_QUEUE1);
    }

    //声明队列Q1
    @Bean("directQ2")
    public Queue directQ2() {
        return new Queue(DIRECT_QUEUE2);
    }

    //声明直连交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    //队列Q1绑定直连交换机
    @Bean("bindDirectQ1")
    public Binding bindDirectQ1() {
        return BindingBuilder.bind(directQ1()).to(directExchange()).with(DIRECT_ROUTE_KEY1);
    }

    //队列Q2绑定直连交换机
    @Bean("bindDirectQ2")
    public Binding bindDirectQ2() {
        return BindingBuilder.bind(directQ2()).to(directExchange()).with("");
    }

}

编写 junit 测试,投递消息给交换机。

@Test
public void directTest() {
    //消息被投递给 bindDirectQ2
    rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, "", "directTest");
    //消息被投递给 bindDirectQ1
    rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DIRECT_ROUTE_KEY1, "directTest-key1");
    //没有匹配的路由,bindDirectQ1 和 bindDirectQ2 都无法接收
    rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, "test", "directTest-test");
}

主题交换机

一个主题交换机可以有多个绑定队列,支持路由模糊匹配,可以使用星号(*)和井号(#)作为通配符进行匹配。其中,* 可以代替一个单词,# 可以代替任意个单词。

假设主题交换机 topicExchange 通过路由 topic.route.* 绑定队列 topicQueue1 , 通过路由 topic.route.# 绑定队列 topicQueue2。当生产者通过路由 topic.route.1 和 topic.route.1.1 投递消息给 topicExchange 时, topicQueue2 会接收到两条不同路由的消息, 而 topicQueue1 仅能接收到路由为 topic.route.1 的消息。

@Configuration
public class TopicExchangeConfig {

    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String TOPIC_EXCHANGE = "topic.exchange";

    public static final String TOPIC_ROUTE_KEY1 = "topic.route.*";
    public static final String TOPIC_ROUTE_KEY2 = "topic.route.#";

    //声明队列Q1
    @Bean("topicQ1")
    public Queue topicQ1() {
        return new Queue(TOPIC_QUEUE1);
    }

    //声明队列Q1
    @Bean("topicQ2")
    public Queue topicQ2() {
        return new Queue(TOPIC_QUEUE2);
    }

    //声明主题交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    //队列Q1绑定主题交换机
    @Bean("bindTopicQ1")
    public Binding bindTopicQ1() {
        return BindingBuilder.bind(topicQ1()).to(topicExchange()).with(TOPIC_ROUTE_KEY1);
    }

    //队列Q2绑定主题交换机
    @Bean("bindTopicQ2")
    public Binding bindTopicQ2() {
        return BindingBuilder.bind(topicQ2()).to(topicExchange()).with(TOPIC_ROUTE_KEY2);
    }

}

编写 junit 测试,投递两条不同路由的消息给主题交换机

@Test
public void topicTest(){
    //消息被投递给 topicQ1 和 topicQ2
    rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "topic.route.1", "topicTest-*");
    //消息仅投递给 topicQ2
    rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "topic.route.1.1", "topicTest-#");
}

可以从管理端看到 topicQueue2 由于绑定的是通配符#,故此两条消息都有被投递到队列中,topicQueue1 由于绑定的是通配符* 只匹配到一条消息,故此只有一条消息被投递到队列中。

首部交换机

首部交换机通过设置消息的头部信息来进行绑定队列的分发,它不依赖于路由键的匹配规则来分发消息,而是根据发送的消息内容中的headers属性进行匹配。当消息投递到首部交换器时,RabbitMQ会获取到该消息的headers(一个键值对的形式),并且对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对。如果完全匹配,则消息会路由到该队列,否则不会路由到该队列。

@Configuration
public class HeadExchangeConfig {

    public static final String HEADER_QUEUE1 = "header.queue1";
    public static final String HEADER_QUEUE2 = "header.queue2";
    public static final String HEADER_QUEUE3 = "header.queue3";
    public static final String HEADER_EXCHANGE = "header.exchange";

    public static final String HEADER_KEY1 = "headerKey1";
    public static final String HEADER_KEY2 = "headerKey2";

    //声明queue
    @Bean("headerQueue1")
    public Queue headerQueue1() {
        return new Queue(HEADER_QUEUE1);
    }

    @Bean("headerQueue2")
    public Queue headerQueue2() {
        return new Queue(HEADER_QUEUE2);
    }

    @Bean("headerQueue3")
    public Queue headerQueue3() {
        return new Queue(HEADER_QUEUE3);
    }

    //声明首部交换机
    @Bean
    public HeadersExchange headerExchange() {
        return new HeadersExchange(HEADER_EXCHANGE);
    }

    //声明Binding,绑定Header(消息头部参数)中 HEADER_KEY1 = a的队列。header的队列匹配可以用mathces和exisits
    @Bean
    public Binding bindHeaderQueue1() {
        return BindingBuilder.bind(headerQueue1()).to(headerExchange()).where(HEADER_KEY1).matches("a");
    }

    //绑定Header中 HEADER_KEY2 =1的队列。
    @Bean
    public Binding bindHeaderBusTyp1() {
        return BindingBuilder.bind(headerQueue2()).to(headerExchange()).where(HEADER_KEY2).matches("b");
    }

    //绑定Header中 HEADER_KEY1 = a 或者 HEADER_KEY2 = b 的队列。
    @Bean
    public Binding bindHeaderTxBusTyp1() {
        Map<String, Object> condMap = new HashMap<>();
        condMap.put(HEADER_KEY1, "a");
        condMap.put(HEADER_KEY2, "b");
        return BindingBuilder.bind(headerQueue3()).to(headerExchange()).whereAny(condMap).match();
    }

}

编写 junit 测试, 往首部交换机投递信息

@Test
public void headerTest(){
    MessageProperties properties = new MessageProperties();
    properties.setHeader(HeadExchangeConfig.HEADER_KEY1, "a");
    
    //消息被投递到 headerQueue1 和 headerQueue3
    rabbitTemplate.convertAndSend(HeadExchangeConfig.HEADER_EXCHANGE, "", new Message("headerTest-a".getBytes(), properties));
    properties.setHeader(HeadExchangeConfig.HEADER_KEY1, "");
    properties.setHeader(HeadExchangeConfig.HEADER_KEY2, "b");
    
    //消息被投递到 headerQueue2 和 headerQueue3
    rabbitTemplate.convertAndSend(HeadExchangeConfig.HEADER_EXCHANGE, "", new Message("headerTest-b".getBytes(), properties));

}

备份交换机

通过设置交换机的alternate-exchange的参数设置备份交换机,当消息路由无法在当前交换机匹配到合适的队列投递时,将消息转到备份交换机,分发到其绑定的备份队列中。

@Configuration
public class BackupExchangeConfig {

    public static final String BACKUP_QUEUE = "backup.queue";
    public static final String BACKUP_EXCHANGE = "backup.exchange";
    public static final String BACKUP_ROUTE_KEY = "backup.key";

    public static final String NON_BACKUP_QUEUE = "nonbackup.queue";
    public static final String NON_BACKUP_EXCHANGE = "nonbackup.exchange";
    public static final String NON_BACKUP_ROUTE_KEY = "nonbackup.key";

    @Bean("backupQueue")
    public Queue backupQueue(){
        return new Queue(BACKUP_QUEUE, true, false, false);
    }

    @Bean("nonBackupQueue")
    public Queue nonBackupQueue(){
        return new Queue(NON_BACKUP_QUEUE, true, false, false);
    }

    @Bean("nonBackupExchange")
    public DirectExchange nonBackupExchange(){
        Map<String, Object> args = new HashMap<>(2);
        args.put("alternate-exchange", BACKUP_EXCHANGE);
        return new DirectExchange(NON_BACKUP_EXCHANGE,true,false, args);
    }

    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE,true,false);
    }

    @Bean("bindNonBackupQueue")
    public Binding bindNonBackupQueue(){
        return BindingBuilder.bind(nonBackupQueue()).to(nonBackupExchange()).with(NON_BACKUP_ROUTE_KEY);
    }

    @Bean("bindBackupQueue")
    public Binding bindBackupQueue(){
        return BindingBuilder.bind(backupQueue()).to(backupExchange());
    }

}

编写测试用例, 投递消息给备份交换机

@Test
public void backupTest() {
    //路由正确匹配,消息投递到非备份队列中
    rabbitTemplate.convertAndSend(BackupExchangeConfig.NON_BACKUP_EXCHANGE, BackupExchangeConfig.NON_BACKUP_ROUTE_KEY, "nonBackupTest");
    //路由无法匹配,消息投递到备份队列中
    rabbitTemplate.convertAndSend(BackupExchangeConfig.NON_BACKUP_EXCHANGE, BackupExchangeConfig.NON_BACKUP_ROUTE_KEY + "123", "backupTest");
}

可以看到备份队列和非备份队列中各有一条消息。

死信交换机

死信交换机其实可以理解成一个拥有特殊意义的直连交换机,正常队列通过设置队列中的x-dead-letter-exchangex-dead-letter-routing-key 参数来设置绑定死信交换机,当消费者拒绝消费、消息积压队列达到最大长度或者消息过期时,消息从正常队列转到死信队列。

死信在转移到死信队列时,它的路由也会保存下来。但是如果配置了x-dead-letter-routing-key参数的话,路由就会被替换为配置的这个值。另外,死信在转移到死信队列的过程中,是没有经过消息发送者确认的,所以并不能保证消息的安全性。

消息被作为死信转移到死信队列后,会在Header当中增加一些消息。比如时间、原因(rejected,expired,maxlen)、队列等。另外header中还会加上第一次成为死信的三个属性(x-first-death-reasonx-first-death-queuex-first-death-exchange),并且这三个属性在以后的传递过程中都不会更改。

死信队列也可以向其它队列一样被消费者正常订阅消费。

@Configuration
public class DeadLetterExchangeConfig {

    public static final String DEAD_QUEUE = "dead.queue";
    public static final String DEAD_EXCHANGE = "dead.exchange";
    public static final String DEAD_ROUTE_KEY = "dead.key";

    public static final String NON_DEAD_QUEUE = "nondead.queue";
    public static final String NON_DEAD_EXCHANGE = "nondead.exchange";
    public static final String NON_DEAD_ROUTE_KEY = "nondead.key";

    @Bean("deadQueue")
    public Queue deadQueue(){
        return new Queue(DEAD_QUEUE, true, false, false);
    }

    @Bean("nonDeadQueue")
    public Queue nonDeadQueue(){
        Map<String, Object> args = new HashMap<>(2);
        args.put("x-message-ttl",10000);
        args.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_ROUTE_KEY);
        return new Queue(NON_DEAD_QUEUE, true, false, false, args);
    }

    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_EXCHANGE,false,false);
    }

    @Bean("nonDeadExchange")
    public DirectExchange nonDeadExchange(){
        return new DirectExchange(NON_DEAD_EXCHANGE,true,false);
    }

    @Bean("bindDeadQueue")
    public Binding bindDeadQueue(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTE_KEY);
    }

    @Bean("bindNonDeadQueue")
    public Binding bindNonDeadQueue(){
        return BindingBuilder.bind(nonDeadQueue()).to(nonDeadExchange()).with(NON_DEAD_ROUTE_KEY);
    }

}

编写 junit 测试, 投递一条10秒过期的消息,刚投递时消息存在于正常队列,10秒过期后转到死信队列。 投递消息和队列同时设置过期时间时,以时间更短的为准。

@Test
public void deadTest() {
    rabbitTemplate.convertAndSend(DeadLetterExchangeConfig.NON_DEAD_EXCHANGE, DeadLetterExchangeConfig.NON_DEAD_ROUTE_KEY, "deadTest 1");
}

延时队列

通过TTL+死信队列实现延时队列,与上述死信交换机使用大同小异,核心就是创建队列的时候设置如下三个参数:

  • x-message-ttl (必要) :当前队列消息多久未消费进入死信队列
  • x-dead-letter-exchange (必要):消息过期后进入的死信队列交换机
  • x-dead-letter-routing-key (非必要):消息的路由, 未设置时保留原队列的路由

TTL 消息可以通过以下方式创建

方式一:在队列中设置 x-message-ttl 参数

@Bean("nonDeadQueue")
public Queue nonDeadQueue(){
    Map<String, Object> args = new HashMap<>(2);
    args.put("x-message-ttl",10000);
    args.put("x-dead-letter-exchange",DEAD_EXCHANGE);
    args.put("x-dead-letter-routing-key",DEAD_ROUTE_KEY);
    return new Queue(NON_DEAD_QUEUE, true, false, false, args);
}

方式二: 在投递消息时设置消息的过期时间

MessageProperties properties = new MessageProperties();
properties.setExpiration("10000");
rabbitTemplate.convertAndSend(DeadLetterExchangeConfig.NON_DEAD_EXCHANGE, DeadLetterExchangeConfig.NON_DEAD_ROUTE_KEY, new Message("ttl test".getBytes(), properties));

两种方式同时都有设置时,时间短的设置生效。

动态创建队列、交换机及绑定关系

Spring Boot 封装了一些类用于对 RabbitMQ 的管理

  • AmqpAdmin
    用于管理队列、交换机及绑定关系 。

  • RabbitTemplate
    对消息操作的一些封装。

@Autowired
private AmqpAdmin amqpAdmin;

public void createComponents(){

    String queueName = "amqp.queue";
    String exchangeName = "amqp.exchange";
    
    //声明(创建)队列
    Queue queue = new Queue(queueName, false, false, false, null);
    amqpAdmin.declareQueue(queue);
    
    //声明交换机
    FanoutExchange fanoutExchange = new FanoutExchange(exchangeName, false, false, null);
    amqpAdmin.declareExchange(fanoutExchange);
    
    //声明绑定
    amqpAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, "", null));
}

消息监听

启动类添加@EnableRabbit注解启用RabbitMQ ,通过注解 @RabbitListener@RabbitHandler 进行消息的消费

@Component
@RabbitListener(queues = "fanout.queue1")
public class FanoutQueueRecever {

    @RabbitHandler
    public void handle(String msg){
        System.out.println("收到来自 fanout.queue1 的消息 :" + msg);
    }
}

消息确认

保证发送方消息不丢失

开启生产端确认, 消息发送成功后回调,获得预期结果后才认为消息发送成功。

  • 交换机收到消息进行回调,ConfirmCallback

    spring.rabbitmq.publisher-confirm-type: correlated (高版本Spring使用)
    spring.rabbitmq.publisher-confirms: true(低版本Spring使用)

  • 消息正确抵达队列进行回调,ReturnsCallback

    spring.rabbitmq.publisher-returns: true
    spring.rabbitmq.template.mandatory: true, 只要抵达队列,以异步形式优先发送回调 ReturnCallback

保证消费者消息不丢失

开启消费端确认(保证每个消息都被正确消费,此时才可以删除这个消息)

  • 手动ack消息
    spring.rabbitmq.listener.simple.acknowledge-mode: manual

    • AcknowledgeMode.NONE:自动确认
      自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
      如果消息已经被处理,但后续出现异常导致事务回滚,也同样造成了实际意义的消息丢失。

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认
      如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者。
      如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限,不足以处理后续投递的消息。

  • ACK的几种方法

    • channel.basicNack(deliveryTag, multiple, requeue); 拒绝消费。
      deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

      multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。

      requeue: 是否重新入队

    • channel.basicAck(deliveryTag, multiple); 确认消费,参数解释同上。

    • channel.basicReject(deliveryTag, requeue); 拒绝消费,不支持批量操作,用法与basicNack()类似。

代码实现

yaml 文件配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 开启发送端确认
    #publisher-confirms: true
    publisher-confirm-type: correlated
    #开启发送端消息抵达确认
    publisher-returns: true
    #只要抵达队列。以异步发送优先回调returnconfirm
    template:
      mandatory: true
    # 手动ack消息
    listener:
      simple:
        acknowledge-mode: manual

配置RabbitMQ, 设置发送者消息确认逻辑

@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 发送消息触发confirmCallback回调, 无论是否到达队列,只要有到达交换机都会触发这个回调
     * @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null)
     * @param ack:消息是否成功收到(ack=true,消息抵达Broker)
     * @param cause:失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("发送消息触发confirmCallback回调");
        System.out.println(String.format("correlationData:%s\nack:%s\ncause:%s", correlationData, ack, cause));
    }

    /**
     * 消息未到达队列触发returnCallback回调,只要消息没有投递给指定的队列,就触发这个失败回调
     * @param returnedMessage 返回的消息,包含
     *                        message:投递失败的消息详细信息
     *                        replyCode:回复的状态码
     *                        replyText:回复的文本内容
     *                        exchange:接收消息的交换机
     *                        routingKey:接收消息的路由键
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("消息未到达队列触发returnCallback回调");
        System.out.println(returnedMessage.toString());
    }
}

实现消费者确认

@Component
@RabbitListener(queues = "fanout.queue1")
public class FanoutQueueRecever {

    @RabbitHandler
    public void handle(String msg, Channel channel, @Headers Map<String, Object> headers){
        System.out.println("收到来自 fanout.queue1 的消息 :" + msg);
        System.out.println("tag:" + headers.get(AmqpHeaders.DELIVERY_TAG));
        try {
            if("ack".equalsIgnoreCase(msg)){
                channel.basicAck((long)headers.get(AmqpHeaders.DELIVERY_TAG), false);
            }
            if("nack".equalsIgnoreCase(msg)){
                channel.basicNack((long)headers.get(AmqpHeaders.DELIVERY_TAG), false, true);
            }
            if("reject".equalsIgnoreCase(msg)){
                channel.basicReject((long)headers.get(AmqpHeaders.DELIVERY_TAG), false);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * @Payload 注解的对象需要实现序列化
     * @Headers 获取所有头部信息
     * @Header  获取单个头部信息
     */
    @RabbitHandler(isDefault = true)
    public void handleMap(@Payload MyMessage message, Channel channel, @Headers Map<String, Object> headers){
        System.out.println("收到来自 fanout.queue1 的消息 :" + message.toString());
        System.out.println("tag:" + headers.get(AmqpHeaders.DELIVERY_TAG));
        try {
            channel.basicAck((long)headers.get(AmqpHeaders.DELIVERY_TAG), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

编写 junit 测试, 发送测试消息

@Data
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
public class MyMessage implements Serializable {

    private String id;

    private String name;

}
@Test
public void confirmTest(){
    //未知交换机, 触发 confirmCallback 回调
    //rabbitTemplate.convertAndSend("unknow", "unknow", "confirmTest");
    //未知路由, 消息到达交换机但是无法到达队列, 触发 confirmCallback 回调和 returnCallback 回调
    //rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "topic", "topicTest-*");

    //正常到达队列,触发confirmCallback回调
    rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, "", "fantoutTest", new CorrelationData("1"));
    rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, "", new MyMessage("1", "张三"), new CorrelationData("2"));

}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容