rabbitmq 可靠性投递(四)之实现可靠性投递

前言

在之前介绍了可靠性投递方案和项目搭建::https://juejin.im/post/5c3f43dae51d45731470a18c

唯一ID:https://juejin.im/post/5c3eb6e7518825253b5ea545

前期的一些准备:https://juejin.im/post/5c3f4bafe51d4551ec60b521

先给出完整代码:
https://github.com/hmilyos/common.git 
https://github.com/hmilyos/snowFlakeDemo.git
https://github.com/hmilyos/rabbitmq-common.git       available 分支

先来回顾一下我们可靠性投递的流程


image

流程的示意图如上所示,比如我下单成功了,这时进行 step1,对我的业务数据进行入库,业务数据入库完毕(这里要特别注意一定要保证业务数据入库)再对要发送的消息进行入库,图中采用了两个数据库,可以根据实际业务场景来确定是否采用两个数据库,如果采用了两个数据库,有人可能就像到了采用分布式事务来保证数据的一致性,但是在大型互联网中,基本很少采用事务,都是采用补偿机制。
对业务数据和消息入库完毕就进入 setp2,发送消息到 MQ 服务上,按照正常的流程就是消费者监听到该消息,就根据唯一 id 修改该消息的状态为已消费,并给一个确认应答 ack 到 Listener。如果出现意外情况,消费者未接收到或者 Listener 接收确认时发生网络闪断,接收不到,这时候就需要用到我们的分布式定时任务来从 msg 数据库抓取那些超时了还未被消费的消息,重新发送一遍。重试机制里面要设置重试次数限制,因为一些外部的原因导致一直发送失败的,不能重试太多次,要不然会拖垮整个服务。例如重试三次还是失败的,就把消息的 status 设置成 发送失败,然后通过补偿机制,人工去处理。实际生产中,这种情况还是比较少的,但是你不能没有这个补偿机制,要不然就做不到可靠性了。

下面就让我们用代码来实现这个方案吧。

1. 简单的创建订单接口
@RestController
public class OrderController {

    @Autowired
    private IMessageService messageService;

    @GetMapping("/createOrder")
    public ServerResponse createOrder(long userId){
        return messageService.createOrder(userId);
    }
}
2. IMessageService 接口以及实现类
public interface IMessageService {

    @Transactional
    ServerResponse createOrder(long userId);
    
}
@Service
public class MessageServiceImpl implements IMessageService {

    private final  static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);

    @Autowired
    private ISnowFlakeService snowFlakeService;

    @Autowired
    private MessageMapper messageMapper;
    @Autowired
    private RabbitOrderSender rabbitOrderSender;

    @Override
    public ServerResponse createOrder(long userId) {
//        首先是针对业务逻辑,进行下单的业务,保存到数据库后
//        业务落库后,再对消息进行落库,
        long msgId = snowFlakeService.getSnowFlakeID();
        Message message = new Message(msgId, TypeEnum.CREATE_ORDER.getCode(), userId + "创建订单:" + msgId,
                0, MSGStatusEnum.SENDING.getCode(), DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
        int row = messageMapper.insertSelective(message);
        if (row == 0){
            throw new CustomException(500, "消息入库异常");
        }
//        消息落库后就可以发送消息了
        try {
            rabbitOrderSender.sendOrder(message);
        } catch (Exception e) {
//          因为业务已经落库了
//          所以 即使发送失败也不影响,因为可靠性投递,我回去再次尝试发送消息
            log.error("sendOrder mq msg error: ", e);
            messageMapper.updataNextRetryTimeForNow(message.getMessageId());
        }
        return ServerResponse.createBySuccess();
    }

}

注意了,我这里是直接拿消息的实体当做业务去落库了,实际上应该是先对订单实体落库,然后再对消息实体落库,最后发送消息!

3. 发送消息的具体实现 RabbitOrderSender

具体代码如下,注释也写得很详细了:


@Component
public class RabbitOrderSender {

    private static final Logger log = LoggerFactory.getLogger(RabbitOrderSender.class);

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MessageMapper messageMapper;

    @Value("${order.rabbitmq.listener.order.exchange.name}")
    private String exchangeName;

    @Value("${order.rabbitmq.send.create.key}")
    private String routingKey;

