前言
初学 Zookeeper 会发现客户端有两种回调方式: Watcher 和 AsyncCallback,而 Zookeeper 的使用是离不开这两种方式的,搞清楚它们之间的区别与实现显得尤为重要。本文将围绕下面几个方面展开
- Watcher 和 AsyncCallback 的区别
- Watcher 的回调实现
- AsyncCallback 的回调实现
- IO 与事件处理
Watcher 和 AsyncCallback 的区别
我们先通过一个例子来感受一下:
zooKeeper.getData(root, new Watcher() {
public void process(WatchedEvent event) {
}
}, new AsyncCallback.DataCallback() {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
}
}, null);
可以看到,getData
方法可以同时设置两个回调:Watcher 和 AsyncCallback,同样是回调,它们的区别是什么呢?要解决这个问题,我们就得从这两个接口的功能入手。
-
Watcher
:Watcher
是用于监听节点,session 状态的,比如getData
对数据节点a
设置了watcher
,那么当a
的数据内容发生改变时,客户端会收到NodeDataChanged
通知,然后进行watcher
的回调。 -
AsyncCallback
:AsyncCallback
是在以异步方式使用 ZooKeeper API 时,用于处理返回结果的。例如:getData
同步调用的版本是:byte[] getData(String path, boolean watch,Stat stat)
,异步调用的版本是:void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx)
,可以看到,前者是直接返回获取的结果,后者是通过AsyncCallback
回调处理结果的。
Watcher
Watcher 主要是通过ClientWatchManager
进行管理的。下面是 Watcher 相关类图
添加 Watcher 的流程如下:
Watcher 的类型
ClientWatchManager
中有四种Watcher
-
defaultWatcher
:创建Zookeeper
连接时传入的Watcher
,用于监听 session 状态 -
dataWatches
:存放getData
传入的Watcher
-
existWatches
:存放exists
传入的Watcher
,如果节点已存在,则Watcher
会被添加到dataWatches
-
childWatches
:存放getChildren
传入的Watcher
从代码上可以发现,监听器是存在HashMap
中的,key
是节点名称path
,value
是Set<Watcher>
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher;
通知的状态类型与事件类型
在Watcher
接口中,已经定义了所有的状态类型和事件类型
-
KeeperState.Disconnected(0)
此时客户端处于断开连接状态,和ZK集群都没有建立连接。
-
EventType.None(-1)
触发条件:一般是在与服务器断开连接的时候,客户端会收到这个事件。
-
-
KeeperState. SyncConnected(3)
此时客户端处于连接状态
-
EventType.None(-1)
触发条件:客户端与服务器成功建立会话之后,会收到这个通知。
-
EventType. NodeCreated (1)
触发条件:所关注的节点被创建。
-
EventType. NodeDeleted (2)
触发条件:所关注的节点被删除。
-
EventType. NodeDataChanged (3)
触发条件:所关注的节点的内容有更新。注意,这个地方说的内容是指数据的版本
号dataVersion
。因此,即使使用相同的数据内容来更新,还是会收到这个事件通知的。无论如何,调用了更新接口,就一定会更新dataVersion
的。 -
EventType. NodeChildrenChanged (4)
触发条件:所关注的节点的子节点有变化。这里说的变化是指子节点的个数和组成,具体到子节点内容的变化是不会通知的。
-
-
KeeperState. AuthFailed(4)
认证失败
- EventType.None(-1)
-
KeeperState. Expired(-112)
session 超时
- EventType.None(-1)
materialize 方法
ClientWatchManager
只有一个方法,那就是materialize
,它根据事件类型type
和path
返回监听该节点的特定类型的Watcher
。
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type, String path);
核心逻辑如下:
-
type == None
:返回所有Watcher
,也就是说所有的Watcher
都会被触发。如果disableAutoWatchReset == true
且当前state != SyncConnected
,那么还会清空Watcher
,意味着移除所有在节点上的Watcher
。 -
type == NodeDataChanged | NodeCreated
:返回监听path
节点的dataWatches & existWatches
-
type == NodeChildrenChanged
:返回监听path
节点的childWatches
-
type == NodeDeleted
:返回监听path
节点的dataWatches | childWatches
每次返回都会从HashMap
中移除节点对应的Watcher
,例如:addTo(dataWatches.remove(clientPath), result);
,这就是为什么Watcher
是一次性的原因(defaultWatcher
除外)。值得注意的是,由于使用的是HashSet
存储Watcher
,重复添加同一个实例的Watcher
也只会被触发一次。
AsyncCallback
Zookeeper 的exists
,getData
,getChildren
方法都有异步的版本,它们与同步方法的区别仅仅在于是否等待响应,底层发送都是通过sendThread
异步发送的。下面我们用一幅图来说明:
上面的图展示了同步/异步调用
getData
的流程,其他方法也是类似的。
IO 与事件处理
Zookeeper 客户端会启动两个常驻线程
-
SendThread
:负责 IO 操作,包括发送,接受响应,发送 ping 等。 -
EventThread
:负责处理事件,执行回调函数。
readResponse
readResponse
是SendThread
处理响应的核心函数,核心逻辑如下:
接受服务器的响应,并反序列化出
ReplyHeader
: 有一个单独的线程SendThread
,负责接收服务器端的响应。假设接受到的服务器传递过来的字节流是incomingBuffer
,那么就将这个incomingBuffer
反序列化为ReplyHeader
。-
判断响应类型:判断
ReplyHeader
是Watcher
响应还是AsyncCallback
响应:ReplyHeader.getXid()
存储了响应类型。- 如果是
Watcher
类型响应:从ReplyHeader
中创建WatchedEvent
,WatchedEvent
里面存储了节点的路径,然后去WatcherManager
中找到和这个节点相关联的所有Watcher
,将他们写入到EventThread
的waitingEvents
中。 - 如果是
AsyncCallback
类型响应:从ReplyHeader
中读取response
,这个response
描述了是Exists,setData,getData,getChildren,create.....
中的哪一个异步回调。从pendingQueue
中拿到Packet
,Packet
中的cb
存储了AsyncCallback
,也就是异步 API 的结果回调。最后将Packet
写入到EventThread
的waitingEvents
中。
- 如果是
processEvent
processEvent
是EventThread
处理事件核心函数,核心逻辑如下:
- 如果
event instanceof WatcherSetEventPair
,取出pair
中的Watchers
,逐个调用watcher.process(pair.event)
- 否则
event
为AsyncCallback
,根据p.response
判断为哪种响应类型,执行响应的回调processResult
。
可见,Watcher
和AsyncCallback
都是由EventThread
处理的,通过processEvent
进行区分处理。
总结
Zookeeper 客户端中Watcher
和AsyncCallback
都是异步回调的方式,但它们回调的时机是不一样的,前者是由服务器发送事件触发客户端回调,后者是在执行了请求后得到响应后客户端主动触发的。它们的共同点在于都需要在获取了服务器响应之后,由SendThread
写入EventThread
的waitingEvents
中,然后由EventThread
逐个从事件队列中获取并处理。
参考资料
ZooKeeper个人笔记客户端watcher和AsycCallback回调
【ZooKeeper Notes 13】ZooKeeper Watcher的事件通知类型