zookeeper watcher机制源码解析

系统模型

zk的视图结构和标准的unix文件系统非常类似,但没有引入传统文件系统中目录和文件等概念,而是使用了其独有的”数据节点“概念,称之为ZNode,Znode是Zookeeper中数据的最小单元, 每个Znode上都可以保存数据,同时还可以挂载子节点,这样构成了一个层次化的命名空间,我们称之为树。


image.png

事务: 在zk中,事务是指能够改变Zookeeper服务器状态的操作,也叫事务操作或更新操作,一般包括数据节点创建与删除、数据节点内容更新和客户端会话创建与失效等操作。对于每一个事务请求,zk都会为其分配一个全局唯一的事务ID,用ZXID来表示,通常64位,每一个ZXID对应一次更新操作。

**节点类型:在zk中,每个数据节点都是有生命周期的,其生命周期的长短取决于数据节点的节点类型:

  • 持久节点: 该数据节点被创建后,就会一直存在于ZooKeeper服务器上,直到有删除操作来清除这个节点。
  • 持久顺序节点: 在持久性的基础上增加了顺序性,zk会根据节点创建的先后顺序,自动为给定节点加上一个数字后缀,
  • 临时节点: 临时节点的生命周期是和客户端的会话绑定在一起,客户端会话失效,这个节点就会被删除
  • 临时顺序节点: 在临时节点的基础上有了顺序性这个特性。

Watcher机制

image.png

zk的watcher由客户端、客户端watchManager和zk服务器组成,zk客户端向zk服务器注册watcher的同时,会将watcher对象存储在客户端的watcherManager,zk服务器触发watcher事件后,会向客户端发送通知,客户端线程从watcher manager中取出对应的watcher对象,执行相应的回调逻辑。

image.png

上图中的小红旗是一个watcher,当小红旗被创建并注册到node1节点后,就会监听node1+ node_a + node_b或者node_a + node_b,这里两种情况是因为在创建watcher注册时有多种途径,并且watcher不能监听到孙子节点,此外,watcher设置后是一次性的,触发一次后就失效,如果想一直监听,需要在process回调函数里重新注册相同的watcher
在zk中接口类Watcher定义了事件通知相关的逻辑,包含了KeeperState和EventType两个枚举类,代表通知状态和事件类型,

image.png

Watcher接口拥有process函数,用于处理回调
内部类Event又包含内部类KeeperState以及EventType
KeeperState用于记录Event发生时的zk状态(通知状态)
EventType用于记录Event的类型

方法process

//回调函数实现该函数,表示根据event执行的行为
abstract public void process(WatchedEvent event);

内部类Event
包含KeeperState和EventType两个内部类,通过枚举类实现
方法很简单,就是int值与对应枚举类型的转换
两者的枚举类型以及两者之间的关系,触发条件可以参考《paxos到zk》中的图

image.png

WatchedEvent 和 WatcherEvent

WatchedEvent :代表zk上一个Watcher能够回应的变化,包含了变化事件的类型,zk状态以及变化影响的znode的path
WatcherEvent : 是WatchedEvent用于网络传输的封装类


image.png

三个成员变量很好的解释了WatchedEvent的意义,即事件的类型,zk状态以及变化影响的znode的path

WatcherEvent有一个getWrapper方法,

/**
     *  Convert WatchedEvent to type that can be sent over network
     */
    //转化成可供网络传输,序列化的WatcherEvent
    public WatcherEvent getWrapper() {
        return new WatcherEvent(eventType.getIntValue(), 
                                keeperState.getIntValue(), 
                                path);
    }
}

WatcherEvent实现了Record接口,可以理解为WatchedEvent用于网络传输的封装类

ClientWatchManager接口和实现类ZKWatchManager

ClientWatchManager接口: 用户根据Event得到需要通知的watcher
ZKWatchManager为ClientWatchManager的实现,ClientWatchManager接口只有一个函数,

//ClientWatchManager负责根据Event得到需要通知哪些watcher
    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
        Watcher.Event.EventType type, String path);

默认实现类是Zookeeper的内部类ZKWatchManager,

private static class ZKWatchManager implements ClientWatchManager {
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();//针对内容的watch
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();//针对exist API相关的watch
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();//针对getChildren API相关的watch

        private volatile Watcher defaultWatcher;//client传递的,默认的watcher实现

        final private void addTo(Set<Watcher> from, Set<Watcher> to) {
            if (from != null) {
                to.addAll(from);
            }
        }

