前言
Zookeeper作为一种应用协调系统,有着广泛的应用,其中一种就是作为服务注册中心,比如:Zookeeper+Dubbo+Spring实现服务注册与发现。
目前,遇到了这样一个问题,用Go语言编写的服务A,依赖于Zookeeper实现服务发现与注册,Master选举,资源分配等功能。这些功能很大程度上依赖于对Zookeeper节点监听的操作,监听要具有实时性和稳定性,并要针对go-zookeeper——封装了Zookeeper操作的API做优化,实现对Zookeeper的永久监听功能。
本文以Master选举为例,实现对Zookeeper的/master下属子节点的循环监听
go-zookeeper监听分析
Master节点只能有一个,在api中,我们发现ChildrenW可以满足对/master子节点监听的需求
// 1. 返回通道
func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
if err := validatePath(path, false); err != nil {
return nil, nil, nil, err
}
var ech <-chan Event
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watchTypeChild)
}
})
if err != nil {
return nil, nil, nil, err
}
return res.Children, &res.Stat, ech, err
}
func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()
// 2. 通道大小为1
ch := make(chan Event, 1)
wpt := watchPathType{path, watchType}
c.watchers[wpt] = append(c.watchers[wpt], ch)
return ch
}
在源码中我们可以看到
ChildrenW会返回一个通道,这个通道用来传递节点监听的结果,结果类型为Event
通道大小为1,且每调用一次ChildrenW就会创建一个通道,故监听只能生效一次
我们需要自定义函数,用来接收Event传递而来的结果,这个函数必然是异步的
如果我们想实现循环监听
就要不断的循环ChildrenW和他的Event响应函数
这两者还都是异步的,因为要保证监听的实时性,在接收到Event后要立即异步开启新的ChildrenW和其响应函数
如果简单的使用循环与协程可能会导致
代码丑陋
会开启大量协程
无法控制协程执行顺序,导致程序运行结果异常
循环监听
// 使用专属通道接收event
// 通道大小由服务并发量决定,越大处理越快
MasterChan = make(chan zk.Event, 1)
// zookeeper master节点监听函数,需要异步调用
func masterObserver(conn *zk.Conn, path string) {
// 开启event响应协程
go masterChanProcess(conn)
for {
_, _, event, err := conn.ChildrenW(path)
if err != nil {
log.Printf("Start to watch children path %s failed, err: %s", path, err.Error())
}
log.Printf("Start to watch children path %s successful!", path)
select {
case e := <-event:
// 将监听得到的event放入其专属通道内
MasterChan <- e
}
}
}
// event响应函数,需要异步调用
func masterChanProcess(conn *zk.Conn) {
for {
select {
// 专属通道响应
case event := <-MasterChan:
log.Printf("Master alteration has been detected, path is %s", event.Path)
/** event响应过程,your code */
}
}
}
func main() {
// zk初始化
go masterChanProcess(conn)
}
优点
仅开启两个协程,资源消耗小
通道保证高并发性能,以及event的响应顺序
代码简洁