Springboot整合RabbitMQ

1、简介

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

2、创建一个springboot的项目

3、添加RabbitMQ依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4、在application.yml中配置RabbitMQ

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true
    virtual-host: /

5、创建一个rabbitMQ配置类(这个一定要看明白)

/**
 * 说明:〈该类初始化创建队列、转发器,并把队列绑定到转发器〉
 */
@Configuration
public class ApplicationConfig {
    private static Logger log = LoggerFactory.getLogger(ApplicationConfig.class);

    @Autowired
    private CachingConnectionFactory connectionFactory;
    final static String queueName = "helloQuery";

    @Bean
    public Queue helloQueue() {
        return new Queue(queueName);
    }

    @Bean
    public Queue userQueue() {
        return new Queue("user");
    }

    @Bean
    public Queue dirQueue() {
        return new Queue("direct");
    }
    
    //===============以下是验证topic Exchange的队列========== 
    // Bean默认的name是方法名
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
    //===============以上是验证topic Exchange的队列===========
    
    
    //===============以下是验证Fanout Exchange的队列==========
    @Bean(name="AMessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    //===============以上是验证Fanout Exchange的队列==========

    /**
     * exchange是交换机交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.
     *
     * Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.
     *
     * topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.
     *
     * headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.
     *
     * Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.
     */
    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }
    @Bean
    TopicExchange exchange() {
        // 参数1为交换机的名称
        return new TopicExchange("exchange");
    }
    /**
     * //配置广播路由器
     * @return FanoutExchange
     */
    @Bean
    FanoutExchange fanoutExchange() {
        // 参数1为交换机的名称
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue, DirectExchange directExchange){
        return  BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
    }

    /**
     * 将队列topic.message与exchange绑定,routing_key为topic.message,就是完全匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    // 如果参数名和上面用到方法名称一样,可以不用写@Qualifier
    Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 将队列topic.messages与exchange绑定,routing_key为topic.#,模糊匹配
     * @param queueMessages
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

    @Bean
    Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

    /**
     * rabbitTemplate是thread safe的,主要是channel不能共用,但是在rabbitTemplate源码里channel是threadlocal的,所以singleton没问题。
     * 但是rabbitTemplate要设置回调类,如果是singleton,回调类就只能有一个,所以如果想要设置不同的回调类,就要设置为prototype的scope。
     * @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true
        //每个rabbitTemplate只能有一个confirm-callback和return-callback,如果这里配置了,那么写生产者的时候不能再写confirm-callback和return-callback
        //使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //将对象序列化为json串
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

        /**
         * 如果消息没有到exchange,则confirm回调,ack=false
         * 如果消息到达exchange,则confirm回调,ack=true
         * exchange到queue成功,则不回调return
         * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    log.info("消息发送成功: correlationData:{}, ack{}, cause:{}", correlationData, ack, cause);
                }else{
                    log.error("消息发送失败: correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);
                }
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.error("消息丢失: exchange:{}, route:{}, replyCode:{}, replyText:{}, message:{}", exchange, routingKey, replyCode, replyText, new String(message.getBody()));
            }
        });

        return rabbitTemplate;
    }
}

rabbitMQ配置类大约就这些内容,里面我基本上都做了注释。

下面我们就开始写rabbitMQ的用法了

6、单生产者和单消费者

6.1、生产者

@Component 
public class HelloSender1 { 
    /**
     * AmqpTemplate可以说是RabbitTemplate父类,RabbitTemplate实现类RabbitOperations接口,RabbitOperations继承了AmqpTemplate接口
     */ 

    @Autowired 
    private AmqpTemplate rabbitTemplate;

    @Autowired 
    private RabbitTemplate rabbitTemplate1; 

    /**
     * 用于单生产者-》单消费者测试
     */
    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate1.convertAndSend("helloQueue", sendMsg);
    }
}

名为helloQueue的队列在配置类创建好了,项目启动的时候会自动创建

6.2、消费者

@Component
@RabbitListener(queues = "helloQueue") 
public class HelloReceiver1 {

    @RabbitHandler 
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }
}

@RabbitListener注解是监听队列的,当队列有消息的时候,它会自动获取。

@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

