SpringBoot RabbitMQ 七种工作模式入门

RabbitMQ 工作模式

简单模式
  • 简单:一个生产者、一个队列和一个消费者,生产者发送消息至队列,消费者监听队列并消费消息
Work 模式
  • Work:一个生产者、一个队列和多个消费者,生产者发送消息至队列,多个消费者监听同一队列消费消息
发布/订阅模式
  • 发布/订阅:publish/subscribe 模式包含一个生产者、一个交换机、多个队列及多个消费者,交换机(Exchange)和队列直接绑定,生产者通过交换机(Exchange)将消息存储在与交换机绑定的队列中,消费者监听队列并进行消费
路由模式
  • 路由:routing 模式可以根据 routing key 将消息发送给指定队列,交换机(Exchange)和队列通过routing key 进行绑定,生产者通过交换机(Exchange)和 routing key 将消息精准发送至队列,消费者监听队列并消费消息
主题模式
  • 主题:Topics 模式在路由模式的基础上支持通配符操作,交换机会根据通配符将消息存储在匹配成功的队列中,消费者监听队列并进行消费
Header 模式
  • Header:header 模式取消了 routing key,而是使用 header 中的 key/value 键值对来进行匹配,匹配成功后消息会通过交换机发送给队列,消息者才能获取到消息并消费
RPC 模式
  • RPC:RPC 模式主要针对需要获取消费者处理结果的情况,通常是生产者将消息发送给了消费者,消费者接收到消息并进行消费后返回给生产者处理结果

SpringBoot 集成 RabbitMQ

  • 首先创建一个SpringBoot 项目,pom.xml 文件加入以下依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
  • 配置文件修改,加入以下 RabbitMQ 配置
server:
  port: 8888  # 设置端口号

spring:
  rabbitmq:
    host: 127.0.0.1  # 设置RabbitMQ所在主机
    port: 5672       # 设置RabbitMQ服务端口
    username: guest  # 设置RabbitMQ用户名
    password: guest  # 设置RabbitMQ密码
  • 新增公共常量类
public interface RabbitConstant {

    /**
     * 简单模式
     */
    String SIMPLE_QUEUE_NAME = "simple_queue";

    /**
     * work 模式
     */
    String WORK_QUEUE_NAME = "work_queue";

    /**
     * 发布/订阅(publish/subscribe)模式
     */
    String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange";
    String PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME = "publish_subscribe_first_queue";
    String PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME = "publish_subscribe_second_queue";

    /**
     * 路由(routing)模式
     */
    String ROUTING_EXCHANGE_NAME = "routing_exchange";
    String ROUTING_FIRST_QUEUE_NAME = "routing_first_queue";
    String ROUTING_SECOND_QUEUE_NAME = "routing_second_queue";
    String ROUTING_THIRD_QUEUE_NAME = "routing_third_queue";
    String ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME = "routing_first_queue_routing_key";
    String ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME = "routing_second_queue_routing_key";
    String ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME = "routing_third_queue_routing_key";

    /**
     * 主题(topics)模式
     */
    String TOPICS_EXCHANGE_NAME = "topics_exchange";
    String TOPICS_FIRST_QUEUE_NAME = "topics_first_queue";
    String TOPICS_SECOND_QUEUE_NAME = "topics_second_queue";
    String TOPICS_THIRD_QUEUE_NAME = "topics_third_queue";
    String TOPICS_FIRST_QUEUE_ROUTING_KEY = "topics.first.routing.key";
    String TOPICS_SECOND_QUEUE_ROUTING_KEY = "topics.second.routing.key";
    String TOPICS_THIRD_QUEUE_ROUTING_KEY = "topics.third.routing.key";
    String TOPICS_ROUTING_KEY_FIRST_WILDCARD = "#.first.#";
    String TOPICS_ROUTING_KEY_SECOND_WILDCARD = "*.second.#";
    String TOPICS_ROUTING_KEY_THRID_WILDCARD = "*.third.*";

    /**
     * header 模式
     */
    String HEADER_EXCHANGE_NAME = "header_exchange";
    String HEADER_FIRST_QUEUE_NAME = "header_first_queue";
    String HEADER_SECOND_QUEUE_NAME = "header_second_queue";