        /* (non-Javadoc)
         * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, 
         *                                                        Event.EventType, java.lang.String)
         */
        @Override
        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) {
            case None://eventType是null
                // 则所有dataWatches,existWatches,childWatches都需要被通知
                result.add(defaultWatcher);//添加默认watcher
                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                        state != Watcher.Event.KeeperState.SyncConnected;//获取clear标记

                synchronized(dataWatches) {
                    for(Set<Watcher> ws: dataWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }

                return result;
            case NodeDataChanged:
            case NodeCreated:
                //如果节点内容变化或者创建
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);//从dataWatches中移除,并且添加到result中
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);//从existWatches中移除,并且添加到result中
                }
                break;
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            case NodeDeleted:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn't be needed, but just in case
                synchronized (existWatches) {
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        addTo(existWatches.remove(clientPath), result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                    }
                }
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default://默认处理
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }
            //返回结果
            return result;
        }
    }

该方法在事件发生后,返回需要被通知的Watcher集合。是根据已经注册的watches(分为三类,data,children,exist),根据path找到对应的watches,得到一个result集合进行返回

** WatcherSetEventPair ** : 将Event以及对应需要触发的watches集合进行组合绑定,放到waitingEvents队列

  private static class WatcherSetEventPair {
        private final Set<Watcher> watchers;//事件触发需要被通知的watches集合
        private final WatchedEvent event;//事件

        public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
            this.watchers = watchers;
            this.event = event;
        }
    }

watcher注册过程
创建zk客户端对象实例时注册:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean 
canBeReadOnly)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

通过这种方式注册的watcher将会作为整个zk会话期间默认的watcher,会一直被保存在客户端zk watchManager的defaultWatcher中。
其他注册watcher的API:

getChildren(String path, Watcher watcher)
getChildren(String path, boolean watch)
exists(String path, Watcher watcher)
getData(String path, boolean watch, Stat stat)
...

Boolean watch表示是否使用上下文默认的watcher,即创建zk时设置的watcher

客户端注册过程

我们以getData这个接口为例说明:
watcher在请求中,通过标志位发送给server

   public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {//如果有watcher,就注册
            wcb = new DataWatchRegistration(watcher, clientPath);//生成一个DataWatchRegistration,即Data的watch的注册
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();//生成请求头
        h.setType(ZooDefs.OpCode.getData);//设置请求类型为getData
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);//设置标志位,是否函数watch
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);//client端提交请求
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
}
    public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
        return this.getData(path, watch?this.watchManager.defaultWatcher:null, stat);
    }

里面调用了ClientCnxn#submitRequestsubmitRequest 源码如下

//提交请求
    public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();//生成回复头
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        synchronized (packet) {
            while (!packet.finished) {//如果packet没有处理完,就一直等着
                packet.wait();
            }
        }
        return r;
    }

里面调用了ClientCnxn#queuePacket函数, queuePacket函数作为生产者,代码中调用

outgoingQueue.add(packet);

在 ZooKeeper 中,Packet 是一个最小的通信协议单元,即数据包。
Pakcet 用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。
在 ClientCnxn 中 WatchRegistration 也会被封装到 Packet 中,调用 queuePacket放入outgoingQueue即发送队列中(生产packet)
然后SendThread 线程调用doTransport方法,从outgoingQueue中消费Packet,客户端发送, 参考实现类ClientCnxnSocketNIO#doTransport里面调用了ClientCnxnSocketNIO#doIO此时是发送请求,调用了ClientCnxn.Packet#createBB

 public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeInt(-1, "len"); // We'll fill this in later
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");//序列化请求头,包含xid和type
                }
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");//序列化request(对于特定请求如GetDataRequest,包含了是否存在watcher的标志位)
                }
                baos.close();
                this.bb = ByteBuffer.wrap(baos.toByteArray());
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

client的watcher并没有进行网络传输,server并不知道client的watcher触发时process函数要怎么执行
但是对于特定请求类型比如GetDataRequest,序列化的时候会传递一个标志位watch,表示是否watch
server在处理的时候,只知道client是否watch某个path

