一、前言:
上一篇简书说到三种RabbitMq的模型,接下来这篇简书介绍剩下的三种。
二、Routing:
在Fanout模式中,⼀条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct
模型下:
- 队列与交换机的绑定,不能是任意绑定了,⽽是要指定⼀个
Routing Key
- 消息的发送方在向
Exchange
发送消息时,也必须指定消息的Routing Key
。Exchange
不再把消息交给每⼀个绑定的队列,而是根据消息的Routing Key
进行判断,只有队列的Routing key
与消息的Routing key
完全⼀致,才会接收到消息。·
1.生产者
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.GetConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String key = "error";
//发布消息
channel.basicPublish(EXCHANGE_NAME, key, null, ("指定的route key:" + key + "的消息").getBytes());
channel.close();
connection.close();
}
2.消费者1
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = ConnectionUtil.GetConnection().createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queue = channel.queueDeclare().getQueue();
//绑定队列和交换机,对"error","info","warn"的消息进行消费
channel.queueBind(queue, EXCHANGE_NAME, "error");
channel.queueBind(queue, EXCHANGE_NAME, "info");
channel.queueBind(queue, EXCHANGE_NAME, "warn");
//消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1: " + new String(body));
}
});
}
3.消费者2
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.GetConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queue = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queue, EXCHANGE_NAME, "error");
//消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2: " + new String(body));
}
});
}
运行两个消费者,再运行生产者,分别发送
Routing key
为"info","warn","error"
的信息,结果如下:
消费者1可以接收到
key
为"info","warn","error"
的消息,消费者2只能接收到key
为"error"
的消息。
三、Topics
Topic
类型的Exchange
与Direct
相比,都是可以根据Routing Key
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!这种模型Routing key
⼀般都是由⼀个或多个单词组成,多个单词之间以”.”
分割,例如:item.insert
1.通配符
*(star) can substitute for exactly one word. 匹配不多不少恰好1个词。
#(hash) can substitute for zero or more words. 匹配⼀个或多个词。
如:
audit.#
匹配audit.irs.corporate
或者audit.irs
等
audit.*
只能匹配audit.irs
或者audit.corporate
等
2.生产者
private static final String EXCHANGE_NAME = "topics";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.GetConnection();
Channel channel = connection.createChannel();
//声明交换机和交换机类型 topic 使⽤动态路由 (通配符⽅式)
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String key = "user.save.delete";//动态路由key
//发布消息
channel.basicPublish(EXCHANGE_NAME, key, null, ("这是Topic模型,route key:" + key + "的消息").getBytes());
channel.close();
connection.close();
}
3.消费者1
private static final String EXCHANGE_NAME = "topics";
public static void main(String[] argv) throws Exception {
Channel channel = ConnectionUtil.GetConnection().createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queue = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queue, EXCHANGE_NAME, "user.*");
//消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1: " + new String(body));
}
});
}
4.消费者2
private static final String EXCHANGE_NAME = "topics";
public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.GetConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queue = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queue, EXCHANGE_NAME, "user.#");
//消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2: " + new String(body));
}
});
}
运行两个消费者,再运行生产者,分别发送
Routing key
为"user.save","user.save.delete"
的信息,结果如下:
消费者2接收的
Routing key
为user.#
,因此它可以接收到key
为:"user.save"
,"user.save.delete"
发出的消息。
消费者1接收的Routing key
为user.*
,只能接收到key为:"user.save"
发出的消息。