    /**
     * rpc 模式
     */
    String RPC_QUEUE_NAME = "rpc_queue";

}
  • 新增 Controller 请求类(用于验证结果,可最后新增)
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RestController
public class RabbitController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/simple")
    public void simple() {
        rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!");
    }

    @GetMapping(value = "/work")
    public void work() {
        rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!");
    }

    @GetMapping(value = "/pubsub")
    public void pubsub() {
        rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello");
    }

    @GetMapping(value = "/routing")
    public void routing() {
        // 给第一个队列发送消息
        rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello");
    }

    @GetMapping(value = "/topics")
    public void topics() {
        // 给第一个队列发送消息,此时队列能接受到消息,因为队列通配符为 #.first.#,而 routing_key 为 topics.first.routing.key,匹配成功
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
        // 给第二个队列发送消息,此时队列也能接受到消息,因为队列通配符为 *.second.#,而 routing_key 为 topics.second.routing.key,匹配成功
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
        // 给第三个队列发送消息,此时队列无法接受到消息,因为队列通配符为 *.third.*,而 routing_key 为 topics.third.routing.key,匹配失败
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
    }

    @GetMapping(value = "/header")
    public void header() {
        // 这条消息应该能被两个队列都接收到,第一个队列 all 匹配成功,第二个队列 hello-value any匹配成功
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("matchAll", "YES");
        messageProperties.setHeader("hello", "world");
        Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message);

        // 这条消息应该只被第二个队列接受,第一个队列 all 匹配失败,第二个队列 matchAll-NO any匹配成功
        MessageProperties messagePropertiesSecond = new MessageProperties();
        messagePropertiesSecond.setHeader("matchAll", "NO");
        Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond);
        rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond);
    }

    @GetMapping(value = "/rpc")
    public void rpc() {
        Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!");
        System.out.println("rabbit rpc response message: " + responseMsg);
    }

}
SpringBoot RabbitMQ 简单模式
  • 生产者声明队列,并向队列发送消息
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitSimpleProvider {

    @Bean
    public Queue simpleQueue() {
        return new Queue(RabbitConstant.SIMPLE_QUEUE_NAME);
    }

}
  • 消费者监听队列,并消费消息
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitSimpleConsumer {

    @RabbitHandler
    @RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_NAME)
    public void simpleListener(String context) {
        System.out.println("rabbit receiver: " + context);
    }

}
  • 单元测试
@Test
public void simple() {
    rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!");
}
  • 响应结果
