此文章参考:http://blog.csdn.net/zl18310999566/article/details/54341057
发送端服务器
配置文件信息
@Configuration
public class RabbitConfig {
/*此处静态常量,在原作中为了保证名称统一使用,可以不声明*/
public static final String FOO_EXCHANGE = "callback.exchange.foo";
public static final String FOO_EXCHANGE_TOPIC = "callback.topic.exchange.foo";
public static final String FOO_ROUTINGKEY = "callback.routingkey.foo";
public static final String FOO_QUEUE = "callback.queue.foo";
/*end*/
/*显式使用配置文件信息*/
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;
/*end*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
/** 如果要进行消息回调,则这里必须要设置为true */
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
@Bean
/** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调 类为最后一次设置 */
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
/*
@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE)这个是说在每次注入的时候回自动创建一个新的bean实例
@Scope(value=ConfigurableBeanFactory.SCOPE_SINGLETON)单例模式,在整个应用中只能创建一个实例
@Scope(value=WebApplicationContext.SCOPE_GLOBAL_SESSION)全局session中的一般不常用
@Scope(value=WebApplicationContext.SCOPE_APPLICATION)在一个web应用中只创建一个实例
@Scope(value=WebApplicationContext.SCOPE_REQUEST)在一个请求中创建一个实例
@Scope(value=WebApplicationContext.SCOPE_SESSION)每次创建一个会话中创建一个实例
*/
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory()); //如果上面未使用显示调用声明 ConnectionFactory,此处运行会报错:ConnectionFactory is null
return template;
}
消息发送
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback{
private RabbitTemplate rabbitTemplate;
@Autowired
public MessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
public void send(String content) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(
RabbitConfig.FOO_EXCHANGE_TOPIC, //交换机
RabbitConfig.FOO_ROUTINGKEY,//路由
content, //内容
correlationData
);
}
public void sendTopic(String content){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(
RabbitConfig.FOO_EXCHANGE_TOPIC, //交换机
RabbitConfig.FOO_ROUTINGKEY + ".topic",//路由
content, //内容
correlationData
);
}
/**
* 回调方法
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
}
接收端服务器
@Component
@Configuration
@RabbitListener(queues = RabbitMQFinal.FOO_QUEUE)
public class MessageReceiver {
/**
* 设置交换机
*
* @return
*/
@Bean
public DirectExchange directExchange() {
/**
* DirectExchange : 按照routingkey分发到指定队列
* TopicExchange : 多关键字匹配
* FanoutExchange : 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange : 通过添加属性key-value匹配
*/
return new DirectExchange(RabbitMQFinal.FOO_EXCHANGE);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(RabbitMQFinal.FOO_EXCHANGE_TOPIC);
}
/**
* 注册队列
*
* @return
*/
@Bean
public Queue queue() {
return new Queue(RabbitMQFinal.FOO_QUEUE);
}
/**
* 将队列绑定至交换机
*
* @return
*/
@Bean
public Binding binding() {
//绑定队列
return BindingBuilder.bind(queue()) //绑定队列
.to(topicExchange()) //设置交换机
.with(RabbitMQFinal.FOO_ROUTINGKEY + ".#");//设置 routingkey topic采用模糊匹配
}
@RabbitHandler
public void process(@Payload String content) {
System.out.println("Receiver Value : " + content);
}
}
测试类
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = WebClientCoreApplication.class)
@WebAppConfiguration
public class RabbitTest {
@Autowired
MessageSender messageSender;
@Test
public void testRabbit(){
String content = "hello " + new Date();
System.out.println("Sender-message : " + content);
messageSender.send(content);
}
}
//WebClientCoreApplication 为当前服务的启动类
测试过程
- 启动接收服务器
- 执行测试类
期望值
- 发送服务器正常打印
- 接收服务器正常打印
- RabbitMQ 管理端可以正常查阅发送记录
如果接收端未配置可用消息队列,接收端会循环检测。
写在后面(可忽略)
通过查看RabbitMQ 管理端页面
消息通过 ROUTINGKEY 完成绑定
生产者者不需要再声明队列,只需在消息中绑定 ROUTINGKEY 即可。
交换机设置,队列处理交由消息监听者处理。