Netty中的NioServerSocketChannel类实例化

看到此文章可能有读者会感到疑惑,为什么NioServerSocketChannel的实例化也要写一篇文章呢?在NettyNioServerSocketChannel可谓是用的比较频繁的一个类了,它在初始化时进行了很多操作,很多人是不了解或是不甚了解的,当然,笔者也是。所以今天我们就深入了解一下它到底做了什么?

NioServerSocketChannel类继承结构
ServerSocketChannelImpl类继承结构

上面两个图中的Channel不是同一个类,上面的图是io.netty.channel.Channel,Netty封装的Channel类,下面的图是java.nio.channels.Channel,jdk中java.nio包下的Channel类。

1. 贴NioServerSocketChannel源码:
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    public NioServerSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

注意:为了减少文章的篇幅,这里我只是贴出部分源码,且删除了暂时用不到的代码和注释
我们以无参构造方法为例,在该方法中,执行了newSocket(DEFAULT_SELECTOR_PROVIDER),这个方法主要是返回一个ServerSocketChannelImpl类的实例,然后将其转为ServerSocketChannel类型,这里主要是java Nio的应用,就不细讲了。
然后是调用构造方法NioServerSocketChannel(SelectorProvider provider),在该构造方法中首先是调用了父类AbstractNioMessageChannel的构造方法,然后是执行config = new NioServerSocketChannelConfig(this, javaChannel().socket());
我们先就进入他的父类看看吧

2. 贴AbstractNioMessageChannel源码
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }

AbstractNioMessageChannel的这个构造器中是直接调用其父类AbstractNioChannel的构造方法,那我们就跟下去看看源码吧

3. 贴AbstractNioChannel源码
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

在这个构造方法中,我们终于看到了大段的代码了,不过第一步它又执行了父类的构造方法,这里我们先看看下面都做了什么,首先传下来的所以参数parentnullchServerSocketChannel的一个实例,readInterestOpSelectionKey.OP_ACCEPT
AbstractNioChannel构造方法中,(重点,后面要用到this.ch = ch;保存传过来的ServerSocketChannel的实例,然后是this.readInterestOp = readInterestOp;,保存传过来的感兴趣的操作,即SelectionKey.OP_ACCEPT
然后是调用ch.configureBlocking(false);channel设置为非阻塞的。这里我们就可以知道是用NioServerSocketChannel,在实例化是它就会创建对应的channel,并且设置为非阻塞模式。

好了,我们接下来看看super(parent)做了什么事情,调用了AbstractChannel的构造方法,那我看看它的源码吧

4. 贴AbstractChannel源码
    private final ChannelId id;
    private final Unsafe unsafe;
    private final DefaultChannelPipeline pipeline;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    @Override
    public final ChannelId id() {
        return id;
    }
    protected ChannelId newId() {
        return DefaultChannelId.newInstance();
    }
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    @Override
    public ChannelPipeline pipeline() {
        return pipeline;
    }

传过来的parentnull,所以我们直接看id = newId(),这里设置了channel的id,所以只要NioServerSocketChannel实例化,就会设置好idunsafepipeline的值,这里要注意一点就是id的类型,并不是String或是基本数据类型,而是 ChannelId类型,newId()方法最终是创建了一个DefaultChannelId类型的实例,这里就不深入了,DefaultChannelId是接口ChannelId的一个默认实现,源码不长,想深入了解channelid生成规则的读者可以看看,DefaultChannelId的全类名是io.netty.channel.DefaultChannelId

到这里,就已经看完了super(null, channel, SelectionKey.OP_ACCEPT)所调用的代码所做的操作了,让我们回到这段代码config = new NioServerSocketChannelConfig(this, javaChannel().socket())看看做了什么?首先看看javaChannel()

    @Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }

调用了父类AbstractNioChanneljavaChannel(),那我们看看的这个方法实现吧

    protected SelectableChannel javaChannel() {
        return ch;
    }

