消费端的手工 ACK 和 NACK
消费端进行消费的时候,如果由于业务异常导致失败了,返回 NACK 达到最大重试次数,此时我们可以进行日志的记录,然后手动 ACK 回去,最后对这个记录进行补偿。
或者由于服务器宕机等严重问题,导致 ACK 和 NACK 都没有,那我们就需要手工进行 ACK 保障消费端消费成功,再通过补偿机制补偿。
消费端的重回队列
消费端的重回队列是为了对没有处理成功的消息,把消息重新递给 broker。但是在我们的实际生产,一般都会关闭重回队列,
代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下
生产端的代码基本没什么变化
@Slf4j
public class Procuder {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQConfig.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQConfig.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQConfig.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i < 5; i++){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
log.info("生产端发送:{}", msg);
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
接着是消费端的代码
注意看消费端的代码, autoack 一定要设置为 false,要不然不会生效的
@Slf4j
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQConfig.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQConfig.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQConfig.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//使用自定义消费者
//1 手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
log.info("消费端启动成功");
}
}
消费端的具体消费代码:
/**
* 自定义消费者
*/
@Slf4j
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //消费者标签
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------MyConsumer-----consume message----------");
log.info("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//是否为批量的,是否重回队列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
先启动消费端,再启动生产端
注意看消费端的日志,发现按 0-4 消费完后,0 的重回队列了,符合我们的目标
自此,重回队列演示完毕。