注意

  • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

  • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

  • application/octet-stream:二进制字节数组存储,使用 byte[]

  • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)

  • text/plain:文本数据类型存储,使用 String

  • application/json:JSON 格式,使用 Object、相应类型

6.3、controller

 /**
  * 最简单的hello生产和消费实现(单生产者和单消费者)
  */ 
@RequestMapping("/hello") 
public void hello() { 
    helloSender1.send(); 
} 

6.4、结果

控制台的结果:

Sender1 : hello1 Mon Feb 18 10:13:35 CST 2019

2019-02-18 10:13:35,831 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

Receiver1  : hello1 Mon Feb 18 10:13:35 CST 2019

7、单生产者对多消费者

7.1、生产者

/**
 * 用于单/多生产者-》多消费者测试
 */
public void send(String msg) {
    String sendMsg = msg + new Date();
    System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}

7.2、消费者

消费者1

@Component
@RabbitListener(queues = "helloQueue") 
public class HelloReceiver1 {

    @RabbitHandler 
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }
}

消费者2

@Component
@RabbitListener(queues = "helloQueue") 
public class HelloReceiver2 {

    @RabbitHandler public void process(String hello) {
        System.out.println("Receiver2  : " + hello);
    }
}

7.3、controller

/**
 * 单生产者-多消费者
 */
@RequestMapping("/oneToMany")
public void oneToMany() {
    for(int i=0;i<10;i++){
        helloSender1.send("hellomsg:"+i);
    }
}

7.4、结果:

Sender1 : hellomsg:0Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:1Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:2Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:3Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:4Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:5Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:6Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:7Mon Feb 18 10:19:10 CST 2019

Sender1 : hellomsg:8Mon Feb 18 10:19:10 CST 2019

Sender1 : hellomsg:9Mon Feb 18 10:19:10 CST 2019

Receiver2  : hellomsg:0Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:2Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:4Mon Feb 18 10:19:09 CST 2019

Receiver1  : hellomsg:1Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:6Mon Feb 18 10:19:09 CST 2019

Receiver1  : hellomsg:3Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:8Mon Feb 18 10:19:10 CST 2019

Receiver1  : hellomsg:5Mon Feb 18 10:19:09 CST 2019

Receiver1  : hellomsg:7Mon Feb 18 10:19:10 CST 2019

Receiver1  : hellomsg:9Mon Feb 18 10:19:10 CST 2019

2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,044 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)

8、实体类的传输,必须格式化

8.1、实体类

public class User implements Serializable { 

    private String name; 

    private String pass; 

    public String getName() { 
        return name;
    } 
   
    public void setName(String name) { 
        this.name = name;
    } 
    public String getPass() { 
        return pass;
    } 
    public void setPass(String pass) { 
        this.pass = pass;
    }

    @Override 
    public String toString() { 
        return "User{" +

                "name='" + name + '\'' +

                ", pass='" + pass + '\'' +

                '}';
    }
}

8.2、生产者

/**
 * 实体类的传输(springboot完美的支持对象的发送和接收,不需要格外的配置。实体类必须序列化)
 * @param user
 */
public void send(User user) {
    System.out.println("user send : " + user.getName()+"/"+user.getPass());     
    this.rabbitTemplate.convertAndSend("userQueue", user);
}

8.3、消费者

@Component
@RabbitListener(queues = "userQueue") 
public class HelloReceiver3 {

    @RabbitHandler 
    public void process(User user){
        System.out.println("user receive  : " + user.getName()+"/"+user.getPass());
    }
}

8.4、controller

/**
 * 实体列的传输
 */ 
@RequestMapping("/userTest") 
public void userTest(){
    User user=new User();
    user.setName("黄义波");
    user.setPass("123456");
    userSender.send(user);
}

8.5、结果

user send : 黄义波/123456
2019-02-18 10:24:24,251 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
user receive  : 黄义波/123456

9、directExchange

Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

9.1、在rabbitMQ配置类中添加内容

@Bean 
public Queue dirQueue() { 
    return new Queue("direct");
}

@Bean
DirectExchange directExchange(){ 
    return new DirectExchange("directExchange");
} 

/**
 * 将队列dirQueue与directExchange交换机绑定,routing_key为direct
 * @param dirQueue
 * @param directExchange
 * @return
 */ 
@Bean
Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue,DirectExchange directExchange){ 
    return  BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
}

