Kafka源码分析-序列3 -Producer -Java NIO

在上一篇我们分析了Metadata的更新机制,其中涉及到一个问题,就是Sender如何跟服务器通信,也就是网络层。同很多Java项目一样,Kafka client的网络层也是用的Java NIO,然后在上面做了一层封装。

下面首先看一下,在Sender和服务器之间的部分:


1.png

可以看到,Kafka client基于Java NIO封装了一个网络层,这个网络层最上层的接口是KakfaClient。其层次关系如下:


2.png

在本篇中,先详细对最底层的Java NIO进行讲述。

NIO的4大组件

Buffer与Channel

Channel: 在通常的Java网络编程中,我们知道有一对Socket/ServerSocket对象,每1个socket对象表示一个connection,ServerSocket用于服务器监听新的连接。
在NIO中,与之相对应的一对是SocketChannel/ServerSocketChannel。

下图展示了SocketChannel/ServerSocketChannel的类继承层次


3.png
public interface Channel extends Closeable {  
    public boolean isOpen();  
    public void close() throws IOException;  
}  
  
public interface ReadableByteChannel extends Channel {  
    public int read(ByteBuffer dst) throws IOException;  
}  
  
public interface WritableByteChannel extends Channel {  
    public int write(ByteBuffer src) throws IOException;  
}

从代码可以看出,一个Channel最基本的操作就是read/write,并且其传进去的必须是ByteBuffer类型,而不是普通的内存buffer。

Buffer:在NIO中,也有1套围绕Buffer的类继承层次,在此就不详述了。只需知道Buffer就是用来封装channel发送/接收的数据。

Selector

Selector的主要目的是网络事件的 loop 循环,通过调用selector.poll,不断轮询每个Channel上读写事件

SelectionKey

SelectionKey用来记录一个Channel上的事件集合,每个Channel对应一个SelectionKey。
SelectionKey也是Selector和Channel之间的关联,通过SelectionKey可以取到对应的Selector和Channel。

关于这4大组件的协作、配合,下面来详细讲述。

4种网络IO模型

epoll与IOCP

在《Unix环境高级编程》中介绍了以下4种IO模型(实际不止4种,但常用的就这4种):

阻塞IO: read/write的时候,阻塞调用

非阻塞IO: read/write,没有数据,立马返回,轮询

IO复用:read/write一次都只能监听一个socket,但对于服务器来讲,有成千上完个socket连接,如何用一个函数,可以监听所有的socket上面的读写事件呢?这就是IO复用模型,对应linux上面,就是select/poll/epoll3种技术。

异步IO:linux上没有,windows上对应的是IOCP。

Reactor模式 vs. Preactor模式

相信很多人都听说过网络IO的2种设计模式,关于这2种模式的具体阐述,可以自行google之。

在此处,只想对这2种模式做一个“最通俗的解释“:

Reactor模式:主动模式,所谓主动,是指应用程序不断去轮询,问操作系统,IO是否就绪。Linux下的select/poll/epooll就属于主动模式,需要应用程序中有个循环,一直去poll。
在这种模式下,实际的IO操作还是应用程序做的。

Proactor模式:被动模式,你把read/write全部交给操作系统,实际的IO操作由操作系统完成,完成之后,再callback你的应用程序。
Windows下的IOCP就属于这种模式,再比如C++ Boost中的Asio库,就是典型的Proactor模式。

epoll的编程模型--3个阶段

在Linux平台上,Java NIO就是基于epoll来实现的。所有基于epoll的框架,都有3个阶段:
注册事件(connect,accept,read,write), 轮询IO是否就绪,执行实际IO操作。

下面的代码展示了在linux下,用c语言epoll编程的基本框架:

//阶段1: 调用epoll_ctl(xx) 注册事件  
  
