JMS如何保证消息的可靠性?
JMS是通过持久化(Persistent)、事务(Transaction)、 签收来(acknowledge)保证的
1、持久化
首先消息的持久化是在消息的生产者中设置的,由消息的生产者告诉消息中间件是否进行消息的持久化,如果生产者端没有设置,默认是持久化的
通过session创建出来的生产者生产的Queue消息为持久性
- 持久化:当服务器宕机,消息依然存在;当服务器再次重启,消息能够被消费者继续消费。
- 非持久化:当服务器宕机,消息不会被保留。
设置通过session创建出来的生产者生产的Queue消息为持久性
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化
生产者代码实现:
public class JmsProducer {
public static final String ACTIVEMQ_URL="tcp://192.168.137.199:61616";
public static final String QUEUE_ANME="queue01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
//2.通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
// 两个参数:事务/签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列或主题topic)
// Destination destination = session.createQueue(QUEUE_ANME);
Queue queue = session.createQueue(QUEUE_ANME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化
// messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化
//6 通过使用messageProducer生产3条消息发送到MQ的队列里面
for (int i = 1; i <=3; i++) {
//7创建消息
// TextMessage textMessage = session.createTextMessage("msg--" + i);
TextMessage textMessage = session.createTextMessage("textMessage msg--" + i);
//8.通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("MQ消息发布成功");
}
}
消费者代码实现:
public class JmsConsumer {
public static final String ACTIVEMQ_URL="tcp://192.168.137.199:61616";
public static final String QUEUE_ANME="queue01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("*****我是2号消费者");
//1.创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
//2.通过连接工厂,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
// 两个参数:事务/签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列或主题topic)
// Destination destination = session.createQueue(QUEUE_ANME);
Queue queue = session.createQueue(QUEUE_ANME);
//5.创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
TextMessage message = (TextMessage) messageConsumer.receive(20000L);
if(null != message){
System.out.println("TX消费者接收到消息:"+message.getText());
// message.acknowledge();
}else {
break;
}
}
// System.in.read();//保证控制台不关闭
messageConsumer.close();
// session.commit();
session.close();
connection.close();
}
}
持久化消息是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
2、事务
消息生产者偏重于事务,开启事务后生产者生产的消息,只有在session执行commit后才会被提交到服务器,不然此次提交记录无效。事务开启的意义在于,如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性。
如果开启事务,事务优先级>签收;
- 在事务性会话中,当一个事务被成功提交则消息被自动签收。
- 如果事务回滚,则消息会被再次传送
- 对于消费者而言,如果开启事务,createSession设置为 ture,消费消息需要commit,否则会出现多次消费的情况
//这边的第一个参数true表示事务开启
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//事务如果是true: 需要commit
session.commit();
3、签收
消费者偏重于签收
签收主要针对于消费者,而且一般在非事务的情况下生效,因为开启事务后,签收与否取决于事务的提交或回滚,与签收设置的方式无关。
签收方式有4种,但平时常用的有两种:
- 自动签收
Session.AUTO_ACKNOWLEDGE;//自动签收
- 手动签收
Session.CLIENT_ACKNOWLEDGE; //手动签收
message.acknowledge(); // 手动签收需要ack