nsqd

nsqd

main函数在nsq/apps/nsqd/中

func main() {
    prg := &program{}
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        logFatal("%s", err)
    }
}

主要运行内容在Start()中

func (p *program) Start() error {
    opts := nsqd.NewOptions()

    flagSet := nsqdFlagSet(opts)
    flagSet.Parse(os.Args[1:])

    rand.Seed(time.Now().UTC().UnixNano())

    if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
        fmt.Println(version.String("nsqd"))
        os.Exit(0)
    }

    var cfg config
    configFile := flagSet.Lookup("config").Value.String()
    if configFile != "" {
        _, err := toml.DecodeFile(configFile, &cfg)
        if err != nil {
            logFatal("failed to load config file %s - %s", configFile, err)
        }
    }
    cfg.Validate()

    options.Resolve(opts, flagSet, cfg)
    nsqd, err := nsqd.New(opts)
    if err != nil {
        logFatal("failed to instantiate nsqd - %s", err)
    }
    p.nsqd = nsqd

    err = p.nsqd.LoadMetadata()
    if err != nil {
        logFatal("failed to load metadata - %s", err)
    }
    err = p.nsqd.PersistMetadata()
    if err != nil {
        logFatal("failed to persist metadata - %s", err)
    }

    go func() {
        err := p.nsqd.Main()
        if err != nil {
            p.Stop()
            os.Exit(1)
        }
    }()

    return nil
}

这段代码前面都是关于配置项和已有数据的处理,核心在与最后一段单其了一个gorountine运行p.nsqd.Main(),nsqd定义在/nsq/nsqd下,nsqd的主要内容均在此文件夹下,nsqd的main函数定义在nsqd.go中:

func (n *NSQD) Main() error {
    ctx := &context{n}

    exitCh := make(chan error)
    var once sync.Once
    exitFunc := func(err error) {
        once.Do(func() {
            if err != nil {
                n.logf(LOG_FATAL, "%s", err)
            }
            exitCh <- err
        })
    }
    
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
    })
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
    })
    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
        })
    }

    n.waitGroup.Wrap(n.queueScanLoop)
    n.waitGroup.Wrap(n.lookupLoop)
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(n.statsdLoop)
    }

    err := <-exitCh
    return err
}

这段通过waitGroup.Wrap(定义在nsq/internal/util/wait_group_wrapper.go,对sync.WaitGroup做了一层包装,wrap另起了一个进程并等待执行func并在exit时等待进程结束)启动了5个任务,前三个分别是tcp、http、https server用来接收处理用户请求,n.queueScanLoop用来定时循环处理消息的发送。lookupLoop和lookupd进行交互。

nsqd的tcp server

tcpSercer定义在/internal/protocal/tcp_server.go中,

type TCPHandler interface {
Handle(net.Conn)

}

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())

for {
    clientConn, err := listener.Accept()
    if err != nil {
        if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
            logf(lg.WARN, "temporary Accept() failure - %s", err)
            runtime.Gosched()
            continue
        }
        // theres no direct way to detect this error because it is not exposed
        if !strings.Contains(err.Error(), "use of closed network connection") {
            return fmt.Errorf("listener.Accept() error - %s", err)
        }
        break
    }
    go handler.Handle(clientConn)
}

logf(lg.INFO, "TCP: closing %s", listener.Addr())

return nil

}
这段逻辑非常简单,监听tcp端口,对每个连接另起一个gorountine来处理

go handler.Handle(clientConn)

但nsqd中定义的tcpServer实际对应TCPHandler,是一个接口,其对应的实现在nsq/nsqd/tcp.go中,

func (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())

    // The client should initialize itself by sending a 4 byte sequence indicating
    // the version of the protocol that it intends to communicate, this will allow us
    // to gracefully upgrade the protocol away from text/line oriented to whatever...
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
        clientConn.Close()
        return
    }
    protocolMagic := string(buf)

    p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx}
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }

    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
        return
    }
}

这段代码前半段主要是从tcp连接中读取4字节的字符串如果==" V2"就继续处理,否则抛错。
继续处理的逻辑定义在/nsq/nsqd/protocol_v2.go中的IOLoop中。

messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan

这段代码首先会给客户段发送连接成功建立成功的返回,然后进入循环等待用户发送请求

