zk源码阅读32:Server与Client的网络I/O(一):ServerCnxn

摘要

在讲ZooKeeperServer之前,要讲ServerCnxnFactory,在ServerCnxnFactory之前又要讲ServerCnxn。
在前面源码阅读第19,20节讲了ClientCnxn,记录client端connection的数据结构,本节讲解ServerCnxn,这个类代表了一个客户端与一个server的连接。

  实现接口
    Stats,Watcher
  内部类
  属性
  函数
    抽象函数
    具体函数
      auth
      packet收发
      更新统计数据
      get,toString,dump
  思考,吐槽

结构如下

ServerCnxn父类子类

本身类图如下

接口

Stats

方法如下

方法 意义
Date getEstablished(); 获取建立连接的时间
long getOutstandingRequests(); 获取已经提交但是尚未回复的请求个数
long getPacketsReceived(); 获取接收到的packets个数
long getPacketsSent(); 获取已经发送packet个数
long getMinLatency(); 最低延迟
long getAvgLatency(); 最高延迟
long getAvgLatency(); 平均延迟
String getLastOperation(); 连接最后一次操作
long getLastCxid(); 连接最后的cxid
long getLastZxid(); 连接最后的zxid
long getLastResponseTime(); 上次回复的时间
long getLastLatency(); 上一次回复的延迟
void resetStats(); 还原各种计数器

Watcher

这个在之前已经讲过了,所有事件处理器都要实现该接口,可以定义一些回调行为

Watcher接口类图

内部类

两个定义异常的内部类

    // 请求关闭异常类
    protected static class CloseRequestException extends IOException {
        private static final long serialVersionUID = -7854505709816442681L;

        public CloseRequestException(String msg) {
            super(msg);
        }
    }

    // 流结束异常类
    protected static class EndOfStreamException extends IOException {
        private static final long serialVersionUID = -8255690282104294178L;

        public EndOfStreamException(String msg) {
            super(msg);
        }

        public String toString() {
            return "EndOfStreamException: " + getMessage();
        }
    }

属性

    // This is just an arbitrary object to represent requests issued by
    // (aka owned by) this class
    // 代表由本类提出的请求
    final public static Object me = new Object();
    // 认证信息
    protected ArrayList<Id> authInfo = new ArrayList<Id>();

    /**
     * If the client is of old version, we don't send r-o mode info to it.
     * The reason is that if we would, old C client doesn't read it, which
     * results in TCP RST packet, i.e. "connection reset by peer".
     */
    // 是否为旧的C客户端
    boolean isOldClient = true;
    
    // Zookeeper的Sasl服务器
    protected ZooKeeperSaslServer zooKeeperSaslServer = null;
    
    
    /**
    * CMD命令
    **/
    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    // CMD命令
    protected final static int confCmd =
        ByteBuffer.wrap("conf".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int consCmd =
        ByteBuffer.wrap("cons".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int crstCmd =
        ByteBuffer.wrap("crst".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int dumpCmd =
        ByteBuffer.wrap("dump".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int enviCmd =
        ByteBuffer.wrap("envi".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int getTraceMaskCmd =
        ByteBuffer.wrap("gtmk".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int ruokCmd =
        ByteBuffer.wrap("ruok".getBytes()).getInt();
    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int setTraceMaskCmd =
        ByteBuffer.wrap("stmk".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int srvrCmd =
        ByteBuffer.wrap("srvr".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int srstCmd =
        ByteBuffer.wrap("srst".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int statCmd =
        ByteBuffer.wrap("stat".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int wchcCmd =
        ByteBuffer.wrap("wchc".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int wchpCmd =
        ByteBuffer.wrap("wchp".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int wchsCmd =
        ByteBuffer.wrap("wchs".getBytes()).getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int mntrCmd = ByteBuffer.wrap("mntr".getBytes())
            .getInt();

    /*
     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
     * Zk Admin</a>. this link is for all the commands.
     */
    protected final static int isroCmd = ByteBuffer.wrap("isro".getBytes())
            .getInt();

    // 存储CMD的整形值与String的键值对
    protected final static HashMap<Integer, String> cmd2String =
        new HashMap<Integer, String>();

    // specify all of the commands that are available
    static {
        cmd2String.put(confCmd, "conf");
        cmd2String.put(consCmd, "cons");
        cmd2String.put(crstCmd, "crst");
        cmd2String.put(dumpCmd, "dump");
        cmd2String.put(enviCmd, "envi");
        cmd2String.put(getTraceMaskCmd, "gtmk");
        cmd2String.put(ruokCmd, "ruok");
        cmd2String.put(setTraceMaskCmd, "stmk");
        cmd2String.put(srstCmd, "srst");
        cmd2String.put(srvrCmd, "srvr");
        cmd2String.put(statCmd, "stat");
        cmd2String.put(wchcCmd, "wchc");
        cmd2String.put(wchpCmd, "wchp");
        cmd2String.put(wchsCmd, "wchs");
        cmd2String.put(mntrCmd, "mntr");
        cmd2String.put(isroCmd, "isro");
    }
    
    
    /**
    * 服务器的统计数据
    **/
    // 创建连接的时间
    protected final Date established = new Date();

    // 接受的packet数量
    protected final AtomicLong packetsReceived = new AtomicLong();
    // 发送的packet数量
    protected final AtomicLong packetsSent = new AtomicLong();
    // 最小延迟
    protected long minLatency;
    // 最大延迟
    protected long maxLatency;
    // 最后操作类型
    protected String lastOp;
    // 最后的cxid
    protected long lastCxid;
    // 最后的zxid
    protected long lastZxid;
    // 最后的响应时间
    protected long lastResponseTime;
    // 最后的延迟
    protected long lastLatency;
    // 数量
    protected long count;
    // 总的延迟
    protected long totalLatency;

主要就是一些统计对象,以及一些命令的支持,这里注意owner的意义,在下面思考中会提到

函数

抽象函数

    // 获取会话超时时间
    abstract int getSessionTimeout();

    // 关闭
    abstract void close();

    // 发送响应
    public abstract void sendResponse(ReplyHeader h, Record r, String tag)
        throws IOException;

    /* notify the client the session is closing and close/cleanup socket */
    // 关闭会话
    abstract void sendCloseSession();

    // 处理,Watcher接口中的方法
    public abstract void process(WatchedEvent event);

    // 获取会话id
    abstract long getSessionId();

    // 设置会话id
    abstract void setSessionId(long sessionId);

    // 设置缓冲
    abstract void sendBuffer(ByteBuffer closeConn);

    // 允许接收
    abstract void enableRecv();

    // 不允许接收
    abstract void disableRecv();

    // 设置会话超时时间
    abstract void setSessionTimeout(int sessionTimeout);
    
    // 获取服务器的统计数据
    protected abstract ServerStats serverStats();

具体函数

auth相关

    // 获取认证信息,返回不可修改的列表
    public List<Id> getAuthInfo() {
        return Collections.unmodifiableList(authInfo);
    }

    // 添加认证信息
    public void addAuthInfo(Id id) {
        if (authInfo.contains(id) == false) {
            authInfo.add(id);
        }
    }

    // 移除认证信息
    public boolean removeAuthInfo(Id id) {
        return authInfo.remove(id);
    }

接收发送packet相关

    // 接收的packet
    protected void packetReceived() {
        incrPacketsReceived();
        ServerStats serverStats = serverStats();
        if (serverStats != null) {
            serverStats().incrementPacketsReceived();
        }
    }

    // 发送的packet
    protected void packetSent() {
        incrPacketsSent();
        ServerStats serverStats = serverStats();
        if (serverStats != null) {
            serverStats().incrementPacketsSent();
        }
    }

    // 增加接收的packet数量
    protected long incrPacketsReceived() {
        return packetsReceived.incrementAndGet();
    }
    
    // 增加发送的packet数量
    protected long incrPacketsSent() {
        return packetsSent.incrementAndGet();
    }

注意里面有耦合
调用了serverStats().incrementPacketsSent();即ServerStats#incrementPacketsSent
会在思考以及吐槽部分进行展开

更新统计数据

// 更新响应的统计数据
    protected synchronized void updateStatsForResponse(long cxid, long zxid,
            String op, long start, long end)
    {
        // don't overwrite with "special" xids - we're interested
        // in the clients last real operation
        if (cxid >= 0) { 
            lastCxid = cxid;
        }
        lastZxid = zxid;
        lastOp = op;
        lastResponseTime = end;
        long elapsed = end - start;
        lastLatency = elapsed;
        if (elapsed < minLatency) {
            minLatency = elapsed;
        }
        if (elapsed > maxLatency) {
            maxLatency = elapsed;
        }
        count++;
        totalLatency += elapsed;
    }

get,toString,dump相关

这部分就不列举出来了

思考

cxid是什么

之前在zk源码阅读3中带过,但是当时并没有注意,看代码应该是client xid,这里没有深究

owner的意义是什么

这个就提前带一下好了,在server中有两个数据结构需要owner
一个是org.apache.zookeeper.server.Request,一个是org.apache.zookeeper.server.SessionTrackerImpl.SessionImpl#owner
标记一个request或者一个session的来源,
比如之前讲SessionTrackerImpl时讲过的checkSession需要验证owner

    synchronized public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
        SessionImpl session = sessionsById.get(sessionId);
        if (session == null || session.isClosing()) {
            throw new KeeperException.SessionExpiredException();
        }
        if (session.owner == null) {
            session.owner = owner;
        } else if (session.owner != owner) {//如果owner不一致
            throw new KeeperException.SessionMovedException();
        }
    }

目前owner的值的类型有两种,ServerCnxn的静态对象me,以及LearnerHandler的每一个实例.

Stats和ServerStats区别是什么

类图对比如下

Stats和ServerStats

Stats用于记录一个client到server的数据,而ServerStats记录所有client到server的数据
相当于前者是个体,后者是集体的关系
因此可以看到,在ServerCnxn对相关数据进行修改的时候,代码上下文都会同样对ServerStats数据进行修改,如

    protected void packetSent() {
        incrPacketsSent();
        ServerStats serverStats = serverStats();
        if (serverStats != null) {
            serverStats().incrementPacketsSent();
        }
    }

ServerCnxn和之前讲的ClientCnxn什么关系

并没什么关系
ServerCnxn是记录一个Client到当前server的各种统计信息
而ClientCnxn是通过两个线程sendThread以及eventThread完成和server的交互

问题

吐槽

ServerCnxn中内部类的必要性

两个异常的类为什么要定义在ServerCnxn中,可以抽出去

ServerCnxn与ServerStats的内聚,耦合性

在上面思考中也提到了,ServerCnxn对相关数据进行修改的时候,代码上下文都会同样对ServerStats数据进行修改
相关上下文,规模的定义应该是一样的,都在ServerCnxn调用,像上面提到的packetSent函数。
但是对于latency来说,又变成了在ServerCnxn以外的类调用,
如org.apache.zookeeper.server.FinalRequestProcessor#processRequest存在下述代码

            case OpCode.ping: {
                zks.serverStats().updateLatency(request.createTime);

                lastOp = "PING";
                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                        request.createTime, System.currentTimeMillis());

                cnxn.sendResponse(new ReplyHeader(-2,
                        zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
                return;
            }

里面zks.serverStats().updateLatency(request.createTime);直接放入ServerCnxn#updateStatsForResponse里面就好了

refer

http://www.cnblogs.com/leesf456/p/6477815.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,830评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,992评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,875评论 0 331
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,837评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,734评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,091评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,550评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,217评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,368评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,298评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,350评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,027评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,623评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,706评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,940评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,349评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,936评论 2 341

推荐阅读更多精彩内容