架构师方案-业务角度下保证消息的可靠性的投递

前言:

消息队列的主要作用是实现系统间的解耦、异步处理和削峰填谷。 由于消息队列的异步使用特性,天然的会存在一定概率消息丢失的情况。


image.png

方案1:消息落库

消息落库重发是基于MQ的confirm机制,在消息发送失败后自动重发。

image.png
  • Step 1: 首先把消息信息(业务数据)存储到数据库中,紧接着,我们再把这个消息记录也存储到一张消息记录表里(或者另外一个同源数据库的消息记录表)

  • Step 2:发送消息到MQ Broker节点(采用confirm方式发送,会有异步的返回结果)

  • Step 3、4:生产者端接受MQ Broker节点返回的Confirm确认消息结果,然后进行更新消息记录表里的消息状态。比如默认Status = 0 当收到消息确认成功后,更新为1即可!

  • Step 5:但是在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取出来)

  • Step 6:接下来我们把中间状态的消息进行重新投递 retry send,继续发送消息到MQ ,当然也可能有多种原因导致发送失败

  • Step 7:我们可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)。

表结构和代码示例


CREATE TABLE IF NOT EXISTS `message_log`
(
    `message_id`      varchar(30) NOT NULL COMMENT '消息唯一ID',
    `message`         varchar(1000)  DEFAULT '' COMMENT '消息内容',
    `business_id`     varchar(40) NOT NULL COMMENT '业务id,比如记录订单号',
    `try_count`       int(4)       DEFAULT '0' COMMENT '重试次数',
    `status`          tinyint(2)   DEFAULT '0' COMMENT ' 消息投递状态  0:投递中 1:投递成功   2:投递失败',
    `next_retry_time` datetime     DEFAULT CURRENT_TIMESTAMP COMMENT '下一次投递时间',
    `create_time`     datetime     DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `update_time`     datetime     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后一次更新时间',
    PRIMARY KEY (`message_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;
创建订单方法
@Service
@RequiredArgsConstructor
public class OrderService {
        
        private final OrderMapper orderMapper;
        
        private final MessageLogMapper messageLogMapper;

        private final RocketMQProducer rocketMQProducer;
        
        
        //创建订单
        public void createOrder(Order order) {
                //插入业务数据
                orderMapper.insert(order);
                //插入消息记录表数据
                MessageLog messageLog = new MessageLog();
                //消息唯一ID
                messageLog.setMessageId(messageId);
                //保存消息整体
                messageLog.setMessage(JSONObject.toJSONString(order));
                //设置消息状态为0 表示发送中
                messageLog.setStatus(0);
                //设置下一次执行时间
                messageLog.setNextRetryTime(nextRetryTime);
                messageLogMapper.insert(brokerMessageLog);
                
                //发送消息
                rocketMQProducer.sendOrder(order);

        }
}
消息生产者
@Component
public class RocketMQProducer {

    public void sendOrder(Order order) {
        //1.创建消息
        Message message = new Message("test_quick_topic",// 主题
                "TagA",// 标签
                "KeyA",// 用户自定义的key,唯一的标识
                FastJsonConvertUtil.convertObjectToJSON(order).getBytes()); //消息内容实体(byte[])

        try {
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
             
                    //如果confirm返回成功 则进行更新
                    messageLogMapper.changeMessageLogStatus();
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                    //失败则进行具体的后续操作:重试 或者补偿等手段
                   System.err.println("-----------异常处理-----------");
                }
            });
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
定时任务

@Component
public class RetryMessageTasker {

    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    public void reSend() {
        System.out.println("----------------定时任务开始----------------");
        //pull status = 0 and timeout message 
        List<MessageLog> list = getNeedReSendMsgList();
        for (MessageLog messageLog : list) {
            if (messageLog.getTryCount() > maxTryCount) {
                //update fail message 
                continue;
            }
        
            //更新try_count
            // resend
            try {
                sendOrder(getMessage());
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("-----------异常处理-----------");
            }
            
        }

    }
}
该方案只能保证消息从生产者到MQ之间的可靠性投递,解决办法:

方式1. 在消息表中新增 消费成功状态,下游消费者变更消费状态(要考虑多个业务消费的情况)
方式2. 使用业务正确性校验平台BCP检查上下游业务数据是否一致,进行修复

方案2:二次确认检测

二次确认检测是基于延时投递机制实现的,主要目的是为了减少数据库操作,提高并发量。

image.png
  • Step 1:先将业务数据进行入库,然后上游服务将消息M1发送出去

  • Step 2:在发送消息M1之后,紧接着生产端再次发送一条延迟消息(Second Send Delay Check),即延迟检查投递消息M3

  • Step 3:消费端去监听指定队列,将收到的消息进行处理

  • Step 4:处理完成之后,发送一个confirm消息M2,也就是回送响应,但是这里响应不是正常的ACK,而是重新生成一条消息,投递到MQ中

  • Step 5:下游Callback Check Service是一个单独的服务,其实它扮演了方案一的存储消息的DB角色,它通过MQ去监听下游服务发送的confirm消息M2,如果下游Callback Check Service收到下游服务的confirm消息M2,那么就对消息做持久化存储,即将消息持久化到DB中

  • Step 6:10分钟之后MQ Server推送了延迟消息发送M3

  • Step 7:下游Callback Check Service收到延迟消息发送M3后,Check消息后去检查DB中是否存在消息M2,如果存在,则不需要做任何处理,如果不存在或者消费失败了,那么下游Callback Check Service就需要主动发起RPC通信给上游服务,上游服务收到信息后就会重新查询业务消息然后将消息M1发送出去

该方案能够保证消息从生成者端到消费者的可靠性投递,消费者都能消费到,生产者也就自然而然是可靠性的投递。

方案对比

方案 优点 缺点
消息落库 实现简单 发送消息前需要2次DB操作,影响并发性能
二次确认检测 减少了数据库操作,提高并发量 不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是使用定时任务去、BCP或人工去做补偿了,

参考:

阿神-RabbitMQ消息可靠性投递解决方案
美团业务正确性校验平台 BCP的设计与实践

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容