一. 一些基本概念
消息队列
RabbitMQ
RabbitMQ-简书
柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍
AMQP
二.依赖和配置
引入rabbitmq相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
相关配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5276
spring.rabbitmq.username=guest
speing.rabbitmq.password=guest
三、使用样例3(Topic Exchange)
配置Topic Exchange
@Configuration
public class RabbitTopicConfig {
public final static String TOPICNAME = "yzh-topic";
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPICNAME,true,false);
}
@Bean
Queue queueUsa(){ return new Queue("queue-usa"); }
@Bean
Queue queueNews(){ return new Queue("queue-news"); }
@Bean
Queue queueWeather(){ return new Queue("queue-weather"); }
@Bean
Queue queueEurope(){ return new Queue("queue-europe"); }
@Bean
Binding usaBinding(){
return BindingBuilder.bind(queueUsa()).to(topicExchange()).with("usa.#");
}
@Bean
Binding newsBinding(){
return BindingBuilder.bind(queueNews()).to(topicExchange()).with("#.news");
}
@Bean
Binding weatherBinding(){
return BindingBuilder.bind(queueWeather()).to(topicExchange()).with("#.weather");
}
@Bean
Binding europeBinding(){
return BindingBuilder.bind(queueEurope()).to(topicExchange()).with("europe.#");
}
}
配置消费者
@Component
public class TopicReceiver {
@RabbitListener(queues = "queue-usa")
public void usa(String msg){
System.out.println("queue-usa:" + msg);
}
@RabbitListener(queues = "queue-news")
public void news(String msg){
System.out.println("queue-news:" + msg);
}
@RabbitListener(queues = "queue-weather")
public void weather(String msg){
System.out.println("queue-weather:" + msg);
}
@RabbitListener(queues = "queue-europe")
public void europe(String msg){
System.out.println("queue-europe:" + msg);
}
}
测试
注入RabbitTemplate
来发送消息
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void test2(){
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"usa.news","This is a news about usa");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"usa.weather","This is a weather about usa");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"europe.news","This is a news about europe");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"europe.weather","This is a weather about europe");
}
启动项目,运行测试方法
四 使用样例4(Header Exchange)
配置Header Exchange
@Configuration
public class RabbitHeaderConfig {
public static final String HEADERNAME = "yzh-header";
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange(HEADERNAME,true,false);
}
@Bean
Queue nameQueue(){ return new Queue("name-queue"); }
@Bean
Queue ageQueue(){return new Queue("age-queue");}
@Bean
Binding bindingName(){
Map<String,Object> map = new HashMap<>();
map.put("name","yzh");
return BindingBuilder.bind(nameQueue())
.to(headersExchange())
.whereAny(map)// 消息只要有一个Header匹配上map中的key/value,便路由到这个Queue上
.match();
}
@Bean
Binding bindingAge(){
return BindingBuilder.bind(ageQueue())
.to(headersExchange())
.where("age") //只要消息Header中包含age(无论值多少),便路由到这个Queue上
.exists();
}
}
配置消费者
@Component
public class HeaderReceiver {
@RabbitListener(queues = "name-queue")
public void name(byte[] msg){
System.out.println("HeaderReceiver>>name:" + new String(msg,0,msg.length));
}
@RabbitListener(queues = "age-queue")
public void age(byte[] msg){
System.out.println("HeaderReceiver>>age:" + new String(msg,0,msg.length));
}
}
测试
@Test
void test3(){
Message nameMsg = MessageBuilder
.withBody("Hello , queue-name".getBytes())
.setHeader("name","yzh").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null,nameMsg);
Message ageMsg = MessageBuilder
.withBody("Hello , queue-age".getBytes())
.setHeader("age","18").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null,ageMsg);
}