上面getData两个接口都可以进行Watcher的注册,第二个接口通过一个Boolean参数来标识是否使用默认的Watcher来进行注册,然后还是调用第一个方法来完成注册逻辑,在第一个方法中,客户端使用this.cnxn.submitRequest(h, request, response, wcb) 方法向服务器发送这个注册请求,完成请求发送后,
发送请求的时候,watcher还并没有注册在client端,要等server的返回,
ClientCnxn.SendThread中,读取server的回复
调用了ClientCnxnSocketNIO#doTransport
调用了ClientCnxnSocketNIO#doIO
调用了ClientCnxn.SendThread#readResponse
调用了ClientCnxn#finishPacket
finishPacket方法,它会从Packet中取出对应的Watcher并注册到zkWatchManager中去,也即请求回复后,watcher才在client端注册,

private void finishPacket(Packet p) {//Packet请求发送,收到回复,进行处理之后
        if (p.watchRegistration != null) {//如果有要注册的watchRegistration
            p.watchRegistration.register(p.replyHeader.getErr());//根据response code进行注册
        }

        if (p.cb == null) {//如果没有异步回调
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {//如果有异步回调
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }
image.png

watchRegistration.register方法就是把 WatchRegistration 子类里面的 Watcher 实例放到 ZKWatchManager 的 dataWatches 中存储起来

   abstract class WatchRegistration {//client中管理watch注册的类
        private Watcher watcher;//注册的watcher
        private String clientPath;//监听的znode path
        public WatchRegistration(Watcher watcher, String clientPath)
        {
            this.watcher = watcher;
            this.clientPath = clientPath;
        }
        //根据response的resultCode来获取所有注册的path以及对应的watcher集合
        abstract protected Map<String, Set<Watcher>> getWatches(int rc);

        /**
         * Register the watcher with the set of watches on path.
         * @param rc the result code of the operation that attempted to
         * add the watch on the path.
         */
        public void register(int rc) {//根据response的resultCode来注册watcher到一个path
            if (shouldAddWatch(rc)) {//如果可以添加
                Map<String, Set<Watcher>> watches = getWatches(rc);//获取所有注册的path以及对应的watcher集合
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);//找到该path
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);//添加当前watcher
                }
            }
        }
        /**
         * Determine whether the watch should be added based on return code.
         * @param rc the result code of the operation that attempted to add the
         * watch on the node
         * @return true if the watch should be added, otw false
         */
        protected boolean shouldAddWatch(int rc) {//根据resultCode判断是否可以添加watch
            return rc == 0;
        }
    }

属性clientPath和watcher分别是监听关注的znode的path和对应处理的watcher
注册逻辑就是根据response的resultCode,判断是否可以添加watch,可以添加的话,就在Map<String, Set<Watcher>>添加记录.

简单来说,就是当使用ZooKeeper 构造方法或者使用 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和Watcher对应关系存储起来备用。

client注册watcher的小结
1.client发送getData,getChildren,exist请求时,传入自定义的watcher,或利用ZooKeeper构造函数的默认Watcher
2.将请求封装为Packet,在RequestHeader记录是否需要watcher,记录放入生产者队列ClientCnxn#outgoingQueue
3.ClientCnxn.SendThread消费outgoingQueue
  调用ClientCnxnSocketNIO#doTransport
  调用ClientCnxnSocketNIO#doIO
  调用ClientCnxn.Packet#createBB
  序列化的时候,将request记性序列化,里面包含一个是否带有watch的标志位(不包含watcher对象)
4.server进行相应的处理,之后进行回复
  可以参考FinalRequestProcessor#processRequest中对于getData的请求处理
  利用getDataRequest.getWatch()),看是否client需要watch,进而注册到DataTree的WatchManager中,下面会讲的
5.ClientCnxn.SendThread读取回复
  调用ClientCnxnSocketNIO#doTransport
  调用ClientCnxnSocketNIO#doIO
  调用ClientCnxn.SendThread#readResponse
  调用ClientCnxn#finishPacket
  利用response code,进行watcher的注册,记录在ZooKeeper.WatchRegistration对应的实现类中
服务端处理watcher
image.png

server前面的调用链这里不展开,从FinalRequestProcessor 类接收到客户端请求后,会调用 processRequest 方法进行处理,会进一步转向 ZooKeeperServer 的 processRequest 进行进一步处理,处理结由 ZKDatabase 类返回


image.png

对于注册 Watcher 请求,FinalRequestProcessor 的 ProcessRequest 方法会判断当前请求是否需要注册 Watcher,如果为 true,就会将当前的 ServerCnxn 对象和数据节点路径传入 getData 方法中去。ServerCnxn 是一个 ZooKeeper 客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接.
实现了Watcher接口,这个Watcher的实现类记录了client和server的连接,回调的时候,可以直接发送response告诉client,有事件触发了,数据节点的节点路径和 ServerCnxn 最终会被存储在 WatchManager 的 watchTable 和 watch2Paths 中。

