首先我们来看一下Nsq的组织结构:
- nsqd:接收,分发队列信息的守护进程,可以单独部署,也可以集群化运行
- nsqlookupd:管理nsqd节点,服务发现
- nsqadmin:nsq的可视化管理工具
NSQ的拓补图
NSQ中Topic和channel的关系
Topic
会将消息发送到每个订阅者(channel)
channel
的读消费类似负载均衡,会均匀的投递到各个消费端
三个模块中nsqd模块最为重要,我们从这个模块开始学习它的源码
入口函数
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
cfg.Validate()
opts := nsqd.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)
nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
<-signalChan
nsqd.Exit()
- 首先用
signal.Notify
阻塞系统的kill
和ctrl+c
信号,让进程可以处于deamon的状态运行 - 按优先级合并配置文件:命令行 > 配置文件 > 默认值
-
nsqd.LoadMetadata
读取dat文件,加载topic和channel信息,并同步运行和停止的状态 - 将进程的运行状态(topic和channel信息)持久化到dat文件中
- 执行
nsqd.Main
直到捕捉退出信号
nsqd.Main 的代码位于 nsqd/nsqd.go
NSQD主函数(TCP监听)
func (n *NSQD) Main() {
var httpListener net.Listener
var httpsListener net.Listener
ctx := &context{n}
tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
os.Exit(1)
}
n.Lock()
n.tcpListener = tcpListener
n.Unlock()
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
})
...
}
NSQD首先启动了tcp监听模型,为了保证通用性,在 protocol
包中封装了TCPServer,需要传入 Listener
, TCPHandler
, Logger
对象。所有的TCP监听均可以用这个模式来创建监听,只要传入对应的 Listener
和TCPHandler
,那么Listener
在Accept到Connect的时候,将其交给对应TCPHandler.Handle(clientConn)
执行。
TCPHandler 的Interface实现
package protocol
type TCPHandler interface {
Handle(net.Conn)
}
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
}
break
}
// 启动Goroutine 去执行Handle函数
go handler.Handle(clientConn)
}
l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}
这里体现了Go在实现Interface的便捷之处,不需要显示的声明实现了某个Interface,只需要完全的实现Interface中定义的方法,那么就会默认该类型实现了接口。在这里不同的Handler,只要实现了Handle(net.Conn),就可以被当做TCPHandler对象传入。在代码中的体现是:
执行Handle
函数时是启动一个Goroutine来执行的,这里其实是per connect per goroutine
,由于Golang的特性,Goroutine在执行时的调度模式是epoll模式,可以很好的利用系统的多核资源。
main函数中TCPServer的实现
type tcpServer struct {
ctx *context
}
func (p *tcpServer) Handle(clientConn net.Conn) {
p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())
// 客户端应该初始化本身通过发送一个4字节序列表示协议的版本,
// 这样将允许我们优雅地升级兼容协议
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
if err != nil {
p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
return
}
protocolMagic := string(buf)
p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx} // V2版本的协议操作
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
}
源码中标记了需要在通讯时预留4个字节的版本号信息,用来兼容协议的升级。如果未来有协议升级,只需要在protocolMagic
中添加新的case分支就可以了。
NSQD主函数(HTTP/HTTPS监听)
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
os.Exit(1)
}
n.Lock()
n.httpsListener = httpsListener
n.Unlock()
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
})
}
httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
}
n.Lock()
n.httpListener = httpListener
n.Unlock()
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
})
这里不论是http还是https的监听,httpsServer
和httpServer
作为Handler
对象,均在内部声明了路由规则,不同的请求定义了不同的操作,最后通过http_api.Serve()
绑定端口监听
NSQD默认自启的操作
n.waitGroup.Wrap(func() { n.queueScanLoop() }) // 循环消息分发
n.waitGroup.Wrap(func() { n.idPump() }) // 生产唯一消息id的一个队列
n.waitGroup.Wrap(func() { n.lookupLoop() }) // 如果nsqd有变化,同步nsqlookup
if n.getOpts().StatsdAddress != "" {
// 定时将nsqd的状态以短连接的方式发送至一个状态监护进程.包括了nsqd的应用资源信息,以及nsqd上topic的信息
n.waitGroup.Wrap(func() { n.statsdLoop() })
}
启动监听后,除了通过监听启动的操作外,NSQD还有一些类似守护进程的操作会一直运行,包括:
- 循环消息分发
- 生产唯一消息ID
- nsqlookup的状态同步
- 状态监控