RocketMQ源码之长轮询实现

Push or Pull

MQ中消息传递的模式有Push和Pull两种。
Pull: 消费者主动从Broker拉取
Push: Broker主动推送给消费者

在RocketMQ虽然有对于消费者有DefaultMQPullConsumer和DefaultMQPushConsumer两个Api可供选择,但是底层其实都使用PullAPIWrapper这个类进行消息拉取,也就是说,RocketMQ使用的消费传递模型是Pull模型。

为什么pull?

push的最大好处的实时,但是也有以下缺点

  1. 在Broker端需要维护Consumer的状态,不利于Broker去支持大量的Consumer的场景
  2. Consumer的消费速度是不一致的,由Broker进行推送难以处理不同的Consumer的状况
  3. Broker难以处理Consumer无法消费消息的情况(Broker无法确定Consumer的故障是短暂的还是永久的)
  4. 大量的推送消息会加重Consumer的负载或者冲垮Consumer

pull很好的解决了上面的问题,但是也丢失了实时性。为了保证实时,我们可以把拉取消息的间隔设置的短一点,但这也带来了一个另外一个问题,在没有消息的时候时候会有大量pull请求,为了解决这个问题,就采用了本文讲解的长轮询技术。

什么是长轮询

轮询是以固定间隔请求服务器,它不在乎这次请求是否能拉取到消息。而长轮询,它请求的服务端,会等待一会儿时间,然后将等待时间内的消息返回。如果超时了,那么也返回空。有效的避免了无效的请求。

但是对于每次都能拉取到消息的情况下,长轮询也就退化成了轮询。

能不能在优化?

还有一种动态轮询的设计,我们的消息一般都是存在缓存区里的,比如RocketMQ存在processqueue里面,因此请求服务端的时候,我们可以把缓冲区剩余的空间告诉服务端,服务端不会返回确认拉取结束的Response,他会根据缓冲区剩余空间先把对应数量的消息分次返回,最后返回确认结束的Response。

在RocketMQ中采用了长轮询模式,动态轮询虽然好,但是也需要考虑一些极端场景的处理,比如丢包。

在H5与服务器的交互模式中,可以使用websocket协议来优化长轮询。websocket是一种基于tcp的全双工协议。

RocketMQ中的设计

从这里开始,建议阅读过一部分RocketMQ源码的读者消化

本文关注于长轮询拉取消息,就直接从DefaultMQPullConsumer和DefaultMQPushConsumer中用于拉取消息的PullAPIWrapper开始讲起。消息消费可以关注我后续文章。

PullAPIWrapper

PullAPIWrapper的pullKernelImpl方法内组装了拉取消息的逻辑

public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //获取broker
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
        //如果查找不到broker 更新一下 再次获取
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }

        //找到broker的情况下
        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            //如果拉取消息的是slave broker,不进行消息确认
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }

            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            //
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }

            // remote invoke
            // 这边根据communicationMode不同,有不同的拉取模式
            // 如果是异步 pullResult直接返回null
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

大致逻辑如下

  1. 通过mq的brokerName获取broker地址
  2. 获取broker地址,更新下路由信息,再次获取
  3. 构造requestHeader,通过this.mQClientFactory.getMQClientAPIImpl().pullMessage发送到broker
  4. 返回pullResult

在与broker长轮询交互中,有几个参数比较重要,我们一一介绍。

第一个参数是requestHeader中的sysFlag,它作为pullKernelImpl的参数传入,它的构造代码如下

int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, 
            true, 
            subExpression != null, 
            classFilter 
        );
        
public static int buildSysFlag(
    final boolean commitOffset, //是否确认消息
    final boolean suspend, //是否长轮询
    final boolean subscription, //是否过滤消息
    final boolean classFilter//是否类过滤
    )

sysFlag的第二个bit位会告诉broker是否应该长轮询

第二个和长轮询有关的参数是brokerSuspendMaxTimeMillis,长轮询不可能无限期等待下去,因此需要传递这个长轮询时间给到broker,如果超过这个时间还没有消息到达,那么直接返回空的Response。

