java io 模型一共分为4种
- 传统IO
- 伪异步IO
- NIO
- AIO
介绍
- netty是一个高性能,异步事件驱动的NIO框架,它提供了对TCP、UDP和file传输的支持,netty所有的IO操作都为异步非阻塞,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
能做什么
- 开发异步、非阻塞的TCP、UDP网络应用程序
- 开发异步文件传输应用程序
- 开发异步HTTP服务端和客户端应用程序
- 提供对多种编解码框架的集成,包括谷歌的Protobuf、Jboss marshalling、Java序列化、压缩编解码、XML解码、字符串编解码等,这些编解码框架可以被用户直接使用。
- 提供形式多样的编解码基础类库,可以非常方便的实现私有协议栈编解码框架的二次定制和开发。
- 基于职责链模式的Pipeline-Handler机制,用户可以非常方便的对网络事件进行拦截和定制。
- 所有的IO操作都是异步的,用户可以通过Future-Listener机制主动Get结果或者由IO线程操作完成之后主动Notify结果,用户的业务线程不需要同步等待。
- IP黑白名单控制 。
- 打印消息码流。
- 流量控制和整形。
- 性能统计。
- 基于链路空闲事件检测的心跳检测。
EventLoop,EventLoopGroup
- EventLoop目的是为Channel处理IO操作,一个EventLoop可以为多个Channel服务,EventLoopGroup会包含多个EventLoop。
BootStrap,ServerBootstrap
- 一个Netty应用通常由一个Bootstrap开始,它主要作用是配置整个Netty程序,串联起各个组件。
ChannelInitializer
- 当一个链接建立时,我们需要知道怎么来接收或者发送数据,当然,我们有各种各样的Handler实现来处理它,那么ChannelInitializer便是用来配置这些Handler,它会提供一个ChannelPipeline,并把Handler加入到ChannelPipeline。
Handler
- 为了支持各种协议和处理数据的方式,便诞生了Handler组件。Handler主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelInboundHandler
- 一个最常用的Handler。这个Handler的作用就是处理接收到数据时的事件,也就是说,我们的业务逻辑一般就是写在这个Handler里面的,ChannelInboundHandler就是用来处理我们的核心业务逻辑
Future
- 在Netty中所有的IO操作都是异步的,因此,你不能立刻得知消息是否被正确处理,但是我们可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发。总之,所有的操作都会返回一个ChannelFuture。
DEMO
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
/**
* @author hejian
*/
public class MyService {
private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
public static void main(String[] args) throws Exception {
new MyService().start();
}
private static int _BOOSEVENTNUM = 1;
private static int _PORT = 7878;
public void start() throws Exception {
EventLoopGroup boss = new NioEventLoopGroup(_BOOSEVENTNUM);
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 构建
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new Myhandler());
//心跳机制
}
});
ChannelFuture future = bootstrap.bind(_PORT).sync();
System.out.println("start success is port " + _PORT);
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
class Myhandler extends SimpleChannelInboundHandler<String> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("服务端收到:" + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
ctx.writeAndFlush("Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
super.channelActive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
System.out.println("5 秒没有接收到客户端的信息了,关闭不活跃链接通道");
//throw new Exception("idle exception");
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
- client
package com.java.netty;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
public class MyClient {
public static void main(String[] args) throws Exception{
new MyClient().start();
}
public void start() throws Exception {
EventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("ping", new IdleStateHandler(0, new Random().nextInt(10), 0, TimeUnit.SECONDS));
//解码器
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new MyClientHandler());
}
});
Channel ch = bootstrap.connect("127.0.0.1", 7878).sync().channel();
//等待窗口数据写入
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (null == line) {
continue;
}
ch.writeAndFlush(line + "\r\n");
}
} finally {
worker.shutdownGracefully();
}
}
}
/**
* 业务处理器
* @author hejian
*
*/
class MyClientHandler extends SimpleChannelInboundHandler<String> {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(new Random().nextInt(10) + "客户端心跳信息",
CharsetUtil.UTF_8));
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(">>>>>>>> client msg " + msg);
//回写service 应答数据
ctx.writeAndFlush("client is ask");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
// if(currentTime <= TRY_TIMES){
// System.out.println("currentTime:"+currentTime);
// currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
// }
}
}
}
}