基本概念
broker
: 实体服务器
VirtualHost
:缩小版的RabbitMq服务器,拥有自己独立的交换器、消息队列和相关的对象
Exchange
:接受生产者的消息,并将这些消息路由到具体的Queue中
Binding
:Exchange和Queue之间的关联
Queue:
用来保存消息直到发送给消费者,它是消息的容器。
Channel
:多路复用连接中的独立的双向数据流通道,因为建立和销毁TCP的Connection开销
rabbitmq对消息的保存方式
- disk 后缀.rdp
a. 在发送时指定需要持久化或者服务器内存紧张时会将部分中的内存消息保存到磁盘中
b. 单个文件增加到16M后会生成新的文件
c. 文件中的消息被标记删除的比例达到阈值时会触发文件的合并,提高磁盘的利用率
Exchange 类型
订阅模式(Fanout Exchange):
会将消息放到所有绑定到该exchange的队列上
public class ConnectionUtils {
private static Connection connection;
private static String lock = "aaa";
/**
* 获取rabbitmq连接
* @return
*/
public static Connection getConnection() {
if (null != connection) {
return connection;
}
synchronized (lock) {
if (null != connection) {
return connection;
}
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("host");
connectionFactory.setUsername("userName");
connectionFactory.setPassword("password");
connectionFactory.setVirtualHost("/vhost");
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
throw new RuntimeException("IoException", e);
} catch (TimeoutException e) {
throw new RuntimeException("timeOutException", e);
}
return connection;
}
}
}
public class FanOutProducer {
private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个exchange,fanout类型,不持久化
channel.exchangeDeclare(EXCHAGE_NAME, "fanout", false);
//发送消息
StringBuilder stringBuilder = new StringBuilder("message");
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHAGE_NAME, "", null, stringBuilder.append(i).toString().getBytes("utf-8"));
}
//关闭连接
channel.close();
connection.close();
}
}
public class FanOutConsumer01 {
private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
private static final String QUEUE_NAME = "test_queue_Name_li_fanout";
public static void main(String[] args) throws IOException {
//建立链接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明exchange和queue--exchange要和生产者一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHAGE_NAME, "fanout");
//将queue绑定到exchange上
channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, "");
//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "utf-8");
System.out.println(message);
}
};
//开始消费
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
public class FanOutConsumer02 {
private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
private static final String QUEUE_NAME = "test_queue_Name_li_fanout02";
....
}
两个消费者都能打印如下
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
Direct Exchange
只会把消息routingkey一致的queue中
public class DirectProducer {
private static final String EXCHAGE_NAME = "test_exchange_li_direct";
private static final String ROUTING_KEY = "direct_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个exchange,direct类型,不持久化
channel.exchangeDeclare(EXCHAGE_NAME, "direct", false);
//发送消息
StringBuilder stringBuilder = new StringBuilder("message");
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHAGE_NAME, ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
}
//关闭连接
channel.close();
connection.close();
}
}
public class DirectConsumer01 {
private static final String EXCHAGE_NAME = "test_exchange_li_direct";
private static final String QUEUE_NAME = "test_queue_Name_li_direct";
private static final String ROUTING_KEY = "direct_routing_key";
public static void main(String[] args) throws IOException {
//建立链接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明exchange和queue--exchange要和生产者一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHAGE_NAME, "direct");
//将queue绑定到exchange上
channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);
//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "utf-8");
System.out.println(message);
}
};
//开始消费
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
public class DirectConsumer02 {
private static final String EXCHAGE_NAME = "test_exchange_li_direct";
private static final String QUEUE_NAME = "test_queue_Name_li_direct01";
private static final String ROUTING_KEY = "direct_routing_key02";
....
}
运行完成,只有DirectConsumer01能收到消息
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
Topic Exchange
对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”
public class TopicProducer {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String ROUTING_KEY = "test.routingkey.01";
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个exchange,topic类型,不持久化
channel.exchangeDeclare(EXCHAGE_NAME, "topic", false);
//发送消息
StringBuilder stringBuilder = new StringBuilder("message");
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHAGE_NAME,ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
}
//关闭连接
channel.close();
connection.close();
}
}
public class TopicConsumer01 {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String QUEUE_NAME = "test_queue_Name_li_topic_01";
private static final String ROUTING_KEY = "test.routingkey.#";
public static void main(String[] args) throws IOException {
//建立链接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明exchange和queue--exchange要和生产者一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHAGE_NAME, "topic");
//将queue绑定到exchange上
channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);
//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "utf-8");
System.out.println(message);
}
};
//开始消费
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
为了节省篇幅,
public class TopicConsumer02 {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String QUEUE_NAME = "test_queue_Name_li_topic_02";
private static final String ROUTING_KEY = "test.routingkey.*";
......
}
public class TopicConsumer03 {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String QUEUE_NAME = "test_queue_Name_li_topic_03";
private static final String ROUTING_KEY = "test.routingkey";
....
}
输出结果只有TopicConsumer01和TopicConsumer02有日志输出
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
内容的持久化durable 表示持久化
- Queue的持久化
com.rabbitmq.client.Channel
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
- Message的持久化
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
其中deliveryMode=2表示持久化
- Exchange的持久化
com.rabbitmq.client.Channel
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
就算上面三个的持久化方式都开启了,也不能保证消息在使用过程中完全不丢失,如果消费者autoAck=true,在收到消息之后自动确认了,但是处理时服务崩了,就会导致消息的丢失,所以需要确认机制的支持
消息投递的确认模式
- 默认情况下,生产者投递消息后,broker时不会做出任何返回的
- 解决方式如下:
- 使用Amqp协议中的事务机制
效率低,影响吞吐量
- 将信道channel设置成确认模式
- 使用Amqp协议中的事务机制
channel信道的确认模式
channel设置成确认模式之后,所有提交的消息都会被分配一条唯一的ID,当消息被投递到匹配的队列中,信道会向生产者发出确认消息,并且消息中带上这个Id。确认模式是异步的,生产者可以发送完一条消息后继续发送下一条消息。调用channel的confirmSelect方法开启确认模式
- 普通方式,发送完成之后调用waitForConfirms
- 异步回调模式,addConfirmListener注册回调函数
消息消费的应答模式
- autoAck,如果等于true,会在消息发送过来之后自动响应--队列会将该消息删除,可能会导致消息消费失败了,但是消息已经被删除的情况
- autoAck=false,需要业务逻辑在处理完成之后,调用channel.basicAck做显示的响应
消费者获取消息时,可以指定预取的消息数量
通过channel的basicQos方法设置
rabbitmq 的死信队列相当于为一个队列设置一个备用的队列,在出现以下情况的时候将所谓的死亡信息推送到死亡信息队列
- 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递
requeue=false
- 消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
- 队列超载
具体内容参考:https://my.oschina.net/u/2948566/blog/1626278