google grpc中定义了四种调用方式,分别是
- 一元RPC:rpc客户端发送一个请求,服务端返回一个响应。
- 服务器流RPC:rpc客户端发送一个请求,服务端会返回一个流式数据。
- 客户端流式RPC:rpc客户端发送一个流式数据,服务端返回一个对象。
- 双向流RPC:rpc客户端发送一个流式数据,服务端返回一个流式数据。(实现了类似于TCP的长连接方式)
在快速入门中我们写了一个一元调用方式(即客户端发送一个请求,服务端返回一个响应),本篇介绍一下剩余的三种方式。
服务器流RPC
客户端发送一个对象服务端返回一个Stream(流式消息)
定义一个.proto文件
syntax = "proto3";
package com.zhihao.miao.proto;
option java_package = "com.zhihao.miao.proto";
option java_outer_classname ="StudentProto";
//将属性生成多个文件,便于代码管理
option java_multiple_files = true;
//定义rpc方法
service StudentService{
//客户端是一个数据对象,而返回的是一个流对象,GPC规范中规定入参和出参必须是Message类型的
rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse){}
}
message StudentRequest{
int32 age = 1;
}
message StudentResponse{
string name = 1;
int32 age = 2;
string city = 3;
}
使用gradle插件编译:
gradle generateProto
将生成的代码StudentProto
,StudentRequest
,StudentRequestOrBuilder
,
StudentResponse
,StudentResponseOrBuilder
,StudentServiceGrpc
放置到src/main下指定的包下:
编写服务端接口实现
import com.zhihao.miao.proto.StudentRequest;
import com.zhihao.miao.proto.StudentResponse;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.stub.StreamObserver;
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase{
@Override
public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
System.out.println("接收到客户端信息:"+request.getAge());
responseObserver.onNext(StudentResponse.newBuilder().setName("张三").setAge(20).setCity("北京").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("李四").setAge(30).setCity("天津").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("王五").setAge(40).setCity("成都").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("苗志浩").setAge(50).setCity("苏州").build());
responseObserver.onCompleted();
}
}
编写服务器代码
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class GrpcServer {
private Server server;
//编写服务启动方法
private void start() throws IOException{
//增加实际业务代码的实列,如果使用spring的话可以直接将StudentServiceImpl对象注入进来
this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();
System.out.println("server start!");
//jvm回调钩子的作用,Runtime.getRuntime()可以获得java一些运行时间的一些信息。
//不管程序是正常关闭还是异常终端,在jvm关闭的时候做一些清理工作
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("关闭jvm");
GrpcServer.this.stop();
System.out.println("服务关闭");
}));
System.out.println("执行到这里");
}
//停止服务器
private void stop(){
if(null != this.server){
this.server.shutdown();
}
}
//服务器阻塞
private void blockUntilShutdown() throws InterruptedException {
if(null != this.server){
this.server.awaitTermination();
//this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
}
public static void main(String[] args) throws IOException,InterruptedException{
GrpcServer server = new GrpcServer();
server.start();;
server.blockUntilShutdown();
}
}
编写客户端调用
import com.zhihao.miao.proto.StudentRequest;
import com.zhihao.miao.proto.StudentResponse;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.Iterator;
public class GrpcClient {
public static void main(String[] args) throws Exception {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
//使用TLS来连接服务端
.usePlaintext(true).build();
//定义一个阻塞的stub
StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel);
Iterator<StudentResponse> iterator = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());
while (iterator.hasNext()) {
StudentResponse studentResponse = iterator.next();
System.out.println(studentResponse.getName() + ", " + studentResponse.getAge() + ", " + studentResponse.getCity());
}
}
}
启动服务端和客户端代码:
服务端控制台打印:
server start!
执行到这里
接收到客户端信息:20
客户端打印:
张三, 20, 北京
李四, 30, 天津
王五, 40, 成都
苗志浩, 50, 苏州
Process finished with exit code 0
发现服务器端流式数据意味着服务器返回的对象是集合或者迭代器(把多个对象一个个返回给客户端)
客户端流式RPC
客户端发送一个Stream(流式消息)服务端返回一个对象。
定义一个.proto文件
package com.zhihao.miao.proto;
option java_package = "com.zhihao.miao.proto";
option java_outer_classname ="StudentProto";
//将属性生成多个文件,便于代码管理
option java_multiple_files = true;
//定义rpc方法
service StudentService{
//客户端是一个流,而返回值是一个对象。
rpc GetStudentsWapperByAges(stream StudentRequest) returns (StudentResponseList) {}
}
message StudentRequest{
int32 age = 1;
}
message StudentResponse{
string name = 1;
int32 age = 2;
string city = 3;
}
//定义个StudentResponse的List
message StudentResponseList{
repeated StudentResponse studentResponse = 1;
}
使用gradle插件编译:
gradle generateProto
将生成的代码放置到src/main下指定的包下:
编写服务端接口实现
import com.zhihao.miao.proto.StudentRequest;
import com.zhihao.miao.proto.StudentResponse;
import com.zhihao.miao.proto.StudentResponseList;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.stub.StreamObserver;
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase{
@Override
public StreamObserver<StudentRequest> getStudentsWapperByAges(StreamObserver<StudentResponseList> responseObserver) {
return new StreamObserver<StudentRequest>() {
//表示服务端接收客户端的一个message,请求每到来了就调用一次
@Override
public void onNext(StudentRequest value) {
System.out.println("onNext: "+value.getAge());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
//服务器端接收完客户端的所有messages
@Override
public void onCompleted() {
//客户端接收的是一个List集合的StudentResponse,所有构建了二个StudentResponse
StudentResponse studentResponse1 = StudentResponse.newBuilder().setName("张三").setAge(20).setCity("西安").build();
StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("李四").setAge(30).setCity("苏州").build();
StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse1)
.addStudentResponse(studentResponse2).build();
responseObserver.onNext(studentResponseList);
responseObserver.onCompleted();
}
};
}
}
接口实现编写一个StreamObserver的实现,并且重写这些方法的的各个阶段的回调,因为这个接口返回的是一个List集合,所以我在onCompleted方法中返回了一个StudentResponse的集合。
编写服务器端代码:
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class GrpcServer {
private Server server;
//编写服务启动方法
private void start() throws IOException{
//增加实际业务代码的实列,如果使用spring的话可以直接将StudentServiceImpl对象注入进来
this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();
System.out.println("server start!");
//jvm回调钩子的作用,Runtime.getRuntime()可以获得java一些运行时间的一些信息。
//不管程序是正常关闭还是异常终端,在jvm关闭的时候做一些清理工作
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("关闭jvm");
GrpcServer.this.stop();
System.out.println("服务关闭");
}));
System.out.println("执行到这里");
}
//停止服务器
private void stop(){
if(null != this.server){
this.server.shutdown();
}
}
//服务器阻塞
private void blockUntilShutdown() throws InterruptedException {
if(null != this.server){
this.server.awaitTermination();
//this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
}
public static void main(String[] args) throws IOException,InterruptedException{
GrpcServer server = new GrpcServer();
server.start();;
server.blockUntilShutdown();
}
}
编写客户端调用
import com.zhihao.miao.proto.StudentRequest;
import com.zhihao.miao.proto.StudentResponseList;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
public class GrpcClient {
public static void main(String[] args) throws Exception {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
//使用TLS来连接服务端
.usePlaintext(true).build();
//构建请求的回调对象
StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
//服务器端给客户端返回接口的各阶段回调方法
@Override
public void onNext(StudentResponseList value) {
value.getStudentResponseList().forEach(studentResponse -> {
System.out.println("name="+studentResponse.getName()+",age="+studentResponse.getAge()+
",city="+studentResponse.getCity());
System.out.println("*********************");
});
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("completed!");
}
};
/**
* 构建客户端向服务器发送的数据
* 只要客户端是以流式向服务端发送请求,那么一定要通过异步的形式进行调用,StudentServiceBlockingStub是同步的,所以不行
*
*
*/
//获取一个异步的stub
StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc.newStub(managedChannel);
//将定义的服务器回调作为参数传递进去
StreamObserver<StudentRequest> studentRequestStreamObserver = stub.getStudentsWapperByAges(studentResponseListStreamObserver);
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(50).build());
studentRequestStreamObserver.onCompleted();
//因为是异步调用getStudentsWapperByAges,流程继续往下走,所以这时候需要休眠一段时间才能看到结果
Thread.sleep(2000);
}
}
执行服务器端客户端,服务器端控制台打印:
server start!
执行到这里
onNext: 20
onNext: 30
onNext: 40
onNext: 50
客户端控制台打印,
name=张三,age=20,city=西安
*********************
name=李四,age=30,city=苏州
*********************
completed!
Process finished with exit code 0
只要客户端是以流式向服务端发送请求,那么一定要通过异步的形式进行调用,StudentServiceBlockingStub
是同步的,所以不行,我们这边使用了 StudentServiceGrpc.StudentServiceStub
(异步Stub)。也很好理解,因为我们客户端是以流式数据发送给客户端,那么如果是使用阻塞的Stub表示等客户端流式数据发送完毕之后客户端才响应很明显会造成资源阻塞。
双向流RPC
类似WebSocket的长连接TCP编程。
定义一个.proto文件
syntax = "proto3";
package com.zhihao.miao.proto;
option java_package = "com.zhihao.miao.proto";
option java_outer_classname ="StudentProto";
//将属性生成多个文件,便于代码管理
option java_multiple_files = true;
//定义rpc方法
service StudentService{
//双向流
rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}
}
message StreamRequest{
string request_info = 1;
}
message StreamResponse{
string response_info = 1;
}
使用gradle插件编译:
gradle generateProto
将生成的代码放置到src/main下指定的包下
编写服务端接口实现
import com.zhihao.miao.proto.StreamRequest;
import com.zhihao.miao.proto.StreamResponse;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.stub.StreamObserver;
import java.util.UUID;
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase{
@Override
public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
return new StreamObserver<StreamRequest>() {
//表示服务端接收客户端的一个message,请求每到来了就调用一次
@Override
public void onNext(StreamRequest value) {
System.out.println(value.getRequestInfo());
//向客户端返回数据,向客户端返回一个UUID
responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
//双向流服务端也要关闭流
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
编写服务器端代码:
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class GrpcServer {
private Server server;
//编写服务启动方法
private void start() throws IOException{
//增加实际业务代码的实列,如果使用spring的话可以直接将StudentServiceImpl对象注入进来
this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();
System.out.println("server start!");
//jvm回调钩子的作用,Runtime.getRuntime()可以获得java一些运行时间的一些信息。
//不管程序是正常关闭还是异常终端,在jvm关闭的时候做一些清理工作
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("关闭jvm");
GrpcServer.this.stop();
System.out.println("服务关闭");
}));
System.out.println("执行到这里");
}
//停止服务器
private void stop(){
if(null != this.server){
this.server.shutdown();
}
}
//服务器阻塞
private void blockUntilShutdown() throws InterruptedException {
if(null != this.server){
this.server.awaitTermination();
//this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
}
public static void main(String[] args) throws IOException,InterruptedException{
GrpcServer server = new GrpcServer();
server.start();;
server.blockUntilShutdown();
}
}
编写客户端调用
import com.zhihao.miao.proto.StreamRequest;
import com.zhihao.miao.proto.StreamResponse;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.time.LocalDateTime;
public class GrpcClient {
public static void main(String[] args) throws Exception {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
//使用TLS来连接服务端
.usePlaintext(true).build();
//获取一个异步的stub
StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc.newStub(managedChannel);
StreamObserver<StreamRequest> requestStreamObserver = stub.biTalk(new StreamObserver<StreamResponse>() {
//收到服务器的返回结果
@Override
public void onNext(StreamResponse value) {
System.out.println(value.getResponseInfo());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompled!");
}
});
for (int i = 0; i <10 ; i++) {
//客户端以流式的形式向服务器发送数据
requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
Thread.sleep(1000);
}
Thread.sleep(5000);
}
}
启动服务器端和客户端发,服务器控制台打印:
server start!
执行到这里
2017-10-21T10:09:32.111
2017-10-21T10:09:33.115
2017-10-21T10:09:34.121
2017-10-21T10:09:35.126
2017-10-21T10:09:36.130
2017-10-21T10:09:37.135
2017-10-21T10:09:38.140
2017-10-21T10:09:39.144
2017-10-21T10:09:40.147
2017-10-21T10:09:41.152
CANCELLED
客户端控制台打印:
84763283-f33d-44f6-8daf-dbfd31f180c1
ae5586ff-5384-4ba7-ae52-a3a291c5d447
1cdcf7fc-8154-4dc7-9e71-fe2e5d5394f3
edc1e62b-af98-455f-a67a-3e9ec8ca386c
f977cd98-1ca9-44db-a760-5169eafa7e05
d6a1fddb-9c19-4953-93bb-b6c555c5d268
753a119b-76c1-4417-8dde-af482a6bfd05
56807302-c5ca-43f9-b60d-49c552ea483e
10398836-7e8e-4244-aaab-4ec202495185
03e66e5b-9506-4dfe-976d-6a626146e5ff
Process finished with exit code 0
问题
我们上面的所有demo的开发中发现一些问题
- 每次生成自动化代码的时候都要去将生成的代码复制代码到指定的目录下,并不是很好,怎么去解决呢?
- 每次
gradle generateProto
之后把相关的代码复制到指定的目录,再去执行gradle build
之后会出现构建失败,因为src/main
目录下和自动生成的代码类重复。将build下的generated目录下代码删除之后,执行gradle build
之后还会报构建失败,我们猜测执行gradle build
的时候先去执行了gradle generateProto
,再去执行了构建代码所以也失败了。
我们在思考如果能将gradle generateProto
生成的代码直接生成到指定的src/main下面的目录下那么不就是直接解决了。
之前使用gradle generateProto生成的目录结构
我们配置的gradle插件如下的时候就可以了
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.2.0"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.4.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {
outputSubDir = "java"
}
}
}
generatedFilesBaseDir = "$projectDir/src"
}
generatedFilesBaseDir = "$projectDir/src"
的作用就是将之前自动生成代码的目录$buildDir/generated/source/proto
设置成当前项目下的src下而outputSubDir = "java"
就是将之前的grpc目录下的代码也自动改生成到src/main/java
下。