一、NSQ部署
我个人电脑是win10 64位,因此在这儿我就给出一个网友写好的Windows下NSQ部署文章NSQ如何在windows上安装 ,安装网友的文章说明我的测试结果图1所示,当启动nsqd连接nsqlookupd时,会有相应的提示,启动nsq_to_file进程时,会往nsqd写入消息都有相应提示,如图1中用红色矩形框选中的是对于的关键提示信息。
打开浏览器直接输入http://127.0.0.1:4171/
就可以查看NSQ运行情况
二、NSQ拓扑图
通过第一小节,我们简单的把NSQ部署起来,并看到了NSQ的运行情况,还记得我们启动各个进程的步骤吗,不记得没关系,看图2所示,该图是出自nsq源码分析之概述,个人觉着这幅图对NSQ总结的非常好,从图中我们可以了解到下面几个点
- 1.
nsqlookupd
进程同时开启tcp
和http
两个监听服务,TCP监听是用于NSQD进程连接
,http监听是用于提供给nsqadmin获取集群信息
- 2.
nsqadmin
进程只开启http
服务,其实就是一个web服务,提供给客户端查询集群信息 - 3.
nsqd
进程同时开启tcp
和http
服务,TPC监听和http监听都提供给生产者和消费者连接
,http服务还提供给nsqadmin获取该nsqd本地信息
- 4.
nsqd
连接到nsqlookupd
的tcp监听上,通过心跳告诉nsqlookupd
自己在线 - 5.
writer
是生产者,直接连接nsqd - 6.
reader
是消费者,直接连接nsqd
了解了nsq的整体结构后,我们就可以开始按模块分析nsq的源码
三、NSQ启动与退出
nsq优雅的启动与退出使用了SVC包,推荐阅读Nsq源码阅读(1) 启动和优雅退出,这篇文章讲解的非常详细。
四、全局唯一messageid
五、NSQ同步
NSQ中简单包装了sync.WaitGroup,包装后的待执行函数都会在轻量级的线程中执行,代码如下所示。主线程中启动了多个子线程后,只有等启动的多个子线程结束后主线程才能结束,方法Wrap中的Add和Done调用分别会维护一个引用计数,只有当该引用计数为0时,主线程才会结束等待
//如果结构体S,包含一个匿名字段T,那么这个结构体S 就有了T的方法。
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1) //sync.WaitGroup结构中方法
go func() {
cb()
w.Done() //sync.WaitGroup结构中方法
}()
}
六、Go接口
Go语言式的接口,就是不用显示声明类型T实现了接口I,只要类型T的公开方法完全满足接口I的要求,就可以把类型T的对象用在需要接口I的地方。比如nsqlookupd.go文件中Main函数最后启动http监听服务时代码
l.Lock()
l.httpListener = httpListener //把Listener存在NSQLookupd的struct里
l.Unlock()
//创建httpServer的实例,httpServer在nsqlookupd\http.go文件中定义
httpServer := newHTTPServer(ctx)
//调用http_api.Serve方法(在http_api\http_server.go中定义)开始在指定的httpListener上接收http连接。
l.waitGroup.Wrap(func() {
//因为httpServer结构重写了http.Handler接口类的ServeHTTP方法,因此可以当http.Handler使用
http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger)
})
代码第9行调用中的第二个参数httpServer,是一个自定义的struct,而http_apit.Serve需要的参数类型为http.Handler,因为httpServer实现了http.Handler接口类中的接口,因此可以在这个地方使用,对每一个类型实现接口方法如下
七、NSQD心跳
NSQD作为消息接收、转发者,他也是nsqlookupd的客户端,当nsqd启动时,除过自己启动tcp和http服务等待生产者和消费者连接外,还需要作为client连接到nsqlookupd。在nsqd.go文件的Main函数最后,通过waitGroup启动了3个线程,如下代码所示,第2行代码启动了一个lookupLoop循环,该循环是一个死循环,其中有一项功能就是发送心跳值,告诉所有的nsqlookupd,自己还活着。
1 n.waitGroup.Wrap(func() { n.queueScanLoop() }) //循环处理消息的分发
2 n.waitGroup.Wrap(func() { n.lookupLoop() }) //同步nsqd状态到nsqlookup比如:在线、Topic变化、Channel变化等
3 if n.getOpts().StatsdAddress != "" {
4 n.waitGroup.Wrap(func() { n.statsdLoop() })
5 }
心跳发送代码在lookup.go文件中,处理方式如下
case <-ticker: //发送心跳 告诉nsqlookup自己在线
// send a heartbeat and read a response (read detects closed conns)
for _, lookupPeer := range lookupPeers {
n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer)
cmd := nsq.Ping()
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
}
}
nsqlookupd作为服务器端,启动时就开启了tcp监听,每接受一个nsqd连接,就使用go开启一个handle处理函数,最终和客户端(nsqd)交互功能代码在LookupProtocolV1文件中完成。
此时我们在看图2的NSQ拓扑图,说过了nsqd作为client链接nsqlookupd服务器端后,我们在顺道说下nsqd开启tcp监控作为服务器时有哪些是客户端,此时的client包括:生产者和消费者,即图中的writer和reader。
八、Decorator装饰器
先看下NSQ源码中对Decorator定义,其实就是一个函数的嵌套定义,源码位置在internal/http_api/api_response.go文件中,代码如下:
type Decorator func(APIHandler) APIHandler
type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)
下边我们来看两个关于Decorate的使用
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
decorated := f
for _, decorate := range ds {
decorated = decorate(decorated)
}
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
decorated(w, req, ps)
}
}
func Log(l app.Logger) Decorator { //Logger是go对应的log4版本
return func(f APIHandler) APIHandler {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
start := time.Now() //当前时间
response, err := f(w, req, ps) //执行http请求 err错误状态 response处理结果
elapsed := time.Since(start) //f(APIHandler)调用时长
status := 200
if e, ok := err.(Err); ok {
status = e.Code //重置错误码
}
l.Output(2, fmt.Sprintf("%d %s %s (%s) %s",
status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)) //输出http处理日志信息
return response, err //返回一个接口 和错误状态
}
}
}
首先先来看下代码第一行定义的Decorate函数,这个函数其实就是一个装饰函数,第一个参数为需要被装饰的视图函数,从第二参数开始,都是装饰函数,最后返回装饰好的视图函数。
第11行代码定义了一个Log函数,返回值为Decorator类型,也就是代码第12行return后边的表达式,该表达式也是一个函数定义,其返回值为APIHandler,第13行代码return后边的表达式为其返回值。