case OpCode.getData: {//getData请求
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getDataRequest);//反序列化出getDataRequest
                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }//验证path对应的node是否存在
                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo);//验证ACL权限
                Stat stat = new Stat();
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);//如果有watch标志位,Watcher就传cnxn
                rsp = new GetDataResponse(b, stat);
                break;
            }

上面的FinalRequestProcessor#processRequest调用会进入
ZKDatabase#getData
DataTree#getData

public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
                dataWatches.addWatch(path, watcher);//注册watcher到dataWatches
            }
            return n.data;//返回byte[]
        }
    }

WatchManager 负责 Watcher 事件的触发,它是一个统称,在服务端 DataTree 会托管两个 WatchManager,分别是 dataWatches 和 childWatches,分别对应数据变更 Watcher 和子节点变更 Watcher。
.WatchManger 两个队列

private final HashMap<String, HashSet<Watcher>> watchTable =
 new HashMap<String, HashSet<Watcher>>();
 
 private final HashMap<Watcher, HashSet<String>> watch2Paths =
 new HashMap<Watcher, HashSet<String>>();
  public synchronized void addWatch(String path, Watcher watcher) {
        HashSet<Watcher> list = watchTable.get(path);
        if (list == null) {
            // don't waste memory if there are few watches on a node
            // rehash when the 4th entry is added, doubling size thereafter
            // seems like a good compromise
            list = new HashSet<Watcher>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);

        HashSet<String> paths = watch2Paths.get(watcher);
        if (paths == null) {
            // cnxns typically have many watches, so use default cap here
            paths = new HashSet<String>();
            watch2Paths.put(watcher, paths);
        }
        paths.add(path);
    }

结合上面的时序图,就可以理解请求是如何经过ZKdatabase到DataTree最终记录在WatchManager,这里就完成了watcher在服务端的注册。

server触发watch

当发生 Create、Delete、NodeChange(数据变更)这样的事件后,DataTree 会调用相应方法去触发 WatchManager 的 triggerWatch 方法,该方法返回 ZNODE 的信息,自此进入到回调本地 process 的序列,这里以setdata为例:
processTxn 代码

public ProcessTxnResult processTxn(TxnHeader header, Record txn)
 {
 ProcessTxnResult rc = new ProcessTxnResult();
 
 try {
switch (header.getType()) {
case OpCode.setData:
 SetDataTxn setDataTxn = (SetDataTxn) txn;
 rc.path = setDataTxn.getPath();
 rc.stat = setData(setDataTxn.getPath(), setDataTxn
 .getData(), setDataTxn.getVersion(), header
 .getZxid(), header.getTime());
 break;

setData 代码

public Stat setData(String path, byte data[], int version, long zxid,
 long time) throws KeeperException.NoNodeException {
 Stat s = new Stat();
 DataNodeV1 n = nodes.get(path);
 if (n == null) {
 throw new KeeperException.NoNodeException();
 }
 synchronized (n) {
 n.data = data;
 n.stat.setMtime(time);
 n.stat.setMzxid(zxid);
 n.stat.setVersion(version);
 n.copyStat(s);
 }
 dataWatches.triggerWatch(path, EventType.NodeDataChanged);
 return s;
 }

triggerWatch 代码

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
 KeeperState.SyncConnected, path);
//将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象
 HashSet<Watcher> watchers;
 synchronized (this) {
//根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher。如果没有找到 Watcher 对象,
说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。如果找打了 Watcher 就将其提取出来,
同时会直接从 watchTable 和 watch2Paths 里删除 Watcher,即 Watcher 是一次性的,触发一次就失效了。
 watchers = watchTable.remove(path);
for (Watcher w : watchers) {
 HashSet<String> paths = watch2Paths.get(w);
 }
 }
 for (Watcher w : watchers) {
 if (supress != null && supress.contains(w)) {
 continue;
 }
//对于需要注册 Watcher 的请求,ZooKeeper 会把请求对应的 ServerCnxn 作为一个 Watcher 存储,
所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法
 w.process(e);
 }
 return watchers;
}

从上面的代码我们可以总结出,如果想要处理一个 Watcher,需要执行的步骤如下所示:

  1. 将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象。

  2. 根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher。如果没有找到 Watcher 对象,说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。如果找到了 Watcher 就将其提取出来,同时会直接从 watchTable 和 watch2Paths 里删除 Watcher,即 Watcher 是一次性的,触发一次就失效了。

  3. 对于需要注册 Watcher 的请求,ZooKeeper 会把请求对应的 ServerCnxn 作为一个 Watcher 存储,所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法,在请求头标记“-1”表示当前是一个通知,将 WatchedEvent 包装成 WatcherEvent 用于网络传输序列化,向客户端发送通知

  @Override
    synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);//xid为-1表示为通知
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();//包装为WatcherEvent来提供网络传输

        sendResponse(h, e, "notification");//给client发送请求,通知WatchedEvent的发生
    }