9.2、生产者

@Component 
public class DirectSender {

    @Autowired 
    private AmqpTemplate rabbitTemplate; 

    public void send() {
        String msgString="directSender :hello i am hzb";
        System.out.println(msgString);   
        this.rabbitTemplate.convertAndSend("direct", msgString);
    }
}

9.3、消费者

@Component
@RabbitListener(queues = "direct") 
public class DirectReceiver {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("directReceiver  : " + msg);
    }
}

9.4、controller

@RequestMapping("/directTest") 
public void directTest() { 
    directSender.send(); 
}

9.5、结果

directSender :hello i am hyb
directReceiver  : directSender :hello i am hyb
2019-02-18 10:33:25,974 INFO (Application.java:175)- 消息发送成功:correlationData(null),ack(true),cause(null)

10、topicExchange

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

10.1、在rabbitMQ配置类中添加内容

// Bean默认的name是方法名 
@Bean(name="message") 
public Queue queueMessage() { 
    return new Queue("topic.message");
}

@Bean(name="messages") 
public Queue queueMessages() { 
    return new Queue("topic.messages");
}

@Bean
TopicExchange exchange() { 
    // 参数1为交换机的名称
    return new TopicExchange("exchange");
} 

/**
 * 将队列topic.message与exchange绑定,routing_key为topic.message,就是完全匹配
 * @param queueMessage
 * @param exchange
 * @return
 */ 
@Bean 
// 如果参数名和上面用到方法名称一样,可以不用写@Qualifier 
Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) { 
    return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");

}

/**
 * 将队列topic.messages与exchange绑定,routing_key为topic.#,模糊匹配
 * @param queueMessages
 * @param exchange
 * @return
 */ 
@Bean
Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) { 
    return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");

}

10.2、生产者

@Component 
public class TopicSender {

    @Autowired 
    private AmqpTemplate rabbitTemplate; 

    public void send() {

        String msg1 = "I am topic.mesaage msg======";

        System.out.println("sender1 : " + msg1); 
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);

        String msg2 = "I am topic.mesaages msg########";

        System.out.println("sender2 : " + msg2); 
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
    }
}

10.3、消费者

消费者1

@Component
@RabbitListener(queues = "topic.message") 
public class TopicMessageReceiver {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("topicMessageReceiver  : " +msg);
    }
}

消费者2

@Component
@RabbitListener(queues = "topic.messages") 
public class TopicMessagesReceiver {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("topicMessagesReceiver  : " +msg);
    }
}

10.4、controller

/**
 * topic exchange类型rabbitmq测试
 */ 
@RequestMapping("/topicTest") 
public void topicTest() { 
    topicSender.send(); 
} 

10.5、结果

sender1 : I am topic.mesaage msg======

sender2 : I am topic.mesaages msg########

topicMessageReceiver  : I am topic.mesaage msg======

topicMessagesReceiver  : I am topic.mesaage msg======

topicMessagesReceiver  : I am topic.mesaages msg########

2019-02-18 10:39:46,150 INFO (Application.java:175)- 消息发送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:39:46,206 INFO (Application.java:175)- 消息发送成功:correlationData(null),ack(true),cause(null)

11、fanoutExchange

Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.

11.1、在rabbitMQ配置类中添加内容

//===============以下是验证Fanout Exchange的队列==========
@Bean(name="AMessage") 
public Queue AMessage() { 
    return new Queue("fanout.A");
}

@Bean 
public Queue BMessage() { 
    return new Queue("fanout.B");
}

@Bean 
public Queue CMessage() { 
    return new Queue("fanout.C");
}

@Bean
FanoutExchange fanoutExchange() { 
    // 参数1为交换机的名称
    return new FanoutExchange("fanoutExchange");
}

