zk源码阅读18:zk client之网络I/O(二) 通信层ClientCnxnSocket

摘要

本节讲解ClientCnxnSocket以及ClientCnxnSocketNIO
涉及一些NIO的知识,希望自行了解。
相关源码分析很少,没有什么参照
主要讲解

  ClientCnxnSocket抽象类结构
    readConnectResult方法 读取server的connect的response
    readLength方法 读取buffer长度并给incomingBuffer分配对应大小空间

  ClientCnxnSocketNIO实现
    findSendablePacket函数
      根据sasl以及outgoingQueue情况获取发送的Packet      
        
    doIO函数
      读就绪时,读取response
      写就绪时,从findSendablePacket找到可发送的Packet

    doTransport函数
      如果是连接就绪,调用sendThread连接操作
      若读写就绪,调用doIO函数

    connect,createSock,registerAndConnect函数
      完成client到server的socket连接(仅为网络连接,并没有和server进行IO,更没有得到server的connect的response)

简介

ClientCnxnSocket定义了底层Socket通信的接口.默认是现实ClientCnxnSocketNIO.

类图如下

ClientCnxnSocket类图

源码

抽象类ClientCnxnSocket

主要讲解变量以及已经实现的方法

属性

    private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class);

    protected boolean initialized;//是否初始化

    /**
     * This buffer is only used to read the length of the incoming message.
     */
    protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);//仅仅用来读取 incoming message的长度

    /**
     * After the length is read, a new incomingBuffer is allocated in
     * readLength() to receive the full message.
     */
    protected ByteBuffer incomingBuffer = lenBuffer;
    protected long sentCount = 0;//send次数
    protected long recvCount = 0;//接收次数
    protected long lastHeard;//上次接收时间
    protected long lastSend;//上次发送时间
    protected long now;//当前时间
    protected ClientCnxn.SendThread sendThread;//客户端通信的发送线程

    /**
     * The sessionId is only available here for Log and Exception messages.
     * Otherwise the socket doesn't need to know it.
     */
    protected long sessionId;//仅仅用来辅助log和Exception记录用的

方法

    void introduce(ClientCnxn.SendThread sendThread, long sessionId) {//设置sendThread以及sessionId
        this.sendThread = sendThread;
        this.sessionId = sessionId;
    }

    void updateNow() {//更新now时间
        now = System.currentTimeMillis();
    }

    int getIdleRecv() {//获取接收的闲置时间
        return (int) (now - lastHeard);
    }

    int getIdleSend() {//获取发送的闲置时间
        return (int) (now - lastSend);
    }

    long getSentCount() {//发送次数
        return sentCount;
    }

    long getRecvCount() {//接收次数
        return recvCount;
    }

    void updateLastHeard() {//更新最后一次监听的时间
        this.lastHeard = now;
    }

    void updateLastSend() {//更新最后一次发送的时间
        this.lastSend = now;
    }

    void updateLastSendAndHeard() {//同时更新最后一次监听和发送的时间
        this.lastSend = now;
        this.lastHeard = now;
    }

    protected void readLength() throws IOException {//读取incoming message的length
        int len = incomingBuffer.getInt();
        if (len < 0 || len >= ClientCnxn.packetLen) {//默认长度[0,4M]之间
            throw new IOException("Packet len" + len + " is out of range!");
        }
        incomingBuffer = ByteBuffer.allocate(len);//分配对应长度的空间
    }

    void readConnectResult() throws IOException {//读取connect的response
        if (LOG.isTraceEnabled()) {
            StringBuilder buf = new StringBuilder("0x[");
            for (byte b : incomingBuffer.array()) {
                buf.append(Integer.toHexString(b) + ",");
            }
            buf.append("]");
            LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
                    + buf.toString());
        }
        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        conRsp.deserialize(bbia, "connect");

        // read "is read-only" flag
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");//反序列化,看是否是只读的
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }

        this.sessionId = conRsp.getSessionId();
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                conRsp.getPasswd(), isRO);//sendThread完成connect时一些参数验证以及zk state更新以及事件处理
    }

主要就是各种次数,时间的设置以及获取
其次就是注意readLength和readConnectResult方法即可

子类ClientCnxnSocketNIO

类图如下

ClientCnxnSocketNIO类图

属性

    private static final Logger LOG = LoggerFactory
            .getLogger(ClientCnxnSocketNIO.class);

    private final Selector selector = Selector.open();

    private SelectionKey sockKey;

主要就是NIO的东西

方法

按照一定的顺序来讲

client连接时

org.apache.zookeeper.ClientCnxn.SendThread#run
org.apache.zookeeper.ClientCnxn.SendThread#startConnect
org.apache.zookeeper.ClientCnxnSocketNIO#connect

    @Override
    void connect(InetSocketAddress addr) throws IOException {//参数是某一个zk server的地址
        SocketChannel sock = createSock();
        try {
           registerAndConnect(sock, addr);//注册SelectionKey到zk server
        } catch (IOException e) {
            LOG.error("Unable to open socket to " + addr);
            sock.close();
            throw e;
        }
        initialized = false;//还没有初始化,connect ok了但是还读到server的response

        /*
         * Reset incomingBuffer
         */
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }

里面调用了createSock和registerAndConnect方法,如下

    /**
     * create a socket channel.
     * @return the created socket channel
     * @throws IOException
     */
    SocketChannel createSock() throws IOException {//创建SocketChannel
        SocketChannel sock;
        sock = SocketChannel.open();
        sock.configureBlocking(false);//非阻塞
        sock.socket().setSoLinger(false, -1);
        sock.socket().setTcpNoDelay(true);
        return sock;
    }

    /**
     * register with the selection and connect
     * @param sock the {@link SocketChannel} 
     * @param addr the address of remote host
     * @throws IOException
     */
    void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
    throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);//注册,监听connect事件
        boolean immediateConnect = sock.connect(addr);
        if (immediateConnect) {//如果立即建立了连接
            sendThread.primeConnection();//client把watches和authData等数据发过去,并更新SelectionKey为读写
        }
    }

这里注意一点
registerAndConnect中如果立即connect就调用sendThread.primeConnection();
如果没有立即connect上,那么就在下面介绍的doTransport中等待SocketChannel finishConnect再调用

client 和 server的网络交互

主要函数

    @Override
    void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        selector.select(waitTimeOut);//找到就绪的keys个数
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {//如果就绪的是connect事件,这个出现在registerAndConnect函数没有立即连接成功
                if (sc.finishConnect()) {//如果次数完成了连接
                    updateLastSendAndHeard();//更新时间
                    sendThread.primeConnection();//client把watches和authData等数据发过去,并更新SelectionKey为读写
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {//如果就绪的是读或者写事件
                doIO(pendingQueue, outgoingQueue, cnxn);//利用pendingQueue和outgoingQueue进行IO
            }
        }
        if (sendThread.getZkState().isConnected()) {//如果zk的state是已连接
            synchronized(outgoingQueue) {
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {//如果有可以发送的packet
                    enableWrite();//允许写
                }
            }
        }
        selected.clear();//清空
    }

参数pendingQueue 以及 outgoingQueue简单介绍如下
outgoingQueue 是请求发送队列,是client存储需要被发送到server端的Packet队列
pendingQueue是已经从client发送,但是要等待server响应的packet队列
后面章节再细讲

主要调用了doIO 以及 findSendablePacket方法
doIO方法如下

    /**
     * @return true if a packet was received
     * @throws InterruptedException
     * @throws IOException
     */
    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        if (sockKey.isReadable()) {//若读就绪
            int rc = sock.read(incomingBuffer);//读出len
            if (rc < 0) {//如果<0,表示读到末尾了,这种情况出现在连接关闭的时候
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
            if (!incomingBuffer.hasRemaining()) {//如果还有数据
                incomingBuffer.flip();//切换到读模式
                if (incomingBuffer == lenBuffer) {
                    recvCount++;//接收次数+1
                    readLength();//获取len并给incomingBuffer分配对应空间
                } else if (!initialized) {//如果client和server的连接还没有初始化
                    readConnectResult();//读取connect 回复
                    enableRead();//启用读
                    if (findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {//如果有可以发送的packet
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();//允许写,因为有要发送的packet
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;//还原incomingBuffer
                    updateLastHeard();
                    initialized = true;//client和server连接初始化完成
                } else { //如果已连接,并且已经给incomingBuffer分配了对应len的空间
                    sendThread.readResponse(incomingBuffer);//读取response
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;//还原incomingBuffer
                    updateLastHeard();
                }
            }
        }
        if (sockKey.isWritable()) {//若写就绪
            synchronized(outgoingQueue) {
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());//找到可以发送的Packet

                if (p != null) {
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
                    if (p.bb == null) {
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }
                        p.createBB();//如果packet还没有生成byteBuffer,那就生成byteBuffer
                    }
                    sock.write(p.bb);
                    if (!p.bb.hasRemaining()) {
                        sentCount++;
                        outgoingQueue.removeFirstOccurrence(p);//从待发送队列中取出该packet
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
                                pendingQueue.add(p);//加入待回复的队列
                            }
                        }
                    }
                }
                if (outgoingQueue.isEmpty()) {
                    // No more packets to send: turn off write interest flag.
                    // Will be turned on later by a later call to enableWrite(),
                    // from within ZooKeeperSaslClient (if client is configured
                    // to attempt SASL authentication), or in either doIO() or
                    // in doTransport() if not.
                    disableWrite();//如果没有要发的,就禁止写
                } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                    // On initial connection, write the complete connect request
                    // packet, but then disable further writes until after
                    // receiving a successful connection response.  If the
                    // session is expired, then the server sends the expiration
                    // response and immediately closes its end of the socket.  If
                    // the client is simultaneously writing on its end, then the
                    // TCP stack may choose to abort with RST, in which case the
                    // client would never receive the session expired event.  See
                    // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
                    disableWrite();
                } else {
                    // Just in case
                    enableWrite();
                }
            }
        }
    }