for {
    if client.HeartbeatInterval > 0 {
        client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
    } else {
        client.SetReadDeadline(zeroTime)
    }

    // ReadSlice does not allocate new space for the data each request
    // ie. the returned slice is only valid until the next call to it
    line, err = client.Reader.ReadSlice('\n')
    if err != nil {
        if err == io.EOF {
            err = nil
        } else {
            err = fmt.Errorf("failed to read command - %s", err)
        }
        break
    }

    // trim the '\n'
    line = line[:len(line)-1]
    // optionally trim the '\r'
    if len(line) > 0 && line[len(line)-1] == '\r' {
        line = line[:len(line)-1]
    }
    params := bytes.Split(line, separatorBytes)

    p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

    var response []byte
    response, err = p.Exec(client, params)
    if err != nil {
        ctx := ""
        if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
            ctx = " - " + parentErr.Error()
        }
        p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

        sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
        if sendErr != nil {
            p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
            break
        }

        // errors of type FatalClientErr should forceably close the connection
        if _, ok := err.(*protocol.FatalClientErr); ok {
            break
        }
        continue
    }

    if response != nil {
        err = p.Send(client, frameTypeResponse, response)
        if err != nil {
            err = fmt.Errorf("failed to send response - %s", err)
            break
        }
    }
}

这段的核心逻辑在于

    line, err = client.Reader.ReadSlice('\n')
    if err != nil {
        if err == io.EOF {
            err = nil
        } else {
            err = fmt.Errorf("failed to read command - %s", err)
        }
        break
    }

    // trim the '\n'
    line = line[:len(line)-1]
    // optionally trim the '\r'
    if len(line) > 0 && line[len(line)-1] == '\r' {
        line = line[:len(line)-1]
    }
    params := bytes.Split(line, separatorBytes)

    p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

    var response []byte
    response, err = p.Exec(client, params)

前面主要是对用户的请求进行按行读取并分割成参数,然后传入p.Exec中处理
Exec定义在同一文件中,主要根据传入的第一个参数来决定是什么请求,从而调用不同的func来处理

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    if bytes.Equal(params[0], []byte("IDENTIFY")) {
        return p.IDENTIFY(client, params)
    }
    err := enforceTLSPolicy(client, p, params[0])
    if err != nil {
        return nil, err
    }
    switch {
    case bytes.Equal(params[0], []byte("FIN")):
        return p.FIN(client, params)
    case bytes.Equal(params[0], []byte("RDY")):
        return p.RDY(client, params)
    case bytes.Equal(params[0], []byte("REQ")):
        return p.REQ(client, params)
    case bytes.Equal(params[0], []byte("PUB")):
        return p.PUB(client, params)
    case bytes.Equal(params[0], []byte("MPUB")):
        return p.MPUB(client, params)
    case bytes.Equal(params[0], []byte("DPUB")):
        return p.DPUB(client, params)
    case bytes.Equal(params[0], []byte("NOP")):
        return p.NOP(client, params)
    case bytes.Equal(params[0], []byte("TOUCH")):
        return p.TOUCH(client, params)
    case bytes.Equal(params[0], []byte("SUB")):
        return p.SUB(client, params)
    case bytes.Equal(params[0], []byte("CLS")):
        return p.CLS(client, params)
    case bytes.Equal(params[0], []byte("AUTH")):
        return p.AUTH(client, params)
    }
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

这些func也定义在同一文件中,只分析PUB和SUB行为

PUB的内容比较简单,主要是对参数做了一些检查,比如topic是否合法、msg的size检查,然后通过topic.PutMessage将消息加入这个topic,同时返回给客户段消息

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    var err error

    if len(params) < 2 {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")
    }

    topicName := string(params[1])
    if !protocol.IsValidTopicName(topicName) {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
            fmt.Sprintf("PUB topic name %q is not valid", topicName))
    }

    bodyLen, err := readLen(client.Reader, client.lenSlice)
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")
    }

    if bodyLen <= 0 {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
            fmt.Sprintf("PUB invalid message body size %d", bodyLen))
    }

    if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
            fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize))
    }

    messageBody := make([]byte, bodyLen)
    _, err = io.ReadFull(client.Reader, messageBody)
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
    }

    if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil {
        return nil, err
    }

    topic := p.ctx.nsqd.GetTopic(topicName)
    msg := NewMessage(topic.GenerateID(), messageBody)
    err = topic.PutMessage(msg)
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
    }

    client.PublishedMessage(topicName, 1)

    return okBytes, nil
}

topic.PutMessage定义在topic.go中,主要的任务对topic进行了一些统计量的处理,然后将msg插入topic.memoryMsgChan,这是一个chan,大小为: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)

func (t *Topic) PutMessage(m *Message) error {
    t.RLock()
    defer t.RUnlock()
    if atomic.LoadInt32(&t.exitFlag) == 1 {
        return errors.New("exiting")
    }
    err := t.put(m)
    if err != nil {
        return err
    }
    atomic.AddUint64(&t.messageCount, 1)
    atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
    return nil
}