for( ; ; )  
    {  
        nfds = epoll_wait(epfd,events,20,500);     //阶段2:轮询所有的socket  
  
        for(i=0;i<nfds;++i)  //处理轮询结果  
        {  
            if(events[i].data.fd==listenfd) //accept事件就绪  
            {  
                connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //阶段3:执行实际的IO操作,accept  
                ev.data.fd=connfd;  
                ev.events=EPOLLIN|EPOLLET;  
                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //回到阶段1:重新注册  
            }  
            else if( events[i].events&EPOLLIN )  //读就绪  
            {  
                n = read(sockfd, line, MAXLINE)) < 0    //阶段3:执行实际的io操作  
                ev.data.ptr = md;       
                ev.events=EPOLLOUT|EPOLLET;  
                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //回到阶段1:重新注册事件  
            }  
            else if(events[i].events&EPOLLOUT) //写就绪  
            {  
                struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;      
                sockfd = md->fd;  
                send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //阶段3: 执行实际的io操作  
                ev.data.fd=sockfd;  
                ev.events=EPOLLIN|EPOLLET;  
                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //回到阶段1,重新注册事件  
            }  
            else  
            {  
                //其他的处理  
            }  
        }  
    }

同样, NIO中的Selector同样有以下3个阶段,下面把Selector和epoll的使用做个对比:


4.png

可以看到,2者只是写法不同,同样的, 都有这3个阶段。

下面的表格展示了connect, accept, read, write 这4种事件,分别在这3个阶段对应的函数:


5.png

下面看一下Kafka client中Selector的核心实现:

@Override  
public void poll(long timeout) throws IOException {  
    。。。  
    clear(); //清空各种状态  
    if (hasStagedReceives())  
        timeout = 0;  
    long startSelect = time.nanoseconds();  
    int readyKeys = select(timeout);  //轮询  
    long endSelect = time.nanoseconds();  
    currentTimeNanos = endSelect;  
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());  
  
    if (readyKeys > 0) {  
        Set<SelectionKey> keys = this.nioSelector.selectedKeys();  
        Iterator<SelectionKey> iter = keys.iterator();  
        while (iter.hasNext()) {  
            SelectionKey key = iter.next();  
            iter.remove();  
            KafkaChannel channel = channel(key);  
  
            // register all per-connection metrics at once  
            sensors.maybeRegisterConnectionMetrics(channel.id());  
            lruConnections.put(channel.id(), currentTimeNanos);  
  
            try {  
                if (key.isConnectable()) {  //有连接事件  
                    channel.finishConnect();  
                    this.connected.add(channel.id());  
                    this.sensors.connectionCreated.record();  
                }  
  
                if (channel.isConnected() && !channel.ready())   
                    channel.prepare(); //这个只有需要安全检查的SSL需求,普通的不加密的channel,prepare()为空实现  
  
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { //读就绪  
                    NetworkReceive networkReceive;  
                    while ((networkReceive = channel.read()) != null)   
                        addToStagedReceives(channel, networkReceive); //实际的读动作  
                }  
  
                if (channel.ready() && key.isWritable()) {  //写就绪  
                    Send send = channel.write(); //实际的写动作  
                    if (send != null) {  
                        this.completedSends.add(send);  
                        this.sensors.recordBytesSent(channel.id(), send.size());  
                    }  
                }  
  
                /* cancel any defunct sockets */  
                if (!key.isValid()) {  
                    close(channel);  
                    this.disconnected.add(channel.id());  
                }  
            } catch (Exception e) {  
                String desc = channel.socketDescription();  
                if (e instanceof IOException)  
                    log.debug("Connection with {} disconnected", desc, e);  
                else  
                    log.warn("Unexpected error from {}; closing connection", desc, e);  
                close(channel);  
                this.disconnected.add(channel.id());  
            }  
        }  
    }  
  
    addToCompletedReceives();  
  
    long endIo = time.nanoseconds();  
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());  
    maybeCloseOldestConnection();  
}

epoll和selector在注册上的差别

从代码可以看出, Selector和epoll在代码结构上基本一样,但在事件的注册上面有区别:

epoll: 每次read/write之后,都要调用epoll_ctl重新注册

