nsq源码解读之nsqd——更多细节

nsqd完成的是topic和channel的创建和消息的发布。

1. topic的创建

我们先来看看topic是怎么创建的。创建消息调用的接口是/topic/create。执行以下命令:

# 启动nsqlookupd
nsqlookupd
# 启动一个nsqd,连接上nsqdlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160
# 这里从log中可以看到nsqd 会和nsqdlookupd建立一个tcp连接,每隔15sPing一次。
# 调用/topic/create接口创建一个topic
curl -X POST http://127.0.0.1:4151/topic/create?topic=name

那么调用这个接口发生了什么事情呢?我们可以将代码定位到这个url的handler:

router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))

在这个handler中:

  • 首先用了一个nsq自己实现的一个装饰器,简洁明了的实现了日志输出和相应的http header和body格式的规范,前文有过描述,不再重复。
  • 其次,在doCreateTopic中,解析了请求,获得了topic的值,并进行了校验。
  • 最后,真正创建的topic。下文将详细解读一下。

实现的函数是GetTopic,这里有几点需要注意的:

  • 先用读锁,判断这个topic是否已存在,如果存在,返回,否则,用写锁,写入。这里的读写操作是线程安全的。同时通过先用读锁后用写锁的方式,降低了开销。(注意,用写锁的时候,还是要判断以下是否已存在,应为解开读锁和开启写锁之间,可能会有写入)
  • 在创建topic之后,将topic的锁开启,再解开nsqd的锁。这是用来初始化channel的。
  • 创建topic使用了NewTopic函数,如果topic的名字有"#ephemeral"前缀,就认为是临时topic,使用newDummyBackendQueue(),否则使用diskqueue
  • 创建topic的同时,还做了topic数据的持久化,这个不做赘述
  • 通过nsqlookupd的http接口/channels查询这个topic有哪些channel,并一一创建。
// nsqd/nsqd.go
// GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
    // most likely, we already have this topic, so try read lock first.
    n.RLock()
    t, ok := n.topicMap[topicName]
    n.RUnlock()
    if ok {
        return t
    }

    n.Lock()

    t, ok = n.topicMap[topicName]
    if ok {
        n.Unlock()
        return t
    }
    deleteCallback := func(t *Topic) {
        n.DeleteExistingTopic(t.name)
    }
    t = NewTopic(topicName, &context{n}, deleteCallback)
    n.topicMap[topicName] = t

    n.logf("TOPIC(%s): created", t.name)

    // release our global nsqd lock, and switch to a more granular topic lock while we init our
    // channels from lookupd. This blocks concurrent PutMessages to this topic.
    t.Lock()
    n.Unlock()

    // if using lookupd, make a blocking call to get the topics, and immediately create them.
    // this makes sure that any message received is buffered to the right channels
    lookupdHTTPAddrs := n.lookupdHTTPAddrs()
    if len(lookupdHTTPAddrs) > 0 {
        channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
        if err != nil {
            n.logf("WARNING: failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
        }
        for _, channelName := range channelNames {
            if strings.HasSuffix(channelName, "#ephemeral") {
                // we don't want to pre-create ephemeral channels
                // because there isn't a client connected
                continue
            }
            t.getOrCreateChannel(channelName)
        }
    } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
        n.logf("ERROR: no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
    }

    t.Unlock()

    // NOTE: I would prefer for this to only happen in topic.GetChannel() but we're special
    // casing the code above so that we can control the locks such that it is impossible
    // for a message to be written to a (new) topic while we're looking up channels
    // from lookupd...
    //
    // update messagePump state
    select {
    case t.channelUpdateChan <- 1:
    case <-t.exitChan:
    }
    return t
}
2. channel的创建和消息的发布

topic是生产者侧的东西,为了将消息将生产者发送给消费者,需要建立channel,可以说channel与消费者相关。常用的工具有:

  • nsq_to_file: 消费指定的话题(topic)/通道(channel),并写到文件中。
  • nsq_to_http: 消费指定的话题(topic)/通道(channel)和执行 HTTP requests到指定的端点。
  • nsq_to_nsq: 消费者指定的话题/通道和通过TCP重新发布消息到目的地 nsqd。

这里使用nsq_to_file为例

nsq_to_file --topic=test --output-dir=./ --lookupd-http-address=127.0.0.1:4161
# 同时, 调用nsqd的/pub接口,发布一个消息
curl -d "hello world" http://127.0.0.1:4151/pub?topic=test

通过nsqd和nsqlookupd的日志可以看到

  • nsq_to_file 去nsqlookupd查找了一个topic并获得了这个topic的生产者的信息
  • 和这个生产者建立TCP连接,使用INDENTIFY的TCP包

通过阅读nsq_to_file的源码,可以发现,nsq_to_file创建的消费者连接了nsqd之后,还发送了SUB的TCP包,订阅了"nsq_to_file"这个channel("nsq_to_file"是这个工具默认的channel名。向nsqd订阅时,该channel不存在,所以会创建一个新的channel)。关于nsq_to_file,后面的一篇文章会做一个简单解读。

