nsqd完成的是topic和channel的创建和消息的发布。
1. topic的创建
我们先来看看topic是怎么创建的。创建消息调用的接口是/topic/create。执行以下命令:
# 启动nsqlookupd
nsqlookupd
# 启动一个nsqd,连接上nsqdlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160
# 这里从log中可以看到nsqd 会和nsqdlookupd建立一个tcp连接,每隔15sPing一次。
# 调用/topic/create接口创建一个topic
curl -X POST http://127.0.0.1:4151/topic/create?topic=name
那么调用这个接口发生了什么事情呢?我们可以将代码定位到这个url的handler:
router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
在这个handler中:
- 首先用了一个nsq自己实现的一个装饰器,简洁明了的实现了日志输出和相应的http header和body格式的规范,前文有过描述,不再重复。
- 其次,在doCreateTopic中,解析了请求,获得了topic的值,并进行了校验。
- 最后,真正创建的topic。下文将详细解读一下。
实现的函数是GetTopic,这里有几点需要注意的:
- 先用读锁,判断这个topic是否已存在,如果存在,返回,否则,用写锁,写入。这里的读写操作是线程安全的。同时通过先用读锁后用写锁的方式,降低了开销。(注意,用写锁的时候,还是要判断以下是否已存在,应为解开读锁和开启写锁之间,可能会有写入)
- 在创建topic之后,将topic的锁开启,再解开nsqd的锁。这是用来初始化channel的。
- 创建topic使用了NewTopic函数,如果topic的名字有"#ephemeral"前缀,就认为是临时topic,使用newDummyBackendQueue(),否则使用diskqueue
- 创建topic的同时,还做了topic数据的持久化,这个不做赘述
- 通过nsqlookupd的http接口/channels查询这个topic有哪些channel,并一一创建。
// nsqd/nsqd.go
// GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
// most likely, we already have this topic, so try read lock first.
n.RLock()
t, ok := n.topicMap[topicName]
n.RUnlock()
if ok {
return t
}
n.Lock()
t, ok = n.topicMap[topicName]
if ok {
n.Unlock()
return t
}
deleteCallback := func(t *Topic) {
n.DeleteExistingTopic(t.name)
}
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t
n.logf("TOPIC(%s): created", t.name)
// release our global nsqd lock, and switch to a more granular topic lock while we init our
// channels from lookupd. This blocks concurrent PutMessages to this topic.
t.Lock()
n.Unlock()
// if using lookupd, make a blocking call to get the topics, and immediately create them.
// this makes sure that any message received is buffered to the right channels
lookupdHTTPAddrs := n.lookupdHTTPAddrs()
if len(lookupdHTTPAddrs) > 0 {
channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
if err != nil {
n.logf("WARNING: failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
}
for _, channelName := range channelNames {
if strings.HasSuffix(channelName, "#ephemeral") {
// we don't want to pre-create ephemeral channels
// because there isn't a client connected
continue
}
t.getOrCreateChannel(channelName)
}
} else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
n.logf("ERROR: no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
}
t.Unlock()
// NOTE: I would prefer for this to only happen in topic.GetChannel() but we're special
// casing the code above so that we can control the locks such that it is impossible
// for a message to be written to a (new) topic while we're looking up channels
// from lookupd...
//
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
return t
}
2. channel的创建和消息的发布
topic是生产者侧的东西,为了将消息将生产者发送给消费者,需要建立channel,可以说channel与消费者相关。常用的工具有:
- nsq_to_file: 消费指定的话题(topic)/通道(channel),并写到文件中。
- nsq_to_http: 消费指定的话题(topic)/通道(channel)和执行 HTTP requests到指定的端点。
- nsq_to_nsq: 消费者指定的话题/通道和通过TCP重新发布消息到目的地 nsqd。
这里使用nsq_to_file为例
nsq_to_file --topic=test --output-dir=./ --lookupd-http-address=127.0.0.1:4161
# 同时, 调用nsqd的/pub接口,发布一个消息
curl -d "hello world" http://127.0.0.1:4151/pub?topic=test
通过nsqd和nsqlookupd的日志可以看到
- nsq_to_file 去nsqlookupd查找了一个topic并获得了这个topic的生产者的信息
- 和这个生产者建立TCP连接,使用INDENTIFY的TCP包
通过阅读nsq_to_file的源码,可以发现,nsq_to_file创建的消费者连接了nsqd之后,还发送了SUB的TCP包,订阅了"nsq_to_file"这个channel("nsq_to_file"是这个工具默认的channel名。向nsqd订阅时,该channel不存在,所以会创建一个新的channel)。关于nsq_to_file,后面的一篇文章会做一个简单解读。
调用nsqd的/pub,会将消息发布到消息队列memoryMsgChan,如果阻塞(内存队列满了),就会写入backendChan,保证消息不会丢失(这要看具体的队列实现newDummyBackendQueue or diskqueue)。
// nsqd/topic
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan chan []byte
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan:
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case pause := <-t.pauseChan:
if pause || len(chans) == 0 {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
goto exit
}
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
exit:
t.ctx.nsqd.logf("TOPIC(%s): closing ... messagePump", t.name)
}
然后,另一边负责消息分发的goroutine,会遍历所有订阅该topic的channel,依次纷发。每个channel处理消息的方式和topic如出一辙。同时,可以看到,同一个channel的消息只会被消费一次。这和topic的消息是不一样的。
// nsqd/channel
func (c *Channel) put(m *Message) error {
select {
case c.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, c.backend)
bufferPoolPut(b)
c.ctx.nsqd.SetHealth(err)
if err != nil {
c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s",
c.name, err)
return err
}
}
return nil
}
// nsqd/protocol_v2.go
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var buf bytes.Buffer
var memoryMsgChan chan *Message
var backendMsgChan chan []byte
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time
var sampleRate int32
subEventChan := client.SubEventChan
identifyEventChan := client.IdentifyEventChan
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout
// v2 opportunistically buffers data to clients to reduce write system calls
// we force flush in two cases:
// 1. when the client is not ready to receive messages
// 2. we're buffered and the channel has nothing left to send us
// (ie. we would block in this loop anyway)
//
flushed := true
// signal to the goroutine that started the messagePump
// that we've started up
close(startedChan)
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
// you can't SUB anymore
subEventChan = nil
case identifyData := <-identifyEventChan:
// you can't IDENTIFY anymore
identifyEventChan = nil
outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}
heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}
if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}
msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.ctx.nsqd.logf("PROTOCOL(V2): [%s] messagePump error - %s", client, err)
}
}