消息生产和消费
- ConnectionFactory : 获取连接工厂
- Connection :通过连接工厂获取一个连接
- Channel : 通过连接创建 数据通信信道,可发送和接收消息
- Queue: 具体的消息存储队列
- Producer 和 Consumer 生产者和消费者
生产端
/**
* 生产者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建一个ConnectionFactory并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3、通过connection创建一个Channel :网络信道,几乎所有操作都在Channel中进行,是进行消息读写的通道。客户端可建立多个Channel,每个代表一个会话任务。
Channel channel = connection.createChannel();
//4、通过channel发送数据
String message = "hello rabbitMq!";
//发送5条
for (int i = 5; i > 0; i--) {
//exchange 交换机 |routingKey 路由key |props 配置文件
//不指定Exchange时,交换机默认是AMQP default,此时就看RoutingKey,RoutingKey要等于队列名才能被路由,否则消息会被删除。
channel.basicPublish("", "test001", null, message.getBytes());
}
//5、关闭连接
channel.close();
connection.close();
}
}
*消费者
/**
* 消费者
*
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1、创建一个ConnectionFactory并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPort(5672);
connectionFactory.setHost("");
connectionFactory.setVirtualHost("/");
//2、 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3、通过connection创建一个Channel :网络信道,几乎所有操作都在Channel中进行,是进行消息读写的通道。客户端可建立多个Channel,每个代表一个会话任务。
Channel channel = connection.createChannel();
//4、创建一个队列 (声明)
/*
queueName: 队列名称
durable: 持久化。true 即使服务重启也不会删除这个队列
exclusive: 独占、 true 队列只能使用一个连接。连接断开,队列删除
autoDelete: 自动删除: true 脱离了Exchange(连接断开) 即队列没有Exchange关联时。自动删除
arguments : 扩展参数
*/
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//5、创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6、设置channel
channel.basicConsume(queueName, true, queueingConsumer);
//7、获取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:" + msg);
Envelope envelope = delivery.getEnvelope();
// long deliveryTag = envelope.getDeliveryTag();
}
}
}
Exchange 交换机
exchange :接收消息,并根据路由键转发消息所绑定的队列
交换机的属性
- name: 交换机名称
- type: 交换机类型。 direct,topic、fanout、headers
- durability: 是否需要持久化,true为持久化
- autoDelete: 当最后一个绑定到Exchange上的队列删除后。自动删除该Exchange
- internal: 当前Exchange是否用于RabbitMQ内部使用。 默认false
- arguments: 扩展参数。 用于扩展AMQP协议。自己定制使用
direct : 直接
direct Exchange
所有发送到Direct Exchange 的消息被转发到RouteKey中指定的Queue
注意 : Direct模式可以使用RabbitMQ自带的Exchange :default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收、否则该消息会被抛弃。
Direct Exchange 生产者
/**
* 直接 交换机
* direct Exchange模式生产者
*
*/
public class Producer4DirectExchange {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
//2 创建连接
Connection connection = connectionFactory.newConnection();
//3 创建通道 channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test-direct-exchange";
String routingKey = "test.direct";
//5 发送
String msg = "hello word RabbitMQ For Direct Exchange Message .... ";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
channel.close();
connection.close();
}
}
Direct Exchange 消费者
/**
* 直接 交换机
* direct Exchange 消费者
*
* @author yangHX
* createTime 2019/3/10 22:02
*/
public class Consumer4DirectExchange {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
//支持重连
connectionFactory.setAutomaticRecoveryEnabled(true);
//3秒
connectionFactory.setNetworkRecoveryInterval(3000);
//2 创建连接
Connection connection = connectionFactory.newConnection();
//3 创建通信信道
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test-direct-exchange";
String exchangeType = "direct";
String queueName = "test-direct-queue";
String routingKey = "test.direct";
//声明exchange(交换机) declare (宣布。声明)
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//声明队列
channel.queueDeclare(queueName, false, false, false, null);
//建立一个绑定关系。
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化。
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//参数。队列名称 是否自动ACK Consumer
//ACK 自动签收
channel.basicConsume(queueName, true, queueingConsumer);
//循环获取消息
while (true) {
//获取消息。如果没有消息。这一步将会阻塞
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String s = new String(delivery.getBody());
System.out.println("收到消息:" + s);
}
}
}
Topic Exchange
所有转发到Topic Exchange的消息被转发到所有关心RokeKey中指定Topic的Queue
Exchange 将 RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic
Topic Exchange 生产者
/**
* topic exchange 生产者
*
* @author yangHX
* createTime 2019/3/10 23:13
*/
public class Producer4TopicExchange {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
//2 创建连接
Connection connection = connectionFactory.newConnection();
//3 创建通道 channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 发送
String msg = "hello word RabbitMQ For Topic Exchange Message .... ";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
channel.close();
connection.close();
}
}
Topic Exchange 消费者
/**
* topic exchange 消费者
*
* @author yangHX
* createTime 2019/3/10 23:13
*/
public class Consumer4TopicExchange {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey="user.*";
// String routingKey = "user.#";
//声明交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//声明队列
//durable 是否持久化
channel.queueDeclare(queueName, false, false, false, null);
//建立交换机和队列的绑定关系
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//参数。队列名称。 是否自动ACK consumer
channel.basicConsume(queueName, true, queueingConsumer);
//循环获取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String s = new String(delivery.getBody());
System.out.println("收到 topic 消息: " + s);
}
}
}
Fanout Exchange
- 不处理路由键。只需要简单的将队列绑定到交换机上
- f发送到交换机的消息都会被转发到于该交换机绑定的所有队列上。
- Fanout 交换机转发消息是最快的
Fanout Exchange 生产者
/**
* fanout 交换机 生产者
*
* @author yangHX
* createTime 2019/3/10 23:45
*/
public class Producer4FanoutExchange {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
//2 创建连接
Connection connection = connectionFactory.newConnection();
//3 创建通道 channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_fanout_exchange";
//5 发送
String msg = "hello word RabbitMQ For Fanout Exchange Message .... ";
for (int i = 10; i > 0; i--) {
channel.basicPublish(exchangeName, "", null, msg.getBytes());
}
channel.close();
connection.close();
}
}
Fanout Exchange 消费者
/**
* Fanout Exchange 消费者
*
* @author yangHX
* createTime 2019/3/10 23:46
*/
public class Consumer4FanoutExchange {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = "";
//声明交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//声明队列
//durable 是否持久化
channel.queueDeclare(queueName, false, false, false, null);
//建立交换机和队列的绑定关系
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//参数。队列名称。 是否自动ACK consumer
channel.basicConsume(queueName, true, queueingConsumer);
//循环获取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String s = new String(delivery.getBody());
System.out.println("收到 fanout 消息: " + s);
}
}
}
Binding-绑定
Exchange
和Exchange
、Queue
之间的连接关系
Binding中可以包含RoutingKey或者参数
*Queue-消息队列
消息队列,实际存储消息数据
Durability
: 是否持久化,Durablity :是 Transient: 否
Auto delete
如选yes,代表当最后一个监听被移除后,该Queue会自动被删除
*Message-消息
服务器和应用程序之间传送的数据
本质上就是一段数据,由Properties , Payload(Body)
组成
常用属性: delivery mode。 headers(自定义属性)
Message-其他属性
content_type 、content_encoding 、priprity
correlation_id、 reply_to、expriation、 message_id
timestamp、type、user_id、app_id、cluster_id
设置消息属性
读取消息属性
Virtual Host -虚拟主机
虚拟地址,用于逻辑隔离,最上层的消息路由
一个Virtual Host 里面可以有若干个Exchange 和Queue
同一个Virtual Host里面不能有相同名称的Exchange或Queue