exchanges 做什么用的?
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。exchange有下列四种类型,不同类型的exchange转发消息的策略有所区别。
direct
消息中的路由键(routing key)如果和Binding中的binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果队列绑定到交换机要求路由键为“dog”,则只转发routing key标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。他是完全匹配、单播的模式。
famout
每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理路由见,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,没太子网内的主机都获得一份复制消息。fanout类型转发消息是最快的。
headers
headers匹配AMQP消息的header而不是路由键,headers交换器和direct交换器完全一致,但是性能差很多,目前几乎用不到了。
topic
topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:#
,*
。#
匹配0个或多个单词, *
匹配一个单词。
代码块
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
com.sz.blog.config.RabbitConfig
创建交换器与队列,并使交换器与队列绑定
package com.sz.blog.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
//创建队列
@Bean
public Queue fanout(){
return new Queue("fanout.all");
}
//创建交换器
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
//队列与交换器绑定
@Bean
public Binding bindingExchange(){
return BindingBuilder.bind(fanout()).to(fanoutExchange());
}
}
com.sz.blog.mq.FanoutConsumer
监听队列消息(队列消费者)
package com.sz.blog.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues="fanout.all")
public class FanoutConsumer {
//处理队列消息
@RabbitHandler
public void received(String msg){
System.out.println("fanout.all :" + msg);
}
}
com.sz.blog.mq.RabbitProducer
往队列中插入消息(消息提供者)
package com.sz.blog.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendFanout(){
rabbitTemplate.convertAndSend("fanoutExchange","","fanout data string");
}
}
测试用例代码
package com.sz.blog;
import com.sz.blog.mq.RabbitProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
@SpringBootTest
class BlogApplicationTests {
@Autowired
private RabbitProducer rabbitProducer;
@Test
void rabbitTest(){
rabbitProducer.sendDemoQueue();
rabbitProducer.sendFanout();
}
}