上一篇我们介绍了 RabbitMQ 的工作流程,以及常用的交换机,接下里我们结合具体的例子来看一下具体的应用。
使用 Java client 操作 RabbitMQ 可以参考以下步骤来实现:
- 创建连接工厂(ConnectionFactory),设置 RabbitMQ 服务信息、账号、密码等
- 使用连接工厂建立连接(Connection)
- 使用连接创建数据通道(Channel)
- 创建交换机(Exchange)、队列(Queue),绑定两者
- 使用数据通道发送、接收消息
- 释放数据通道、连接
本文的例子会结合 Fanout Exchange
、Direct Exchange
、 Topic Exchange
这三种常用的交换机来实现 。
一、准备工作
创建一个 Maven 项目,添加 RabbitMQ 依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
考虑到代码的复用,我们先将一些通用的步骤封装一下:
public class RabbitMQConnection {
ConnectionFactory connectionFactory;
public RabbitMQConnection() {
// 创建连接工厂
connectionFactory = new ConnectionFactory();
// 设置 RabbitMQ 服务地址
connectionFactory.setHost("localhost");
// 设置 RabbitMQ 服务端口
connectionFactory.setPort(5672);
// 设置账号
connectionFactory.setUsername("admin");
// 设置密码
connectionFactory.setPassword("123456");
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
}
public void create(String connectionName, RabbitMQTask rabbitMQTask) {
Connection connection = null;
Channel channel = null;
try {
// 创建连接
connection = connectionFactory.newConnection(connectionName);
// 创建数据通道
channel = connection.createChannel();
// 执行消息的发送、接收等业务
rabbitMQTask.execute(channel);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放资源
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public interface RabbitMQTask {
void execute(Channel channel) throws IOException;
}
在RabbitMQConnection
中已经实现了资源的连接以及释放,第4、5步骤,需要在RabbitMQTask
接口里,根据具体的业务去实现execute()
方法。
简单起见,我们将生产者和消费者定义在同一个项目里。
二、Fanout Exchange
生产者代码如下:
public class Producer {
public void work() {
RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
rabbitMQConnection.create("生产者", new RabbitMQTask() {
public void execute(Channel channel) throws IOException {
// 交换机名称
String exchangeName = "fanout-example-exchange";
// 交换机类型
String exchangeType = "fanout";
// 创建交换机,true表示持久化交换机,一般都为true
channel.exchangeDeclare(exchangeName, exchangeType, true);
// 创建消息队列
/**
* 参数1:队列名称
* 参数2:是否需要持久化,非持久化队列在服务重启后,队列中的消息会丢失,一般都为true
* 参数3:排它性,是否是一个独占队列
* 参数4:队列中的消息被消费完后是否自动删除队列
* 参数5:附加参数,Headers Exchange 的参数可以在这里传递
*/
String queueName1 = "queue1";
channel.queueDeclare(queueName1, true, false, false, null);
String queueName2 = "queue2";
channel.queueDeclare(queueName2, true, false, false, null);
String queueName3 = "queue3";
channel.queueDeclare(queueName3, true, false, false, null);
// 绑定队列和交换机,不指定 routingKey
channel.queueBind(queueName1, exchangeName, "");
channel.queueBind(queueName2, exchangeName, "");
channel.queueBind(queueName3, exchangeName, "");
String routingKey = "";
// 准备消息内容
String message = "hello rabbitmq";
// 发送消息
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
System.out.println("生产者发送的消息是:" + message);
}
});
}
public static void main(String[] args) {
new Producer().work();
}
}
生产者核心的业务都是基于Channel
对象实现的,包括创建Fanout
类型的交换机,创建队列,将交换机和队列绑定,由于使用了Fanout
类型的交换机所以绑定时不用指定routingKey
,发送消息时需要携带交换机名称和一个空的routingKey
。
消费者代码如下:
public class Consumer implements Runnable {
private String queueName;
public Consumer(String queueName) {
this.queueName = queueName;
}
@Override
public void run() {
RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
rabbitMQConnection.create("消费者", new RabbitMQTask() {
public void execute(Channel channel) throws IOException {
// 接收消息
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName + "收到的消息是:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接收消息失败");
}
});
System.out.println(queueName + "开始接收消息");
System.in.read();
}
});
}
public static void main(String[] args) {
new Thread(new Consumer("queue1")).start();
new Thread(new Consumer("queue2")).start();
new Thread(new Consumer("queue3")).start();
}
}
消费者的实现比较简单,用多线程模拟三个消费者,分别接收三个队列中的消息。
分别启动生产者和消费者,结果符合Fanout
类型交换机的效果,消息分别进入到三个队列中,最终被消费者掉:
三、Direct Exchange
掌握 Fanout Exchange 的用法,学习 Direct Exchange 就很简单了。
修改一下生产者的代码:
public void work() {
RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
rabbitMQConnection.create("生产者", new RabbitMQTask() {
public void execute(Channel channel) throws IOException {
// 交换机名称
String exchangeName = "direct-example-exchange";
// 交换机类型
String exchangeType = "direct";
// 创建交换机,true表示持久化交换机,一般都为true
channel.exchangeDeclare(exchangeName, exchangeType, true);
// 创建消息队列
String queueName1 = "queue1";
channel.queueDeclare(queueName1, true, false, false, null);
String queueName2 = "queue2";
channel.queueDeclare(queueName2, true, false, false, null);
String queueName3 = "queue3";
channel.queueDeclare(queueName3, true, false, false, null);
// 绑定队列和交换机,指定routingKey
channel.queueBind(queueName1, exchangeName, "red");
channel.queueBind(queueName2, exchangeName, "green");
channel.queueBind(queueName3, exchangeName, "blue");
String routingKey = "red";
String routingKey2 = "blue";
// 准备消息内容
String message = "hello rabbitmq";
String message2 = "hello amqp";
// 发送消息
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
System.out.println("生产者发送的消息是:" + message);
System.out.println("生产者发送的消息是:" + message2);
}
});
}
使用 Direct 类型的交换机时,绑定交换机和队列时需要指定routingKey
,发送消息时也要携带上routingKey
去匹配消息队列。消费者代码不需要修改。按照预期生产者的消息最终会分别进入queue1
、queue3
,最终被消费掉。
运行程序,结果符合预期:
四、Topic Exchange
上一篇我们已经了解到,Topic Exchange 和 Direct Exchange 的差别就是 Topic Exchange 的routingKey
支持通配符模糊匹配,更像一种精细化的 Direct Exchange。
只需要修改生产者的代码:
public void work() {
RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
rabbitMQConnection.create("生产者", new RabbitMQTask() {
public void execute(Channel channel) throws IOException {
// 交换机名称
String exchangeName = "topic-example-exchange";
// 交换机类型
String exchangeType = "topic";
// 创建交换机,true表示持久化交换机,一般都为true
channel.exchangeDeclare(exchangeName, exchangeType, true);
// 创建消息队列
String queueName1 = "queue1";
channel.queueDeclare(queueName1, true, false, false, null);
String queueName2 = "queue2";
channel.queueDeclare(queueName2, true, false, false, null);
String queueName3 = "queue3";
channel.queueDeclare(queueName3, true, false, false, null);
// 绑定队列和交换机,指定routingKey
channel.queueBind(queueName1, exchangeName, "*.red.#");
channel.queueBind(queueName2, exchangeName, "green.*");
channel.queueBind(queueName3, exchangeName, "#.blue.#");
String routingKey = "green.red";
// 准备消息内容
String message = "hello rabbitmq";
// 发送消息
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
System.out.println("生产者发送的消息是:" + message);
}
});
}
按照模糊匹配规则,消息会进入queue1
、queue2
,最终被消费掉: