zkClient API

来源: https://www.cnblogs.com/f1194361820/p/5575206.html

1. zkClient 介绍

zkClient 是基于 原生 zookeeper包下开发的客户端,解决原生API出现的超时重连(session失效重连)、重复注册watcher、异常处理.
该zkClient 已经用在Dubbo, Kafka, Helix中。

2. zkClient 的类图

zkClient

3. zkClient 组件说明

zkClient:   利用该类的构造函数对其进行连接zk。  使用过zkClient 连接,会发现其底层使用了zookeeper原生的api 对zk进行连接的。
IZKConnection的实现类ZkConnection, 就是设置一些基本的属性, 比如
_zk  : ZooKeeper 对象
_zookeeperLock: Lock对象
_servers:  连接字符ip:port
_sessionTimeOut  : 

前面有一篇文章中,已经说了,使用ZooKeeper客户端来注册watcher有几种方法:

1、创建ZooKeeper对象时指定默认的Watcher
2、exists()
3、getData()
4、getchildren

其中getdata,exists注册的是某个节点的事件处理器(watcher),getchildren注册的是子节点的事件处理器(watcher)。
而在ZKClient中,根据事件类型,分为了节点事件(数据事件)、子节点事件。
对应的事件处理器则是IZKDataListener和IZKChildListener。另外加入了Session相关的事件和事件处理器。

4. 启动ZKClient流程

在创建ZKClient对象时,就完成了到ZooKeeper服务器连接的建立。具体过程是这样的:


zkClient

1、 启动时,指定好connection string,连接超时时间,序列化工具等。
2、 创建并启动eventThread,用于接收事件,并调度事件监听器Listener的执行。
3、 连接到zookeeper服务器,同时将ZKClient自身作为默认的Watcher。

5. 代码流程

构造zkClient 对象, 其中会运行connect方法,
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer, long operationRetryTimeout) {
        this._childListener = new ConcurrentHashMap();
        this._dataListener = new ConcurrentHashMap();
        this._stateListener = new CopyOnWriteArraySet();
        this._zkEventLock = new ZkLock();
        if (zkConnection == null) {
            throw new NullPointerException("Zookeeper connection is null!");
        } else {
            this._connection = zkConnection;
            this._zkSerializer = zkSerializer;
            this._operationRetryTimeoutInMillis = operationRetryTimeout;
            this._isZkSaslEnabled = this.isZkSaslEnabled();
            this.connect((long)connectionTimeout, this);
        }
    }

// 启动连接
public void connect(long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
        boolean started = false;
        this.acquireEventLock();

        try {
            this.setShutdownTrigger(false);
            this._eventThread = new ZkEventThread(this._connection.getServers());  
            this._eventThread.start();     // ZkEventThread 是用来处理事件的线程
            this._connection.connect(watcher);  // 启动连接
            LOG.debug("Awaiting connection to Zookeeper server"); 
            boolean waitSuccessful = this.waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);
// 等待连接,连接成功之后会返回状态
            if (!waitSuccessful) {
                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + this._connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");
            }

            started = true;
        } finally {
            this.getEventLock().unlock();
            if (!started) {
                this.close();
            }

        }

    }

ZkEventThread.start

public void run() {
        LOG.info("Starting ZkClient event thread.");

        int eventId;
        try {
            for(; !this.isInterrupted(); LOG.debug("Delivering event #" + eventId + " done")) {
                ZkEventThread.ZkEvent zkEvent = (ZkEventThread.ZkEvent)this._events.take();  // 获取相应的事件, 这些事件是一些ZkEventThread.Event的实现类, 比如 childChangeEvent, newSessionEvent, DataChangeEvent事件
                eventId = _eventId.incrementAndGet();
                LOG.debug("Delivering event #" + eventId + " " + zkEvent);
                try {
                    zkEvent.run();
                } catch (InterruptedException var4) {
                    this.interrupt();
                } catch (ZkInterruptedException var5) {
                    this.interrupt();
                } catch (Throwable var6) {
                    LOG.error("Error handling event " + zkEvent, var6);
                }
            }
        } catch (InterruptedException var7) {
            LOG.info("Terminate ZkClient event thread.");
        }

    }


