Netty源码笔记

Netty版本4.0.29.Final,以构造客户端连接服务端的角度来追踪源码

一 创建Netty事件循环组

NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();

NioEventLoopGroup的构造器中会调用父类MultithreadEventLoopGroup的构造器

  1. SelectorProvider.provider()返回运行JVM的操作系统提供的provider
  2. 如果没有指定线程数量,默认为两倍核数
    // NioEventLoopGroup
    public NioEventLoopGroup() {
        this(0);
    }
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, null);
    }
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }
    public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
        super(nThreads, threadFactory, selectorProvider);
    }

    // MultithreadEventLoopGroup
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

在父类MultithreadEventExecutorGroup的构造器中

  1. 创建默认的线程工厂,后续用来创建线程
  2. 创建一个SingleThreadEventExecutor数组,数组大小为两倍核数,保存在属性children上
  3. 创建选择器保存在属性chooser上
  4. 在newChild方法中创建EventExecutor填充到数组上
  5. newChild是父类MultithreadEventExecutorGroup提供的一个抽象方法,由于当前是在创建NioEventLoopGroup对象,具体实现还是由NioEventLoopGroup完成。
  6. 可以看到NioEventLoopGroup中完成了NioEventLoop的创建
    // MultithreadEventExecutorGroup
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (threadFactory == null) {
            threadFactory = newDefaultThreadFactory();
        }

        children = new SingleThreadEventExecutor[nThreads];
        if (isPowerOfTwo(children.length)) {
            chooser = new PowerOfTwoEventExecutorChooser();
        } else {
            chooser = new GenericEventExecutorChooser();
        }

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(threadFactory, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    }

    // NioEventLoopGroup
    @Override
    protected EventExecutor newChild(
            ThreadFactory threadFactory, Object... args) throws Exception {
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
    }

二 创建Netty事件循环

  1. 将一开始创建的provider保存在属性provider上
  2. openSelector中返回操作系统提供的selector(多路复用器)
  3. 如果未禁用优化(默认未禁用),使用反射将操作系统返回的selector中的属性selectedKeys和publicSelectedKeys改为可写,并用Netty的SelectedSelectionKeySet代替
  4. 最后再看父类SingleThreadEventExecutor构造器
    // NioEventLoop
    NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
        super(parent, threadFactory, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        provider = selectorProvider;
        selector = openSelector();
    }

    private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }

        try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());

            // Ensure the current selector implementation is what we can instrument.
            if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);

            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable t) {
            selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
        }

        return selector;
    }
  1. 事件循环组保存在属性parent上。对于客户端来说,这里的parent还是开始创建的NioEventLoopGroup(服务端可以有两个具有父子关系的事件循环组)
  2. 使用线程工厂创建线程,封装了匿名任务(后续会分析该任务),保存在属性thread上(未启动)
  3. 使用NioEventLoop的newTaskQueue创建一个无锁并发的单消费者多生产者队列,保存在属性taskQueue上
    // SingleThreadEventExecutor
    protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }

        this.parent = parent;
        this.addTaskWakesUp = addTaskWakesUp;

        thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                // 暂时不关注任务内逻辑 省略
            }
        });

        taskQueue = newTaskQueue();
    }

    // NioEventLoop
    @Override
    protected Queue<Runnable> newTaskQueue() {
        // This event loop never calls takeTask()
        return PlatformDependent.newMpscQueue();
    }

    // PlatformDependent
    public static <T> Queue<T> newMpscQueue() {
        return new MpscLinkedQueue<T>();
    }

上面已经完成了NioEventLoop的创建,并保存在NioEventLoopGroup的数组属性上。而关于NioEventLoop在具备线程池能力时,在何时启动已经创建的线程呢?在SingleThreadEventExecutor::execute中可以找到答案

  1. 当任务被提交到NioEventLoop以后,首先判断提交任务的线程与thread属性保存的线程是否是同一个
  2. 如果不是同一个,startThread中根据维护的STATE_UPDATER来判断当前NioEventLoop是否已经启动过,如果没有,使用CAS更新该NioEventLoop状态为已经启动
  3. 然后启动该线程,后续如果再执行该方法,由于线程已经启动,方法内什么都不会做
  4. 再回到execute中,addTask将任务生产到之前创建的队列中
    // SingleThreadEventExecutor
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    // AbstractEventExecutor
    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

    // SingleThreadEventExecutor
    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
    // SingleThreadEventExecutor
    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                schedule(new ScheduledFutureTask<Void>(
                        this, Executors.<Void>callable(new PurgeTask(), null),
                        ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
                thread.start();
            }
        }
    }
    // SingleThreadEventExecutor
    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (isShutdown()) {
            reject();
        }
        taskQueue.add(task);
    }

