最近在上云计算,上课讲到RabbitMQ,先试下水(重庆大学2017 刘 李 杨 谢)
Demo 0.环境配置:
- 服务端(MQ服务)
下载安装erlang 22.3
,下载安装 RabbitMQ 3.8
,无需进一步操作
- 客户端(代码)
本小组使用Java语言,基于Maven配置:
pom.xml
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
此时Maven自动导入包,可以在项目中使用MQ了
Demo1.实现Hello World
- 发送方,消息生产者:
核心代码:
//step 1
ConnectionFactory factory = new ConnectionFactory();
//step 2
factory.setHost("localhost");
//step 3
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
//step 4
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
核心逻辑:
- 工厂模式产生一个channel作为通讯的工具
- 为其绑定主机号,localhost表示本地
- 制造一个channel,并在其中声明一个名叫 QUEUE_NAME 字符串值得队列
- 发布这个消息,这个消息位于消息队列中了
- 接收方,消息的消费者:
核心代码:
//step 1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//step 2
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//step 3
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//step 4
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
核心逻辑:
- 同发送方,创建工厂和channel;
- channel声明队列,这个和发送方代码调用相同,但此时是找到已经创建的MQ而发送方是创建一个MQ
- 创建一个接收消息后的逻辑的回调,但是不在这里执行;
- 通过basicConsume循环接收消息,执行回调逻辑处理消息
Demo 2. 公平分摊,ACK,持久化
基于demo1的代码,产生如下问题:
消息存在内存中,如果停止服务,重启丢失怎么办?
如果有多个消费者,那么如何决定将消息发给谁?
如何确保消费者处理完消息了呢? && 如何确保公平分摊?
demo2实现的工作队列将用于解决这些问题,具体如下
Q1:持久化问题:
一般情况下,发布消息:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("",QUEUE_NAME,
null,
message.getBytes("UTF-8"));
如果需要将消息持久化:
channel.queueDeclare(TASK_QUEUE_NAME,
true, false, false, null);//第二个参数为是否持久化
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
Q2.多消费者问题
如果有多个消费者:
默认情况下,会按一定的顺序轮流分配给不同的消费者;也就是说,多个消费者轮流消费队列里的消息
Q3. 确保处理完成&&公平分配
那么,每个消费者会设置一个channel.basicQos(n);
代表消费者能最大n个队列待处理;如果超过这个值,那么消息队列将不会往其中分配;
而MQ是不会主动监视消费者还有多少个消息没处理,通常,都是由消费者处理完了主动通知MQ:
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
消费者的这段代码告诉队列已经成功处理了消息,如果队列持久化了消息,这个时候就可以放心销毁了;否则消息会一直存在,那么也会造成开销甚至存储溢出;
Demo 3.exchange "fanout" 实现发布订阅
当发送方发送一条消息,订阅了该类型消息的所有消费者都会收到。
场景:
用户上传了自己的头像,这个时候图片需要清除缓存,同时用户应该得到积分奖励,你可以把这两个队列绑定到图片上传的交换器上,这样当有第三个、第四个上传完图片需要处理的需求的时候,原来的代码可以不变,只需要添加一个订阅消息即可,这样发送方和消费者的代码完全解耦,并可以轻而易举的添加新功能了。
原理在于,发送方不再直接通过消息队列发送消息,而是通过exchange,而消费者会通过在所在的exchange绑定自己的消息队列;当发送方在exchange发布的时候,exchange会把消息发送给所有的绑定在其上的消息队列发送消息。(下图X为exchange)
- 发送方:
核心代码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
核心逻辑
相比之前的demo,发送方不再通过channel.queueDeclare()
声明一个queue,取而代之的是声明一个exchange,fannout是广播的模式,还有其他类型:direct topic等:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //"fanout"模式代表广播发送
- 接收方:
核心代码
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
//...
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Consumer Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
核心逻辑
同样,接收方也通过exchangedeclare连接到exchange;这个时候通过channel会为其分配一个随机命名的MQ,并通过queueBind()
绑定到exchange,该MQ为了在最后一行basicConsume()
中获取的这个MQ执行回调函数,通过getQueue()
获取到该随机队列名,随后正常消费队列消息
Demo 4. exchange "direct" 选择性发布订阅
对于demo3, exchange的fanout将发送方每一个消息都发送给所有的消费者,这是一种无差别的分配,但有的时候是需要合理过滤的:
场景:
一个日志产生器会产生日志,而多个消费者会消费日志,但是有的消费者只会对【warn】级别的感兴趣,而有的只负责处理【info】【error】级别的日志;这个情况下,如果使用fanout,消费者就会收到自己不感兴趣的消息,exchange对这些消费者不感兴趣的MQ根本没必要发送消息。
- 发送方:发送 info warn error三种类型的日志
核心代码:
//前面照常创建channel、 factory
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Scanner sc=new Scanner(System.in);
while(sc.hasNext()){
String severity = sc.nextLine(); //日志级别
String message = severity+" message";
channel.basicPublish(EXCHANGE_NAME,
severity, //发往对此级别感兴趣的管道
null, message.getBytes("UTF-8"));
}
}
核心逻辑:
- 声明exchange的时候指定类型"direct"
- 输入感兴趣的级别 warn error info等
-
发往对应感兴趣的MQ
- 接收方:选择性接收自己感兴趣的log类型
核心代码
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
String [] interestLog=new String[]{"info"};//对info类型感兴趣
for (String severity : interestLog) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
核心逻辑
同样声明,找到同名exchange,类型也是“direct”
根据感兴趣类型,绑定对应的队列(多个感兴趣事件就绑定多个MQ)
设置future回调函数处理感兴趣的事件:
对info感兴趣的消费者,只能收到Info:
对 error,warn感兴趣的消费者,只能收到error 和 warn
Demo. 5 exchange "Topic"更灵活的匹配发布订阅
尽管使用direct类型的exchange对fanout进行了改进,但它仍然存在局限性:它不能基于多个条件进行路由。
场景:
日志系统并不只会产生[warn info error]等级信息,还会涉及作者[user admin guest],以及别的信息,这多个属性往往是不相干的,如果用direct,会有很大的限制
Topic 类型的exchange使用 xx.xx.xx.xx的点段式路由key对管道进行划分,发布的消息的key为该格式的,而消费者则只需要care其中某几个自己感兴趣的字段,别的通过*****进行忽略某一段,或者使用#进行模糊匹配:
例如发布了一个“cn.mq.rabbit.error”的消息;
能匹配上的路由键:cn.mq.rabbit.* ; cn.mq.rabbit.#;#.error ; cn.mq.#;#
不能匹配上的路由键:cn.mq.;.error;*
原理图如下,可以看到一个消费者可以匹配多个topic:
代码上:
- 发送方
和DEMO4 基本结构相同,只是对exchange声明为“topic”类型:
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
此外就是发布的时候routingKey变成了点段式字符串:
channel.basicPublish(EXCHANGE_NAME,
routingKey, //如“a.b.c” “error.admin.2020”
null, Rmessage.getBytes("UTF-8"));
接收方发布消息:
- 接收方
接收方同样相比demo4只有两处改动:
topic类型的exchange声明
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
匹配的bindKey变成了点段式字符串
String [] keys=new String[]{"*.admin.*" , "#.guest.#"};
for (String bindingKey : keys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
对于上述发送方的输入: 对第二个字段为“admin”和“guset”感兴趣的接收方收到:
对第一个字段为“Error”感兴趣的接收方收到:
Topic实现了相比direct更灵活的匹配选择性订阅;
Demo .6 RPC远程调用
RPC远程调用是MQ的一个用法示例,其基本原理还是在于字符串消息的传递,在官方示例中,请求服务方只负责通过RequestQueue
简单发送一个数字字符串;
而RPC服务方收到RequestQueue
字符串之后进行斐波拉契数列的调用,随后通过ReplyQueue
返回结果;
- 如何精准返回?
我们知道,通常服务端只有几个实例,负责响应所有的调用请求,客户端数量远远大于服务端;所以,通常都是多个客户端用一个requestQueue
,而自己和服务端之间独享一个replyQueue
;
所以客户端在发送请求的时候,会通过设置property
中的属性来告诉服务方将返回结果送往自己所在的replyQueue
:
客户端
String replyQueueName =
channel.queueDeclare().getQueue();//和服务方建立一个随机名字的MQ
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)//告诉服务方返回结果的队列
.build();
同时我们也看到通过在prop
中设置了一个随机的id
,可以在接收的时候验证,确保返回来的结果就是当初发出去的:
String ctag = channel.basicConsume(replyQueueName
, true, (consumerTag, delivery) -> {
//equals函数确保发送的和接收的结果的id相同
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
//如果reply有返回值了,那么就往response阻塞队列放置
response.offer(new String(delivery.getBody(), "UTF-8"));
}java
}, consumerTag -> {
});
String result = response.take();//阻塞获取RPC结果
通过将response
初始化为阻塞队列,当结果没有返回的时候,就会阻塞。