流程接见如下

  主要分为读或者写两个case
  读:
    没有初始化就完成初始化
    读取len再给incomingBuffer分配对应空间
    读取对应的response
  写:
    找到可以发送的Packet
    如果Packet的byteBuffer没有创建,那么就创建
    byteBuffer写入socketChannel
    把Packet从outgoingQueue中取出来,放到pendingQueue中
    相关读写的处理

主要注意,读的时候是分两次读的
第一次只读len,然后给incomingBuffer分配对应的空间
第二次再把剩下的内容读完

findSendablePacket方法如下

    private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
                                      boolean clientTunneledAuthenticationInProgress) {//bool参数是表示 如果当前client和server在处理sasl的权限
        synchronized (outgoingQueue) {
            if (outgoingQueue.isEmpty()) {//如果没有要发送的
                return null;
            }
            if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish
                || !clientTunneledAuthenticationInProgress) {//如果有要发送的 或者 没有在处理sasl的权限
                return outgoingQueue.getFirst();
            }

            // Since client's authentication with server is in progress,
            // send only the null-header packet queued by primeConnection().
            // This packet must be sent so that the SASL authentication process
            // can proceed, but all other packets should wait until
            // SASL authentication completes.
            ListIterator<Packet> iter = outgoingQueue.listIterator();
            while (iter.hasNext()) {
                Packet p = iter.next();
                if (p.requestHeader == null) {//如果在处理sasl的权限,那么只有requestHeader为null的Packet可以被发送
                    // We've found the priming-packet. Move it to the beginning of the queue.
                    iter.remove();
                    outgoingQueue.add(0, p);
                    return p;
                } else {
                    // Non-priming packet: defer it until later, leaving it in the queue
                    // until authentication completes.
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("deferring non-priming packet: " + p +
                                "until SASL authentication completes.");
                    }
                }
            }
            // no sendable packet found.
            return null;
        }
    }

主要流程简介如下

如果没有要发送的就返回null
如果有要发送的或者client没有在处理sasl的权限,那么就拿队列第一个
如果在处理sasl,那么遍历队列,把没有requestHeader为null的放到队头,返回该packet

这个地方主要涉及到sasl验证,并不是很了解这个机制,没有深究

其他函数

    @Override
    boolean isConnected() {//这个只是说SelectionKey有没有初始化,来标示,并不是真正的Connected
        return sockKey != null;
    }

部分函数表格列举

函数 备注
void cleanup() socketChannel关闭,SelectionKey置空
void close() selector关闭
SocketAddress getRemoteSocketAddress() 获取远端地址
SocketAddress getLocalSocketAddress() 获取本地地址
synchronized void wakeupCnxn() 唤醒selector
void testableCloseSocket() 测试socket关闭
synchronized void enableWrite() 开启写
public synchronized void disableWrite() 禁止写
synchronized private void enableRead() 开启读
synchronized void enableReadWriteOnly() 仅允许读写
Selector getSelector() 获取selector
void sendPacket(Packet p) 发送packet

思考

何时调用sendThread.primeConnection();以及里面干了什么

如果瞬间连上,就直接调用
否则的话就等到sc.finishConnect()再调用

这个函数完成了一些watches和authData的传递以及允许更改SelectionKey,允许clientCnxnSocket可读写,

org.apache.zookeeper.ClientCnxnSocket#initialized意义

参数指的是zk client收到的zk server的正确response之后,才算初始化成功
不是说NIO中的connect上了就算成功

两者的区别在于NIO的SelectionKey
前者已经从connect变化到了write和read
后者仅限于connect

org.apache.zookeeper.ClientCnxnSocketNIO#doIO处理读就绪的时候,为什么分两次

第一次只读len,然后给incomingBuffer分配对应的空间
第二次再把剩下的内容读完
唯一能够想到的优点就是节省空间了

请求发送与接收 流程图

请求发送与接收

吐槽以及问题

1.方法没有注释,甚至是错的注释
如错误的方法注释 org.apache.zookeeper.ClientCnxnSocketNIO#doIO

2.ClientCnxnSocketNIO中connect以及state相关的函数太多了,有点绕

3.SelectionKey中,读写一会开一会关的目的是什么,代码看起来很麻烦
为什么不一直允许读写,单个开关弄来弄去让人疑惑,除非close
是有场景需要禁止读后者禁止写么,还是这样会提升性能?

4.org.apache.zookeeper.ClientCnxnSocketNIO#isConnected用SelectionKey是否初始化判断是否Connected
不太合理,有可能刚初始化但是还没有connect呢???

备注

sendThread在下面两节中讲到,是client完成和server通信的线程

sessionId也会在后面讲会话的时候进行讲解

pendingQueue和outingQueue之后再讲解

refer

《paxos到zk》
http://www.cnblogs.com/leesf456/p/6098255.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容