调用nsqd的/pub,会将消息发布到消息队列memoryMsgChan,如果阻塞(内存队列满了),就会写入backendChan,保证消息不会丢失(这要看具体的队列实现newDummyBackendQueue or diskqueue)。

// nsqd/topic
func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}

// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
    var msg *Message
    var buf []byte
    var err error
    var chans []*Channel
    var memoryMsgChan chan *Message
    var backendChan chan []byte

    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()

    if len(chans) > 0 {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }

    for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            if err != nil {
                t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
                continue
            }
        case <-t.channelUpdateChan:
            chans = chans[:0]
            t.RLock()
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case pause := <-t.pauseChan:
            if pause || len(chans) == 0 {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case <-t.exitChan:
            goto exit
        }

        for i, channel := range chans {
            chanMsg := msg
            // copy the message because each channel
            // needs a unique instance but...
            // fastpath to avoid copy if its the first channel
            // (the topic already created the first copy)
            if i > 0 {
                chanMsg = NewMessage(msg.ID, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            if chanMsg.deferred != 0 {
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.ctx.nsqd.logf(
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }
    }

exit:
    t.ctx.nsqd.logf("TOPIC(%s): closing ... messagePump", t.name)
}

然后,另一边负责消息分发的goroutine,会遍历所有订阅该topic的channel,依次纷发。每个channel处理消息的方式和topic如出一辙。同时,可以看到,同一个channel的消息只会被消费一次。这和topic的消息是不一样的。

// nsqd/channel
func (c *Channel) put(m *Message) error {
    select {
    case c.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, c.backend)
        bufferPoolPut(b)
        c.ctx.nsqd.SetHealth(err)
        if err != nil {
            c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s",
                c.name, err)
            return err
        }
    }
    return nil
}

// nsqd/protocol_v2.go
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    var err error
    var buf bytes.Buffer
    var memoryMsgChan chan *Message
    var backendMsgChan chan []byte
    var subChannel *Channel
    // NOTE: `flusherChan` is used to bound message latency for
    // the pathological case of a channel on a low volume topic
    // with >1 clients having >1 RDY counts
    var flusherChan <-chan time.Time
    var sampleRate int32

    subEventChan := client.SubEventChan
    identifyEventChan := client.IdentifyEventChan
    outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
    heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
    heartbeatChan := heartbeatTicker.C
    msgTimeout := client.MsgTimeout

    // v2 opportunistically buffers data to clients to reduce write system calls
    // we force flush in two cases:
    //    1. when the client is not ready to receive messages
    //    2. we're buffered and the channel has nothing left to send us
    //       (ie. we would block in this loop anyway)
    //
    flushed := true

    // signal to the goroutine that started the messagePump
    // that we've started up
    close(startedChan)

    for {
        if subChannel == nil || !client.IsReadyForMessages() {
            // the client is not ready to receive messages...
            memoryMsgChan = nil
            backendMsgChan = nil
            flusherChan = nil
            // force flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        } else if flushed {
            // last iteration we flushed...
            // do not select on the flusher ticker channel
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = nil
        } else {
            // we're buffered (if there isn't any more data we should flush)...
            // select on the flusher ticker channel, too
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        select {
        case <-flusherChan:
            // if this case wins, we're either starved
            // or we won the race between other channels...
            // in either case, force flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        case <-client.ReadyStateChan:
        case subChannel = <-subEventChan:
            // you can't SUB anymore
            subEventChan = nil
        case identifyData := <-identifyEventChan:
            // you can't IDENTIFY anymore
            identifyEventChan = nil

            outputBufferTicker.Stop()
            if identifyData.OutputBufferTimeout > 0 {
                outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
            }

            heartbeatTicker.Stop()
            heartbeatChan = nil
            if identifyData.HeartbeatInterval > 0 {
                heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
                heartbeatChan = heartbeatTicker.C
            }

            if identifyData.SampleRate > 0 {
                sampleRate = identifyData.SampleRate
            }

            msgTimeout = identifyData.MsgTimeout
        case <-heartbeatChan:
            err = p.Send(client, frameTypeResponse, heartbeatBytes)
            if err != nil {
                goto exit
            }
        case b := <-backendMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }

            msg, err := decodeMessage(b)
            if err != nil {
                p.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case msg := <-memoryMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }

exit:
    p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting messagePump", client)
    heartbeatTicker.Stop()
    outputBufferTicker.Stop()
    if err != nil {
        p.ctx.nsqd.logf("PROTOCOL(V2): [%s] messagePump error - %s", client, err)
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,594评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • 1. 介绍 最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基...
    aoho阅读 8,916评论 1 16
  • 以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统)。 本次分享...
    文档随手记阅读 1,881评论 0 28
  • 今天又是忙碌的一天,光试教就花了两节课,第一次试教总感觉想象中和现实相差太远了,第二次稍微找回点感觉,我们二年级的...
    一尘720阅读 233评论 4 1