也就是说,server触发watcher,回调process函数其实就是告诉需要watch的client,WatcherEvent 发生了

服务端处理watcher小结

注册时watcher是ServerCnxn类型,保存了和client的会话,如果client发送请求的时候,request的标志位watch为true,server才会将这个会话注册到WatchManager(否则server知道client对这个path不感兴趣,下次这个path变化了也不通知你)
触发watcher时,就利用watchManager,找到path对应的watcher即ServerCnxn,告诉连接的client方,发生了WatcherEvent,client自己再处理

client回调watcher

客户端收到消息后,会调用 ClientCnxn 的 SendThread.readResponse 方法来进行统一处理,如清单所示。如果响应头 replyHdr 中标识的 Xid 为 02,表示是 ping,如果为-4,表示是验证包,如果是-1,表示这是一个通知类型的响应,然后进行反序列化、处理 chrootPath、还原 WatchedEvent、回调 Watcher 等步骤,其中回调 Watcher 步骤将 WacthedEvent 对象交给 EventThread 线程,在下一个轮询周期中进行 Watcher 回调。

客户端回调watcher

服务端会通过使用 ServerCnxn 对应的 TCP 连接来向客户端发送一个 WatcherEvent 事件。ClientCnxn.SendThread读取回复
调用ClientCnxnSocketNIO#doTransport
调用ClientCnxnSocketNIO#doIO
调用ClientCnxn.SendThread#readResponse
里面处理事件通知的代码段

            if (replyHdr.getXid() == -1) {//-1代表通知类型 即WatcherEvent

                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");//反序列化WatcherEvent

                // convert from a server path to a client path
                if (chrootPath != null) {//把serverPath转化成clientPath
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                        LOG.warn("Got server path " + event.getPath()
                                + " which is too short for chroot path "
                                + chrootPath);
                    }
                }

                WatchedEvent we = new WatchedEvent(event);//WatcherEvent还原成WatchedEvent
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                eventThread.queueEvent( we );//加入队列
                return;
            }

对于一个来自服务端的响应,都是经过一堆的 NIO 处理类到达客户端,然后由 SendThread.readResponse(ByteBuffer incomingBuffer) 方法来统一进行处理的。如果响应头 replyHdr 中标识了 xid 为 -1,表面这是一个通知类型的响应,对其的处理大体上分为如下步骤。

  • 反序列化 packet2.deserialize(bbia, “response”);将字节流转换成 WatcherEvent 对象。
  • 还原WatchedEvent, WatchedEvent we1 = new WatchedEvent(packet2);
  • 回调 Watcher : ClientCnxn.this.eventThread.queueEvent(we1); 最后将 WatchedEvent 对象交给 eventThread 线程,在下一个轮询周期中进行回调。

下面来看一下eventThread.queueEvent(we1)里面的逻辑:

     public void queueEvent(WatchedEvent event) {//将WatchedEvent加入队列
           if (event.getType() == EventType.None
                   && sessionState == event.getState()) {
               return;
           }
           sessionState = event.getState();

           // materialize the watchers based on the event
           WatcherSetEventPair pair = new WatcherSetEventPair(
                   watcher.materialize(event.getState(), event.getType(),
                           event.getPath()),
                           event);//用WatcherSetEventPair封装watchers和watchedEvent
           // queue the pair (watch set & event) for later processing
           waitingEvents.add(pair);//加入队列
       }

