1.protobuf
RPC(一般通过SOCKET进行数据传输)
1.定义一个接口说明文件:描述了对象(结构体),对象成员, 接口方法等信息
2.通过RPC框架所提供的编译器, 将接口说明文件编译成具体的语言文件,如java文件, python文件
3.在客户端和服务端分别引入RPC编译器所生成的文件, 即可像调用本地方法一样, 调用远程方法
1.1windows上安装protobuf编译器
https://github.com/protocolbuffers/protobuf/releases
3.示例用法
3.0 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>java-tools</artifactId>
<groupId>com.zy</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<properties>
<org.slf4j.version>1.7.25</org.slf4j.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.16.20</lombok.version>
<protobuf.version>3.9.1</protobuf.version>
</properties>
<artifactId>tools-netty</artifactId>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.48.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<!--slf4j-log4j12包含了log4j依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
</project>
3.1 定义 protoc 文件, 生成代码
syntax = "proto3";
option java_package = "com.zy.netty.netty03";
option java_outer_classname = "DataInfo";
// 这里定义传递多种实例的 protobuf 与 netty 结合的写法
// protobuf 多协议传输的解决方案一
message Message {
enum DataType {
SchoolType = 0;
TeacherType = 1;
StuType = 2;
}
DataType data_type = 1;
oneof dataBody {
School school = 2;
Teacher teacher = 3;
Stu stu = 4;
}
}
message Stu {
string name = 1;
int32 age = 2;
string gender = 3;
}
message Teacher {
string name = 1;
int32 age = 2;
string gender = 3;
}
message School {
string name = 1;
float square = 2;
}
执行 protoc --java_out=src/main/java src/main/resources/proto/Data.proto 命令, 生成代码 com.zy.netty.netty03.DataInfo
3.2 server
package com.zy.netty.netty03;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Server03 {
public static void main(String[] args) {
ServerBootstrap server = new ServerBootstrap();
// server 这里的 boss, worker 都没有设置 nThreads, 走默认值: io.netty.channel.MultithreadEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS
// 1.当 worker 不存在, server.group(boss, boss) 是 Reactor 的单线程模型
// 2.当 worker 存在, boss 的 nThreads == 1 时, server.group(boss, worker) 是 Reactor 的多线程模型
// 3.当 worker 存在, boss 的 nThreads > 1 时, server.group(boss, worker) 是 Reactor 的主从线程模型
NioEventLoopGroup bossGroup = new NioEventLoopGroup(new DefaultThreadFactory("bossGroupExecutor", true));
NioEventLoopGroup workerGroup = new NioEventLoopGroup(new DefaultThreadFactory("bossGroupExecutor", true));
try {
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline()
// ProtobufVarint32FrameDecoder 用于解决粘包拆包问题
.addLast(new ProtobufVarint32FrameDecoder())
// ProtobufDecoder 仅仅支持解码, 不支持半包处理
// 这个解码器定义了要解码的数据类型: 这里限制了只能解码Student.Stu类型, 思考解决方案, 如何通用?
.addLast(new ProtobufDecoder(DataInfo.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new ServerHandler03());
}
});
ChannelFuture channelFuture = server.bind("127.0.0.1", 8099).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error.", e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/**
* 这里讲述下 @Sharable 的作用:
* 1.一般情况下, Server 每与 Client 建立一个连接, 都会建立一个 Channel, 一个 ChannelPipeline
* 在每一个 ChannelPipeline 中都会新建 ChannelHandler, 如: ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
*
* 2.如果要想定义一个全局的 ChannelHandler (单例) 供所有的 Channel 使用, 比如统计全局的 metrics 信息, 则可以:
* ServerHandler03 sharableHandler = new ServerHandler03(); // 这个定义在 server.group 之前
* server.group(boss, worker).
* ... // 这里 加入
* ch.pipeline().addLast(sharableHandler);
*
* 3.需要说明的是: 第2步中的定义的 sharableHandler 必须被 @Sharable 修饰, 否则, 当有第二个 Channel 建立时, 将会报错, 详见:
* io.netty.channel.DefaultChannelPipeline#checkMultiplicity(io.netty.channel.ChannelHandler)
*
* 4.当然, 如果只是每建立一个 Channel, 就 new 一个 ChannelHandler, 则 @Sharable 修饰与否 都不影响
*/
@ChannelHandler.Sharable
private static class ServerHandler03 extends SimpleChannelInboundHandler<DataInfo.Message> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Message msg) throws Exception {
// protobuf 多协议传输的解决方案一
if (msg.getDataType() == DataInfo.Message.DataType.SchoolType) {
System.out.println("server received msg: " + msg.getSchool());
} else if (msg.getDataType() == DataInfo.Message.DataType.TeacherType) {
System.out.println("server received msg: " + msg.getTeacher());
} else if (msg.getDataType() == DataInfo.Message.DataType.StuType) {
System.out.println("server received msg: " + msg.getStu());
}
}
}
}
3.3 client
package com.zy.netty.netty03;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
@Slf4j
public class Client03 {
public static void main(String[] args) {
Bootstrap client = new Bootstrap();
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try {
client.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(DataInfo.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new ClientHandler03());
}
});
ChannelFuture channelFuture = client.connect("127.0.0.1", 8099).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error.", e);
} finally {
nioEventLoopGroup.shutdownGracefully();
}
}
private static class ClientHandler03 extends SimpleChannelInboundHandler<DataInfo.Message> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Message msg) throws Exception {
System.out.println("client receive msg: " + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// protobuf 多协议传输的解决方案一
DataInfo.Message message;
int i = new Random().nextInt(3);
switch (i) {
case 0:
message = DataInfo.Message.newBuilder().setDataType(DataInfo.Message.DataType.SchoolType)
.setSchool(DataInfo.School.newBuilder().setName("nanjingjinlingzhognxue").setSquare(800).build())
.build();
break;
case 1:
message = DataInfo.Message.newBuilder().setDataType(DataInfo.Message.DataType.TeacherType)
.setTeacher(DataInfo.Teacher.newBuilder().setName("tom").setAge(30).build())
.build();
break;
default:
message = DataInfo.Message.newBuilder().setDataType(DataInfo.Message.DataType.StuType)
.setStu(DataInfo.Stu.newBuilder().setName("jerry").setAge(10).build())
.build();
}
ctx.writeAndFlush(message);
}
}
}
参考资料
https://developers.google.com/protocol-buffers/docs/proto3
https://github.com/protocolbuffers/protobuf
https://www.jianshu.com/p/506667a6651f
https://blog.csdn.net/u011518120/article/details/54604615