    //发送消息方法调用: 构建自定义对象消息
    public void sendOrder(Message message) throws Exception {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        CorrelationData correlationData = new CorrelationData(message.getMessageId() + "");
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);
//        throw new CustomException("--test--");
    }

    //回调函数: confirm确认
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("correlationData: {}", correlationData);
            String messageId = correlationData.getId();
            if(ack){
                //如果confirm返回成功 则进行更新
                messageMapper.changeMessageStatus(Long.parseLong(messageId), MSGStatusEnum.SEND_SUCCESS.getCode());
            } else {
                //失败则进行具体的后续操作:重试 或者补偿等手段
                log.error("消息发送失败,需要进行异常处理...");
                messageMapper.updataNextRetryTimeForNow(Long.parseLong(messageId));
            }
        }
    };

    //回调函数: return返回, 这里是预防消息不可达的情况,比如 MQ 里面没有对应的 exchange、queue 等情况,
//    如果消息真的不可达,那么就要根据你实际的业务去做对应处理,比如是直接落库,记录补偿,还是放到死信队列里面,之后再进行落库
//    这里脱开实际业务场景,不大好描述
    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
                                    String replyText, String exchange, String routingKey) {
            log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
                    exchange, routingKey, replyCode, replyText);
        }
    };
}
4. 消费端代码 RabbitOrderReceiver
@Component
public class RabbitOrderReceiver {

    private static final Logger log = LoggerFactory.getLogger(RabbitOrderReceiver.class);

    @Autowired
    private MessageMapper messageMapper;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${order.rabbitmq.listener.order.queue.name}",
                    durable="${order.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${order.rabbitmq.listener.order.exchange.name}",
                    durable="${order.rabbitmq.listener.order.exchange.durable}",
                    type= "${order.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "${order.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${order.rabbitmq.listener.order.key}"
    )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload com.hmily.rabbitmq.rabbitmqcommon.entity.Message msg,
                               Channel channel,
                               @Headers Map<String, Object> headers) throws Exception {
        log.info("-----------------RabbitOrderReceiver---------------------");
        log.info("消费端 order msg: {} ",  msg.toString());
        msg.setStatus(MSGStatusEnum.PROCESSING_IN.getCode());
        int row = messageMapper.updateByPrimaryKeySelective(msg);
        if (row != 0) {
            Thread.sleep(2000L);
            Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            //手工ACK
            channel.basicAck(deliveryTag, false);
//            接着去执行你对应的业务逻辑,
//            注意,这是可靠性投递,执行业务逻辑一定要做幂等性
        }

    }
}
注意:我这里并没有做幂等性去重,实际业务时必须做幂等性!!

6.我们的定时重试机制 SendMessageTask
@Component
public class SendMessageTask {
    private static final Logger log = LoggerFactory.getLogger(SendMessageTask.class);

    @Autowired
    private MessageMapper messageMapper;
    @Autowired
    private IMessageFailedService messageFailedService;

    @Autowired
    private RabbitOrderSender rabbitOrderSender;

    @Scheduled(initialDelay = 3000, fixedDelay = 10000)
    public void reSend(){
        log.info("---------------定时任务开始---------------");
        List<Message> msgs = messageMapper.getNotProcessingInByType(TypeEnum.CREATE_ORDER.getCode(), null, 
                new int[]{MSGStatusEnum.SENDING.getCode()});
        msgs.forEach(msg -> {
            if (msg.getTryCount() >= Constants.MAX_TRY_COUNT) {
//              如果重试次数大于最大重试次数就不再重试,记录失败
                msg.setStatus(MSGStatusEnum.SEND_FAILURE.getCode());
                msg.setUpdateTime(new Date());
                messageMapper.updateByPrimaryKeySelective(msg);
                MessageFailed failed = new MessageFailed(msg.getMessageId(), "消息发送失败", "已达到最大重试次数,但是还是发送失败");
                messageFailedService.add(failed);
            } else {
//              未达到最大重试次数,可以进行重发消息
//              先改一下消息记录,保存好再发送消息
                msg.setNextRetry(DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
                int row = messageMapper.updateTryCount(msg);
                try {
                    rabbitOrderSender.sendOrder(msg);
                } catch (Exception e) {
                    log.error("sendOrder mq msg error: ", e);
                    messageMapper.updataNextRetryTimeForNow(msg.getMessageId());
                }
            }
        });
    }
}

至此,我们的可靠性投递就完成了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,013评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,205评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,370评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,168评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,153评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,954评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,271评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,916评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,382评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,877评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,989评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,624评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,209评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,199评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,418评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,401评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,700评论 2 345

推荐阅读更多精彩内容