Netty之Channel

分析的netty版本4.1.17

一、Channel

Netty的抽象了一个顶层接口Channel相比原来NIO提供的Channel有更多的功能,当然也是相对复杂的。

1. Channel的功能

1.1 网络的IO操作

网络的IO操作包含read,write,flush,close,disconnect,connect,bind,config,localAddress,remoteAdress等IO的功能操作。

1.2 其他功能

  • eventLoop():这个比较重要,channel是需要注册到eventLoop多路复用器上的,通过这个方法可以获取当前channel所注册的eventLoop;当然eventLoop除了处理IO操作还可以执行定时任务和自定义的NIOTask。
  • metadata():在Netty中每一个channel都对应一个物理连接,这个元数据表示的就是每一个连接对应的TCP参数配置,通过这个方法可以获取相对应的配置信息。

  • parent():对于服务端Channel来讲,它的父channel是空,而客户端的channel,它的父channel就是创建它的ServerSocketChannel.

  • id():这个返回的是ChannelId对象,它是由mac地址,进程id,自增序列,系统时间数,随机数等构成的。

2.Channel结构和源码

2.1NioServerSocketChannel继承结构

YqoDb9.png

2.2 NioSocketChannel继承结构

YqoBDJ.png

简单的看看上面两个图的,做下对比:

两个相同之处很明显AbstractChannel---->AbstractNioChannel及DefaultAttributeMap

主要不同点是 NioSocketChannel继承的是AbstractNioByteChannel接口是SockerChannel;NioServerSocketChannle继承是的AbstractNioMessageChannle以及实现接口ServerSocketChannel

2.3 channel的生命周期

Netty 有一个简单但强大的状态模型,并完美映射到ChannelInboundHandler 的各个方法。下面是Channel 生命周期中四个不同的状态:

状态描述

  • channelUnregistered() :Channel已创建,还未注册到一个EventLoop上

  • channelRegistered(): Channel已经注册到一个EventLoop上

  • channelActive() :Channel是活跃状态(连接到某个远端),可以收发数据

  • channelInactive(): Channel未连接到远端

一个Channel 正常的生命周期如下图所示。随着状态发生变化相应的事件产生。这些事件被转发到ChannelPipeline中的ChannelHandler 来触发相应的操作。

image

2.4 相关源码

1)AbstractChannel

列出了主要的成员变量,和主要网络IO操作的实现

   重点看了下网络读写操作,网络I/O操作时讲到它会触发ChannelPipeline中对应的事件方法。Netty 基于事件驱动,我们也可以理解为当Chnanel进行I/O操作时会产生对应的I/O事件,然后驱动事件在ChannelPipeline中传播,由对应的ChannelHandler对事件进行拦截和处理,不关心的事件可以直接忽略。采用事件驱动的方式可以非常轻松地通过事件定义来划分事件拦截切面,方便业务的定制和功能扩展,相比AOP,其性能更高,但是功能却基本等价。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);

    private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
    private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
    private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
    private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
    private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");

    private final Channel parent;     //父channel
    private final ChannelId id;        //channel 的唯一id
    private final Unsafe unsafe;     //unsafe底层io操作应用
    private final DefaultChannelPipeline pipeline;  //执行channel链
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);

    private volatile SocketAddress localAddress;  
    private volatile SocketAddress remoteAddress;
    private volatile EventLoop eventLoop;   //channel所注册的eventLoop
    private volatile boolean registered;       //变量是否完成注册
    private boolean closeInitiated;

    /** Cache for the string representation of this channel */
    private boolean strValActive;
    private String strVal;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();    
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
  ...
    //主要的IO操作,发先都是通过pipeline事件传播实现
    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }


   @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return pipeline.writeAndFlush(msg, promise);
    }

    @Override
    public Channel read() {
        pipeline.read();
        return this;
    }

2) AbstractNioChannel

会使用到nio的相关类,Selector做相关操作位的使用

public abstract class AbstractNioChannel extends AbstractChannel {

    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(AbstractNioChannel.class);