对于这个方法,首先使用该 event 来生成一个 WatcherSetEventPair 类型的pari,这个pari只是把 event 加了一个壳,然后附加上了 这个节点上所有的 Watcher :

    private static class WatcherSetEventPair {
        private final Set<Watcher> watchers;
        private final WatchedEvent event;

那么是如何获取到注册该节点的所有watcher呢?看一下上面的 ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()) 这个方法,以 NodeCreated 事件为例:

        public Set<Watcher> materialize(KeeperState state, EventType type, String clientPath) {
            HashSet result = new HashSet();
            Map msg;
            switch(null.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[type.ordinal()]) {
            ...
            case NodeDataChanged:
            case NodeCreated:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
                ...

客户端在识别出事件类型 EventType 后,会从相应的 Watcher 存储(即 dataWatches、existWatches 或 childWatches 中的一个或多个,本例中就是从 dataWatches 和 existWatches 两个存储中获取,因为,节点创建事件不会在 childWatches 中存储)中去除对应的 Watcher。需要注意的是,这里使用的是 remove 接口,因此也表明了客户端的 Watcher 机制同样是一次性的,即一旦被触发后,该 Watcher 就失效了。

取到所有的 Watcher 后,放到 pari 的 Set 里面,然后再把这个 pari 放到 waitingEvents 里面,而 waitingEvents 是啥玩意儿呢?

private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue();

        public void run() {
            try {
                this.isRunning = true;

                while(true) {
                    Object e = this.waitingEvents.take();//循环取pari
                    if(e == ClientCnxn.this.eventOfDeath) {
                        this.wasKilled = true;
                    } else {
                        this.processEvent(e);//进行处理
                    }

                    if(this.wasKilled) {
                        LinkedBlockingQueue var2 = this.waitingEvents;
                        synchronized(this.waitingEvents) {
                            if(this.waitingEvents.isEmpty()) {
                                this.isRunning = false;
                                break;
                            }
                        }
                    }
                }
            } catch (InterruptedException var5) {
                ClientCnxn.LOG.error("Event thread exiting due to interruption", var5);
            }

        }

waitingEvents 是一个待处理 Watcher 的队列,waitingEvents的消费在ClientCnxn.EventThread#run中,EventThread 的 run() 方法会不断从队列中取数据,交由 processEvent 方法处理:

        private void processEvent(Object event) {
            try {
                if(event instanceof ClientCnxn.WatcherSetEventPair) {
                    ClientCnxn.WatcherSetEventPair t = (ClientCnxn.WatcherSetEventPair)event;
                    Iterator rc = t.watchers.iterator();

                    while(rc.hasNext()) {
                        Watcher clientPath = (Watcher)rc.next();

                        try {
                            clientPath.process(t.event);
                        } catch (Throwable var11) {
                            ClientCnxn.LOG.error("Error while calling watcher ", var11);
                        }
                    }
                } else {

OK,针对于本次事件,取出所有的 Watcher 类型的对象,遍历运行process方法,进行串行同步处理。此处 processEvent 方法中的 Watcher 才是之前客户端真正注册的 Watcher,调用其 process 方法就可以实现 Watcher 的回调了。客户端只能收到服务器发过来的相关事件通知,并不能获取到对应数据节点的原始数据和变更后的数据,如果需要知道变更前或者变更后的数据,需要调用相关接口获取新的数据。

思考
  1. client注册的watcher和server注册的watcher有什么区别
    作用和类型有区别
    client注册的watcher类型没有限制,作用就是说client监控到xx事件后干的事情,比如重新获取数据
    server注册的watcher都是ServerCnxn类型,作用就是告诉对应client 发生了xx WatchedEvent就行
    由于watcher并没有直接在网络进行传输,所以两者并不一样

  2. server怎么知道一个WatchedEvent触发,要通知哪些client
    server的watch是ServerCnxn,保持了和Client的对话,直接回调process就行了
    都是ServerCnxn(实现了Watcher)的功劳

watcher 特性总结
  • 轻量
    WatcherEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对 NodeDataChanged 事件,ZooKeeper 的Watcher 只会通知客户端指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据——这也是 ZooKeeper 的 Watcher 机制的一个非常重要的特性。客户端向服务端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象传递到服务端,仅仅只是在客户端请求中使用 boolean 类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的 ServerCnxn 对象。这样轻量级的 Watcher 机制设计,在网络开销和服务端内存开销上都是非常廉价的。

  • 一次性
    无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除。因此,在 Watcher 的使用上,需要反复注册。这样的设计有效地减轻了服务端的压力,如果注册一个 Watcher 之后一直有效,那么针对那些更新非常频繁的节点,服务端会不断地向客户端发送事件通知,这无论对于网络还是服务端性能的影响都非常大。

  • 客户端串行执行
    客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345