直接返回chch是什么呢?就是刚刚说重点的地方,该方法返回我们刚刚保存好的ServerSocketChannel实例,这实例是用的子类ServerSocketChannelImpl创建的,所以抽象方法socket()调用的是ServerSocketChannelImpl中的socket(),那我们看看ServerSocketChannelImpl的源码

5. ServerSocketChannelImpl的源码

private static NativeDispatcher nd;
    private final FileDescriptor fd;
    private int fdVal;
    private final Object stateLock = new Object();
    private int state = -1;
    ServerSocket socket;

    ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
        super(var1);
        this.fd = Net.serverSocket(true);
        this.fdVal = IOUtil.fdVal(this.fd);
        this.state = 0;
    }
     public ServerSocket socket() {
        synchronized(this.stateLock) {
            if (this.socket == null) {
                this.socket = ServerSocketAdaptor.create(this);
            }
            return this.socket;
        }
    }

在构造方法中,可以看到并没有初始化socket的值,所以socketnull,所以调用socket()返回的是ServerSocketAdaptor.create(this)返回的结果
为了验证分析的正确性,我们做一下Debug

Debug结果

好结论是没有问题的,那我们向下走,看看ServerSocketAdaptor.create(this)源码

    public static ServerSocket create(ServerSocketChannelImpl var0) {
        try {
            return new ServerSocketAdaptor(var0);
        } catch (IOException var2) {
            throw new Error(var2);
        }
    }

ServerSocketAdaptor源码

    private ServerSocketAdaptor(ServerSocketChannelImpl var1) throws IOException {
        this.ssc = var1;
    }

这里只是将ServerSocketChannelImpl实例进行保存,保存在ssc字段中,并没有做其他的操作,到这里我们就获得了一个ServerSocketAdaptor的实例,并且将其保存在ServerSocketChannelImpl实例的socket字段中,两个实例形成了相互引用。

从新回到new NioServerSocketChannelConfig(this, javaChannel().socket())这段代码,现在我们直接进入NioServerSocketChannelConfig

NioServerSocketChannelConfig类的继承结构

6. NioServerSocketChannelConfig的源码

private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
        private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
            super(channel, javaSocket);
        }

        @Override
        protected void autoReadCleared() {
            clearReadPending();
        }

        @Override
        public <T> boolean setOption(ChannelOption<T> option, T value) {
            if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
                return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
            }
            return super.setOption(option, value);
        }

        @Override
        public <T> T getOption(ChannelOption<T> option) {
            if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
                return NioChannelOption.getOption(jdkChannel(), (NioChannelOption<T>) option);
            }
            return super.getOption(option);
        }

        @SuppressWarnings("unchecked")
        @Override
        public Map<ChannelOption<?>, Object> getOptions() {
            if (PlatformDependent.javaVersion() >= 7) {
                return getOptions(super.getOptions(), NioChannelOption.getOptions(jdkChannel()));
            }
            return super.getOptions();
        }

        private ServerSocketChannel jdkChannel() {
            return ((NioServerSocketChannel) channel).javaChannel();
        }
    }

构造方法直接调用了父类DefaultServerSocketChannelConfig的构造方法,那我们就深入吧

    public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
        super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        this.javaSocket = javaSocket;
    }

DefaultServerSocketChannelConfig还是调用了父类DefaultChannelConfig的构造方法,并且保存传入的javaSocket参数

7. DefaultChannelConfig的源码

    protected final Channel channel;
    private volatile RecvByteBufAllocator rcvBufAllocator;

    public DefaultChannelConfig(Channel channel) {
        this(channel, new AdaptiveRecvByteBufAllocator());
    }

    protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
        setRecvByteBufAllocator(allocator, channel.metadata());
        this.channel = channel;
    }
    
    private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
        if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
            ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
        } else if (allocator == null) {
            throw new NullPointerException("allocator");
        }
        setRecvByteBufAllocator(allocator);
    }
    @Override
    public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
        rcvBufAllocator = checkNotNull(allocator, "allocator");
        return this;
    }

