grpc是一个跨语言的rpc框架,通过protobuf定义接口和传输的格式,具有高压缩,高性能(基于http/2的多路复用和压缩头)等特性。这里简单介绍一下怎样实现一个传输文件的客户端和服务端模型,代码简单,没怎么封装,有兴趣的同学可以基于这个自己进行改造。
1.环境准备
- 开发工具: idea
- maven: 3.6.1
- jdk8
2.protobuf插件安装
1.2 maven配置
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.25.0</grpc.version>
<protobuf.version>3.5.1</protobuf.version>
<protoc.version>3.5.1-1</protoc.version>
<netty.tcnative.version>2.0.7.Final</netty.tcnative.version>
</properties>
<dependencies>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-alts</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>${grpc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty.tcnative.version}</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-protocol-buffers</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
3.proto文件编写
- 在src/main目录下建一个proto目录。
- 在proto目录下新建一个File.ptoto文件
syntax = "proto3";
package file;
option java_multiple_files = true;
option java_package = "com.demo.grpc.file";
option java_outer_classname = "File";
service FileService {
rpc Upload (Request) returns (Response) {}
}
message Request{
// 定义文件为字节类型
bytes file = 1;
string name = 2;
}
message Response{
int32 code = 1;
string msg = 2;
}
4编译
-
执行插件的protobuf-compile 生成基础代码
-
执行插件的protobuf-compile-custom 生成grpc类
5.编写服务端代码
import com.demo.grpc.file.FileServiceGrpc;
import com.demo.grpc.file.Response;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import io.grpc.ServerBuilder;
/**
* @Author: pengjunming
* @Date:2020/1/7 22:03
* @Description:
*/
public class Server {
private io.grpc.Server server;
private static final int PORT = 8888;
public static void main(String[] args) throws IOException, InterruptedException {
Server server = new Server();
server.start(PORT);
server.await();
}
private void start(int port) throws IOException {
server = ServerBuilder.forPort(port)
.addService(new BasicCalImpl())
.build()
.start();
// 添加钩子,在程序关闭时自动关闭服务端
addHook();
}
private void addHook() {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
System.out.println("监听到JVM停止,正在关闭GRPC服务....");
this.stop();
System.out.println("服务已经停止...");
}));
}
/**
* 关闭服务
*/
public void stop() {
if (server != null) {
server.shutdown();
}
}
public void await() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
static class BasicCalImpl extends FileServiceGrpc.FileServiceImplBase {
@Override
public void upload(com.demo.grpc.file.Request request,
io.grpc.stub.StreamObserver<com.demo.grpc.file.Response> responseObserver) {
byte[] bytes = request.getFile().toByteArray();
System.out.println(String.format("收到文件%s长度%s", request.getName(), bytes.length));
File f = new File("D:/tmp/" + request.getName());
Response response;
if (f.exists()) {
f.delete();
}
try (OutputStream os = new FileOutputStream(f)) {
os.write(bytes);
response = Response.newBuilder().setCode(1).setMsg("上传成功").build();
} catch (IOException e) {
response = Response.newBuilder().setCode(-1).setMsg(String.format("上传失败:%s", e.getMessage())).build();
e.printStackTrace();
}
// 返回数据,完成此次请求
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
}
6.编写客户端代码
import com.google.protobuf.ByteString;
import com.demo.grpc.file.FileServiceGrpc;
import com.demo.grpc.file.Request;
import com.demo.grpc.file.Response;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
/**
* @Author: pengjunming
* @Date:2020/1/8 9:14
* @Description:
*/
public class Client {
private static final String HOST = "127.0.0.1";
private static final int PORT = 8888;
public static void main(String[] args) throws IOException, InterruptedException {
Client client = new Client(HOST, PORT);
client.upload("b.log", "D:/a.txt");
client.shutdown();
}
private ManagedChannel managedChannel;
private FileServiceGrpc.FileServiceBlockingStub blockingStub;
public Client(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true));
}
/**
* 上传文件
* @param name 保存到服务端的文件名
* @param path 要上传的文件路径
* @throws IOException
*/
public void upload(String name, String path) throws IOException {
Request request = Request.newBuilder()
.setName(name)
// 文件 -> 字节码数据 -> ByteString
.setFile(ByteString.copyFrom(getContent(path)))
.build();
Response response;
try {
response = blockingStub.upload(request);
System.out.println(response.getMsg());
} catch (StatusRuntimeException ex) {
}
}
/**
* 关闭客户端
*/
public void shutdown() throws InterruptedException {
managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
Client(ManagedChannelBuilder<?> channelBuilder) {
managedChannel = channelBuilder.build();
blockingStub = FileServiceGrpc.newBlockingStub(managedChannel);
}
public static byte[] getContent(String filePath) throws IOException {
File file = new File(filePath);
long fileSize = file.length();
if (fileSize > Integer.MAX_VALUE) {
return null;
}
FileInputStream fi = new FileInputStream(file);
byte[] buffer = new byte[(int) fileSize];
int offset = 0;
int numRead = 0;
while (offset < buffer.length
&& (numRead = fi.read(buffer, offset, buffer.length - offset)) >= 0) {
offset += numRead;
}
// 确保所有数据均被读取
if (offset != buffer.length) {
throw new IOException("Could not completely read file "
+ file.getName());
}
fi.close();
System.out.println("生成文件长度" + buffer.length);
return buffer;
}
}
7.运行测试
- 先运行服务端
- 在D盘创建a.txt文件,文件内容随意。(可以根据自己的程序修改路径和文件名)
- 运行客户端
- 检查D:/tmp/new_a.txt 文件是否存在并且内容是否跟a.txt一致