文章内容:
- 第1-4章:发送方确认机制(Publisher Confirms),推荐使用异步确认(ConfirmCallback接口)。
- 第5章:Alternate Exchange。
- 第6章:实现ReturnCallback接口,用来接收没有Queue的退回消息。
关于如何确保发送方数据安全的问题,官网也作了详细的解释:
- Publisher-side data safety topics (connection recovery, publisher confirms) :https://www.rabbitmq.com/publishers.html#data-safety
- Publisher Confirms:https://www.rabbitmq.com/confirms.html#publisher-confirms
Using standard AMQP 0-9-1, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced.
AMQP协议提供的一个事务机制,虽然还能确保消息正确送达,但比较笨重(性能没有很好),在此基础上引入了发送方确认机制。
1. 那么如何实现发送方确认机制?
- 首先要将信道(channel)设成confirm模式(事务信道不能设成confirm模式,而conform模式的信道不具有事务性)。
- 一旦一个channel设成confirm模式后,Broker和其Producer都开始计数(从1开始计数)。
- Broker在收到消息后进行消息确认——在这个信道中给生产者发送一个确认(basic.act)——消息确认包含内容:delivery-tag(即计数)和multiple field。
2. 发送方确认机制(publisher confirm)有三种方式:
a. 串行confirm模式(Publishing Messages Individually)
b. 批量confirm模式(Publishing Messages in Batches)
c. 异步confirm模式(Handling Publisher Confirms Asynchronously)
3. 如何用代码实现
官网文章参考:https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
官网基于原始的amqp-client.jar写的代码:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/PublisherConfirms.java
3.1 首先是串行模式(Publishing Messages Individually)
- 配置:
publisher-confirm-type默认是NONE,也就是确认机制是disabled。这里我们要把它set为SIMPLE模式。
Publisher确认机制的方式是simple,意味着Producer发布一条消息后,需要同步等待Broker的basic.act,官网例子用的是amqp-client.jar,我这里用的是Spring Boot集成RabbitMQ后的方式。
spring:
rabbitmq:
port: 5672
host: localhost
virtual-host: spring-boot-test
publisher-confirm-type: simple
- Producer端代码:
Producer发送一条消息,然后使用方法waitForConfirms(ms)等待,这个方法会阻塞等待到Broker的消息确认。如果在规定时间内没有确认,就会报错。
- boolean waitForConfirms(long timeout) throws AmqpException;
- void waitForConfirmsOrDie(long timeout) throws AmqpException;
上述方法区别:http://www.yayihouse.com/yayishuwu/chapter/2341
值得一提的是,如果Producer向一个不存在的exchange中发送消息,那么在执行rabbitOperations. waitForConfirms的时候不会抛AmqpTimeoutException错误,而是会抛出异常:com.rabbitmq.client.ShutdownSignalException: channel error;
@Slf4j
@SpringBootTest
public class ProducerConfirmServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void singleConfirm() {
try {
rabbitTemplate.invoke(rabbitOperations -> {
rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "hello, i am direct message!");
// 等待Broker确认时间:1ms,超过1ms报错
return rabbitOperations.waitForConfirms(1);
});
} catch (AmqpTimeoutException e) {
log.error("met timeout exception: ", e);
}
}
}
也可以用waitForConfirmsOrDie(ms)来确认:
rabbitTemplate.invoke(rabbitOperations -> {
rabbitOperations.convertAndSend("direct.exchange", "direct-routing-key", "hello, i am direct message!");
rabbitOperations.waitForConfirmsOrDie(100000);
return true;
});
由于是发布者确认机制(发生在Publisher和Broker之间),消费端的代码没有改动,这里就不贴了,详细看 【RabbitMQ的那点事】与Spring boot集成:https://www.jianshu.com/p/4a21a7fce14c
上述方法测试结果会报错(1ms太短了,Broker来不及确认):
也可以通过Thread name=main看出是同步(阻塞等待)的,这里始终是主线程在执行。另外虽然Broker确认失败了,因为Broker其实是好的,只是我们设的等待时间太短了,所以消息依然是发送出去了。
2022-05-07 17:43:30.104 ERROR 63048 --- [ main] ProducerConfirmServiceTest : met timeout exception:
org.springframework.amqp.AmqpTimeoutException: java.util.concurrent.TimeoutException at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:73) ~[spring-rabbit-2.3.12.jar:2.3.12] at org.springframework.amqp.rabbit.core.RabbitTemplate.waitForConfirms(RabbitTemplate.java:2320) ~[spring-rabbit-2.3.12.jar:2.3.12]
...
3.2 其次是批量confirm确认(Publishing Messages in Batches)
在#3.1示例是单条publish后Producer就开始等待Broker的确认,当然我们也可以在发布一定数量的消息后再开始确认,比如100条。
这样做的好处是可以提高吞吐量。缺点是如果收不到Broker的确认,我们不知道这一批中哪一个消息开始出了问题,所以可能需要将这100条都重新发送,可能会造成重复发的情况。
@Slf4j
@SpringBootTest
public class ProducerConfirmServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void batchConfirm() {
try {
rabbitTemplate.invoke(rabbitOperations -> {
for (int i = 0; i < 10; i ++) {
rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "message - " + i);
}
return rabbitOperations.waitForConfirms(10000);
});
} catch (AmqpTimeoutException e) {
log.error("met timeout exception: ", e);
}
}
}
3.3 最后是异步confirm确认(Handling Publisher Confirms Asynchronously)
同步确认的配置是publisher-confirm-type: simple
publisher-confirm-type: 还有另外一个配置项即:correlated,如果使用该配置项,说明发送方也需要消息确认,并且可以通过CorrelationData来回传额外的信息。这个分类方法与串行或批量无关,只是confirm能否回传数据的分类方式。
以下是示例:
- 配置:publisher-confirm-type: correlated
spring:
rabbitmq:
port: 5672
host: localhost
virtual-host: spring-boot-test
publisher-confirm-type: correlated
- 新建一个ConfirmCallBack类,需要实现RabbitTemplate.ConfirmCallback接口 ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause:
- correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
- ack:消息投递到broker 的状态,true表示成功。
- cause:表示投递失败的原因。
@Slf4j
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {
log.info("MsgSendConfirmCallBack , 回调id: {}", correlationData);
if(ack) {
log.info("消息发送成功");
} else {
log.info("消息发送失败: {}", cause);
}
}
}
- Producer类:
在发送消息前需要先set一个ConfirmCallback,发送消息的时候可以带上CorrelationData,在callback中可以接收该data:
@Slf4j
@SpringBootTest
public class ProducerConfirmServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void confirmAsync() {
try {
rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
CorrelationData correlationData = new CorrelationData();
log.info("开始发送消息");
correlationData.setId("100");
rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "hello, i am direct message!", correlationData);
} catch (AmqpTimeoutException e) {
log.error("met timeout exception: ", e);
}
}
}
测试结果:可以看出回调方法用的是自己的线程,即异步。并且能收到发送时带的CorrelationData类:
2022-05-07 18:41:32.112 INFO 75420 --- [ main] ProducerConfirmServiceTest : 开始发送消息
2022-05-07 18:41:32.133 INFO 75420 --- [nectionFactory1] MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=100]
2022-05-07 18:41:32.135 INFO 75420 --- [nectionFactory1] MsgSendConfirmCallBack : 消息发送成功
也可以发送到错误的exchange上来测试发送callback:
rabbitTemplate.convertAndSend("wrong.exchange", "direct-routing-key", "hello, i am direct message!", correlationData);
测试结果:callback会检测到错误,也就是说使用ConfirmCallBack无论消息是否正确送到Broker,都会进入该回调函数类中。
2022-05-07 19:03:37.792 INFO 80121 --- [ main] ProducerConfirmServiceTest : 开始发送消息
2022-05-07 19:03:37.802 ERROR 80121 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'wrong.exchange' in vhost 'spring-boot-test', class-id=60, method-id=40)
2022-05-07 19:03:37.804 INFO 80121 --- [nectionFactory2] MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=100]
2022-05-07 19:03:37.806 INFO 80121 --- [nectionFactory2] MsgSendConfirmCallBack : 消息发送失败: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'wrong.exchange' in vhost 'spring-boot-test', class-id=60, method-id=40)
4. 总结
发送方确认机制是保证消息可靠环节的第1步。三种方式总结如下:
a. 发送消息串行(逐条)确认——同步等待,简单,但会限制吞吐量。
b. 批量发送消息后再确认——同步等待,简单,能提高吞吐量,但极端情况下会造成消息的重复发送(无法精确定位到单条错误消息)。
c. 发送消息后异步等待确认,效率高,但需要正确的实现接口方法。
以下是官方的例子测评结果(官网代码在#3一开始有贴),发送消息的总条数都是50000条,Brokder和producer都在同一台机器上:
测试用例 | 花费时间 |
---|---|
串行确认(同步逐条确认) | 5,549 ms |
批量(按100条一批次,同步确认) | 2,331 ms |
异步确认 | 4,054 ms |
生产环境往往Broker是单独的机器,所以官网又做了以下的测试,同样是发送50000条消息,但这次是远程发送:
测试用例 | 花费时间 |
---|---|
串行确认(同步逐条确认) | 231,541 ms |
批量(按100条一批次,同步确认) | 7,232 ms |
异步确认 | 6,332 ms |
可以看到逐条发送后确认的效率是惊人的低。批量确认和异步确认的效率差不太多。批量确认的代码容易实现,而异步确认的实现会比较复杂一些。
结束了吗?还没有!!!
上述串行、批量确认以及异步确认,都是为了解决:让Producer知道信息有没有成功的发送到Broker的Exchange交换机上,但如果消息从Exchange 到 Queue投递失败(或者Exchange没有匹配的Queue的话),那么消息也会丢失,这时候要怎么办?
- 当发布者发布消息到Exchange上,但Exchange没有绑定的Queue时,默认情况下发布的消息会丢掉。当然这时候我们也可以启用Alternate Exchange,将没有目的地的消息统一转到这个Alternative Exchange上来。
- 或者在发送消息的时候,将参数mandatory置为true,那么message就会退回到Producer方,Producer方需要实现ReturnCallback接口(https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ReturnCallback.html),也能将退回的消息取到。
针对上述两种方式,具体来演示:
5. Alternate Exchange
关于Alternate Exchange, 参见官网:https://www.rabbitmq.com/ae.html
以下是具体思路:
- 首先创建一个Exchange(Fanout类型),叫backup.exchange。
- 再创建一个Queue,叫noBinding.queue,并绑定到backup.exchange上(因为该exchange是fanout type,所以routingKey为空)。
- 在创建正常要使用的Exchange时(比如叫direct.exchange),可以将backup.exchange作为参数名为alternate-exchange的值,传入direct.exchange中。
以下是代码示例:
先是Alternate Exchange的创建:
@Configuration
public class AlternateExchangeConfig {
@Bean
public Queue noRoutedQueue() {
return new Queue("noBinding.queue", true);
}
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange("backup.exchange");
}
@Bean
public Binding noBinding(Queue noRoutedQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(noRoutedQueue).to(backupExchange);
}
@RabbitListener(queues = "noBinding.queue")
public void listen(String in) {
System.out.println("[noBinding.queue]: " + in);
}
}
再是正常业务处理的Exchange
可以看到这里有新加arguments,key是alternate-exchange, value是上述创建的backup.exchange:
@Configuration
public class DirectExchangeConfig {
@Bean
public Queue directqueue() {
return new Queue("direct.queue", true);
}
@Bean
public DirectExchange directExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", "backup.exchange");
DirectExchange directExchange = new DirectExchange("direct.exchange", true, false, arguments);
return directExchange;
}
@Bean
public Binding directBinding(Queue directqueue, DirectExchange directExchange) {
return BindingBuilder.bind(directqueue).to(directExchange).with("direct-routing-key");
}
@RabbitListener(queues = "direct.queue")
public void listen(String in) {
System.out.println("Direct Message Listener: " + in);
}
}
测试
将消息发送到direct.exchange,但是routingKey是错误的,也就是这个消息没有目的地:
@Slf4j
@SpringBootTest
public class ProducerServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessageToDirectExchangeWrongly() {
rabbitTemplate.convertAndSend("direct.exchange", "wrong.routing-key", "hello, i am direct message!");
}
}
测试印机结果:[noBinding.queue]: hello, i am direct message!
也就是由于routingKey是错的,消息并没有从direct.exchange正确的发送到direct.queue上,而是转发到了backup.exchange上,通过广播模式被noBinding.queue监听到。
6. 实现ReturnCallback接口来接收退回的消息
在rabbitmq原生的API中,需要在发送的时候将参数mandatory置为true,然后通过实现ReturnCallback接口来接收退回的消息。
如果是和Spring Boot结合,以下是示例:
配置:
首先需要先设置publisher-returns = true
spring:
rabbitmq:
port: 5672
host: localhost
virtual-host: spring-boot-test
publisher-returns: true
Producer类
尝试往错误的routingKey中发消息,即topic.exchange通过a.wrong,找不到正确的Queue,由于publisher-returns为true,所以消息就被ReturnCallback捕捉到了。
在高版本的RabbitTemplate中的ReturnCallback是@Deprecated,理由是提倡我们使用lamda表达式去实现,取而代之的是FunctionalInterface ReturnsCallback,这个接口其实就是ReturnCallback的子接口。
所以我们不需要单独创建类,而是在rabbitTemplate setReturnsCallback的时候直接使用lamda表达式,一般里面的实现可以是发送邮件等。
@Slf4j
@SpringBootTest
public class ReturnCallbackServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void returnCallback() {
rabbitTemplate.setReturnsCallback((message) -> {
log.info("getMessage: {}", message.getMessage());
log.info("getRoutingKey: {}", message.getRoutingKey());
log.info("getExchange: {}", message.getExchange());
log.info("getReplyCode: {}", message.getReplyCode());
log.info("getReplyText: {}", message.getReplyText());
});
rabbitTemplate.convertAndSend("topic.exchange", "a.wrong", "important message!");
log.info("Finished for sending message...");
}
}
测试结果:
2022-05-10 12:46:50.280 INFO 58740 --- [ main] ReturnCallbackServiceTest : Finished for sending message...
2022-05-10 12:46:50.282 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getMessage: (Body:'important message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2022-05-10 12:46:50.284 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getRoutingKey: a.wrong
2022-05-10 12:46:50.285 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getExchange: topic.exchange
2022-05-10 12:46:50.285 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getReplyCode: 312
2022-05-10 12:46:50.285 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getReplyText: NO_ROUTE
- 关于Alternate Exchange和ReturnCallback的优先级:如果同时设置了,那么Alternate Exchange的优先级更高,也就是退回的消息会首先转到设置的Alternate Exchange中,从而不会调用ReturnCallback了。
- 如果是延迟队列(delayed exchange),那么ReturnCallback会一直报ReplyCode=312的错,也就是延迟队列不适合使用ReturnCallback功能。