在构造方法中创建实例AdaptiveRecvByteBufAllocator,这类其实是比较重要的,从类的名字上看,它的意思是适应接收缓存分配器(翻译可能不太准确),简单来说就是一个可以根据接收数据大小来判断开辟多大的空间存储数据。它是Netty提供的一个可以估计接收数据量,自动扩容和减容的一个类,接下来我们看看它的源码

package io.netty.channel;

import java.util.ArrayList;
import java.util.List;

import static io.netty.util.internal.ObjectUtil.checkPositive;
import static java.lang.Math.max;
import static java.lang.Math.min;

/**
 * The {@link RecvByteBufAllocator} that automatically increases and
 * decreases the predicted buffer size on feed back.
 * It gradually increases the expected number of readable bytes if the previous
 * read fully filled the allocated buffer.  It gradually decreases the expected
 * number of readable bytes if the read operation was not able to fill a certain
 * amount of the allocated buffer two times consecutively.  Otherwise, it keeps
 * returning the same prediction.
 */
/**
*  {@link RecvByteBufAllocator}自动增加和
*  减少反馈时预测的缓冲区大小。 
*  如果先前的read完全填充了分配的缓冲区,它会逐渐增加预期的可读字节数。如果读取操作不能连续两次填充分配的缓冲区的某个
*  量,则逐渐减少预期的*可读字节数。否则,它会保持
*  返回相同的预测。
*/
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 1024;
    static final int DEFAULT_MAXIMUM = 65536;

    private static final int INDEX_INCREMENT = 4;
    private static final int INDEX_DECREMENT = 1;

    private static final int[] SIZE_TABLE;

    static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        for (int i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {
            SIZE_TABLE[i] = sizeTable.get(i);
        }
    }

    /**
     * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
     */
    @Deprecated
    public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1;
            }
        }
    }

    private final class HandleImpl extends MaxMessageHandle {
        private final int minIndex;
        private final int maxIndex;
        private int index;
        private int nextReceiveBufferSize;
        private boolean decreaseNow;

        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            index = getSizeTableIndex(initial);
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

        @Override
        public void lastBytesRead(int bytes) {
            // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
            // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
            // the selector to check for more data. Going back to the selector can add significant latency for large
            // data transfers.
            if (bytes == attemptedBytesRead()) {
                record(bytes);
            }
            super.lastBytesRead(bytes);
        }

        @Override
        public int guess() {
            return nextReceiveBufferSize;
        }

        private void record(int actualReadBytes) {
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
                if (decreaseNow) {
                    index = max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

        @Override
        public void readComplete() {
            record(totalBytesRead());
        }
    }

    private final int minIndex;
    private final int maxIndex;
    private final int initial;

    /**
     * Creates a new predictor with the default parameters.  With the default
     * parameters, the expected buffer size starts from {@code 1024}, does not
     * go down below {@code 64}, and does not go up above {@code 65536}.
     */
    public AdaptiveRecvByteBufAllocator() {
        this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
    }

    /**
     * Creates a new predictor with the specified parameters.
     *
     * @param minimum  the inclusive lower bound of the expected buffer size
     * @param initial  the initial buffer size when no feed back was received
     * @param maximum  the inclusive upper bound of the expected buffer size
     */
    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
        checkPositive(minimum, "minimum");
        if (initial < minimum) {
            throw new IllegalArgumentException("initial: " + initial);
        }
        if (maximum < initial) {
            throw new IllegalArgumentException("maximum: " + maximum);
        }

        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            this.minIndex = minIndex + 1;
        } else {
            this.minIndex = minIndex;
        }

        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            this.maxIndex = maxIndex - 1;
        } else {
            this.maxIndex = maxIndex;
        }

        this.initial = initial;
    }

    @SuppressWarnings("deprecation")
    @Override
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }

    @Override
    public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
        super.respectMaybeMoreData(respectMaybeMoreData);
        return this;
    }
}

好了,具体细节还需要读者自行研读源码了,Netty的源码还是值得一看的,但是也不要陷入源码,把握总体方向Reactor模式。谢谢您的阅读,如有不足,欢迎来访

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容