@Bean
Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) { 
    return BindingBuilder.bind(AMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { 
    return BindingBuilder.bind(BMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { 
    return BindingBuilder.bind(CMessage).to(fanoutExchange);
}

11.2、生产者

@Component 
public class FanoutSender {

    @Autowired 
    private AmqpTemplate rabbitTemplate; 

    public void send() {
        String msgString="fanoutSender :hello i am hzb";
        System.out.println(msgString); // 参数2被忽略
        this.rabbitTemplate.convertAndSend("fanoutExchange","", msgString);
    }
}

11.3、消费者

消费者A

@Component
@RabbitListener(queues = "fanout.A") 
public class FanoutReceiverA {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("FanoutReceiverA  : " + msg);
    }
}

消费者B

@Component
@RabbitListener(queues = "fanout.B") 
public class FanoutReceiverB {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("FanoutReceiverB  : " + msg);
    }
}

消费者C

@Component
@RabbitListener(queues = "fanout.C") 
public class FanoutReceiverC {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("FanoutReceiverC  : " + msg);
    }
}

11.4、controller

/**
 * fanout exchange类型rabbitmq测试
 */ 

@RequestMapping("/fanoutTest") 
public void fanoutTest() { 
    fanoutSender.send(); 
} 

11.5、结果

fanoutSender :hello i am hzb

FanoutReceiverA  : fanoutSender :hello i am hyb

FanoutReceiverC  : fanoutSender :hello i am hyb

FanoutReceiverB  : fanoutSender :hello i am hyb

2019-02-18 10:45:38,760 INFO (Application.java:175)- 消息发送成功:correlationData(null),ack(true),cause(null)

12、配置类中的rabbitTemplate

@Bean
public RabbitTemplate rabbitTemplate(){
    //若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true
    //每个rabbitTemplate只能有一个confirm-callback和return-callback,如果这里配置了,那么写生产者的时候不能再写confirm-callback和return-callback
    //使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true
    connectionFactory.setPublisherConfirms(true);
    connectionFactory.setPublisherReturns(true);
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMandatory(true);

    /**
     * 如果消息没有到exchange,则confirm回调,ack=false
     * 如果消息到达exchange,则confirm回调,ack=true
     * exchange到queue成功,则不回调return
     * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
     */
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }else{
                log.info("消息发送失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        }
    });

    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
        }
    });

    return rabbitTemplate;
}

好好看看注释

13、不在配置类中配置callback

方法一:

13.1、配置一个接口

/**
 * 说明:〈定义一个名为SendMessageService 的接口,这个接口继承了RabbitTemplate.ConfirmCallback,

 * ConfirmCallback接口是用来回调消息发送成功后的方法,当一个消息被成功写入到RabbitMQ服务端时,

 * 会自动的回调RabbitTemplate.ConfirmCallback接口内的confirm方法完成通知
 */
public interface SendMessageService extends RabbitTemplate.ConfirmCallback{ 
    void sendMessage(String exchange,String routekey,Object message);

}

13.2、实现这个接口

/**
 * 说明:〈该类注入了RabbitTemplate,RabbitTemplate封装了发送消息的方法,我们直接使用即可。

 * 可以看到我们构建了一个回调返回的数据,并使用convertAndSend方法发送了消息。同时实现了confirm回调方法,

 * 通过判断isSendSuccess可以知道消息是否发送成功,这样我们就可以进行进一步处理。
 */ 
@Service 
public class SendMessageServiceImpl implements SendMessageService{  
    
    private static Logger logger = LoggerFactory.getLogger(SendMessageServiceImpl.class);

    @Autowired private RabbitTemplate rabbitTemplate;

    @Override 
    public void sendMessage(String exchange,String routekey,Object message) { 
        //设置回调对象 
        //rabbitTemplate.setConfirmCallback(this);           
        //rabbitTemplate.setMandatory(true); 
        //构建回调返回的数据
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());  
        
        //rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData);
        rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData);
        logger.info("SendMessageServiceImpl() >>> 发送消息到RabbitMQ, 消息内容: " + message);

    }

    /**
     * 消息回调确认方法
     * @param correlationData 回调数据
     * @param isSendSuccess   是否发送成功
     * @param
     */ 
    @Override 
    public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {
        logger.info("confirm回调方法>>>>>>>>>>>>>回调消息ID为: " + correlationData.getId()); 
        if (isSendSuccess) {
            logger.info("confirm回调方法>>>>>>>>>>>>>消息发送成功");
        } else {
            logger.info("confirm回调方法>>>>>>>>>>>>>消息发送失败" + s);
        }
    }
}

方法二:

直接在生产者发送信息的时候修改rabbitTemplate

@Service
public class SendMessage1 {
    