而关于NioEventLoop内线程启动后的逻辑,可以在创建该线程时看到有向线程提交一个任务。根据任务内SingleThreadEventExecutor.this.run()可以定位到创建线程提交任务的NioEventLoop::run

        // SingleThreadEventExecutor
        thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }
                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error(
                                "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                        "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });

NioEventLoop::run是Netty的核心所在。它是NioEventLoop的线程执行的唯一任务。方法内无限循环,阻塞等待IO事件或队列中的任务

  1. 判断NioEventLoop的队列中是否有任务
  2. 如果有,执行一次selectNow,这会导致多路复用器上准备就绪的事件被分配到selectedKeys上(jdk nio基础,操作系统介入)
  3. 如果没有,执行select(oldWakenUp);,阻塞oldWakenUp时间后,开始下一个循环(阻塞期间等待多路复用器上准备就绪的事件被分配到selectedKeys上)
  4. 根据设置的IO事件的执行比例(默认50%),计算出执行队列任务的最长执行时间(先记录IO事件执行的总耗时,然后根据比例,计算出执行队列任务的最长执行时间)
    // NioEventLoop
    @Override
    protected void run() {
        for (;;) {
            boolean oldWakenUp = wakenUp.getAndSet(false);
            try {
                if (hasTasks()) {
                    selectNow();
                } else {
                    select(oldWakenUp);

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();

                    processSelectedKeys();

                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
        }
    }
执行IO事件
  1. 在有IO事件就绪时,处理for循环本次循环取出的这些事件
  2. 这里有看到unsafe类的介入,对于客户端而言,可能存在的IO事件有建立连接的应答,或者服务端主动发送消息给客户端
  3. 交由unsafe来处理的对应方法分别为unsafe::finishConnect和unsafe::read
  4. 对unsafe的分析将在后面完成,这里暂时不展开
    // NioEventLoop
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                for (;;) {
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                    i++;
                }

                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See https://github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
执行队列任务

关于队列中的任务从哪里来?一是源码内部,启动时会直接提交任务到队列中;二是可以直接取出channel中的NioEventLoop向其提交任务;三是使用Channel写数据时,都是以任务的形式提交到队列中。与Channel绑定的NioEventLoop循环时会消费提交到队列中任务并执行,后续在分析unsafe时会一并提及。

  1. IO事件执行完成后,回到NioEventLoop::run中,在runAllTasks中开始执行队列中的任务
  2. pollTask从队列中取出一个任务,接着在for循环中执行该任务,并重复取出任务、执行任务的过程
  3. 退出for循环的条件有两个,一是队列中没有需要执行的任务,二是到了执行定时任务的时间
    // NioEventLoop
    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

三 客户端Channel类型

客户端Bootstrap配置引导时,需要指定Channel类型,后续会使用反射创建该Channel类型实例。

  1. 假设以Channel类型为NioSocketChannel来跟踪源码
  2. 一系列构造器之间的调用,将JDK原生的SocketChannel作为属性保存在父类AbstractChannel的parent属性上
  3. 创建NioSocketChannelUnsafe实例保存在父类AbstractChannel属性unsafe上
  4. 创建管道DefaultChannelPipeline保存在父类AbstractChannel属性pipeline上
  5. 管道DefaultChannelPipeline中默认拥有head和tail两个ChannelHandler,构成只有头尾两节点的双向链表。
    // NioSocketChannel
    public NioSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    public NioSocketChannel(SocketChannel socket) {
        this(null, socket);
    }
    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }
    // AbstractNioByteChannel
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }
    // AbstractNioChannel
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
    // AbstractChannel
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = new DefaultChannelPipeline(this);
    }
    // NioSocketChannel
    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }
    // DefaultChannelPipeline
    public DefaultChannelPipeline(AbstractChannel channel) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        this.channel = channel;

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

四 客户端建立连接

bootstrap.connect()

客户端完成NioEventLoopGroup和Bootstrap的创建及对Bootstrap进行相关的设置后,使用Bootstrap尝试与服务端建立连接。在同步与异步之间,推荐使用异步来监听连接事件的结果。

  1. 一些必须的参数校验
  2. 由于是与服务器建立连接,localAddress值为null
  3. doConnect中先调用initAndRegister异步创建通道及注册
  4. 无论是使用isDone还是使用监听器,都是在通道创建完成后,使用doConnect0进行连接
    // Bootstrap
    public ChannelFuture connect() {
        validate();
        SocketAddress remoteAddress = this.remoteAddress;
        if (remoteAddress == null) {
            throw new IllegalStateException("remoteAddress not set");
        }

        return doConnect(remoteAddress, localAddress());
    }

    private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise = channel.newPromise();
        if (regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

        return promise;
    }