SUB内容如下:

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
    if atomic.LoadInt32(&client.State) != stateInit {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot SUB in current state")
    }

    if client.HeartbeatInterval <= 0 {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot SUB with heartbeats disabled")
    }

    if len(params) < 3 {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "SUB insufficient number of parameters")
    }

    topicName := string(params[1])
    if !protocol.IsValidTopicName(topicName) {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
            fmt.Sprintf("SUB topic name %q is not valid", topicName))
    }

    channelName := string(params[2])
    if !protocol.IsValidChannelName(channelName) {
        return nil, protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL",
            fmt.Sprintf("SUB channel name %q is not valid", channelName))
    }

    if err := p.CheckAuth(client, "SUB", topicName, channelName); err != nil {
        return nil, err
    }

    // This retry-loop is a work-around for a race condition, where the
    // last client can leave the channel between GetChannel() and AddClient().
    // Avoid adding a client to an ephemeral channel / topic which has started exiting.
    var channel *Channel
    for {
        topic := p.ctx.nsqd.GetTopic(topicName)
        channel = topic.GetChannel(channelName)
        if err := channel.AddClient(client.ID, client); err != nil {
            return nil, protocol.NewFatalClientErr(nil, "E_TOO_MANY_CHANNEL_CONSUMERS",
                fmt.Sprintf("channel consumers for %s:%s exceeds limit of %d",
                    topicName, channelName, p.ctx.nsqd.getOpts().MaxChannelConsumers))
        }

        if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) {
            channel.RemoveClient(client.ID)
            time.Sleep(1 * time.Millisecond)
            continue
        }
        break
    }
    atomic.StoreInt32(&client.State, stateSubscribed)
    client.Channel = channel
    // update message pump
    client.SubEventChan <- channel

    return okBytes, nil
}

SUB的代码主要是将client加入到channel中之后的数据发送是由queueScanLoop进行的。

queueScanLoop

func (n *NSQD) queueScanLoop() {
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    closeCh := make(chan int)

    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    channels := n.channels()
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C:
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }

        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

    loop:
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

exit:
    n.logf(LOG_INFO, "QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

先起两个定时器然后监听定时器,在循环中如果定时器到达定时时间,就更新用于处理各个channel的数据gorountine的数量,并通过workCh发送要处理的channel,通过responseCh获取处理结果。

n.resizePool(len(channels), workCh, responseCh, closeCh)

这个进程启动任务如下:

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

首先将goroutine数与理想的goroutine数比较如果不足就启动对应差值数量的goroutine如果超过了通过colseChan来关掉一定数量的goroutine,每个goroutine主要调用c.processInFlightQueue(now)和c.processDeferredQueue(now)来处理数据。

queueScanLoop后面的loop主要是不断的随机选择一定数量的channel通过workChan发给queueScanWorker来处理,

for _, i := range util.UniqRands(num, len(channels)) {
    workCh <- channels[i]
}

这段主要是随机选num个channels,下面的

numDirty := 0
for i := 0; i < num; i++ {
    if <-responseCh {
        numDirty++
    }
}

if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
    goto loop
}

dirty即queueScanWorker处理了某个消息,比例达到一定就直接进入loop

msg从topic到channel

在每个topic建立时会有执行t.messagePump任务:

t.waitGroup.Wrap(t.messagePump)

这个任务主要是不断获取topic memory queue和backend queue中的msg并插入到各个channel中:

// main message loop
for {
    select {
    case msg = <-memoryMsgChan:
    case buf = <-backendChan:
        msg, err = decodeMessage(buf)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
            continue
        }
    case <-t.channelUpdateChan:
        chans = chans[:0]
        t.RLock()
        for _, c := range t.channelMap {
            chans = append(chans, c)
        }
        t.RUnlock()
        if len(chans) == 0 || t.IsPaused() {
            memoryMsgChan = nil
            backendChan = nil
        } else {
            memoryMsgChan = t.memoryMsgChan
            backendChan = t.backend.ReadChan()
        }
        continue
    case <-t.pauseChan:
        if len(chans) == 0 || t.IsPaused() {
            memoryMsgChan = nil
            backendChan = nil
        } else {
            memoryMsgChan = t.memoryMsgChan
            backendChan = t.backend.ReadChan()
        }
        continue
    case <-t.exitChan:
        goto exit
    }

    for i, channel := range chans {
        chanMsg := msg
        // copy the message because each channel
        // needs a unique instance but...
        // fastpath to avoid copy if its the first channel
        // (the topic already created the first copy)
        if i > 0 {
            chanMsg = NewMessage(msg.ID, msg.Body)
            chanMsg.Timestamp = msg.Timestamp
            chanMsg.deferred = msg.deferred
        }
        if chanMsg.deferred != 0 {
            channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
            continue
        }
        err := channel.PutMessage(chanMsg)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                t.name, msg.ID, channel.name, err)
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容