    private static Logger log = LoggerFactory.getLogger(SendMessage1.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routekey, Object message) {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                } else {
                    log.info("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                }
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
    }
}

13、有时候消费者出现错误,需要人工处理

//构建回调返回的数据
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

//rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData); 

//rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData); 

// 将 CorrelationData的id 与 Message的correlationId绑定,然后关系保存起来,例如放到缓存中,然后人工处理 

// 当confirm或return回调时,根据ack类别等,分别处理. 例如return或者ack=false则说明有问题,报警, 如果ack=true则删除关系 

// (因为return在confirm前,所以一条消息在return后又ack=true的情况也是按return处理)
Message message1 = MessageBuilder.withBody(message.toString().getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setCorrelationId(correlationData.getId()).build();

rabbitTemplate.send(exchange, routekey, message1, correlationData);

将 CorrelationData的id 与 Message的correlationId绑定,然后关系保存起来,例如放到缓存中,然后人工处理

image

我们可以看到,这两条消息关联起来了。

14、事务消息

消息确认机制

RabbitMQ提供了transaction、confirm两种消息确认机制。transaction即事务机制,手动提交和回滚;confirm机制提供了Confirmlistener和waitForConfirms两种方式。confirm机制效率明显会高于transaction机制,但后者的优势在于强一致性。如果没有特别的要求,建议使用confrim机制。

  • 1、从实验来看,消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。
  • 2、而为了确认消息是否被发送给queue,应该在发送消息中启用参数mandatory=true,使用ReturnListener接收未被发送成功的消息。
  • 3、接下来就需要确认消息是否被有效消费。publisher端目前并没有提供监听事件,但提供了应答机制来保证消息被成功消费,应答方式:
    • basicAck:成功消费,消息从队列中删除
    • basicNack:requeue=true,消息重新进入队列,false被删除
    • basicReject:等同于basicNack
    • basicRecover:消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer

RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制。RabbitMQ中,与事务机制有关的方法有三个:

  • txSelect():将当前channel设置成transaction模式。
  • txCommit():提交事务。
  • txRollback():回滚事务。
  • channel.basicPublish:发送消息,可以是多条,可以是消费消息提交ack。
  • autoAck=false,手动提交ack,以事务提交或回滚为准。
  • autoAck=true,不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了。

RabbitConfig

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
@Data
public class RabbitConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

}

RabbitUtil

  • 通过@PostConstruct注解,在依赖注入之后,初始化ConnectionFactory。
  • 提供创建ConnectionFactory的方法和创建Connection的方法。
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

@Component
public final class RabbitUtil {

    @Resource
    private RabbitConfig rabbitConfig;

    private ConnectionFactory factory;

    @PostConstruct
    public void init() {
        factory = new ConnectionFactory();
        factory.setHost(rabbitConfig.getHost());
        factory.setPort(rabbitConfig.getPort());
        factory.setUsername(rabbitConfig.getUsername());
        factory.setPassword(rabbitConfig.getPassword());
        factory.setVirtualHost(rabbitConfig.getVirtualHost());
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(60000L);
    }

    public ConnectionFactory newConnectionFactory() {
        return factory;
    }

    public Connection newConnection() throws IOException, TimeoutException {
        return factory.newConnection();
    }
}

TransactionProducer

import com.panda.rabbitmq.config.RabbitUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

@Component
public class TransactionProducer {

    private final static String EXCHANGE_NAME = "publisherconfirm-exchange";

    @Resource
    private RabbitUtil rabbitUtil;


    public void send(String routingKey, String message) throws IOException, TimeoutException {
        Channel channel = null;
        try {
            Connection connection = rabbitUtil.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 声明一个direct交换机
            // 开启事务
            channel.txSelect();
            channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

            //int i = 1 / 0;
            
            // 提交事务
            channel.txCommit();
        } catch (Exception e) {
            // 回滚事务
            if (channel != null) {
                channel.txRollback();
            }
        } finally {
            if (channel != null) {
                channel.close();
            }
        }
    }
}

参考:
Springboot整合RabbitMQ

https://zhuanlan.zhihu.com/p/582787505

http://www.uml.org.cn/zjjs/202009082.asp

https://codeleading.com/article/2222792253/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容