Selector: 注册一次,一直有效,一直会有事件产生,因此需要取消注册。下面来详细分析一下:

connect事件的注册

//Selector  
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {  
        if (this.channels.containsKey(id))  
            throw new IllegalStateException("There is already a connection for id " + id);  
  
        SocketChannel socketChannel = SocketChannel.open();  
        。。。  
        try {  
            socketChannel.connect(address);  
        } catch (UnresolvedAddressException e) {  
            socketChannel.close();  
            throw new IOException("Can't resolve address: " + address, e);  
        } catch (IOException e) {  
            socketChannel.close();  
            throw e;  
        }  
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);  //构造channel的时候,注册connect事件  
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);  
        key.attach(channel);  
        this.channels.put(id, channel);  
    }

connect事件的取消

//在上面的poll函数中,connect事件就绪,也就是指connect连接完成,连接简历  
 if (key.isConnectable()) {  //有连接事件  
       channel.finishConnect();   
                        ...  
     }  
  
 //PlainTransportLayer  
 public void finishConnect() throws IOException {  
        socketChannel.finishConnect();  //调用channel的finishConnect()  
        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); //取消connect事件,新加read事件组册  
    }

read事件的注册

从上面也可以看出,read事件的注册和connect事件的取消,是同时进行的

read事件的取消

因为read是要一直监听远程,是否有新数据到来,所以不会取消,一直监听

write事件的注册

//Selector  
    public void send(Send send) {  
        KafkaChannel channel = channelOrFail(send.destination());  
        try {  
            channel.setSend(send);  
        } catch (CancelledKeyException e) {  
            this.failedSends.add(send.destination());  
            close(channel);  
        }  
    }  
  
//KafkaChannel  
    public void setSend(Send send) {  
        if (this.send != null)  
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");  
        this.send = send;  
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);  //每调用一次Send,注册一次Write事件  
    }

Write事件的取消

//上面的poll函数里面  
                    if (channel.ready() && key.isWritable()) { //write事件就绪  
                        Send send = channel.write(); //在这个write里面,取消了write事件  
                        if (send != null) {  
                            this.completedSends.add(send);  
                            this.sensors.recordBytesSent(channel.id(), send.size());  
                        }  
                    }  
  
  
    private boolean send(Send send) throws IOException {  
        send.writeTo(transportLayer);  
        if (send.completed())  
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);  //取消write事件  
  
        return send.completed();  
    }

总结一下:
(1)“事件就绪“这个概念,对于不同事件类型,还是有点歧义的

read事件就绪:这个最好理解,就是远程有新数据到来,需要去read
write事件就绪:这个指什么呢? 其实指本地的socket缓冲区有没有满。没有满的话,应该就是一直就绪的,可写
connect事件就绪: 指connect连接完成
accept事件就绪:有新的连接进来,调用accept处理

(2)不同类型事件,处理方式是不一样的:

connect事件:注册1次,成功之后,就取消了。有且仅有1次
read事件:注册之后不取消,一直监听
write事件: 每调用一次send,注册1次,send成功,取消注册

欢迎加入QQ群:104286694

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容

  • Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java I...
    JackChen1024阅读 7,533评论 1 143
  • 简介 Java NIO 是由 Java 1.4 引进的异步 IO.Java NIO 由以下几个核心部分组成: Ch...
    永顺阅读 1,786评论 0 15
  • 作者: 一字马胡 转载标志 【2017-11-24】 更新日志 一、Java OIO Java OIO (Jav...
    一字马胡阅读 1,337评论 0 12
  • 这两天了解了一下关于NIO方面的知识,网上关于这一块的介绍只是介绍了一下基本用法,没有系统的解释NIO与阻塞、非阻...
    Ruheng阅读 7,119评论 5 48
  • 古凤凰台,远近闻名,始建东晋,高耸入云,绿荫环抱,湖光映衬,有凤来栖,方得此名。物华天宝,凤糯港虾,素有美称。人杰...
    凤港渔人阅读 944评论 0 0