NIO
public class NIOServer {
/*标识数字*/
private int flag = 0;
/*缓冲区大小*/
private int BLOCK = 4096;
/*接受数据缓冲区*/
private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
/*发送数据缓冲区*/
private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
private Selector selector;
public NIOServer(int port) throws IOException {
// 打开服务器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 服务器配置为非阻塞
serverSocketChannel.configureBlocking(false);
// 检索与此通道关联的服务器套接字
ServerSocket serverSocket = serverSocketChannel.socket();
// 进行服务的绑定
serverSocket.bind(new InetSocketAddress(port));
// 通过open()方法找到Selector
selector = Selector.open();
// 注册到selector,等待连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server Start----8888:");
}
// 监听
private void listen() throws IOException {
while (true) {
// 阻塞 直到有事件到达
selector.select();
// 返回此选择器的已选择键集。
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
handleKey(selectionKey);
}
}
}
// 处理请求
private void handleKey(SelectionKey selectionKey) throws IOException {
// 接受请求
ServerSocketChannel server = null;
SocketChannel client = null;
String receiveText;
String sendText;
int count=0;
// 测试此键的通道是否已准备好接受新的套接字连接。
if (selectionKey.isAcceptable()) {
// 返回为之创建此键的通道。
server = (ServerSocketChannel) selectionKey.channel();
// 接受到此通道套接字的连接。
// 此方法返回的套接字通道(如果有)将处于阻塞模式。
client = server.accept();
// 配置为非阻塞
client.configureBlocking(false);
// 注册到selector,等待连接
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 返回为之创建此键的通道。
client = (SocketChannel) selectionKey.channel();
//将缓冲区清空以备下次读取
receivebuffer.clear();
//读取服务器发送来的数据到缓冲区中
count = client.read(receivebuffer);
if (count > 0) {
receiveText = new String( receivebuffer.array(),0,count);
System.out.println("服务器端接受客户端数据--:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (selectionKey.isWritable()) {
//将缓冲区清空以备下次写入
sendbuffer.clear();
// 返回为之创建此键的通道。
client = (SocketChannel) selectionKey.channel();
sendText="message from server--" + flag++;
//向缓冲区中输入数据
sendbuffer.put(sendText.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendbuffer.flip();
//输出到通道
client.write(sendbuffer);
System.out.println("服务器端向客户端发送数据--:"+sendText);
client.register(selector, SelectionKey.OP_READ);
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8888;
NIOServer server = new NIOServer(port);
server.listen();
}
}
public class NIOClient {
/*标识数字*/
private static int flag = 0;
/*缓冲区大小*/
private static int BLOCK = 4096;
/*接受数据缓冲区*/
private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
/*发送数据缓冲区*/
private static ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
/*服务器端地址*/
private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress(
"localhost", 8888);
public static void main(String[] args) throws IOException {
// 打开socket通道
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞方式
socketChannel.configureBlocking(false);
// 打开选择器
Selector selector = Selector.open();
// 注册连接服务端socket动作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 连接
socketChannel.connect(SERVER_ADDRESS);
// 分配缓冲区大小内存
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
SocketChannel client;
String receiveText;
String sendText;
int count=0;
while (true) {
//选择一组键,其相应的通道已为 I/O 操作准备就绪。
//此方法执行处于阻塞模式的选择操作。
selector.select();
//返回此选择器的已选择键集。
selectionKeys = selector.selectedKeys();
//System.out.println(selectionKeys.size());
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
System.out.println("client connect");
client = (SocketChannel) selectionKey.channel();
// 判断此通道上是否正在进行连接操作。
// 完成套接字通道的连接过程。
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println("完成连接!");
sendbuffer.clear();
sendbuffer.put("Hello,Server".getBytes());
sendbuffer.flip();
client.write(sendbuffer);
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
client = (SocketChannel) selectionKey.channel();
//将缓冲区清空以备下次读取
receivebuffer.clear();
//读取服务器发送来的数据到缓冲区中
count=client.read(receivebuffer);
if(count>0){
receiveText = new String( receivebuffer.array(),0,count);
System.out.println("客户端接受服务器端数据--:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (selectionKey.isWritable()) {
sendbuffer.clear();
client = (SocketChannel) selectionKey.channel();
sendText = "message from client--" + (flag++);
sendbuffer.put(sendText.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendbuffer.flip();
client.write(sendbuffer);
System.out.println("客户端向服务器端发送数据--:"+sendText);
client.register(selector, SelectionKey.OP_READ);
}
}
selectionKeys.clear();
}
}
}
Buffer
NIO ByteBuffer
//重要属性
private int position;
private int limit;
private int capacity;
写模式时 position
指向当前待写的索引处,limit
和capacity
都指向数组索引最大值+1处
读模式 需要调用flip()方法
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
即position
置为0,limit
置为写模式下postion
,标记最大可读位置。capacity
不变表示此buffer内存数组最大容量。
Netty ByteBuf
Neety ByteBuf不同于NIO ByteBuffer,ByteBuffer读写模式使用同一套索引,转换需要调用flip。而ByteBuf同时维护了两个索引:读索引和写索引,readIndex
与writeIndex
。有三种种类的buffer,堆缓冲/直接缓冲/混合缓冲
heap buffers
即在JVM堆中分配的字节数组,最普遍的缓冲形式,分配释放也较为廉价,一般使用方式:
direct buffers
直接缓存由JVM通过本地调用在对外进行分配,分配释放比heap buffers更为昂贵,但是效率高省去了一次拷贝,因为堆内存在socket中进行传输之前,都需要进行一次heap buffer 到 direct buffer的拷贝,使用方式:
composite buffers
对各种ByteBuf的组合使用,相当于ByteBuf的容器,使用方式:
引用计数 refCount
ByteBuf采用引用计数,ByteBuf被分配后reCount=1,调用retain()方法refCount+1,调用release()方法refCount-1.当refCount==0时代表ByeBuf生命周期已过,会被释放或者归池。调用refCount==0的ByteBuf将会抛出IllegalReferenceCountException。需要注意的是有些过程是netty内部帮助release的,比如SimpleChannelInboundHandler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
默认构造器实现类autoRelease为true,channelRead0模板方法由用户进行实现,处理信息,处理完成后可以看到netty帮助释放了。
ByteBuf拷贝,copy()进行深拷贝即内部字节数组也会即你想嗯拷贝,duplicate()进行浅拷贝,只对readIndex和writeIndex进行拷贝,但共用同一个字节数组。同样readSlice()浅拷贝,readBytes()深拷贝。
Netty
Netty服务端典型代码:
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup boosGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(3);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind().sync();
System.out.println(EchoServer.class.getName() +
" started and listening for connections on " + f.channel().localAddress());
f.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully().sync();
workGroup.shutdownGracefully().sync();
}
ServerSocketChannel初始化
ServerBootstrap中进行Channel等初始化工作实在bind()方法中进行的,一系列调用链后在initAndRegister()方法中进行ServerSocketChannel的初始化工作。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
这里重要的是channel实例化和channel初始化两部操作,channelFactory.newChannel()使用Channel的反射工厂生成一个Channel实例。
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
使用默认provider构建了一个java nio的ServerSocketChannel实例,后再调用重载构造方法,设置感兴趣事件OP_ACCEPT用于监听客户端连接事件,构造器一直调用上层构造器,其中在父类AbstractCHannel中实例化Unsafe和Pipeline,Unsafe封装了Java底层的Socket操作,实际上是沟通 Netty 上层和 Java 底层的重要的桥梁。
实例化过程:
调用 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO ServerSocketChannel
-
AbstractChannel(Channel parent) 中初始化 AbstractChannel 的属性:
parent 属性置为 null
unsafe 通过newUnsafe() 实例化一个 unsafe 对象, 它的类型是 AbstractNioMessageChannel#AbstractNioUnsafe 内部类
pipeline 是 new DefaultChannelPipeline(this) 新创建的实例.
-
AbstractNioChannel 中的属性:
SelectableChannel ch 被设置为 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.
readInterestOp 被设置为 SelectionKey.OP_ACCEPT
SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)
-
NioServerSocketChannel 中的属性:
- ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())
接下来是Channel的初始化工作:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init()方法中首先设置配置Options和Attributes,重点在于最后向Pipeline中添加一个ServerBootstrapAcceptor,ServerBootStrapAcceptor继承自ChannelInboundHandlerAdapter,核心是其中的channelRead()方法
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
当有客户端连接时,连接后的SocketChannel会有Pipeline进行链式调用,于是就会调用该channelRead方法,首先将ServerBootStrap中配置的childHanlder添加到SocketChannel的Pipeline中,然后设置SocketChannel的Options和Attributes,最后将SocketChannel注册到workGroup,绑定到一个EventLoop上,完成整个ServerChannel初始化工作。
EventLoopGroup
EventLoopGroup相当于线程池,EventLoop相当于线程池中的线程用于处理相关的IO事件。Server拥有两个线程池,bossGroup用于接受客户端连接事件,workGroup用于处理连接后SocketChannel产生的IO事件。一旦SocketChannel就绪就会在workGroup中进行注册,平均注册到一个EventLoop,此后该Channel就会与该EventLoop绑定,一个Channel对应一个EventLoop,但一个EventLoop可以对应多个Channel。
ChannelHandler
ChannelHandler为Netty消息的处理器,分为InboundChannelHandler和OutbountChannelHandler,对于Channel的read消息由InbountChannelHandler处理,write消息由OutbountChannelHandler处理。常见的几种处理器有ChannelInitializer是InbountChannelHandler,一般用于向Pipline中添加其他Handler,只会调用一次,完成后就会从Pipline中移除该Handler。Decoder和Encoder解码和编码器,Decoder为InbountHandler,Encoder为OutbountHandler。
ChannelPipline
一个Channel对应了一个CahnnelPipeline,维护保存了ChannelHandler的链式结构,具体如图:
ChannelContext
ChannelContext是对ChannelHandler的封装,在ChannelHandler添加进Pipeline时生成,其实Pipeline中的链式结构保存的是ChannelContext的双向链表,ChannelHandler是不知道所在Pipeline中前后ChannelHandler的,但ChannelContext保存有前后ChannelContext的索引。ChannelContext调用下一个处理器相关方法是以fire为开头的,比如context.fireChannelRead()即调用下一个处理器的channelRead操作。
还有需要注意的是pipeline和channel调用write()都是从第一个outbount开始链式处理,但context调用write()是从当前context的下一个outbount开始链式处理。
//channel
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
//pipeline
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
//context
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
........
}
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}