SpringBoot RabbitMQ Work 模式
  • 生产者声明队列,并向队列生产消息
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitWorkProvider {

    @Bean
    public Queue workQueue() {
        return new Queue(RabbitConstant.WORK_QUEUE_NAME);
    }

}
  • 消费者监听队列,并消费消息(这里有两个消费者监听同一队列)
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitWorkConsumer {

    @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
    @RabbitHandler
    public void workQueueListenerFirst(String context) {
        System.out.println("rabbit workQueue listener first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
    @RabbitHandler
    public void workQueueListenerSecond(String context) {
        System.out.println("rabbit workQueue listener second receiver: " + context);
    }

}
  • 单元测试
@Test
public void work() {
    rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!");
}
  • 响应结果(由于有两个消费者监听同一队列,消息只能被其中一者进行消费,默认是负载均衡的将消息发送给所有消费者)
SpringBoot RabbitMQ 发布/订阅模式
  • 生产者声明两个队列和一个 fanout 交换机,并将这两个队列和交换机进行绑定
  • 交换机种类一共有四种 fanout、direct、topic、header(文末介绍)
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitPublishSubscribeProvider {

    @Bean
    public Queue pubsubQueueFirst() {
        return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue pubsubQueueSecond() {
        return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 创建fanout类型交换机,表示与此交换机会将消息发送给所有绑定的队列
        return new FanoutExchange(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME);
    }

    @Bean
    public Binding pubsubQueueFirstBindFanoutExchange() {
        // 队列一绑定交换机
        return BindingBuilder.bind(pubsubQueueFirst()).to(fanoutExchange());
    }

    @Bean
    public Binding pubsubQueueSecondBindFanoutExchange() {
        // 队列二绑定交换机
        return BindingBuilder.bind(pubsubQueueSecond()).to(fanoutExchange());
    }

}
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitPublishSubscribeConsumer {

    @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void pubsubQueueFirst(String context) {
        System.out.println("rabbit pubsub queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void pubsubQueueSecond(String context) {
        System.out.println("rabbit pubsub queue second receiver: " + context);
    }

}
  • 单元测试
@Test
public void pubsub() {
    rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello");
}
  • 响应结果
SpringBoot RabbitMQ 路由模式
  • 生产者声明三个队列和一个 direct 交换机,将这三个队列和交换机进行绑定并设定交换机与队列之间的路由
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRoutingProvider {

    @Bean
    public Queue rabbitRoutingFirstQueue() {
        return new Queue(RabbitConstant.ROUTING_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue rabbitRoutingSecondQueue() {
        return new Queue(RabbitConstant.ROUTING_SECOND_QUEUE_NAME);
    }

    @Bean
    public Queue rabbitRoutingThirdQueue() {
        return new Queue(RabbitConstant.ROUTING_THIRD_QUEUE_NAME);
    }

    @Bean
    public DirectExchange directExchange() {
        // 创建direct类型交换机,表示与此交换机会将消息发送给 routing_key 完全相同的队列
        return new DirectExchange(RabbitConstant.ROUTING_EXCHANGE_NAME);
    }

    @Bean
    public Binding routingFirstQueueBindDirectExchange() {
        // 队列一绑定direct交换机,并设置 routing_key 为 routing_first_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingFirstQueue()).to(directExchange()).with(RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME);
    }

    @Bean
    public Binding routingSecondQueueBindDirectExchange() {
        // 队列二绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingSecondQueue()).to(directExchange()).with(RabbitConstant.ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME);
    }

    @Bean
    public Binding routingThirdQueueBindDirectExchange() {
        // 队列三绑定direct交换机,并设置 routing_key 为 routing_third_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingThirdQueue()).to(directExchange()).with(RabbitConstant.ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME);
    }

}
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitRoutingConsumer {

    @RabbitListener(queues = RabbitConstant.ROUTING_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void routingFirstQueueListener(String context) {
        System.out.println("rabbit routing queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.ROUTING_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void routingSecondQueueListener(String context) {
        System.out.println("rabbit pubsub queue second receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.ROUTING_THIRD_QUEUE_NAME)
    @RabbitHandler
    public void routingThirdQueueListener(String context) {
        System.out.println("rabbit pubsub queue third receiver: " + context);
    }

}
  • 单元测试
@Test
public void routing() {
    // 给第一个队列发送消息
    rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello");
}
  • 响应结果
SpringBoot RabbitMQ 主题模式
  • 生产者声明三个队列和一个 topic 交换机,队列分别与 topic 交换机绑定并设置 routing key 统配符,若 routing key 满足交换机与队列间通配符要求则将消息存储至队列
  • # 通配符可以匹配一个或多个单词,* 通配符可以匹配一个单词;假如交换机(Exchange)与队列之间的 routing key 通配符为 #.hello.#,则代表 routing key 中间带有 hello 单词的都满足条件,消息将存储至队列
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTopicProvider {

    @Bean
    public Queue topicFirstQueue() {
        return new Queue(RabbitConstant.TOPICS_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue topicSecondQueue() {
        return new Queue(RabbitConstant.TOPICS_SECOND_QUEUE_NAME);
    }

    @Bean
    public Queue topicThirdQueue() {
        return new Queue(RabbitConstant.TOPICS_THIRD_QUEUE_NAME);
    }

    @Bean
    public TopicExchange topicExchange() {
        // 创建topic类型交换机,表示与此交换机会将消息发送给 routing_key 通配符匹配成功的队列
        return new TopicExchange(RabbitConstant.TOPICS_EXCHANGE_NAME);
    }

    @Bean
    public Binding topicFirstQueueBindExchange() {
        // 队列一绑定topic类型交换机,并设置 routing_key 通配符为 #.first.#
        return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_FIRST_WILDCARD);
    }

    @Bean
    public Binding topicSecondQueueBindExchange() {
        // 队列二绑定topic类型交换机,并设置 routing_key 通配符为 *.second.#
        return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_SECOND_WILDCARD);
    }

    @Bean
    public Binding topicThirdQueueBindExchange() {
        // 队列三绑定topic类型交换机,并设置 routing_key 通配符为 *.third.*
        return BindingBuilder.bind(topicThirdQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_THRID_WILDCARD);
    }

}
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitTopicsConsumer {

    @RabbitListener(queues = RabbitConstant.TOPICS_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void topicFirstQueue(String context) {
        System.out.println("rabbit topics queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.TOPICS_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void topicSecondQueue(String context) {
        System.out.println("rabbit topics queue second receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.TOPICS_THIRD_QUEUE_NAME)
    @RabbitHandler
    public void topicThirdQueue(String context) {
        System.out.println("rabbit topics queue third receiver: " + context);
    }

}
  • 单元测试
@Test
public void topics() {
    // 给第一个队列发送消息,此时队列能接受到消息,因为队列通配符为 #.first.#,而 routing_key 为 topics.first.routing.key,匹配成功
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
    // 给第二个队列发送消息,此时队列也能接受到消息,因为队列通配符为 *.second.#,而 routing_key 为 topics.second.routing.key,匹配成功
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
    // 给第三个队列发送消息,此时队列无法接受到消息,因为队列通配符为 *.third.*,而 routing_key 为 topics.third.routing.key,匹配失败
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
    }
  • 响应结果
SpringBoot RabbitMQ header模式
  • 生产者声明队列并创建 HeaderExchange 交换机,将队列分别与交换机通过 header 进行绑定
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitHeaderProvider {

    @Bean
    public Queue headerFirstQueue() {
        return new Queue(RabbitConstant.HEADER_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue headerSecondQueue() {
        return new Queue(RabbitConstant.HEADER_SECOND_QUEUE_NAME);
    }

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(RabbitConstant.HEADER_EXCHANGE_NAME);
    }

    @Bean
    public Binding headerFirstQueueBindExchange() {
        Map<String, Object> headersMap = new HashMap<>(8);
        headersMap.put("matchAll", "YES");
        headersMap.put("hello", "world");

        return BindingBuilder.bind(headerFirstQueue()).to(headersExchange()).whereAll(headersMap).match();
    }

    @Bean
    public Binding headerSecondQueueBindExchange() {
        Map<String, Object> headersMap = new HashMap<>(8);
        headersMap.put("matchAll", "NO");
        headersMap.put("hello", "world");

        return BindingBuilder.bind(headerSecondQueue()).to(headersExchange()).whereAny(headersMap).match();
    }

}
  • 交换机与队列绑定详情
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitHeaderConsumer {

    @RabbitListener(queues = RabbitConstant.HEADER_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void headerFirstQueue(String context) {
        System.out.println("rabbit header queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.HEADER_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void headerSecondQueue(String context) {
        System.out.println("rabbit header queue second receiver: " + context);
    }

}
  • 单元测试
@Test
public void header() {
    // 这条消息应该能被两个队列都接收到,第一个队列 all 匹配成功,第二个队列 hello-value any匹配成功
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setHeader("matchAll", "YES");
    messageProperties.setHeader("hello", "world");
    Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties);
    rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message);

    // 这条消息应该只被第二个队列接受,第一个队列 all 匹配失败,第二个队列 matchAll-NO any匹配成功
    MessageProperties messagePropertiesSecond = new MessageProperties();
    messagePropertiesSecond.setHeader("matchAll", "NO");
    Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond);
    rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond);
}
  • 响应结果
SpringBoot RabbitMQ RPC模式
  • 生产者声明队列
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRpcProvider {

    @Bean
    public Queue rpcQueue() {
        return new Queue(RabbitConstant.RPC_QUEUE_NAME);
    }

}
  • 消费者监听队列,并进行消费
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitRpcConsumer {

    @RabbitListener(queues = RabbitConstant.RPC_QUEUE_NAME)
    @RabbitHandler
    public String rpcQueue(String context) {
        System.out.println("rabbit rpc queue receiver: " + context);
        return "copy that!";
    }

}
  • 单元测试
@Test
public void rpc() {
    Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!");
    System.out.println("rabbit rpc response message: " + responseMsg);
}
  • 响应结果

Exchange 交换机

  • fanout 交换机:通过fanout交换机发送消息,则与fanout交换机绑定的所有队列都能接收到该消息
  • direct 交换机:通过direct交换机发送消息,则与direct交换机绑定队列中routing key完全一致的队列能接收到消息
  • topic 交换机:通过topic交换机发送消息,则与topic交换机绑定队列中routing key经过通配符匹配成功的队列能接收到消息

文章还有很多不足之处,欢迎各位兄弟姐妹批评指正,代码仓库已存放至gitee [SpringBoot RabbitMQ 工作模式仓库链接](https://gitee.com/BarryMan/spring-boot-rabbitmq.git)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345