前言
在之前介绍了可靠性投递方案和项目搭建::https://juejin.im/post/5c3f43dae51d45731470a18c
唯一ID:https://juejin.im/post/5c3eb6e7518825253b5ea545
前期的一些准备:https://juejin.im/post/5c3f4bafe51d4551ec60b521
先给出完整代码:
https://github.com/hmilyos/common.git
https://github.com/hmilyos/snowFlakeDemo.git
https://github.com/hmilyos/rabbitmq-common.git available 分支
先来回顾一下我们可靠性投递的流程
流程的示意图如上所示,比如我下单成功了,这时进行 step1,对我的业务数据进行入库,业务数据入库完毕(这里要特别注意一定要保证业务数据入库)再对要发送的消息进行入库,图中采用了两个数据库,可以根据实际业务场景来确定是否采用两个数据库,如果采用了两个数据库,有人可能就像到了采用分布式事务来保证数据的一致性,但是在大型互联网中,基本很少采用事务,都是采用补偿机制。
对业务数据和消息入库完毕就进入 setp2,发送消息到 MQ 服务上,按照正常的流程就是消费者监听到该消息,就根据唯一 id 修改该消息的状态为已消费,并给一个确认应答 ack 到 Listener。如果出现意外情况,消费者未接收到或者 Listener 接收确认时发生网络闪断,接收不到,这时候就需要用到我们的分布式定时任务来从 msg 数据库抓取那些超时了还未被消费的消息,重新发送一遍。重试机制里面要设置重试次数限制,因为一些外部的原因导致一直发送失败的,不能重试太多次,要不然会拖垮整个服务。例如重试三次还是失败的,就把消息的 status 设置成 发送失败,然后通过补偿机制,人工去处理。实际生产中,这种情况还是比较少的,但是你不能没有这个补偿机制,要不然就做不到可靠性了。
下面就让我们用代码来实现这个方案吧。
1. 简单的创建订单接口
@RestController
public class OrderController {
@Autowired
private IMessageService messageService;
@GetMapping("/createOrder")
public ServerResponse createOrder(long userId){
return messageService.createOrder(userId);
}
}
2. IMessageService 接口以及实现类
public interface IMessageService {
@Transactional
ServerResponse createOrder(long userId);
}
@Service
public class MessageServiceImpl implements IMessageService {
private final static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);
@Autowired
private ISnowFlakeService snowFlakeService;
@Autowired
private MessageMapper messageMapper;
@Autowired
private RabbitOrderSender rabbitOrderSender;
@Override
public ServerResponse createOrder(long userId) {
// 首先是针对业务逻辑,进行下单的业务,保存到数据库后
// 业务落库后,再对消息进行落库,
long msgId = snowFlakeService.getSnowFlakeID();
Message message = new Message(msgId, TypeEnum.CREATE_ORDER.getCode(), userId + "创建订单:" + msgId,
0, MSGStatusEnum.SENDING.getCode(), DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
int row = messageMapper.insertSelective(message);
if (row == 0){
throw new CustomException(500, "消息入库异常");
}
// 消息落库后就可以发送消息了
try {
rabbitOrderSender.sendOrder(message);
} catch (Exception e) {
// 因为业务已经落库了
// 所以 即使发送失败也不影响,因为可靠性投递,我回去再次尝试发送消息
log.error("sendOrder mq msg error: ", e);
messageMapper.updataNextRetryTimeForNow(message.getMessageId());
}
return ServerResponse.createBySuccess();
}
}
注意了,我这里是直接拿消息的实体当做业务去落库了,实际上应该是先对订单实体落库,然后再对消息实体落库,最后发送消息!
3. 发送消息的具体实现 RabbitOrderSender
具体代码如下,注释也写得很详细了:
@Component
public class RabbitOrderSender {
private static final Logger log = LoggerFactory.getLogger(RabbitOrderSender.class);
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageMapper messageMapper;
@Value("${order.rabbitmq.listener.order.exchange.name}")
private String exchangeName;
@Value("${order.rabbitmq.send.create.key}")
private String routingKey;
//发送消息方法调用: 构建自定义对象消息
public void sendOrder(Message message) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData(message.getMessageId() + "");
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);
// throw new CustomException("--test--");
}
//回调函数: confirm确认
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("correlationData: {}", correlationData);
String messageId = correlationData.getId();
if(ack){
//如果confirm返回成功 则进行更新
messageMapper.changeMessageStatus(Long.parseLong(messageId), MSGStatusEnum.SEND_SUCCESS.getCode());
} else {
//失败则进行具体的后续操作:重试 或者补偿等手段
log.error("消息发送失败,需要进行异常处理...");
messageMapper.updataNextRetryTimeForNow(Long.parseLong(messageId));
}
}
};
//回调函数: return返回, 这里是预防消息不可达的情况,比如 MQ 里面没有对应的 exchange、queue 等情况,
// 如果消息真的不可达,那么就要根据你实际的业务去做对应处理,比如是直接落库,记录补偿,还是放到死信队列里面,之后再进行落库
// 这里脱开实际业务场景,不大好描述
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
String replyText, String exchange, String routingKey) {
log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
exchange, routingKey, replyCode, replyText);
}
};
}
4. 消费端代码 RabbitOrderReceiver
@Component
public class RabbitOrderReceiver {
private static final Logger log = LoggerFactory.getLogger(RabbitOrderReceiver.class);
@Autowired
private MessageMapper messageMapper;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${order.rabbitmq.listener.order.queue.name}",
durable="${order.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${order.rabbitmq.listener.order.exchange.name}",
durable="${order.rabbitmq.listener.order.exchange.durable}",
type= "${order.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${order.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${order.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload com.hmily.rabbitmq.rabbitmqcommon.entity.Message msg,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
log.info("-----------------RabbitOrderReceiver---------------------");
log.info("消费端 order msg: {} ", msg.toString());
msg.setStatus(MSGStatusEnum.PROCESSING_IN.getCode());
int row = messageMapper.updateByPrimaryKeySelective(msg);
if (row != 0) {
Thread.sleep(2000L);
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
// 接着去执行你对应的业务逻辑,
// 注意,这是可靠性投递,执行业务逻辑一定要做幂等性
}
}
}
注意:我这里并没有做幂等性去重,实际业务时必须做幂等性!!
6.我们的定时重试机制 SendMessageTask
@Component
public class SendMessageTask {
private static final Logger log = LoggerFactory.getLogger(SendMessageTask.class);
@Autowired
private MessageMapper messageMapper;
@Autowired
private IMessageFailedService messageFailedService;
@Autowired
private RabbitOrderSender rabbitOrderSender;
@Scheduled(initialDelay = 3000, fixedDelay = 10000)
public void reSend(){
log.info("---------------定时任务开始---------------");
List<Message> msgs = messageMapper.getNotProcessingInByType(TypeEnum.CREATE_ORDER.getCode(), null,
new int[]{MSGStatusEnum.SENDING.getCode()});
msgs.forEach(msg -> {
if (msg.getTryCount() >= Constants.MAX_TRY_COUNT) {
// 如果重试次数大于最大重试次数就不再重试,记录失败
msg.setStatus(MSGStatusEnum.SEND_FAILURE.getCode());
msg.setUpdateTime(new Date());
messageMapper.updateByPrimaryKeySelective(msg);
MessageFailed failed = new MessageFailed(msg.getMessageId(), "消息发送失败", "已达到最大重试次数,但是还是发送失败");
messageFailedService.add(failed);
} else {
// 未达到最大重试次数,可以进行重发消息
// 先改一下消息记录,保存好再发送消息
msg.setNextRetry(DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
int row = messageMapper.updateTryCount(msg);
try {
rabbitOrderSender.sendOrder(msg);
} catch (Exception e) {
log.error("sendOrder mq msg error: ", e);
messageMapper.updataNextRetryTimeForNow(msg.getMessageId());
}
}
});
}
}
至此,我们的可靠性投递就完成了。