RabbitMQ
简介
RabbitMQ是一个消息代理:它接收和转发消息。可以将它看做一个邮局:你把邮件放到邮箱里,会有邮递员将邮件送到接收人。这个比喻里,RabbitMQ就是邮箱,邮局和邮递员。
Queue就像是邮箱。虽然消息是从RabbitMQ到程序里,但消息只能放到Queue里。Queue的上限取决于主机的内容和硬盘限制。多个生产者可以将消息发到同一个Queue里,多个消费者可以从同一个Queue里取出消息。
以下为Spring-AMQP里对应的接口或类
Exchange
Exchange接收生产者发送的消息,并将它们放到Queue里。exchange决定如何处理收到的消息。消息是否应该放到指定的Queue里,是否添加到多个Queue里,或者是否被丢弃。这些都由exchange type定义。
可用的exchange type有:direct
, topic
, headers
和fanout
。
- fanout: 不需要routing key,将消息发送到所有的Queue里
- direct: 将Queue绑定到固定的routing key对应的Queue上。
- topic: 支持通配符
*
和#
方式匹配多个routing key上。topic的方式可以满足fanout
和direct
两种类型。
Queue
Queue代表消息消费者接收消息的一个组件。
Binding
@Bean
public Queue someQueue() {
Queue queue = new Queue("someQueue");
return queue;
}
@Bean
public DirectExchange someExchange() {
DirectExchange directExchange = new DirectExchange("someExchange");
return directExchange;
}
上面的方式queue和exchange默认是持久化的
- 绑定Queue到DirectExchange:
new Binding(someQueue, someDirectExchange, "foo.bar")
- 绑定Queue到TopicExchange:
new Binding(someQueue, someTopicExchange, "foo.*")
- 绑定Queue到FanoutExchange
new Binding(someQueue, someFanoutExchange)
spring-amqp还提供了BindingBuilder
创建:
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*")
还有可以通过@RabbitListener
注解直接生成queue和exchange的绑定关系,同时创建了消费者,可谓是一气呵成。
@RabbitListener(bindings = @QueueBinding(value = @org.springframework.amqp.rabbit.annotation.Queue(value = "topicQueue", durable = "true"),
exchange = @Exchange(value = "topic", type = ExchangeTypes.TOPIC, durable = "true")))
上面代码会自动创建名为topicQueue
的queue和topic
的topicExchange并建立绑定关系 ,这种方式创建的queue和exchange的持久化默认是false
。需要显示指定durable
。
发送消息
@Autowired
private RabbitTemplate template;
@Override
public void sendMessage(String routingKey, Object data) {
template.convertAndSend("exchangeName", routingKey, data);
}
// 发送延迟消息,注意exchange的delay属性要设置成true
@Override
public void sendDelayMessage(String routingKey, Object data, int delay) {
template.convertAndSend("exchangeName", routingKey, data, m -> {
m.getMessageProperties().setDelay(delay);
return m;
});
}
测试当routingKey为空字符串的时候,好像所有的routingkey都能收到。
接收消息
@RabbitListener(bindings = @QueueBinding(value = @org.springframework.amqp.rabbit.annotation.Queue(value = "topicQueue", durable = "true"),
exchange = @Exchange(value = "topic", type = ExchangeTypes.TOPIC, durable = "true")))
public void receiveMessage(String message) {
}
或者已有创建的queue:
@RabbitListener(queues="someQueue")
public void receiveMessage(String message) {
}
可以创建一个bean,省去强制转换:
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
return converter;
}