前言
问题
现如今我们使用通用的应用程序或者类库来实现系统之间地互相访问。例如,我们经常使用一个HTTP客户端来从web服务器上获取信息,或者通过web service来执行一个远程的调用。然而,有时候一个通用的协议或者它的实现并没有覆盖一些场景。比如我们无法使用一个通用的HTTP服务器来处理大文件、电子邮件、近实时消息比如财务信息和多人游戏数据。这就需要一个高度优化的协议实现来满足特殊的场景。比如,你可以实现一个优化的Ajax的聊天应用、媒体流传输或者是大文件传输的HTTP服务器。你甚至可以自己设计和实现一个新的协议来为你的需求量身定制。另一个不可避免的情况是,您必须处理遗留的专有协议,以确保与旧系统的互操作性。在这种情况下,重点是我们如何能快速实现该协议,并且不牺牲最终应用程序的稳定性和性能。
解决方案
Netty项目是一个提供异步事件(event-driven)驱动的网络应用框架,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
换句话说,Netty是一个NIO框架,它能够快速方便地开发网络应用程序,如服务器和客户端协议。Netty大大简化了网络程序的开发,如TCP和UDP的Socket开发。
“快速且简单”并不意味着应用程序会有难维护和性能低的问题。Netty是一个精心设计的框架,它从许多协议的实现中吸收了很多的经验比如FTP、SMTP、HTTP、许多二进制和基于文本的传统协议。因此,Netty在不降低开发效率、性能、稳定性、灵活性情况下,成功地找到了解决方案。
有一些用户可能已经发现其他的一些网络框架也声称自己有同样的优势,所以你可能会问是Netty和它们的不同之处。答案就是Netty的哲学设计理念。Netty从第一天开始就为用户提供了用户体验最好的API以及实现设计。这不是什么实质的东西,但当你阅读本指南并使用Netty时,你会意识到,这种哲学将使你的生活变得更容易。
入门指南
这个章节会介绍Netty核心的结构,并通过一些简单的例子来帮助你快速入门。当你读完本章节你马上就可以用Netty写出一个客户端和服务端。
如果你在学习的时候喜欢“自顶向下(top-down)”的方法,那你可能需要要从第二章《架构概述》开始,然后再回到这里。
开始之前
运行本章所介绍的示例有两个最低要求:最新版本的Netty和JDK 1.6或以上。最新的Netty版本在项目下载页面可以找到。为了下载到正确的JDK版本,请到你喜欢的网站下载。
当您阅读时,您可能会对本章中介绍的类有更多的问题。关于这些类的详细的信息请请参考API说明文档。为了方便,所有文档中涉及到的类名字都会被关联到一个在线的API说明。当然如果有任何错误信息、语法错误或者你有任何好的建议来改进文档说明,那么请联系Netty社区。
写一个丢弃(DISCARD
)服务
世界上最简单的协议不是”Hello,World!”,而是DISCARD
。他是一种丢弃了所有接收到的数据,并不做有任何的响应的协议。
为了实现DISCARD
协议,你唯一需要做的就是忽略所有收到的数据。让我们从处理器的实现开始,处理器是由Netty生成用来处理I/O事件的。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
DisCardServerHandler
继承自ChannelInboundHandlerAdapter
,这个类实现了ChannelInboundHandler
接口,ChannelInboundHandler
提供了许多事件处理的接口方法,你可以覆盖这些方法。现在,仅仅只需要继承ChannelInboundHandlerAdapter
类而不是你自己去实现接口方法。这里我们覆盖了
channelRead()
事件处理方法。当从客户端接收到新数据时,这个方法就会传入接收到的消息并被调用。这个例子中,收到的消息的类型是ByteBuf
。-
为了实现
DISCARD
协议,处理器需要忽略所有接受到的消息。ByteBuf
是一个引用计数(reference-counted)对象,这个对象必须显示地调用release()
方法来释放。请记住,释放所有传递到处理器的引用计数对象,是处理器的职责。通常,channelRead()
方法的实现就像下面的这段代码:@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
exceptionCaught()
事件处理方法是当出现Throwable对象才会被调用,即当Netty由于IO错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。然而这种方法的实现可能会有所不同,这取决于您想要处理的异常情况。比如,你可能想在关闭连接之前发送一个错误码的响应消息。
到目前为止一切都还比较顺利,我们已经实现了DISCARD
服务的一半功能,剩下的需要编写一个main()
方法来启动服务端的DiscardServerHandler
。
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
-
NioEventLoopGroup
是用来处理I/O操作的多线程事件循环器,Netty提供了许多不同的EventLoopGroup
的实现用来处理不同传输协议。在这个例子中我们实现了一个服务端的应用,因此会有2个NioEventLoopGroup
会被使用。第一个通常被称作‘boss’,用来接收进来的连接。第二个通常被称作‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。使用多少线程,以及它们如何映射到创建的通道(Channel
)上,这取决于EventLoopGroup
的实现,甚至可以通过构造函数来配置。 -
ServerBootstrap
是一个用于设置服务器的助手类。你可以直接使用Channel
设置服务。然而,请注意这会是一个复杂的处理过程,在很多情况下你并不需要这样做。 - 这里,我们指定使用
NioServerSocketChannel
类来举例说明一个新的Channel
如何接收进来的连接。 - 这里指定的 handler 将始终由新接受的
Channel
进行调用。ChannelInitializer
是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel
。也许你想通过增加一些处理类比如DiscardServerHandler
来配置这个新的Channel
所对应的ChannelPipeline
来实现你的网络程序。随着应用程序变得复杂,您可能会在管道中添加更多的处理程序,并最终将这个匿名类提取到顶级类。 - 你可以设置这里指定的通道实现的配置参数。我们正在写一个TCP/IP的服务端,因此我们被允许设置socket的参数选项比如
tcpNoDelay
和keepAlive
。请参考ChannelOption
和详细的ChannelConfig
实现的接口文档以此可以对ChannelOptions
的有一个大概的认识。 - 你注意到
option()
和childOption()
吗?option()
是提供给NioServerSocketChannel
用来接收进来的连接。childOption()
是提供给由父管道ServerChannel
接收到的连接,在这个例子中就是NioServerSocketChannel
。 - 我们继续。剩下的就是绑定端口然后启动服务。这里我们在机器上绑定了机器所有网卡上的8080端口。当然现在你可以多次调用
bind()
方法(基于不同绑定地址)。
恭喜!你已经完成熟练地完成了第一个基于Netty的服务端程序。
观察接收到的数据
现在我们已经编写出我们第一个服务端,我们需要测试一下他是否真的可以运行。最简单的测试方法是用 telnet
命令。例如,你可以在命令行上输入 telnet localhost 8080
然后任意输入。
然而,我们能说这个服务端是正常运行了吗?事实上我们也不知道因为他是一个discard服务。你根本不可能得到任何的响应。为了证明他仍然是在工作的,让我们修改服务端的程序来打印出他到底接收到了什么。
我们已经知道 channelRead()
方法是在数据被接收的时候调用。让我们放一些代码到 DiscardServerHandler
类的 channelRead()
方法:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
- 这个低效的循环事实上可以简化为:
System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
- 或者,你可以在这里调用
in.release()
。
如果你再次运行telnet命令,你将会看到服务端打印出了他所接收到的消息。
完整的discard server代码放在了 io.netty.example.discard
包下面。
ECHO服务
到目前为止,我们虽然接收到了数据,但没有做任何的响应。然而一个服务端通常会对一个请求作出响应。让我们学习如何通过实现 ECHO
协议来为客户端编写响应消息,其中任何接收到的数据都会被发送回去。
我们在前几节中实现的丢弃服务器的唯一区别是,它将接收到的数据发送回来,而不是将接收到的数据打印到控制台。因此,可以再次修改channelRead()方法:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
-
ChannelHandlerContext
对象提供了许多操作,使你能够触发各种各样的I/O事件和操作。这里我们调用了write(Object)
方法来逐字地把接受到的消息写入。请注意不同于DISCARD的例子我们并没有释放接受到的消息,这是因为当写入的时候Netty已经帮我们释放了。 -
ctx.write(Object)
方法不会使消息写入到通道上,他被缓冲在了内部,你需要调用ctx.flush()
方法来把缓冲区中数据强行输出。或者,你可以用更简洁的cxt.writeAndFlush(msg)
以达到同样的目的。
如果你再一次运行 telnet
命令,你会看到服务端会发回一个你已经发送的消息。
完整的echo服务的代码放在了io.netty.example.echo
包下面。
写一个时间服务器
在这个部分被实现的协议是 TIME
协议。和之前的例子不同的是在不接受任何请求时他会发送一个含32位的整数的消息,并且一旦消息发送就会立即关闭连接。在这个例子中,你会学习到如何构建和发送一个消息,然后在完成时主动关闭连接。
因为我们将会忽略任何接收到的数据,而只是在连接被创建发送一个消息,所以这次我们不能使用channelRead()方法了,代替他的是,我们需要覆盖channelActive()方法,下面的就是实现的内容:
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
如所说,
channelActive()
方法将会在连接被建立并且准备进行通信时被调用。我们编写一个32位整数,表示这个方法中的当前时间。为了发送一个新的消息,我们需要分配一个包含这个消息的新的缓冲。因为我们需要写入一个32位的整数,因此我们需要一个至少有4个字节的
ByteBuf
。通过ChannelHandlerContext.alloc()
得到当前的ByteBufAllocator
,然后分配一个新的缓冲。-
和往常一样我们需要编写一个构建好的消息。
但是等一等,flip在哪?难道我们使用NIO发送消息时不是调用
java.nio.ByteBuffer.flip()
吗?ByteBuf
之所以没有这个方法因为有两个指针;一个对应读操作一个对应写操作。当你向ByteBuf里写入数据的时候写指针的索引就会增加,同时读指针的索引没有变化。读指针索引和写指针索引分别代表了消息的开始和结束。比较起来,NIO缓冲并没有提供一种简洁的方式来计算出消息内容的开始和结尾,除非你调用flip方法。当你忘记调用flip方法而引起没有数据或者错误数据被发送时,你会陷入困境。这样的一个错误不会发生在Netty上,因为我们对于不同的操作类型有不同的指针。你会发现这样的使用方法会让你过程变得更加的容易,因为你已经习惯一种没有使用flip的方式。
另外一个点需要注意的是
ChannelHandlerContext.write()
(和writeAndFlush())方法会返回一个ChannelFuture
对象,一个ChannelFuture
代表了一个还没有发生的I/O操作。这意味着任何一个请求操作都不会马上被执行,因为在Netty里所有的操作都是异步的。举个例子下面的代码中在消息被发送之前可能会先关闭连接。Channel ch = ...; ch.writeAndFlush(message); ch.close();
因此,你需要在write()
方法返回的 ChannelFuture
完成后调用close()
方法,然后当他的写操作已经完成他会通知他的监听者。请注意,close()方法也可能不会立马关闭,他也会返回一个 ChannelFuture
。
-
当一个写请求已经完成是如何通知到我们?这个只需要简单地在返回的ChannelFuture上增加一个
ChannelFutureListener
。这里我们构建了一个匿名的ChannelFutureListener
类用来在操作完成时关闭Channel。
或者,你可以使用简单的预定义监听器代码:f.addListener(ChannelFutureListener.CLOSE);
为了测试我们的time服务如我们期望的一样工作,你可以使用UNIX的rdate命令:
$ rdate -o <port> -p <host>
port是你在main()函数中指定的端口,host使用locahost就可以了。
写一个Time客户端
不像DISCARD
和ECHO
的服务端,对于TIME协议我们需要一个客户端,因为人们不能把一个32位的二进制数据翻译成一个日期或者日历。在这一部分,我们将会讨论如何确保服务端是正常工作的,并且学习怎样用Netty编写一个客户端。
在Netty中,编写服务端和客户端最大的并且唯一不同的使用了不同的 Bootstrap
和 Channel
的实现。请看一下下面的代码:
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
-
Bootstrap
和ServerBootstrap
类似,不过他是对非服务端的channel而言,比如客户端或者无连接传输模式(connectionless)的channel。 - 如果你只指定了一个
EventLoopGroup
,那它就会同时作为一个‘boss’和‘workder’线程。尽管客户端不需要使用到‘boss’线程。 - 使用
NioSocketChannel
创建客户端的Channel
,而不是NioServerSocketChannel
。 - 注意,这里我们不使用
childOption()
,不像ServerBootstrap
那样,因为客户端的SocketChannel
没有父channel。 - 我们用
connect()
方法代替了bind()
方法。
正如你看到的,它和服务端的代码并没有太大的区别。 ChannelHandler
是如何实现的?他应该从服务端接受一个32位的整数消息,把他翻译成人们能读懂的格式,并打印翻译好的时间,然后关闭连接:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
- 在TCP/IP中,NETTY会把读到的数据放到
ByteBuf
中。
这样看起来非常简单,并且和服务端的那个例子的代码也相差不多。然而,这个handler有时候会拒绝工作并抛出IndexOutOfBoundsException
异常。在下个部分我们会讨论为什么会发生这种情况。
处理基于流的传输
一个小的Socket Buffer问题
在基于流的传输里,比如TCP/IP,接收到的数据会先被存储到一个socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。这意味这,即使您将两个消息发送为两个独立的包,操作系统不会将它们视为两个消息,但只是一堆字节。因此,无法保证你所读的内容与你的远程节点所写的完全相同。举个例子,让我们假设操作系统的TCP/TP协议栈已经接收了3个数据包:
由于基于流传输的协议的这种普通的性质,在你的应用程序里读取数据的时候会有很高的可能性被分成下面的片段。
因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意义并且能够让程序的业务逻辑更好理解的数据。在上面的例子中,接收到的数据应该被构造成下面的格式:
第一个解决方案
现在让我们回到 TIME
客户端的例子上。这里我们遇到了同样的问题,一个32字节数据是非常小的数据量,他并不见得会被经常拆分到到不同的数据段内。然而,问题是他确实可能会被拆分到不同的数据段内,并且拆分的可能性会随着通信量的增加而增加。
最简单的方案是构造一个内部的可积累的缓冲,直到4个字节全部接收到了内部缓冲。下面的代码修改了TimeClientHandler
的实现类修复了这个问题:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
-
ChannelHandler
有2个生命周期的监听方法:handlerAdded()
和handlerRemoved()
。你可以完成任意初始化任务只要他不会被阻塞很长的时间。 - 首先,所有接收的数据都应该被累积在
buf
变量里。 - 然后,处理器必须检查buf变量是否有足够的数据,在这个例子中是4个字节,然后处理实际的业务逻辑。否则,Netty会重复调用channelRead()当有更多数据到达直到4个字节的数据被积累。
第二个解决方案
尽管第一个解决方案已经解决了Time客户端的问题了,但是修改后的处理器看起来不那么的简洁,想象一下如果由多个字段比如可变长度的字段组成的更为复杂的协议时,你的 ChannelInboundHandler
的实现将很快地变得难以维护。
正如你所知的,你可以增加多个 ChannelHandler
到 ChannelPipeline
,因此,你可以把一整个 ChannelHandler
拆分成多个模块以减少应用的复杂程度。比如,你可以把TimeClientHandler
拆分成2个处理器:
-
TimeDecoder
处理数据拆分的问题 -
TimeClientHandler
原始版本的实现
幸运地是,Netty提供了一个可扩展的类,帮你完成TimeDecoder的开发:
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
-
ByteToMessageDecoder
是ChannelInboundHandler
的一个实现类,他可以在处理数据拆分的问题上变得很简单。 - 每当有新数据接收的时候,
ByteToMessageDecoder
都会调用decode()
方法来处理内部的那个累积缓冲。 -
Decode()
方法可以决定当累积缓冲里没有足够数据时可以往out
对象里放任意数据。当有更多的数据被接收了ByteToMessageDecoder会再一次调用decode()方法。 - 如果在decode()方法里增加了一个对象到out对象里,这意味着解码器解码消息成功。
ByteToMessageDecoder
将会丢弃在累积缓冲里已经被读过的数据。请记得你不需要对多条消息调用decode(),ByteToMessageDecoder
会持续调用decode()
直到不放任何数据到out
里。
现在我们有另外一个处理器插入到 ChannelPipeline
里,我们应该在 TimeClient
里修改 ChannelInitializer
的实现:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果你是一个大胆的人,你可能会尝试使用更简单的解码类 ReplayingDecoder
。不过你还是需要参考一下API文档来获取更多的信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty还提供了更多开箱即用的解码器使你可以更简单地实现更多的协议,帮助你避免开发一个难以维护的处理器实现。请参考下面的包以获取更多更详细的例子:
- 对于二进制协议请看
io.netty.example.factorial
- 对于基于文本行(text line-based)协议请看
io.netty.example.telnet
用POJO代替ByteBuf
我们已经讨论了所有的例子,到目前为止一个消息的消息都是使用 ByteBuf
作为一个基本的数据结构。在这一部分,我们会改进 TIME
协议的客户端和服务端的例子,用POJO替代 ByteBuf
。
在你的 ChannelHandler
中使用POJO优势是比较明显的。通过从ByteBuf提取信息的方式,分离handler的代码,将会使你的handler变得更加可维护和可重用。在 TIME
客户端和服务端的例子中,我们读取的仅仅是一个32位的整形数据,直接使用ByteBuf不会是一个主要的问题。然后,你会发现当你需要实现一个真实的协议,分离代码变得非常的必要。
首先,让我们定义一个新的类型叫做 UnixTime
。
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
现在我们可以修改下 TimeDecoder
类,返回一个 UnixTime
而不是 ByteBuf
。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
下面是修改后的解码器,TimeClientHandler
不再有任何的 ByteBuf
代码了。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
更加简单和优雅了,是吧?相同的技术可以被运用到服务端。让我们更改一下一开始的 TimeServerHandler
。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
现在,唯一缺失的部分是编码器,解码器是 ChannelOutboundHandler
的实现,能把 UnixTime
对象重新转化为一个 ByteBuf
。这比编写解码器要简单得多,因为在编码消息时,不需要处理包的分段和组装。
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
- 在这几行代码里还有几个重要的事情。
第一, 通过ChannelPromise
,当编码后的数据被写到了通道上Netty可以通过这个对象标记是成功还是失败。
第二, 我们不需要调用cxt.flush()
。因为处理器已经单独分离出了一个方法void flush(ChannelHandlerContext cxt)
,用于覆盖flush()
操作。
进一步简化操作,你可以使用 MessageToByteEncoder
:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
最后的任务就是在TimeServerHandler之前把TimeEncoder插入到 ChannelPipeline
,剩下的只是简单的练习。
关闭你的应用
关闭一个Netty应用往往只需要简单地通过 shutdownGracefully()
方法来关闭你构建的所有的 EventLoopGroup
。当 EventLoopGroup
被完全地终止,并且对应的所有 Channel
都已经被关闭时,Netty会返回一个Future对象通知你。
总结
在本章中,我们快速浏览了Netty,并演示了如何在Netty上编写一个完整的工作网络应用程序。在Netty接下去的章节中还会有更多更相信的信息。我们也鼓励你去重新复习下在 io.netty.example
包下的例子。