上篇文章讲解了自定义通信协议,本章节介绍如何支持多种协议。
会构建一个Server,同时支持Cat,Dog和People通信协议。有二种实现方式:
- 第一种方式利用了自定义协议,传递消息的时候,对消息的前几位(比如2位)进行自定义的位置(比如AB)解码器解析的时候前二位为AB表示一种协议类型,CD一种协议类型。这种方式没有利用protobuf,而是直接使用Netty自定义协议来解决的方案。
- 第二种方式使用protobuf来实现,实际上是对消息的定义方式进行规定,因为netty本身,客户端和服务器端建立的是一条TCP连接,一方必须要判断对方发送过来的对象是什么类型。
Protocol Buffers实现netty的多种传输协议
我们知道使用Protocol Buffers首先定义一个.proto文件
定义一个最外层的消息,最外层的消息(MyMessage)包含了所有传递的消息类型,所有的消息类型嵌套在最外层的消息类型中,每次传递都将传递具体消息类型(以最外层消息类型的枚举类型传递)
syntax ="proto2";
package com.zhihao.miao.netty.sixthexample;
option optimize_for = SPEED;
option java_package = "com.zhihao.miao.netty.seventhexample";
option java_outer_classname="MyDataInfo";
message MyMessage {
enum DataType{
PeopleType = 1;
DogType = 2;
CatType = 3;
}
required DataType data_type = 1;
//oneof的意思:如果有多个可选字段,在某一个时刻只能只有一个值被设置,可以节省内存空间
oneof dataBody {
People people = 2;
Dog dog = 3;
Cat cat = 4;
}
}
message People{
optional string name = 1;
optional int32 age = 2;
optional string address = 3;
}
message Dog{
optional string name = 1;
optional string age = 2;
}
message Cat{
optional string name = 1;
optional string city = 2;
}
使用编译器编译生成代码
protoc --java_out=src/main/java src/protobuf/People.proto
关于proto协议中的Oneof含义,如果有多个可选字段,在某一个时刻只能只有一个值被设置,官方链接,生成MyDataInfo类,类代码太多,这边不贴出了
服务端代码:
package com.zhihao.miao.netty.seventhexample;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class TestServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup wokerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}
}
}
服务端初始化链接:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestServerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//使用最外层的消息实例
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}
其实实现的关键就在于此,使用MyDataInfo.MyMessage
实列(MyDataInfo.MyMessage
是枚举类型),而我们定义的三种对象刚好就是其枚举对象
自定义的服务端的Handler,根据通道中传递数据的不同dataType值来解析程具体的类型:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if(dataType == MyDataInfo.MyMessage.DataType.PeopleType){
MyDataInfo.People people = msg.getPeople();
System.out.println(people.getName());
System.out.println(people.getAge());
System.out.println(people.getAddress());
}else if(dataType == MyDataInfo.MyMessage.DataType.DogType){
MyDataInfo.Dog dog = msg.getDog();
System.out.println(dog.getName());
System.out.println(dog.getAge());
}else if(dataType == MyDataInfo.MyMessage.DataType.CatType){
MyDataInfo.Cat cat = msg.getCat();
System.out.println(cat.getName());
System.out.println(cat.getCity());
}
}
}
客户端代码:
package com.zhihao.miao.netty.seventhexample;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TestClient {
public static void main(String[] args) throws Exception{
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost",8888).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
客户端的初始化链接:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//使用最外层的消息实例
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestClientHandler());
}
}
自定义处理器端的handler,随机发送不同协议的数据:
package com.zhihao.miao.netty.seventhexample;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Random;
public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
}
//客户端像服务器端发送数据
public void channelActive(ChannelHandlerContext ctx) throws Exception {
int randomInt = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if(0 == randomInt){
myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(MyDataInfo.MyMessage.DataType.PeopleType).
setPeople(MyDataInfo.People.newBuilder().setName("张三").
setAddress("上海").setAge(26).build()).build();
}else if(1 == randomInt){
myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(MyDataInfo.MyMessage.DataType.DogType).
setDog(MyDataInfo.Dog.newBuilder().setName("旺财")
.setAge("2").build()).build();
}else if(2 == randomInt){
myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(MyDataInfo.MyMessage.DataType.CatType).
setCat(MyDataInfo.Cat.newBuilder().setName("汤姆")
.setCity("上海").build()).build();
}
ctx.channel().writeAndFlush(myMessage);
}
}
启动服务器端,然后启动客户端多执行几次,服务器的控制台显示:
七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x82a26e9f, L:/127.0.0.1:8888 - R:/127.0.0.1:51777]
七月 05, 2017 10:10:37 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
汤姆
上海
七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x128da3e7, L:/127.0.0.1:8888 - R:/127.0.0.1:52049]
七月 05, 2017 10:11:38 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
张三
26
上海
七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xa8220c73, L:/127.0.0.1:8888 - R:/127.0.0.1:52097]
七月 05, 2017 10:11:49 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
汤姆
上海
七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x9ac52ec1, L:/127.0.0.1:8888 - R:/127.0.0.1:52125]
七月 05, 2017 10:11:55 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
张三
26
上海
七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x797d03b6, L:/127.0.0.1:8888 - R:/127.0.0.1:52178]
七月 05, 2017 10:12:07 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd5f957bd, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
旺财
2
使用netty实现多种传输协议
官网类似的demo,自己写了很长也参考了官网才写出这个demo,对netty的理解又加深了:
三种协议实体类:
Person协议
package com.zhihao.miao.test.day10;
public class Person {
private String username;
private int age;
//get set方法
}
Dog协议
package com.zhihao.miao.test.day10;
public class Dog {
private String name;
private String age;
//get set方法
}
Cat协议
package com.zhihao.miao.test.day10;
public class Cat {
private String name;
private String city;
//get set方法
}
服务端:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class MultiServer {
public static void main(String args[]) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 指定socket的一些属性
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定是一个NIO连接通道
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerChannelInitializer());
// 绑定对应的端口号,并启动开始监听端口上的连接
Channel ch = serverBootstrap.bind(8899).sync().channel();
// 等待关闭,同步端口
ch.closeFuture().sync();
}
}
服务器端初始化lInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//解析handler
pipeline.addLast(new ServlerDecoder());
pipeline.addLast(new TestServerHandler());
}
}
服务端解码器Handler,如果解析的位置数据是0则按照 Person协议进行解码,如果传递的位置数据是1,则按照Dog协议进行解码,如果传递的位置数据是2,则按照Cat协议进行解码:
public class ServlerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int flag = in.readInt();
if(flag == 0){
int usernamelength = in.readInt();
byte[] usernamebys = new byte[usernamelength];
in.readBytes(usernamebys);
String username = new String(usernamebys);
int age = in.readInt();
Person pserson = new Person();
pserson.setUsername(username);
pserson.setAge(age);
out.add(pserson);
}
if(flag ==1){
int namelength =in.readInt();
byte[] namebys = new byte[namelength];
in.readBytes(namebys);
String name = new String(namebys);
byte[] agebys = new byte[in.readableBytes()];
in.readBytes(agebys);
String age = new String(agebys);
Dog dog = new Dog();
dog.setName(name);
dog.setAge(age);
out.add(dog);
}
if(flag ==2){
int namelength = in.readInt();
byte[] nameByte = new byte[namelength];
in.readBytes(nameByte);
String name = new String(nameByte);
byte[] colorbys = new byte[in.readableBytes()];
in.readBytes(colorbys);
String color = new String(colorbys);
Cat cat = new Cat();
cat.setName(name);
cat.setColor(color);
out.add(cat);
}
}
自定义服务器端Handler:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TestServerHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof Person){
System.out.println(((Person) msg).getUsername());
System.out.println(((Person) msg).getAge());
}
if(msg instanceof Dog){
System.out.println(((Dog) msg).getName());
System.out.println(((Dog) msg).getAge());
}
if(msg instanceof Cat){
System.out.println(((Cat) msg).getName());
System.out.println(((Cat) msg).getColor());
}
}
}
客户端:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MultiClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ClientChannelInitializer());
// Start the connection attempt.
Channel ch = b.connect("127.0.0.1", 8899).sync().channel();
ch.flush();
}
}
客户端初始化Initializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import java.util.Random;
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
int randomInt = new Random().nextInt(3);
/**
* 编码动作,如果随机参数是1,则传输Person协议,如果随机参数是2,则传递Dog协议,
* 如果随机参数是3,则传递Cat协议
*
* Person协议就是传递一个标识位为0,然后将Person对象编码成二进制传输
* Dog协议传递一个标识位为1,然后将Dog对象编码成二进制进行传输
* Cat协议传递一个标识为2,然后将Cat对象编码成二进制进行传输
*/
if(0 == randomInt){
pipeline.addLast(new PersonEncoder());
Person person = new Person();
person.setUsername("zhihao");
person.setAge(27);
pipeline.addLast(new TestClientHandler(person));
}
if(1 == randomInt){
pipeline.addLast(new DogEncoder());
Dog dog = new Dog();
dog.setName("wangcai");
dog.setAge("2");
pipeline.addLast(new TestClientHandler(dog));
}
if(2 == randomInt){
pipeline.addLast(new CatEncoder());
Cat cat = new Cat();
cat.setName("maomi");
cat.setColor("yellow");
pipeline.addLast(new TestClientHandler(cat));
}
}
}
三种自定义编码协议,与服务器端进行对应传输Person数据的时候,在Person数据之前加上标识位置数据0,在Dog数据之前加上标识位置数据1,在Cat数据之前加上标识位置数据2,然后将其与本身的数据一起编码成二进制进行传输。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class PersonEncoder extends MessageToByteEncoder<Person> {
@Override
protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {
String username = msg.getUsername();
int usernamelength = username.length();
int age = msg.getAge();
out.writeInt(0); //标识位
out.writeInt(usernamelength);
out.writeBytes(username.getBytes());
out.writeInt(age);
}
}
Dog协议编码器
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class DogEncoder extends MessageToByteEncoder<Dog> {
@Override
protected void encode(ChannelHandlerContext ctx, Dog msg, ByteBuf out) throws Exception {
String name = msg.getName();
int namelength = name.length();
String age = msg.getAge();
out.writeInt(1); //标识位
out.writeInt(namelength);
out.writeBytes(name.getBytes());
out.writeBytes(age.getBytes());
}
}
Cat协议编码器:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class CatEncoder extends MessageToByteEncoder<Cat> {
@Override
protected void encode(ChannelHandlerContext ctx, Cat msg, ByteBuf out) throws Exception {
String name = msg.getName();
int namelength = name.length();
String color = msg.getColor();
out.writeInt(2); //标识位
out.writeInt(namelength);
out.writeBytes(name.getBytes());
out.writeBytes(color.getBytes());
}
}
自定义客户端处理器:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TestClientHandler extends ChannelInboundHandlerAdapter {
private Person person;
private Cat cat;
private Dog dog;
public TestClientHandler(Person person){
this.person = person;
}
public TestClientHandler(Dog dog){
this.dog = dog;
}
public TestClientHandler(Cat cat){
this.cat =cat;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if(person != null){
ctx.channel().writeAndFlush(person);
}
if(dog != null){
ctx.channel().writeAndFlush(dog);
}
if(cat != null){
ctx.channel().writeAndFlush(cat);
}
}
}
启动服务端,再多次启动客户端,服务器控制台打印出不同协议传输的结果
maomi
yellow
十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0xf40f7b07, L:/127.0.0.1:8899 - R:/127.0.0.1:57879]
十月 15, 2017 4:33:43 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
wangcai
2
十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x3384f158, L:/127.0.0.1:8899 - R:/127.0.0.1:57914]
十月 15, 2017 4:33:48 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x930eab24, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
zhihao
27