6. 为节点注册Watcher

ZooKeeper的三个方法:getData、getChildren、exists,ZKClient都提供了相应的代理方法。就拿exists来看:

1.png

可以看到,是否注册watcher,由hasListeners(path)来决定的。


2.png

hasListeners就是看有没有与该数据节点绑定的listener。

所以呢,默认情况下都会自动的为指定的path注册watcher,并且是默认的watcher(ZKClient)。怎么才能让hasListeners判定值为true呢,也就是怎么才能为path绑定Listener呢?

ZKClient提供了订阅功能:


3.png

一个新建的会话,只需要在取得响应的数据节点后,调用subscribteXxx就可以订阅上相应的事件了。

zkClient 是使用listener来为节点注册watcher的, 这个watcher一般是zkClient本身。zkClient 是利用subscribteXxx来为path 订阅listener事件的。

7. ZooKeeper的变更操作

Zookeeper中提供的变更操作有:节点的创建、删除,节点数据的修改。

  • 创建操作,数据节点分为四种,ZKClient分别为他们提供了相应的代理:


    1
  • 删除节点的操作:


    2
  • 修改节点数据的操作:


    3

writeDataReturnStat():写数据并返回数据的状态。
updateDataSerialized():修改已序列化的数据。执行过程是:先读取数据,然后使用DataUpdater对数据修改,最后调用writeData将修改后的数据发送给服务端。

8.客户端处理变更

前面已经知道,ZKClient是默认的Watcher,并且在为各个数据节点注册的Watcher都是这个默认的Watcher。
那么该是如何将各种事件通知给相应的Listener呢?

处理过程大致可以概括为下面的步骤:

1、判断变更类型:变更类型分为State变更、ChildNode变更(创建子节点、删除子节点、修改子节点数据)、
NodeData变更(创建指定node,删除节点,节点数据变更)。

2、取出与path关联的Listeners,并为每一个Listener创建一个ZKEvent,将ZkEvent交给ZkEventThread处理。

3、ZkEventThread线程,拿到ZkEvent后,只需要调用ZkEvent的run方法进行处理。

从这里也可以知道,具体的怎么如何调用Listener,还要依赖于ZkEvent的run()实现了。

9.序列化处理

ZooKeeper中,会涉及到序列化、反序列化的操作有两种:getData、setData。
在ZKClient中,分别用readData、writeData来替代了。

对于readData:先调用zookeeper的getData,然后进行使用ZKSerializer进行反序列化工作。

对于writeData:先使用ZKSerializer将对象序列化后,再调用zookeeper的setData。

10.ZkClient如何解决使用ZooKeeper客户端遇到的问题的呢?

  • Watcher自动重注册:这个要是依赖于hasListeners()的判断,来决定是否再次注册。
    有就将watcher注册到path 路劲下
  • Session失效重连:如果发现会话过期,就先关闭已有连接,再重新建立连接。
  • 异常处理:对比ZooKeeper和ZKClient,就可以发现ZooKeeper的所有操作都是抛异常的,
    而ZKClient的所有操作,都不会抛异常的。在发生异常时,它或做日志,或返回空,或做相应的Listener调用。

相比于ZooKeeper官方客户端,使用ZKClient时,只需要关注实际的Listener实现即可。
所以这个客户端,还是推荐大家使用的。

log: 遇到的问题:

1. Unable to connect to zookeeper server xxx:port with timeout of 10000

解决:

本地虚拟机,以standlone模式启动, 连接时,出现这个问题,说明连接超时,将connect time 设置大点就可以了

PS: 若你觉得可以、还行、过得去、甚至不太差的话,可以“关注”或者“点赞”一下,就此谢过!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容