    private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractNioChannel.class, "doClose()");

    private final SelectableChannel ch; //Socketchannle和ServerSocketChannel的公共操作类,用来设置SelectableChannel相关参数和IO操作
    protected final int readInterestOp;    //代表JDK的SelectionKey的OP_READ
    volatile SelectionKey selectionKey;  //JDK的selectionKey
    boolean readPending;
    private final Runnable clearReadPendingRunnable = new Runnable() {
        @Override
        public void run() {
            clearReadPending0();
        }
    };

    /**
     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
     */
    private ChannelPromise connectPromise; 
    private ScheduledFuture<?> connectTimeoutFuture;   //连接超时定时器future
    private SocketAddress requestedRemoteAddress;   //请求通信地址

核心API
A:doRegister() :定义一个布尔类型的局部变量selected来标识注册操作是否成功,调用nio的AbstractSelectableChannel的register方法,将当前的Channel注册到EventLoop的多路复用器selector上。

  //核心操作,注册操作

 //1) 如果当前注册返回的selectionKey已经被取消,则抛出CancelledKeyException异常,捕获该异常进行处理。
// 2) 如果是第一次处理该异常,调用多路复用器的selectNow()方法将已经取消的selectionKey从多路复用器中删除掉。操作成功之后,将selected置为true, 说明之前失效的selectionKey已经被删除掉。继续发起下一次注册操作,如果成功则退出,
//3) 如果仍然发生CancelledKeyException异常,说明我们无法删除已经被取消的selectionKey,按照JDK的API说明,这种意外不应该发生。如果发生这种问题,则说明可能NIO的相关类库存在不可恢复的BUG,直接抛出CancelledKeyException异常到上层进行统一处理。

  
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

}

channel注册时候通过操作位表示对某个事件感兴趣:

//SelectionKey
public static final int OP_READ = 1 << 0;   //读操作位
public static final int OP_WRITE = 1 << 2;  //写操作位
public static final int OP_CONNECT = 1 << 3;  //客户端连接操作位
public static final int OP_ACCEPT = 1 << 4;  //服务端接受连接操作位

//如果注册的操作位为0表示只是完成注册功能,说明对任何事件都不感兴趣

注册时可以指定附件,后续获取到事件通知时可以从SelectionKey中获取到附件,上面是将当前AbstractNioSocket实现子类自身当做附件,如果注册成功则可以通过返回的SelectionKey从多路复用器中获取channel对象。

B:doBeginRead() 读之前的准备

  @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;   //key无效的话直接返回
        }

        readPending = true;  //表示读pending中

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {  //表示当前没有读操作位
            selectionKey.interestOps(interestOps | readInterestOp);  //设置读操作位
        }

    
    }

  //SelectionKey中定义的是否可读操作
  public final boolean isReadable() {
        return (readyOps() & OP_READ) != 0;
    }

3) AbstractNioByteChannel

先看成员变量

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
    private static final String EXPECTED_TYPES =
            " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
            StringUtil.simpleClassName(FileRegion.class) + ')';

    private Runnable flushTask;  //flush工作的task任务,主要是继续写半包消息

}

接下来看核心API doWrite操作