AbstractBootstrap是Bootstrap的父类,initAndRegister中完成了通道的创建、初始化以及注册

  1. 使用Bootstrap中默认的BootstrapChannelFactory,根据设置的channel,使用反射创建channel实例
  2. 假设使用上述NioSocketChannel,当然也可以是别的channel类型
  3. init负责对创建的channel初始化
  4. 初始化完成后异步完成注册
    // AbstractBootstrap
    final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }
Channel初始化
  1. init方法还是交由子类Bootstrap实现
  2. 取出channel上的管道,将客户端自定义的handler添加进去,底层其实是对双向链表的变更,如DefaultChannelPipeline::addLast0所示,这时会形成head、自定义ChannelInitializer、tail三个节点
  3. 对channel的option和attr进行设置,以支持将自定义参数配置到channel上
    // Bootstrap
    @Override
    @SuppressWarnings("unchecked")
    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(handler());

        final Map<ChannelOption<?>, Object> options = options();
        synchronized (options) {
            for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
                try {
                    if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + channel, t);
                }
            }
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }
    // DefaultChannelPipeline
    private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
        checkMultiplicity(newCtx);

        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;

        name2ctx.put(name, newCtx);

        callHandlerAdded(newCtx);
    }
Channel注册

group()取到开始配置的NioEventLoopGroup,register在父类MultithreadEventLoopGroup中

  1. next()内借助之前生成的chooser的next()方法,选出MultithreadEventExecutorGroup中child数组上的一个NioEventLoop
  2. 接着变成NioEventLoop::register方法,该方法来自父类SingleThreadEventLoop
  3. 在SingleThreadEventLoop中再转换为channel上unsafe::register,关于unsafe暂时先不展开介绍。
    // MultithreadEventLoopGroup
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    // MultithreadEventExecutorGroup
    @Override
    public EventExecutor next() {
        return chooser.next();
    }
    // GenericEventExecutorChooser
    private final class GenericEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            return children[Math.abs(childIndex.getAndIncrement() % children.length)];
        }
    }
    // SingleThreadEventLoop
    @Override
    public ChannelFuture register(Channel channel) {
        return register(channel, new DefaultChannelPromise(channel, this));
    }
    @Override
    public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (promise == null) {
            throw new NullPointerException("promise");
        }

        channel.unsafe().register(this, promise);
        return promise;
    }
Channel管道初始化

虽说暂时不引入unsafe类逻辑,但在unsafe::register中,通道注册完成后会调用管道的fireChannelRegistered方法,进而执行自定义的ChannelInitializer,最终形成完整的管道。
目前管道上链表有三个节点:head、自定义ChannelInitializer、tail(规定自头向尾的流向为In,自尾向头的为Out)
它们的类型都是AbstractChannelHandlerContext,每个Context上都持有对应的ChannelHandler

  1. 首先head执行父类方法fireChannelRegistered,该方法找到以In流向的下一个AbstractChannelHandlerContext
  2. 然后在invokeChannelRegistered中找到Context持有的Handler,执行Handler::channelRegistered
  3. 自定义的ChannelInitializer属于ChannelInboundHandler的子类,流向为In。其channelRegistered方法执行时,调用被重写的initChannel方法
  4. initChannel方法内一般定义的是业务自身往管道添加Handler的逻辑ChannelPipeline::addLast
  5. addLast之前自定义ChannelInitializer已经有分析过,目的是将ChannelHandler封装为Context添加到管道上,并根据实现的接口标记节点流向,最终形成完整的双向链表
  6. 最后回到ChannelInitializer::channelRegistered中,将自定义的ChannelInitializer移出,因为它只需要在channel初始化时发挥作用,不需要存在于实际的管道中
    // DefaultChannelPipeline
    @Override
    public ChannelPipeline fireChannelRegistered() {
        head.fireChannelRegistered();
        return this;
    }
    // AbstractChannelHandlerContext
    @Override
    public ChannelHandlerContext fireChannelRegistered() {
        final AbstractChannelHandlerContext next = findContextInbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new OneTimeTask() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
        return this;
    }

    private void invokeChannelRegistered() {
        try {
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

    // ChannelInitializer
    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ChannelPipeline pipeline = ctx.pipeline();
        boolean success = false;
        try {
            initChannel((C) ctx.channel());
            pipeline.remove(this);
            ctx.fireChannelRegistered();
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }
Channel连接

至此,Channel的初始化及注册已经完成。回到Bootstrap::doConnect中,在Channel注册这个异步任务完成后,开始真正的连接服务端

  1. 获取到Channel绑定的NioEventLoop,提交任务到线程池
  2. 任务被触发时,使用channel进行连接
  3. 使用channel上的管道pipeline进行连接,从tailf开始,以Out的流向,找到下一个Out节点,执行其invokeConnect
  4. 最终流到head节点,head内继续使用unsafe与服务端进行连接的建立
    // Bootstrap类
    private static void doConnect0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, promise);
                    } else {
                        channel.connect(remoteAddress, localAddress, promise);
                    }
                    promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

    // AbstractChannel
    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, promise);
    }
     // DefaultChannelPipeline
    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }
    // AbstractChannelHandlerContext
    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return connect(remoteAddress, null, promise);
    }

    @Override
    public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        if (!validatePromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new OneTimeTask() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }

        return promise;
    }

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

    // HeadContext
    @Override
    public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }

五 客户端发送数据

channel.write("test");

channel连接建立完成后,可以使用channel写数据

  1. 使用channel写数据时,从channel开始,经过管道,找到tail开始执行写数据逻辑
  2. tail作为双向链表的尾部,使用自尾向头的Out流向,找到下一个Context,直到最终流到head节点
  3. head节点转化为unsafe::write,在后面分析
    // AbstractChannel
    @Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }

    // DefaultChannelPipeline
    @Override
    public ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

    // AbstractChannelHandlerContext
    @Override
    public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }
    @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        if (!validatePromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
        write(msg, false, promise);

        return promise;
    }
    private void write(Object msg, boolean flush, ChannelPromise promise) {

        AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeWrite(msg, promise);
            if (flush) {
                next.invokeFlush();
            }
        } else {
            int size = channel.estimatorHandle().size(msg);
            if (size > 0) {
                ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
                // Check for null as it may be set to null if the channel is closed already
                if (buffer != null) {
                    buffer.incrementPendingOutboundBytes(size);
                }
            }
            Runnable task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, msg, size, promise);
            }  else {
                task = WriteTask.newInstance(next, msg, size, promise);
            }
            safeExecute(executor, task, promise, msg);
        }
    }
    private void invokeWrite(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

    // HeadContext
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

写数据不会直接发送数据到服务端,而是会缓存起来,直到调用flush才会完成数据的最终发送。flush逻辑与写数据一样,channel到pipeline,tail到head。最终交由unsafe类来完成

channel.flush();
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }

六 总结

以客户端建立连接发送数据流程为例(服务端大部分逻辑相似),总结Netty的工作流程:

  1. 声明NioEventLoopGroup(事件循环组),内部会创建两倍核数大小的数组,并为数组上的每一个位置分配NioEventLoop(事件循环),NioEventLoop内包含一个未启动线程和一个任务队列
  2. 声明Bootstrap(引导),指定事件循环组、远程地址、option、attr、channel类型和自定义的ChannelInitializer
  3. 使用Bootstrap发起建立连接请求
  • 使用反射根据指定的channel类型创建channel实例
  • 对channel实例进行初始化,配置自定义ChannelInitializer到channel管道中,设置option和attr
  • 从NioEventLoopGroup维护的数组中选出NioEventLoop,借助NioEventLoop开始channel的注册逻辑
  • 这个过程会激活NioEventLoop内创建的线程。该线程进而启动一个无限for循环,开始接收多路复用器上就绪的IO事件,同时执行NioEventLoop内队列上的任务
  • 接着channel会与该NioEventLoop绑定,并将channel注册到多路复用器上
  • channel注册完成后,执行为channel指定的ChannelInitializer,最终在channel上形成完整的管道配置
  • 再设置channel为readPending的状态
  • 待channel准备就绪后,使用channel建立与服务端的连接,发起建立连接的请求,由NioEventLoop完成
  • 处理服务端返回的建立请求应答,IO事件由NioEventLoop完成
  1. Bootstrap在连接建立完成后返回channel实例,使用channel实例完成数据的发送。同时与channel绑定的NioEventLoop内的线程,监听并处理多路复用器上准备就绪的IO事件(接收服务端数据应答或服务端主动发起的数据)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342