第三个重要的参数是timeoutMillis,broker在长轮询的时候,客户端也需要阻塞等待结果,单也不能无限制等待下去,如果超过timeoutMillis还没收到返回,那么我本地也需要做对应处理,。

brokerSuspendMaxTimeMillis会小于timeoutMillis,这个原因大家都懂的吧。

怕大家不理解,讲下在DefaultMQPullConsumerImpl的pullSyncImpl中如何使用到pullKernelImpl方法。

pullSyncImpl用来同步拉取消息,它支持选择是否使用长轮询模式。其他用到pullKernelImpl的地方,使用方式大同小异。

    //同步拉取消息
    private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
        long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();

        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }

        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }

        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }

        this.subscriptionAutomatically(mq.getTopic());

        //第一个参数为false 代表 pull不会顺带确认消息
        //第二个参数 block代表是否开启长轮询
        //第三个参数 表示broker需要处理消息过滤
        //第四个参数为false 表示不需要通过过滤类过滤消息
        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

        //如果是长轮询使用系统设置的长轮询等待时间
        //如果不是长轮序,使用调用者配置的超时时间
        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;

        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
        //同步拉取
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            subscriptionData.getExpressionType(),
            isTagType ? 0L : subscriptionData.getSubVersion(),
            offset,
            maxNums,
            sysFlag,
            0,
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),//broker长轮询时间
            timeoutMillis,//客户端阻塞等待时间
            CommunicationMode.SYNC,//这个同步只与本地netty客户端请求方式有关 和远程实现无关
            null
        );
        // 处理拉取结果
        // 本地再次进行消息过滤
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
        // 一些其他操作
        // 执行一些回调
        if (!this.consumeMessageHookList.isEmpty()) {
            ConsumeMessageContext consumeMessageContext = null;
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);
            this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);
            this.executeHookAfter(consumeMessageContext);
        }
        return pullResult;
    }

讲好了,客户端是如何设置长轮询,现在来看下服务端是怎么进行长轮询的。

我们从处理消息拉取的PullMessageProcessor的processRequest方法看起。

在这个方法中,首先会解析出我们刚才讲的那两个和长轮询有关的参数

        //是否允许长轮询
        final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());

        //长轮询等待时间
        final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;

然后通过客户端的请求去拉取消息

//从messageStore获取消息
        final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

如果拉取到消息,那么直接返回到客户端就行了。如果拉取不到,那么就触发我们的长轮询了。

            case ResponseCode.PULL_NOT_FOUND:
                //注意这个brokerAllowSuspend 长轮序后的拉取这个默认为false
                if (brokerAllowSuspend && hasSuspendFlag) {
                    //如果pull不到消息 应该会等待pollingTimeMills时间
                    // 专业术语 长轮询
                    long pollingTimeMills = suspendTimeoutMillisLong;
                    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                        //如果配置不支持长轮询 改为一个比较小的时间 1秒
                        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                    }

                    String topic = requestHeader.getTopic();
                    long offset = requestHeader.getQueueOffset();
                    int queueId = requestHeader.getQueueId();
                    //注意这边3,4参数 用于计算长轮询间隔
                    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                        this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                    //这边应该会阻塞等待下
                    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                    //这次请求在这里先不返回 suspendPullRequest那边会返回
                    response = null;
                    break;
                }

接下来的事情就比较关键了,长轮询在broker是如何实现的。

  1. 如果一直没有消息达到,我需要等待pollingTimeMills时间,然后返回客户端
  2. 如果在第一步的等待中有消息到达,立刻返回客户端

第一步的等待由PullRequestHoldService实现,在上述代码中我们可以看到

PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//这边应该会阻塞等待下
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);

构造PullRequest,然后放入到PullRequestHoldService中,在PullRequestHoldService会定期判断pullRequest是否可以唤醒

    @Override
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                //阻塞时间根据是否支持长轮询不同
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    //这边固定休息5秒。。应该有其他地方会激活吧
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                //唤醒
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        log.info("{} service end", this.getServiceName());
    }

在run方法中会每隔5秒,调用checkHoldRequest方法,检查是否有消息打到,并不是直接阻塞pollingTimeMills时间后返回。

checkHoldRequest逻辑如下