配置中设置循环次数是避免半包中数据量过大,IO线程一直尝试写操作,此时IO线程无法处理其他IO操作或者定时任务,比如新的消息或者定时任务,如果网络IO慢或者对方读取慢等造成IO线程假死的状态

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1; //        写自选次数

        boolean setOpWrite = false;  //写操作位为0
        for (;;) {
            Object msg = in.current();
            if (msg == null) {  //从环形数组ChannelOutboundBuffer弹出一条消息,如果为null,表示消息已经发送完成,
                // Wrote all messages.
                clearOpWrite();  //清除写标志位,退出循环
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) { //如果可读字节为0,则丢弃该消息,循环处理其他消息
                    in.remove();
                    continue;
                }

                boolean done = false;    //消息是否全部发送完毕表示
                long flushedAmount = 0;  //发送的字节数量
                if (writeSpinCount == -1) {
                    //如果为-1的时候从配置中获取写循环次数
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    int localFlushedAmount = doWriteBytes(buf);  //由子类实现写
                    if (localFlushedAmount == 0) {  //这里表示本次发送字节为0,发送TCP缓冲区满了,所以此时为了避免空循环一直发送,这里就将半包写表示设置为true并退出循环
                        setOpWrite = true;
                        break;
                    }
                    //发送成功就对发送的字节计数
                    flushedAmount += localFlushedAmount;
                    if (!buf.isReadable()) { //如果没有可读字节,表示已经发送完毕
                        done = true; //表示发送完成,并退出循环
                        break;
                    }
                }
                //通知promise当前写的进度
                in.progress(flushedAmount); 

                if (done) {  //如果发送完成,移除缓冲的数据
                    in.remove();
                } else {
                    如果没有完成会调用incompleteWrite方法
                    // Break the loop and so incompleteWrite(...) is called.
                    break;
                }
            } else if (msg instanceof FileRegion) {  //这个是文件传输和上面类似
                FileRegion region = (FileRegion) msg;
                boolean done = region.transferred() >= region.count();

                if (!done) {
                    long flushedAmount = 0;
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }

                    for (int i = writeSpinCount - 1; i >= 0; i--) {
                        long localFlushedAmount = doWriteFileRegion(region);
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }

                        flushedAmount += localFlushedAmount;
                        if (region.transferred() >= region.count()) {
                            done = true;
                            break;
                        }
                    }

                    in.progress(flushedAmount);
                }

                if (done) {
                    in.remove();
                } else {
                    // Break the loop and so incompleteWrite(...) is called.
                    break;
                }
            } else {
                // Should not reach here.
                throw new Error();
            }
        }
        //如果没有完成写看看需要做的事情
        incompleteWrite(setOpWrite);
    }

//未完成写操作,看看操作
  protected final void incompleteWrite(boolean setOpWrite) {
        // Did not write completely.
        if (setOpWrite) {  //如果当前的写操作位true,那么当前多路复用器继续轮询处理
            setOpWrite();
        } else {  //否则重新新建一个task任务,让eventLoop后面点执行flush操作,这样其他任务才能够执行
            // Schedule flush again later so other tasks can be picked up in the meantime
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = new Runnable() {
                    @Override
                    public void run() {
                        flush();
                    }
                };
            }
            eventLoop().execute(flushTask);
        }
    }

4) AbstractNioMessageChannel

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    boolean inputShutdown;  //只有一个成员变量,表示是否数据读取完毕

}

主要实现方法是doWrite

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();

        for (;;) {
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                    key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                }
                break;
            }
            try {
                boolean done = false;
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                    if (doWriteMessage(msg, in)) {
                        done = true;
                        break;
                    }
                }

                if (done) {
                    in.remove();
                } else {
                    // Did not write all messages.
                    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                        key.interestOps(interestOps | SelectionKey.OP_WRITE);
                    }
                    break;
                }
            } catch (Exception e) {
                if (continueOnWriteError()) {
                    in.remove(e);
                } else {
                    throw e;
                }
            }
        }
    }

通过代码分析我们发现,AbstractNioMessageChannel 和AbstractNioByteChannel的消息发送实现比较相似,不同之处在于:一个发送的是ByteBuf或者FileRegion,它们可以直接被发送;另一个发送的则是POJO对象。

5) NioServerSocketChannel

先看成员变量

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);  //有channel的元数据
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();  //打开ServerSocketChannel
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

    private final ServerSocketChannelConfig config;  //用于配置serversocketchannel的tcp相关参数

再看看对应的接口实现操作:

    @Override
    public boolean isActive() {
        return javaChannel().socket().isBound(); //判断端口是否属于绑定状态S
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return null;
    }

    @Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }

    @Override
    protected SocketAddress localAddress0() {
        return SocketUtils.localSocketAddress(javaChannel().socket());
    }

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());  
        } else {
            //绑定端口以及最大接受的客户端数量
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

看重点的API接口
doReadMessage():下面很明显通过Nio的接受客户端连接并新建一个NioSocketChannel并封装父类和nio的SocketChannel放到buf中,返回1表示服务端接受成功

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
        return 0;
    }

剩下其他的一些connect,remoteAddress0等是serverSocket不支持的所以调用直接抛异常。

6) NioSocketChannel

