Spring AMQP
先对本片博客进行总结,可直接跳过总结看下面的博客正文。
总结:
spring-amqp二个核心类RabbitAdmin和RabbitTemplate类
1.RabbitAdmin类完成对Exchange,Queue,Binging的操作,在容器中管理了RabbitAdmin类的时候,可以对Exchange,Queue,Binging进行自动声明。
2.RabbitTemplate类是发送和接收消息的工具类。(下一篇博客具体讲解)
简介
Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。 它提供了一个“模板”(template)作为发送和接收消息的高级抽象。 它还通过“侦听器容器(listener container)”为消息驱动的POJO提供支持。 这些库促进AMQP资源的管理,同时促进使用依赖注入和声明式配置。 在所有这些情况下,您将看到与Spring框架中的JMS支持的相似之处。
Spring AMQP包括两个部分;spring-amqp是对amqp的一些概念的一些抽象。spring-rabbit是对AMQP的实现RabbitMQ的实现。
特征
- 异步处理消费消息的一个监听容器(
Listener container
) - 使用
RabbitTemplate
类的实例来发送和接收消息。 - 使用
RabbitAdmin
去自动声明队列(queues
),交换机(exchanges
),绑定(bindings
)
spring-amqp
模块是对AMQP协议的一个抽象和封装。所以说对所有的AMQP的实现都进行的抽象和封装,比如
org.springframework.amqp.core.Binding
:绑定的封装,类型有QUEUE
和EXCHANGE
。
org.springframework.amqp.core.Exchange
:其有基本的四种实现
org.springframework.amqp.core.Message
:消息是由属性和body构成,将属性也封装成一个对象MessageProperties。
org.springframework.amqp.core.MessageProperties
:对消息属性进行了抽象。
org.springframework.amqp.core.Queue
:队列的封装。
还有对消息的转换进行了封装,相关的类在org.springframework.amqp.support.converter
包下面。(下面的博客会专门讲解消息转换converter的一些实现)。
spring-rabbit
模块是建立在spring
,spring-amqp
,amqp-client
(rabbitmq java client)之上的,是具体操作RabbitMQ的,底层对Rabbitmq
的操作是使用amqp-client
的。
二个核心类,一个是org.springframework.amqp.rabbit.core.RabbitAdmin
和org.springframework.amqp.rabbit.core.RabbitTemplate
spring-rabbit
对日志进行了扩展,可以将日志发送到mq中。
Demo
加入spring-amqp依赖:
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.3.RELEASE</version>
</dependency>
</dependencies>
RabbitmqAdmin使用
容器中纳入ConnectionFactory和RabbitAdmin管理
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
}
应用类,使用RabbitAdmin进行Exchange,Queue,Binding操作
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.HashMap;
import java.util.Map;
@ComponentScan
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
System.out.println(rabbitAdmin);
//创建四种类型的Exchange,可重复执行
rabbitAdmin.declareExchange(new DirectExchange("zhihao.direct.exchange",true,false));
rabbitAdmin.declareExchange(new TopicExchange("zhihao.topic.exchange",true,false));
rabbitAdmin.declareExchange(new FanoutExchange("zhihao.fanout.exchange",true,false));
rabbitAdmin.declareExchange(new HeadersExchange("zhihao.header.exchange",true,false));
//删除Exchange
//rabbitAdmin.deleteExchange("zhihao.header.exchange");
//定义队列
rabbitAdmin.declareQueue(new Queue("zhihao.debug",true));
rabbitAdmin.declareQueue(new Queue("zhihao.info",true));
rabbitAdmin.declareQueue(new Queue("zhihao.error",true));
//删除队列
//rabbitAdmin.deleteQueue("zhihao.debug");
//将队列中的消息全消费掉
rabbitAdmin.purgeQueue("zhihao.info",false);
//绑定,指定要绑定的Exchange和Route key
rabbitAdmin.declareBinding(new Binding("zhihao.debug",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.hehe",new HashMap()));
rabbitAdmin.declareBinding(new Binding("zhihao.info",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.haha",new HashMap()));
rabbitAdmin.declareBinding(new Binding("zhihao.error",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.welcome",new HashMap()));
//绑定header exchange
Map<String,Object> headerValues = new HashMap<>();
headerValues.put("type",1);
headerValues.put("size",10);
//whereAll指定了x-match: all参数
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")).
to(new HeadersExchange("zhihao.header.exchange")).whereAll(headerValues).match());
//whereAll指定了x-match: any参数
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.info")).
to(new HeadersExchange("zhihao.header.exchange")).whereAny(headerValues).match());
//进行解绑
rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("zhihao.info")).
to(new TopicExchange("zhihao.direct.exchange")).with("zhihao.info"));
//声明topic类型的exchange
rabbitAdmin.declareExchange(new TopicExchange("zhihao.hehe.exchange",true,false));
rabbitAdmin.declareExchange(new TopicExchange("zhihao.miao.exchange",true,false));
//exchange与exchange绑定
rabbitAdmin.declareBinding(new Binding("zhihao.hehe.exchange",Binding.DestinationType.EXCHANGE,
"zhihao.miao.exchange","zhihao",new HashMap()));
//使用BindingBuilder进行绑定
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")).
to(new TopicExchange("zhihao.topic.exchange")).with("zhihao.miao"));
//rabbitAdmin.declareBinding(new Binding("amq.rabbitmq.trace",Binding.DestinationType.EXCHANGE,
//"amq.rabbitmq.log","zhihao",new HashMap()));
context.close();
}
}
Exchange ,Queue,Binding的自动声明
直接把要自动声明的组件Bean纳入到spring容器中管理即可。
自动声明发生的rabbitmq第一次连接创建的时候。如果系统从启动到停止没有创建任何连接,则不会自动创建。自定声明支持单个和多个。
自动声明Exchange:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeclareConfig {
//声明direct类型的Exchange
@Bean
public Exchange directExchange(){
return new DirectExchange("zhihao.direct.exchange",true,false);
}
//声明topic类型的Exchange
@Bean
public Exchange topicExchange(){
return new TopicExchange("zhihao.topic.exchange",true,false);
}
//声明fanout类型的Exchange
@Bean
public Exchange fanoutExchange(){
return new FanoutExchange("zhihao.fanout.exchange",true,false);
}
//声明headers类型的Exchange
@Bean
public Exchange headersExchange(){
return new HeadersExchange("zhihao.header.exchange",true,false);
}
}
配置类,在spring容器中纳入ConnectionFactory实例和RabbitAdmin实例
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
}
启动应用类,自动声明发生的rabbitmq第一次连接创建的时候。如果系统从启动到停止没有创建任何连接,则不会自动创建。
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
//使得客户端第一次连接rabbitmq
context.getBean(RabbitAdmin.class).getQueueProperties("**");
context.close();
}
}
队列的自动声明
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeclareConfig {
@Bean
public Queue debugQueue(){
return new Queue("zhihao.debug",true);
}
@Bean
public Queue infoQueue(){
return new Queue("zhihao.info",true);
}
@Bean
public Queue errorQueue(){
return new Queue("zhihao.error",true);
}
}
上面的Application和DeclareConfig不列举出来了,执行Application应用启动类,查看web管控台的队列生成。
绑定的自动生成
DeclareConfig类中,
import org.springframework.amqp.core.Binding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class DeclareConfig {
@Bean
public Binding binding(){
return new Binding("zhihao.debug",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.debug",new HashMap());
}
@Bean
public Binding binding2(){
return new Binding("zhihao.info",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.info",new HashMap());
}
@Bean
public Binding binding3(){
return new Binding("zhihao.error",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.error",new HashMap());
}
}
上面的Application和DeclareConfig不列举出来了,执行Application应用启动类,查看web管控台的Binding生成。
一次性生成多个queue,exchange,binding
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Configuration
public class DeclareConfig {
@Bean
public List<Queue> queues(){
List<Queue> queueList = new ArrayList<>();
queueList.add(new Queue("chao.wang.debug",true));
queueList.add(new Queue("chao.wang.info",true));
queueList.add(new Queue("chao.wang.error",true));
return queueList;
}
@Bean
public List<Exchange> exchanges(){
List<Exchange> exchangeList = new ArrayList<>();
exchangeList.add(new TopicExchange("chao.wang.debug.topic.exchange",true,false));
exchangeList.add(new TopicExchange("chao.wang.info.topic.exchange",true,false));
exchangeList.add(new TopicExchange("chao.wang.error.topic.exchange",true,false));
return exchangeList;
}
@Bean
public List<Binding> bindings(){
List<Binding> bindingList = new ArrayList<>();
bindingList.add(BindingBuilder.bind(new Queue("chao.wang.debug")).
to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.#"));
bindingList.add(BindingBuilder.bind(new Queue("chao.wang.info")).
to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.*"));
bindingList.add(BindingBuilder.bind(new Queue("chao.wang.error")).
to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.error.*"));
return bindingList;
}
}
上面的Application和DeclareConfig不列举出来了,执行Application应用启动类,查看web管控台Exchange,Queue,Binding都已经生成。
注意
当声明队列是以amp开头的时候,队列是不能创建声明的。
@Bean
public Queue amqQueue(){
return new Queue("amp.log",true);
}
总结
自动声明的一些条件
- 要有连接(对rabbitmq的连接)
- 容器中要有
org.springframework.amqp.rabbit.core.RabbitAdmin
的实例RabbitAdmin
的autoStartup
属性必须为true。- 如果
ConnectionFactory
使用的是CachingConnectionFactory
,则cacheMode
必须是CachingConnectionFactory.CacheMode.CHANNEL
(默认)。- 所要声明的组件(
Queue
,Exchange
和Binding
)的shouldDeclare
必须是true
(默认就是true
)Queue
队列的名字不能以amq.
开头。
注意:Queue
,Exchange
和Binding
都直接或者间接的继承Declarable
,而Declarable
中定义了shouldDeclare
的方法。
自动声明源码分析
org.springframework.amqp.rabbit.core.RabbitAdmin
实现InitializingBean
接口,在BeanFactory
设置完所有属性之后执行特定初始化(afterPropertiesSet
方法)
RabbitAdmin
的afterPropertiesSet
方法,
@Override
public void afterPropertiesSet() {
synchronized (this.lifecycleMonitor) {
//autoStartup属性的值为false的时候,直接return
if (this.running || !this.autoStartup) {
return;
}
//connectionFactory实例如果是CachingConnectionFactory,并且CacheMode是CacheMode.CONNECTION也会return下面不执行了。
if (this.connectionFactory instanceof CachingConnectionFactory &&
((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
return;
}
//连接的监听器
this.connectionFactory.addConnectionListener(new ConnectionListener() {
// Prevent stack overflow...
private final AtomicBoolean initializing = new AtomicBoolean(false);
@Override
public void onCreate(Connection connection) {
if (!initializing.compareAndSet(false, true)) {
// If we are already initializing, we don't need to do it again...
return;
}
try {
//执行这个方法
initialize();
}
finally {
initializing.compareAndSet(true, false);
}
}
@Override
public void onClose(Connection connection) {
}
});
this.running = true;
}
}
RabbitAdmin
的initialize
方法,声明所有exchanges
, queues
和 bindings
/**
* Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
* (but unnecessary) to call this method more than once.
*/
public void initialize() {
if (this.applicationContext == null) {
this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
return;
}
this.logger.debug("Initializing declarations");
//得到容器中所有的Exchange
Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
this.applicationContext.getBeansOfType(Exchange.class).values());
//得到容器中所有的Queue
Collection<Queue> contextQueues = new LinkedList<Queue>(
this.applicationContext.getBeansOfType(Queue.class).values());
//得到容器中所有的Binding
Collection<Binding> contextBindings = new LinkedList<Binding>(
this.applicationContext.getBeansOfType(Binding.class).values());
//获取容器中所有的Collection,如果容器中所有元素是Exchange,Queue或者Binding的时候将这些实例也加入到spring容器中。
@SuppressWarnings("rawtypes")
Collection<Collection> collections = this.applicationContext.getBeansOfType(Collection.class, false, false)
.values();
for (Collection<?> collection : collections) {
if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) {
for (Object declarable : collection) {
if (declarable instanceof Exchange) {
contextExchanges.add((Exchange) declarable);
}
else if (declarable instanceof Queue) {
contextQueues.add((Queue) declarable);
}
else if (declarable instanceof Binding) {
contextBindings.add((Binding) declarable);
}
}
}
}
//进行了filter过滤,
final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
final Collection<Queue> queues = filterDeclarables(contextQueues);
final Collection<Binding> bindings = filterDeclarables(contextBindings);
for (Exchange exchange : exchanges) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
+ exchange.getName()
+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
+ "reopening the connection.");
}
}
for (Queue queue : queues) {
if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
+ queue.getName()
+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
+ queue.isExclusive() + ". "
+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
+ "alive, but all messages will be lost.");
}
}
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
@Override
public Object doInRabbit(Channel channel) throws Exception {
//声明exchange,如果exchange是默认的exchange那么也不会声明。
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
//声明队列,如果队列名以amq.开头的也不会进行声明
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
}
});
this.logger.debug("Declarations finished");
}
filterDeclarables
方法过滤一些Exchange
,Queue
,Binding
,因为这三个类都是继承Declarable这个类
,
private <T extends Declarable> Collection<T> filterDeclarables(Collection<T> declarables) {
Collection<T> filtered = new ArrayList<T>();
for (T declarable : declarables) {
Collection<?> adminsWithWhichToDeclare = declarable.getDeclaringAdmins();
//shouldDeclare属性必须是true,否则就会被过滤掉了
if (declarable.shouldDeclare() &&
(adminsWithWhichToDeclare.isEmpty() || adminsWithWhichToDeclare.contains(this))) {
filtered.add(declarable);
}
}
return filtered;
}
声明Exchanges
private void declareExchanges(final Channel channel, final Exchange... exchanges) throws IOException {
for (final Exchange exchange : exchanges) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("declaring Exchange '" + exchange.getName() + "'");
}
//不是默认的Exchange
if (!isDeclaringDefaultExchange(exchange)) {
try {
//是否是delayed类型的Exchange
if (exchange.isDelayed()) {
Map<String, Object> arguments = exchange.getArguments();
if (arguments == null) {
arguments = new HashMap<String, Object>();
}
else {
arguments = new HashMap<String, Object>(arguments);
}
arguments.put("x-delayed-type", exchange.getType());
//调用exchangeDeclare进行声明
channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),
exchange.isAutoDelete(), exchange.isInternal(), arguments);
}
else {
//调用exchangeDeclare进行声明
channel.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(),
exchange.isAutoDelete(), exchange.isInternal(), exchange.getArguments());
}
}
catch (IOException e) {
logOrRethrowDeclarationException(exchange, "exchange", e);
}
}
}
}
声明Queue队列
private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length);
for (int i = 0; i < queues.length; i++) {
Queue queue = queues[i];
//队列不以amq.开头的队列才能进行声明
if (!queue.getName().startsWith("amq.")) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("declaring Queue '" + queue.getName() + "'");
}
try {
try {
//进行队列声明
DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
declareOks.add(declareOk);
}
catch (IllegalArgumentException e) {
if (this.logger.isDebugEnabled()) {
this.logger.error("Exception while declaring queue: '" + queue.getName() + "'");
}
try {
if (channel instanceof ChannelProxy) {
((ChannelProxy) channel).getTargetChannel().close();
}
}
catch (TimeoutException e1) {
}
throw new IOException(e);
}
}
catch (IOException e) {
logOrRethrowDeclarationException(queue, "queue", e);
}
}
this.logger.debug("Queue with name that starts with 'amq.' cannot be declared.");
}
return declareOks.toArray(new DeclareOk[declareOks.size()]);
}
binding声明:
private void declareBindings(final Channel channel, final Binding... bindings) throws IOException {
for (Binding binding : bindings) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Binding destination [" + binding.getDestination() + " (" + binding.getDestinationType()
+ ")] to exchange [" + binding.getExchange() + "] with routing key [" + binding.getRoutingKey()
+ "]");
}
try {
//QUEUE类型的绑定
if (binding.isDestinationQueue()) {
//并且不是绑定到默认的Default Exchange
if (!isDeclaringImplicitQueueBinding(binding)) {
//绑定队列
channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
}
}
else {
//Exchange类型的绑定
channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
}
}
catch (IOException e) {
logOrRethrowDeclarationException(binding, "binding", e);
}
}
}