RabbitMQ是什么?
rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输,即接收和发送消息。
RabbitMQ工作原理
这个系统架构图版权属于sunjun041640。
下面来介绍RabbitMQ里的一些基本定义,主要如下:
RabbitMQ Server:提供消息一条从Producer到Consumer的处理。
Exchange:一边从发布者方接收消息,一边把消息推送到队列。
producer只能将消息发送给exchange。而exchange负责将消息发送到queues。Procuder Publish的Message进入了exchange,exchange会根据routingKey处理接收到的消息,判断消息是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的主要的type有direct,topic,headers,fanout。具体针对不同的场景使用不同的type。
queue也是通过这个routing keys来做的绑定。交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。
Queue:消息队列。接收来自exchange的消息,然后再由consumer取出。exchange和queue可以一对一,也可以一对多,它们的关系通过routingKey来绑定。
Producer:Client A & B,生产者,消息的来源,消息必须发送给exchange。而不是直接给queue
Consumer:Client 1,2,3消费者,直接从queue中获取消息进行消费,而不是从exchange中获取消息进行消费。
还有一些在上图中没有显示,但在应用程序中会用到的定义。
Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
Channels: **虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
Bindings:绑定(binding)是指交换机(exchange)和队列(queue)进行关联。可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。
Hello,World
下面将从最简单的应用开始简单了解一下RabbitMQ的使用。
生产者
import com.rabbitmq.client.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//声明一个TCP连接
Connection connection = factory.newConnection();
//声明一个channel
Channel channel = connection.createChannel();
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
boolean flag = true;
while(flag){
System.out.print("往交换机中推送消息:");
String message = input.readLine();
if (message == "quit"){
flag = false;
continue;
}
//往exchange中推送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
}
消费者
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
首先列举一下几个重要声明的参数
- 队列声明queueDeclare
queueDeclare(String queue, boolean durable, boolean exclusive, boolean
autoDelete, Map<String, Object> arguments)
- 生产者发送消息basicPublish
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- 消费者消费basicConsume
消费者到队列中获取消息进行消费。autoAck=true表示消费者收到这个消息就会向队列发送确认ack,表示自己收到了,队列接到反馈后,就会把这条消息从队列中删除,callback表示消费者收到消息后,对消息进行的处理。
basicConsume(String queue, boolean autoAck, Consumer callback)
上述两段代码,有一下几点要说明。
1、从Send.java 代码中可以看出并没有声明架构图中的exchange,而是方法basicPublish中使用了空字符串 (""),这代码使用了默认/匿名交换机。
2.默认交换机有一个特殊的地方,那就是生产者和消费者会公用一个队列。 在Send.java的basicPublish方法中,routingKey参数的位置我们给了一个队列名,默认交换机会把推给他的消息直接再推到相应名称的队列中。在消费者中,声明了队列后,也不需要和交换机进行绑定。消费者会直接到相应队列获取消息。所以我们的设计看起来如下所示(注意只是看起来):
运行生产者和消费者,运行结果如下:
轮询分发机制
在上面的运行结果中,只有一个消费者在消费队列中的消息,如果有多个消费者同时从这个队列中获取消息进行处理,那么结果会是怎样呢?
同时运行两个消费者,运行结果如下:
生产者
第一个消费者
第二个消费者
从运行结果中可以看出,RabbitMQ 会顺序循环的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin。
消息确认
这里就会有一个问题,在上述消费者代码中,我们设置autoAck=true,即消费者收到这条消息就通知队列将其删除。然后才会去处理这条消息。
如果consumer处理消息的时候突然挂了,消息还没有被完成,那么这条消息就会消失。为了演示信息丢失,将消费者代码改成如下
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Created by sunyan on 17/8/1.
*/
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
try {
Thread.sleep(10000);
System.out.println(" [x] Received '" + message + "'");
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
在消息输出前,sleep5秒,下面,运行两个消费者,并在发送第一条消息后,将第一个消费者关闭,
从运行结果可以看出,一共丢失了两条消息。为了不让消息丢失,RabbitMQ
提供了消息确认机制,consumer在接收到,执行完消息后会发送一个ack给RabbitMQ告诉它可以从queue中移除消息了。如果没收到ack。Rabbitmq会重新发送此条消息,如果有其他的consumer在线,将会接收并消费这条消息。消息确认机制是默认打开的。如果想关闭只需要按如下设置 即可。
channel.basicConsume(QUEUE_NAME, false, consumer);
从上图可以看出,第一个消费者退出后,原本轮询分发给他的数据又给了第二个消费者,并没有信息丢失。
如果处理完成后忘记发送ack,那么即使已经被处理的信息,RabbitMQ不会将它删除,且在消费者退出后,将消息继续发送给其他消费者进行重复处理。
把上段代码中的相应内容注释掉。
channel.basicAck(envelope.getDeliveryTag(), false);
从上面三张图可以看出,已经处理过的消息h1,h2又被处理了一遍。忘记basicAck是一个常见的错误,但后果是严重的。因为RabbitMQ无法删除任何消息,将会消耗越来越多的内存。
公平派遣
看到这里,你可能已经觉得轮询分发有点问题了,如果一个消费者处理一个问题要10s,另一个处理只需瞬间,那么一个消费者将不断忙碌,另一个消费者几乎不会做任何工作。但RabbitMQ不知道什么,还会平均分配消息。这是因为当消息进入队列时,RabbitMQ只会分派消息。它不看消费者的未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。我们可以使用basicQos方法与 prefetchCount = 1设置。这告诉RabbitMQ在消费者返回一个处理消息的ack之前不要再给他发送新消息。相反,它将发送到下一个还不忙的消费者。
一个消费者设置
Thread.sleep(10000);
另一个消费者不设置,让其立即处理消息。运行结果如下
从图中可以看出,原本应该有Recv处理的消息“hello3”,由Recv1处理了。
消息持久化
在消息确认中讲了如何确保即使消费者死亡,消息也不会丢失。但是如果RabbitMQ服务器停止,消息仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,需要两件事来确保消息不会丢失:我们需要将队列和消息标记为耐用。
首先,我们需要确保RabbitMQ不会失去我们的队列。为了这样做,我们需要将其声明为持久的:
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
虽然这个命令本身是正确的,但是在我们目前的设置中是不行的。这是因为我们已经定义了一个 不耐用的名为hello的队列。RabbitMQ不允许您重新定义具有不同参数的现有队列,并会向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 用不同的名称声明一个队列。
channel.queueDeclare("hello_world",durable,false,false,null);
消息持久化
需要通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN来标记我们的消息。
总结一下,为了数据不丢失,我们采用了:
- 在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。
- 持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。
- 持久化Message,理由同上。
总结
在本篇中,主要介绍了一下默认交换机,在默认交换机下,一条消息只能分发给一个消费者。那如果有很多不同的消费者都对这条消息的话,默认交换机就无法实现了?那么具体该怎么实现,将在RabbitMQ--交换机中进行说明。