第一篇笔记中,我们先从总体上了解了NSQ的拓扑结构,和启动时如何和Client进行交互。这一篇学习中,我们尝试从消息的整个生命周期来看NSQ的实现思路。
消息的产生
NSQ采用的是生产者消费者模式,消息的产生是由客户端主动的进行 publish
,我们假定Producer
的连接采用的是TCP连接。TCP 连接的协议采用的是V2,可以看一下protocolV2
的实现。
protocolV2
中有在IOLoop
中有两部分:messagePump
和Exec
,用来保证通信,messagePump
是client开启订阅后用来分发Msg的,作为生产者,发布消息是通过Exec
中的SUB来实现。
func (p *protocolV2) IOLoop(conn net.Conn) error {
// ...
response, err = p.Exec(client, params)
// ...
}
进入Exec中,能看到一堆方法,其中SUB是用来开启订阅模式
topic := p.ctx.nsqd.GetTopic(topicName)
channel := topic.GetChannel(channelName)
channel.AddClient(client.ID, client)
atomic.StoreInt32(&client.State, stateSubscribed)
client.Channel = channel
PUB函数中,client会将Msg放入对应Topic的消息队列中
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
topicName := string(params[1])
bodyLen, err := readLen(client.Reader, client.lenSlice)
messageBody := make([]byte, bodyLen)
topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(<-p.ctx.nsqd.idChan, messageBody)
err = topic.PutMessage(msg)
return okBytes, nil
}
Topic的PutMessage方法:
// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}
err := t.put(m)
if err != nil {
return err
}
atomic.AddUint64(&t.messageCount, 1)
return nil
}
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
在put方法中,msg会加入memoryMsgChan
,如果被阻塞,将会写入Backend
中,Backend
是磁盘存储
自此,就完成了一条消息从Client的发出到NSQ的存储。
消息的分发
同样在Topic的实现中,messagePump
函数负责将Topic中的Msg以复制的方式分发到所有的Channel
中,Channel在这里就相当于一个二级Topic。
具体看messagePump的实现,首先加载Topic
所有的channel
,并初始化内存读取chan和backend读取的chan。
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
接下来是一个for循环,循环中首先通过memoryMsgChan
和backendChan
来读取消息,这里可以看到,通过select的方式来读取消息,NSQ的消息是无序的。
我们注意到<-channelUpdateChan
,接收到更新消息后的处理方式和上一步的操作是一致的,都是重新加载Topic的channel状态。这么做的好处是有变动的情况下会去动态加载,不用每次循环的时候都去执行一次加载操作,浪费资源。
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan:
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case pause := <-t.pauseChan:
if pause || len(chans) == 0 {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
goto exit
}
}
读取到需要分发的消息后,就是将消息分发到所有的Channel中。
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
Channel的PutMesssage和Topic的PutMessage实现基本一致,这里不过多赘述。
至此,消息就完成了从Topic分发至Channel的过程,从Channel分发至Client的过程是在每个Client启动连接的时候就默认运行的,只要Client启动了SUB操作就会接收对应Channel的消息。回过头来看IOLoop
函数的开始部分
clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
// 相当于标识,下面会阻塞该channel来保证goroutine的初始化的完成
messagePumpStartedChan := make(chan bool)
// 如果client订阅了topic,将会收到Msg
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
MessagePump的实现如下
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
for {
if subChannel == nil || !client.IsReadyForMessages() {
} else {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
// 这里负责执行Client 的各种事件
select {
//Client 需要发送一个SUB 请求 来订阅Channel, 并切一个Client只能订阅一个Channel
case subChannel = <-subEventChan: // 做了订阅
subEventChan = nil
case msg := <-memoryMsgChan:
// 这里推测NSQ支持按概率读取部分消息,比如读取30%的消息
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
// inflight 队列用来实现“至少投递一次消息”
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
// protocol 进行消息格式的打包, 再发送给Client
// 这里, Message 就发送给了 client
err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
}
整理整个流程,Client连接的是Channel,Topic在接收到消息后会分发到左右的Channel,如果多个Client连接同一个Channel,那么从实现上来看,每个消息在由Channel分发到Client的时候实现了负载均衡。每个消息在多个Client中,只会有一个接收到。这么回头看上一篇的消息传递图,就很明了了。