private void checkHoldRequest() {
        //对pullRequestTable中每个pullRequest进行notify
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0];
                int queueId = Integer.parseInt(kArray[1]);
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                try {
                    this.notifyMessageArriving(topic, queueId, offset);
                } catch (Throwable e) {
                    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
                }
            }
        }
    }

checkHoldRequest会对每个在pullRequestTable的pullRequest进行检查,检查逻辑在notifyMessageArriving方法中

    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            //注意这边会清除
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();

                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    //如果newestOffset小于客户端拉取的offset 更新下
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }

                    if (newestOffset > request.getPullFromThisOffset()) {
                        // 如果tagcode=null 默认返回true 来自上一个方法的调用 相当于这边=true
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                        // match by bit map, need eval again when properties is not null.
                        //同上
                        if (match && properties != null) {
                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                        }

                        if (match) {
                            try {
                                //下面这个调用会再次触发拉取消息
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }

                    // 如果超过了长轮询时间 直接去拉取消息 拉取不到也返回
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }

                    replayList.add(request);
                }

                //这边将拉取不到的消息重新放入pullRequestTable 等待下次调度
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
    }

如果newestOffset > request.getPullFromThisOffset()的话,也就是对应consumequeue内有新消息到达,那么会通过this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand())再次触发拉取消息并且返回客户端

    //用于长轮询拉取消息
    public void executeRequestWhenWakeup(final Channel channel,
        final RemotingCommand request) throws RemotingCommandException {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    //再次从messagestore拉取消息
                    //注意第三个参数为false 在消息拉取不到的情况下不在进行长轮询
                    final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);

                    //拉取结果 返回客户端
                    //可能这边还是没拉取到
                    if (response != null) {
                        response.setOpaque(request.getOpaque());
                        response.markResponseType();
                        try {
                            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                                @Override
                                public void operationComplete(ChannelFuture future) throws Exception {
                                    if (!future.isSuccess()) {
                                        log.error("processRequestWrapper response to {} failed",
                                            future.channel().remoteAddress(), future.cause());
                                        log.error(request.toString());
                                        log.error(response.toString());
                                    }
                                }
                            });
                        } catch (Throwable e) {
                            log.error("processRequestWrapper process request over, but response failed", e);
                            log.error(request.toString());
                            log.error(response.toString());
                        }
                    }
                } catch (RemotingCommandException e1) {
                    log.error("excuteRequestWhenWakeup run", e1);
                }
            }
        };
        this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
    }

可以看到在executeRequestWhenWakeup中还是调用了PullMessageProcessor的processRequest方法。
不过需要注意第三个参数为false了 ,也就是长轮询不能循环触发长轮询,这个细节上的设计读者可以细细品味下。

上面讲解了,broker端在长轮询时间段中,定时检查是否有消息到达,然后返回客户端。

但是现在有一种场景,在PullRequest休眠的5秒钟,如果有消息到达,也需要等待下次调度。
RocketMQ在这边做了优化,在上面是通过notifyMessageArriving来做消息是否达到的处理以及再次触发消息拉取。因此可以在消息达到的时候直接触发notifyMessageArriving,来拉取消息返回到客户端。

这个逻辑封装在NotifyMessageArrivingListener中

public class NotifyMessageArrivingListener implements MessageArrivingListener {
    private final PullRequestHoldService pullRequestHoldService;

    public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
        this.pullRequestHoldService = pullRequestHoldService;
    }

    @Override
    public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        //消息一到达 这边也会触发长轮询返回
        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
            msgStoreTime, filterBitMap, properties);
    }
}

而这个Listener会在消息做reput的时候触发

reput是啥?
简单的来讲,我们生产的消息落到broker之后,先是持久化到commitlog,然后在通过reput持久化到consumequeue和index。也正因为持久化到consumequeue,我们的客户端才能感知到这条消息的存在。然后在reput这个操作中顺带激活了长轮询休眠的PullRequest。

好了,RocketMQ长轮询的实现讲解完了,还是不熟悉的话,多看几遍源码,我也是看了好久。

魔鬼在于细节

参考

Push or Pull?
这个哥们的MQ系列文章很不错,从架构的角度讲解MQ的设计

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343