Netty 开发环境搭建
- 安装JDK
- 下载Netty对应的jar包
- 搭建一个项目工程
Netty 服务端开发
package com.netty.study;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* netty server
* @author yuxi
*/
public class NettyServer {
public static void main(String[] args) {
bind(8089);
}
/**
* bind port
* @param port
*/
private static void bind(int port) {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
}catch (Exception e){
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
private static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
}
- 创建两个NIO线程组
- 创建ServerBootstrap,它是NIO服务端启动辅助类
- 启动参数配置
- 绑定I/O事件的处理类ChildChannelHandler
package com.netty.study;
import java.util.Date;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* @author yuxi
*/
public class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf)msg;
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
String body = new String(req,"utf-8");
System.out.println("this is body:"+body);
String currentTime = "ok".equals(body)?new Date(System.currentTimeMillis()).toString():
"not ok";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
Netty 客户端开发
package com.netty.study;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* netty client
* @author yuxi
*/
public class NettyClient {
public static void main(String[] args) {
connect("127.0.0.1",8089);
}
/**
*
* @param host
* @param port
*/
private static void connect(String host, int port) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
}finally {
group.shutdownGracefully();
}
}
}
- 创建客户端处理I/O读写的线程组
- 创建客户端启动辅助类
- 启动参数配置
- 调研connect,发起异步连接
package com.netty.study;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class TimeClientHandler extends ChannelHandlerAdapter {
private final ByteBuf firstMessage;
public TimeClientHandler() {
byte[] req = "ok".getBytes();
this.firstMessage = Unpooled.buffer(req.length);
this.firstMessage.writeBytes(req);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf)msg;
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
String body = new String(req,"utf-8");
System.out.println("now is:"+body);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMessage);
}
}