NSQ 源码学习笔记(二)

  第一篇笔记中,我们先从总体上了解了NSQ的拓扑结构,和启动时如何和Client进行交互。这一篇学习中,我们尝试从消息的整个生命周期来看NSQ的实现思路。

消息的产生

  NSQ采用的是生产者消费者模式,消息的产生是由客户端主动的进行 publish,我们假定Producer的连接采用的是TCP连接。TCP 连接的协议采用的是V2,可以看一下protocolV2的实现。
  protocolV2中有在IOLoop中有两部分:messagePumpExec,用来保证通信,messagePump是client开启订阅后用来分发Msg的,作为生产者,发布消息是通过Exec中的SUB来实现。

func (p *protocolV2) IOLoop(conn net.Conn) error {
    // ...
    response, err = p.Exec(client, params)
    // ...
}

进入Exec中,能看到一堆方法,其中SUB是用来开启订阅模式

    topic := p.ctx.nsqd.GetTopic(topicName)
    channel := topic.GetChannel(channelName)
    channel.AddClient(client.ID, client)

    atomic.StoreInt32(&client.State, stateSubscribed)
    client.Channel = channel

PUB函数中,client会将Msg放入对应Topic的消息队列中

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    topicName := string(params[1])
    bodyLen, err := readLen(client.Reader, client.lenSlice)
    messageBody := make([]byte, bodyLen)
    topic := p.ctx.nsqd.GetTopic(topicName)
    msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
    err = topic.PutMessage(msg)
    return okBytes, nil
}

Topic的PutMessage方法:

// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
    t.RLock()
    defer t.RUnlock()
    if atomic.LoadInt32(&t.exitFlag) == 1 {
        return errors.New("exiting")
    }
    err := t.put(m)
    if err != nil {
        return err
    }
    atomic.AddUint64(&t.messageCount, 1)
    return nil
}

func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}

  在put方法中,msg会加入memoryMsgChan,如果被阻塞,将会写入Backend中,Backend是磁盘存储
  自此,就完成了一条消息从Client的发出到NSQ的存储。

消息的分发

  同样在Topic的实现中,messagePump函数负责将Topic中的Msg以复制的方式分发到所有的Channel中,Channel在这里就相当于一个二级Topic。
  具体看messagePump的实现,首先加载Topic所有的channel,并初始化内存读取chan和backend读取的chan。

    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()

    if len(chans) > 0 {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }

  接下来是一个for循环,循环中首先通过memoryMsgChanbackendChan来读取消息,这里可以看到,通过select的方式来读取消息,NSQ的消息是无序的。
  我们注意到<-channelUpdateChan,接收到更新消息后的处理方式和上一步的操作是一致的,都是重新加载Topic的channel状态。这么做的好处是有变动的情况下会去动态加载,不用每次循环的时候都去执行一次加载操作,浪费资源。

    for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            if err != nil {
                t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
                continue
            }
        case <-t.channelUpdateChan:
            chans = chans[:0]
            t.RLock()
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case pause := <-t.pauseChan:
            if pause || len(chans) == 0 {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case <-t.exitChan:
            goto exit
        }
    }

  读取到需要分发的消息后,就是将消息分发到所有的Channel中。

    for i, channel := range chans {
            chanMsg := msg
            // copy the message because each channel
            // needs a unique instance but...
            // fastpath to avoid copy if its the first channel
            // (the topic already created the first copy)
            if i > 0 {
                chanMsg = NewMessage(msg.ID, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            if chanMsg.deferred != 0 {
                channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
                continue
            }
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.ctx.nsqd.logf(
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }

  Channel的PutMesssage和Topic的PutMessage实现基本一致,这里不过多赘述。
  至此,消息就完成了从Topic分发至Channel的过程,从Channel分发至Client的过程是在每个Client启动连接的时候就默认运行的,只要Client启动了SUB操作就会接收对应Channel的消息。回过头来看IOLoop函数的开始部分

    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    client := newClientV2(clientID, conn, p.ctx)

    // 相当于标识,下面会阻塞该channel来保证goroutine的初始化的完成
    messagePumpStartedChan := make(chan bool)
    // 如果client订阅了topic,将会收到Msg
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan

MessagePump的实现如下

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    for {
        if subChannel == nil || !client.IsReadyForMessages() {
        } else {
            // we're buffered (if there isn't any more data we should flush)...
            // select on the flusher ticker channel, too
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        // 这里负责执行Client 的各种事件
        select {
        //Client 需要发送一个SUB 请求 来订阅Channel, 并切一个Client只能订阅一个Channel
        case subChannel = <-subEventChan:  // 做了订阅
            subEventChan = nil
        case msg := <-memoryMsgChan:
            // 这里推测NSQ支持按概率读取部分消息,比如读取30%的消息
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            // inflight 队列用来实现“至少投递一次消息”
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()

            // protocol 进行消息格式的打包, 再发送给Client
            // 这里, Message 就发送给了 client
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }
}

  整理整个流程,Client连接的是Channel,Topic在接收到消息后会分发到左右的Channel,如果多个Client连接同一个Channel,那么从实现上来看,每个消息在由Channel分发到Client的时候实现了负载均衡。每个消息在多个Client中,只会有一个接收到。这么回头看上一篇的消息传递图,就很明了了。

@Topic和channel的关系 | center
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,590评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,701评论 13 425
  • 前言 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来...
    Chandler_珏瑜阅读 6,555评论 2 39
  • 经历的好多,不得不写出来和大家分享!
    shyizne阅读 377评论 0 1