什么是rabbitmq?
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
第一种方式:点对点
- P:生成者
- C:消费者
- 红色方块代表信道
生成者
void producerSendMessing() {
//生产者代码
//创建一个rabbitmq的连接工厂
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setConnectionTimeout(1000);
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
try {
//开启一个server连接
Connection connection = connectionFactory.newConnection();
//server连接创建一个信道
Channel channel = connection.createChannel();
//通过信道绑定一个队列,两个重载方法
/*
* @param queue :队列的名称
* @param durable 是否持久化,
* @param exclusive 是否独占这个队列
* @param autoDelete 是否消费之后,删除队列
* @param arguments 其他参数设置,是一个map
* */
channel.queueDeclare("hello",true,false,false,null);
/**
* 三个重载方法,
*
* @param exchange 交换器
* @param routingKey 路由密钥
* @param支持消息的其他属性-路由标头等
* @param正文消息正文
*/
channel.basicPublish("","hello",null,"hello world!".getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
消费者
void consumerReceivingMessing() {
//生产者代码
//创建一个rabbitmq的连接工厂
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setConnectionTimeout(1000);
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
try {
//开启一个server连接
Connection connection = connectionFactory.newConnection();
//server连接创建一个信道
Channel channel = connection.createChannel();
//通过信道绑定一个队列,两个重载方法
/*
* @param queue :队列的名称
* @param durable 是否持久化,
* @param exclusive 是否独占这个队列
* @param autoDelete 是否消费之后,删除队列
* @param arguments 其他参数设置,是一个map
* */
channel.queueDeclare("hello",true,false,false,null);
/**
* @param queue队列名称
* @param autoAck如果服务器应考虑消息,则为true。自动确认机制
*交付后确认; 如果服务器应该期望,则返回false
*明确的确认
* @param回调用户对象的接口Consumer,默认实现类DefaultConsumer,需要传入信道
*/
String hello = channel.basicConsume("hello", true,new DefaultConsumer(channel){
//最后一个参数是消息体
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
System.out.println(new String(body));
}
});
System.out.println(hello);
//连接关闭
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
总结
- 无论是生成者还是消费者都需要连接到rabbitmq的server。通过信道操作消息
work模型
Work queues, 也被称为(Task queues), 任务模型。 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费, 就会消失,因此任务是不会被重复执行的。
角色:
●P:生产者:任务的发布者
●C1:消费者,领取任务并且完成任务,假设完成速度较慢
●C2:消费者2:领取任务并完成任务,假设完成速度快
总结
- 默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。
为了避免消费者1处理消息的业务慢,消费者2处理消息的业务快。但是因为轮询机制,导致消息被消费者1拿了过去进行堵塞,从而导致系统宕机。消息确认机制相应出现。
消费者需要拿到消息之后回复队列已处理才会拿到下一条消息,从而实现能者多劳
- 消费者1:假设完成速度较慢
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//创建一个信道
Channel channel = mqConnection.createChannel();
//信道每次只传递1个消息
channel.basicQos(1);
//绑定到一个work队列
channel.queueDeclare("work",true,false,false,null);
//获取消息
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//模拟消费者处理慢的场景
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2:--->"+new String(body));
//参数1:根据标签回复那条消息,参数2:是否回复多条消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
- 消费者2:领取任务并完成任务,假设完成速度快
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//创建一个信道
Channel channel = mqConnection.createChannel();
//信道每次只传递1个消息
channel.basicQos(1);
//绑定到一个work队列
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:--->"+new String(body));
//参数1:根据标签回复那条消息,参数2:是否回复多条消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
注意
代码中在消费者里面设置了信道里面只消费1条消息,并且处理业务之后会回复消息已被消费
fanout广播模型
在广播模式下,消息发送流程是这样的:
●可以有多个消费者
●每个消费者有自己的queue (队列)
●每个队列都要绑定到Exchange (交换机)
●生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
●交换机把消息发送给绑定过的所有队列
●队列的消费者都能拿到消息。实现一条消息被多个消费者消费
生产者代码
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//创建一个信道
Channel channel = mqConnection.createChannel();
//声明一个交换机。参数1:交换机名称,参数2:选择交换机模式。fanout:广播模式
channel.exchangeDeclare("logs","fanout");
channel.basicPublish("logs","",null,("fanout条消息").getBytes());
//关闭连接
RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
}
消费者代码
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//创建一个信道
Channel channel = mqConnection.createChannel();
//绑定一个交换机
channel.exchangeDeclare("logs","fanout");
//获取临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue,"logs","");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:--->"+new String(body));
}
});
}
Routing模式:1direct(直连模型)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
●队列与交换机的绑定,不能是任意绑定了,而是要指定-个RoutingKey (路由key)
●消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。
●Exchange不再把消息交给每- 个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息
生产者代码
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
//创建一个信道
Channel channel = mqConnection.createChannel();
//创建一个交换器,参数1:交换器名称,参数2:交换器模式:路由
channel.exchangeDeclare("log_direct","direct");
//声明一个routingKey
String routingKey="error";
//发送消息
channel.basicPublish("log_direct",routingKey,null,("这是direc模型发送的消息:"+routingKey).getBytes());
//关闭链接
RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
}
消费者1代码
public static void main(String[] args) throws IOException {
//获取链接
Connection mqConnection = RabbitMqUtils.getMqConnection();
//创建信道
Channel channel = mqConnection.createChannel();
//信道绑定交换器
channel.exchangeDeclare("log_direct","direct");
//获取临时队列名称
String queue = channel.queueDeclare().getQueue();
String routingKey="info";
//信道绑定交换器,路由键,队列
channel.queueBind(queue,"log_direct",routingKey);
//获取消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
消费者2代码
public static void main(String[] args) throws IOException {
//创建一个连接
Connection mqConnection = RabbitMqUtils.getMqConnection();
//创建一个信道
Channel channel = mqConnection.createChannel();
//绑定一个交换机
channel.exchangeDeclare("log_direct","direct");
//获取信道的一个临时队列
String queue = channel.queueDeclare().getQueue();
//定义三个路由键
String routingKey="error";
String routingKeyWarring="warring";
String routingKeyInfo="info";
//信道绑定队列,交换机和路由键
channel.queueBind(queue,"log_direct",routingKey);
channel.queueBind(queue,"log_direct",routingKeyWarring);
channel.queueBind(queue,"log_direct",routingKeyInfo);
//获取消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:消费了"+new String(body));
}
});
}
Routing模式:topic(订阅模型)
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配
符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以"分割,例如: item. insert
- 通配符:
* 代表只匹配一个单词,比如user.*的路由键可以接受user.add,user.delete,user.update
#代表只匹配多个单词,比如user.#的路由键可以接受user.add.all,user.delete.all
生产者代码
public static void main(String[] args) throws IOException {
//创建连接
Connection mqConnection = RabbitMqUtils.getMqConnection();
//创建信道
Channel channel = mqConnection.createChannel();
//绑定交换机
channel.exchangeDeclare("topic","topic");
//路由
String routingKey="user.save";
//发送消息
channel.basicPublish("topic",routingKey,null,("生产了"+routingKey+"消息").getBytes());
//关闭连接
RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
}
消费者1代码是 * 的通配符
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("topic","topic");
String queue = channel.queueDeclare().getQueue();
//路由键匹配一个单词比如user.save
channel.queueBind(queue,"topic","user.*");
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
消费者2代码是#号的通配符
public static void main(String[] args) throws IOException {
Connection mqConnection = RabbitMqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("topic","topic");
String queue = channel.queueDeclare().getQueue();
//路由键匹配多个单词比如user.id.delete
channel.queueBind(queue,"topic","user.#");
//消费消息
channel.basicConsume(queue,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}