这个类相对比较重要,通信主要是它实现的。
先看成员变量

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

 private final SocketChannelConfig config;  //这个是socketchannel配置信息

    private static SocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openSocketChannel();  //open一个soketChannel
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }

   

A: connect操作

    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);  //先看本地地址是否为null,不为空直接绑定
        }

        boolean success = false;
        try {
            //连接远程地址,
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) { //  连接没有应答,再次 注册连接连接标识位
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected; //返回连接失败
        } finally {   //如果服务端拒绝或者REST抛出连接异常,则直接关闭连接
            if (!success) {
                doClose();
            }
        }
    }

B:doWrite 看写标识

 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {  //flushed字节为0直接清除可写标识位,标识没有可写的。
                // All written so clear OP_WRITE
                clearOpWrite();
                break;
            }
            long writtenBytes = 0;   //已经写出的字节数
            boolean done = false;   //是否写完
            boolean setOpWrite = false;  //写标识

            // Ensure the pending writes are made of ByteBufs only.
            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:  //标识没有可写
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    super.doWrite(in);
                    return;
                case 1:
                    // Only one ByteBuf so use non-gathering write
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default:
                  //默认的写方法
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }

            // Release the fully written buffers, and update the indexes of the partially written buffer.
            in.removeBytes(writtenBytes);

            if (!done) {
                // Did not write all buffers completely.
                incompleteWrite(setOpWrite);
                break;
            }
        }
    }

这个是AbstractNioByteChannel的写类似。

C:读写
具体的读写操作还是如下,还是比较简单的。

   @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

    @Override
    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }

3.Unsafe

Unsafe就是channel的辅助接口,我们实际的IO操作最后还是交给Unsafe操作,Unsafe接口的定义就是放在Channel中的;具体如下:


public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

  .....
    /**
     * Returns an <em>internal-use-only</em> object that provides unsafe operations.
     */
    Unsafe unsafe();

    interface Unsafe {

        /**
         * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
         * receiving data.
         */
        RecvByteBufAllocator.Handle recvBufAllocHandle();

        /**
         * Return the {@link SocketAddress} to which is bound local or
         * {@code null} if none.
         */
        SocketAddress localAddress();

        /**
         * Return the {@link SocketAddress} to which is bound remote or
         * {@code null} if none is bound yet.
         */
        SocketAddress remoteAddress();

        /**
         * Register the {@link Channel} of the {@link ChannelPromise} and notify
         * the {@link ChannelFuture} once the registration was complete.
         */
        void register(EventLoop eventLoop, ChannelPromise promise);

        /**
         * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
         * it once its done.
         */
        void bind(SocketAddress localAddress, ChannelPromise promise);

        /**
         * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
         * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
         * pass {@code null} to it.
         *
         * The {@link ChannelPromise} will get notified once the connect operation was complete.
         */
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

        /**
         * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
         * operation was complete.
         */
        void disconnect(ChannelPromise promise);

        /**
         * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
         * operation was complete.
         */
        void close(ChannelPromise promise);

        /**
         * Closes the {@link Channel} immediately without firing any events.  Probably only useful
         * when registration attempt failed.
         */
        void closeForcibly();

        /**
         * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
         * {@link ChannelPromise} once the operation was complete.
         */
        void deregister(ChannelPromise promise);

        /**
         * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
         * {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
         */
        void beginRead();

        /**
         * Schedules a write operation.
         */
        void write(Object msg, ChannelPromise promise);

        /**
         * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
         */
        void flush();

        /**
         * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
         * It will never be notified of a success or error and so is only a placeholder for operations
         * that take a {@link ChannelPromise} as argument but for which you not want to get notified.
         */
        ChannelPromise voidPromise();

        /**
         * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
         */
        ChannelOutboundBuffer outboundBuffer();
    }
}

总结下:
netty自定义了channel接口,通过组合的jdk的channel实现IO操作操作功能;当然channel需要注册到eventLoop的多路复用器上。一个channel对应一条实际的物理连接;这里主要详解了NioServersocketChannel和NioSocketChannel。下一章节我们看看EventLoop的实现细节

参考《Netty权威指南》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343