google grpc 其他三种调用方式

google grpc中定义了四种调用方式,分别是

  • 一元RPC:rpc客户端发送一个请求,服务端返回一个响应。
  • 服务器流RPC:rpc客户端发送一个请求,服务端会返回一个流式数据。
  • 客户端流式RPC:rpc客户端发送一个流式数据,服务端返回一个对象。
  • 双向流RPC:rpc客户端发送一个流式数据,服务端返回一个流式数据。(实现了类似于TCP的长连接方式)

在快速入门中我们写了一个一元调用方式(即客户端发送一个请求,服务端返回一个响应),本篇介绍一下剩余的三种方式。

服务器流RPC

客户端发送一个对象服务端返回一个Stream(流式消息)

定义一个.proto文件

syntax = "proto3";

package com.zhihao.miao.proto;

option java_package = "com.zhihao.miao.proto";
option java_outer_classname ="StudentProto";
//将属性生成多个文件,便于代码管理
option java_multiple_files = true;

//定义rpc方法
service StudentService{

    //客户端是一个数据对象,而返回的是一个流对象,GPC规范中规定入参和出参必须是Message类型的
    rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse){}

}

message StudentRequest{
    int32 age = 1;
}

message StudentResponse{
    string name = 1;
    int32 age = 2;
    string city = 3;
}

使用gradle插件编译:

gradle generateProto 

将生成的代码StudentProto,StudentRequest,StudentRequestOrBuilder,
StudentResponse,StudentResponseOrBuilder,StudentServiceGrpc放置到src/main下指定的包下:

编写服务端接口实现

import com.zhihao.miao.proto.StudentRequest;
import com.zhihao.miao.proto.StudentResponse;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.stub.StreamObserver;

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase{

    @Override
    public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
        System.out.println("接收到客户端信息:"+request.getAge());

        responseObserver.onNext(StudentResponse.newBuilder().setName("张三").setAge(20).setCity("北京").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("李四").setAge(30).setCity("天津").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("王五").setAge(40).setCity("成都").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("苗志浩").setAge(50).setCity("苏州").build());

        responseObserver.onCompleted();
    }
}

编写服务器代码

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class GrpcServer {

    private Server server;

    //编写服务启动方法
    private void start() throws IOException{
        //增加实际业务代码的实列,如果使用spring的话可以直接将StudentServiceImpl对象注入进来
        this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();

        System.out.println("server start!");

        //jvm回调钩子的作用,Runtime.getRuntime()可以获得java一些运行时间的一些信息。
        //不管程序是正常关闭还是异常终端,在jvm关闭的时候做一些清理工作
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("关闭jvm");
            GrpcServer.this.stop();
            System.out.println("服务关闭");
        }));

        System.out.println("执行到这里");
    }

    //停止服务器
    private void stop(){
        if(null != this.server){
            this.server.shutdown();
        }
    }

    //服务器阻塞
    private void blockUntilShutdown() throws InterruptedException {
        if(null != this.server){
            this.server.awaitTermination();
            //this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
        }
    }

    public static void main(String[] args) throws IOException,InterruptedException{
        GrpcServer server = new GrpcServer();

        server.start();;
        server.blockUntilShutdown();
    }
}

编写客户端调用

import com.zhihao.miao.proto.StudentRequest;
import com.zhihao.miao.proto.StudentResponse;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.util.Iterator;

public class GrpcClient {

    public static void main(String[] args) throws Exception {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
                //使用TLS来连接服务端
                .usePlaintext(true).build();

        //定义一个阻塞的stub
        StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel);


        Iterator<StudentResponse> iterator = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());
        while (iterator.hasNext()) {
            StudentResponse studentResponse = iterator.next();

            System.out.println(studentResponse.getName() + ", " + studentResponse.getAge() + ", " + studentResponse.getCity());
        }

    }
}

启动服务端和客户端代码:
服务端控制台打印:

server start!
执行到这里
接收到客户端信息:20

客户端打印:

张三, 20, 北京
李四, 30, 天津
王五, 40, 成都
苗志浩, 50, 苏州

Process finished with exit code 0

发现服务器端流式数据意味着服务器返回的对象是集合或者迭代器(把多个对象一个个返回给客户端)

客户端流式RPC

客户端发送一个Stream(流式消息)服务端返回一个对象。

定义一个.proto文件

package com.zhihao.miao.proto;

option java_package = "com.zhihao.miao.proto";
option java_outer_classname ="StudentProto";
//将属性生成多个文件,便于代码管理
option java_multiple_files = true;

//定义rpc方法
service StudentService{

    //客户端是一个流,而返回值是一个对象。
    rpc GetStudentsWapperByAges(stream StudentRequest) returns (StudentResponseList) {}

}


message StudentRequest{
    int32 age = 1;
}

message StudentResponse{
    string name = 1;
    int32 age = 2;
    string city = 3;
}

//定义个StudentResponse的List
message StudentResponseList{
    repeated StudentResponse studentResponse = 1;
}

使用gradle插件编译:

gradle generateProto 

将生成的代码放置到src/main下指定的包下:

编写服务端接口实现

import com.zhihao.miao.proto.StudentRequest;
import com.zhihao.miao.proto.StudentResponse;
import com.zhihao.miao.proto.StudentResponseList;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.stub.StreamObserver;

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase{

    @Override
    public StreamObserver<StudentRequest> getStudentsWapperByAges(StreamObserver<StudentResponseList> responseObserver) {
        return new StreamObserver<StudentRequest>() {
            //表示服务端接收客户端的一个message,请求每到来了就调用一次
            @Override
            public void onNext(StudentRequest value) {
                System.out.println("onNext: "+value.getAge());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            //服务器端接收完客户端的所有messages
            @Override
            public void onCompleted() {
                //客户端接收的是一个List集合的StudentResponse,所有构建了二个StudentResponse
                StudentResponse studentResponse1 = StudentResponse.newBuilder().setName("张三").setAge(20).setCity("西安").build();

                StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("李四").setAge(30).setCity("苏州").build();

                StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse1)
                        .addStudentResponse(studentResponse2).build();

                responseObserver.onNext(studentResponseList);
                responseObserver.onCompleted();
            }
        };
    }
}

接口实现编写一个StreamObserver的实现,并且重写这些方法的的各个阶段的回调,因为这个接口返回的是一个List集合,所以我在onCompleted方法中返回了一个StudentResponse的集合。

编写服务器端代码:

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class GrpcServer {

    private Server server;

    //编写服务启动方法
    private void start() throws IOException{
        //增加实际业务代码的实列,如果使用spring的话可以直接将StudentServiceImpl对象注入进来
        this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();

        System.out.println("server start!");

        //jvm回调钩子的作用,Runtime.getRuntime()可以获得java一些运行时间的一些信息。
        //不管程序是正常关闭还是异常终端,在jvm关闭的时候做一些清理工作
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("关闭jvm");
            GrpcServer.this.stop();
            System.out.println("服务关闭");
        }));

        System.out.println("执行到这里");
    }

    //停止服务器
    private void stop(){
        if(null != this.server){
            this.server.shutdown();
        }
    }

    //服务器阻塞
    private void blockUntilShutdown() throws InterruptedException {
        if(null != this.server){
            this.server.awaitTermination();
            //this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
        }
    }

    public static void main(String[] args) throws IOException,InterruptedException{
        GrpcServer server = new GrpcServer();

        server.start();;
        server.blockUntilShutdown();
    }
}

编写客户端调用

import com.zhihao.miao.proto.StudentRequest;
import com.zhihao.miao.proto.StudentResponseList;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

public class GrpcClient {

    public static void main(String[] args) throws Exception {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
                //使用TLS来连接服务端
                .usePlaintext(true).build();

        //构建请求的回调对象
        StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
            //服务器端给客户端返回接口的各阶段回调方法
            @Override
            public void onNext(StudentResponseList value) {
                value.getStudentResponseList().forEach(studentResponse -> {
                    System.out.println("name="+studentResponse.getName()+",age="+studentResponse.getAge()+
                            ",city="+studentResponse.getCity());
                    System.out.println("*********************");
                });
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("completed!");
            }
        };


        /**
         * 构建客户端向服务器发送的数据
         * 只要客户端是以流式向服务端发送请求,那么一定要通过异步的形式进行调用,StudentServiceBlockingStub是同步的,所以不行
         *
         *
         */
        //获取一个异步的stub
        StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc.newStub(managedChannel);

        //将定义的服务器回调作为参数传递进去
        StreamObserver<StudentRequest> studentRequestStreamObserver = stub.getStudentsWapperByAges(studentResponseListStreamObserver);


        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(50).build());


        studentRequestStreamObserver.onCompleted();

        //因为是异步调用getStudentsWapperByAges,流程继续往下走,所以这时候需要休眠一段时间才能看到结果
        Thread.sleep(2000);

    }
}

执行服务器端客户端,服务器端控制台打印:

server start!
执行到这里
onNext: 20
onNext: 30
onNext: 40
onNext: 50

客户端控制台打印,

name=张三,age=20,city=西安
*********************
name=李四,age=30,city=苏州
*********************
completed!

Process finished with exit code 0

只要客户端是以流式向服务端发送请求,那么一定要通过异步的形式进行调用,StudentServiceBlockingStub是同步的,所以不行,我们这边使用了 StudentServiceGrpc.StudentServiceStub(异步Stub)。也很好理解,因为我们客户端是以流式数据发送给客户端,那么如果是使用阻塞的Stub表示等客户端流式数据发送完毕之后客户端才响应很明显会造成资源阻塞。

双向流RPC

类似WebSocket的长连接TCP编程。

定义一个.proto文件

syntax = "proto3";

package com.zhihao.miao.proto;

option java_package = "com.zhihao.miao.proto";
option java_outer_classname ="StudentProto";
//将属性生成多个文件,便于代码管理
option java_multiple_files = true;

//定义rpc方法
service StudentService{

    //双向流
    rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}

}


message StreamRequest{
    string request_info = 1;
}

message StreamResponse{
    string response_info = 1;
}

使用gradle插件编译:

gradle generateProto
将生成的代码放置到src/main下指定的包下

编写服务端接口实现

import com.zhihao.miao.proto.StreamRequest;
import com.zhihao.miao.proto.StreamResponse;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.stub.StreamObserver;

import java.util.UUID;

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase{

    @Override
    public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
        return new StreamObserver<StreamRequest>() {
            //表示服务端接收客户端的一个message,请求每到来了就调用一次
            @Override
            public void onNext(StreamRequest value) {
                System.out.println(value.getRequestInfo());

                //向客户端返回数据,向客户端返回一个UUID
                responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            //双向流服务端也要关闭流
            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
}

编写服务器端代码:

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class GrpcServer {

    private Server server;

    //编写服务启动方法
    private void start() throws IOException{
        //增加实际业务代码的实列,如果使用spring的话可以直接将StudentServiceImpl对象注入进来
        this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();

        System.out.println("server start!");

        //jvm回调钩子的作用,Runtime.getRuntime()可以获得java一些运行时间的一些信息。
        //不管程序是正常关闭还是异常终端,在jvm关闭的时候做一些清理工作
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("关闭jvm");
            GrpcServer.this.stop();
            System.out.println("服务关闭");
        }));

        System.out.println("执行到这里");
    }

    //停止服务器
    private void stop(){
        if(null != this.server){
            this.server.shutdown();
        }
    }

    //服务器阻塞
    private void blockUntilShutdown() throws InterruptedException {
        if(null != this.server){
            this.server.awaitTermination();
            //this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
        }
    }

    public static void main(String[] args) throws IOException,InterruptedException{
        GrpcServer server = new GrpcServer();

        server.start();;
        server.blockUntilShutdown();
    }
}

编写客户端调用

import com.zhihao.miao.proto.StreamRequest;
import com.zhihao.miao.proto.StreamResponse;
import com.zhihao.miao.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.time.LocalDateTime;

public class GrpcClient {

    public static void main(String[] args) throws Exception {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
                //使用TLS来连接服务端
                .usePlaintext(true).build();

        //获取一个异步的stub
        StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc.newStub(managedChannel);

        StreamObserver<StreamRequest> requestStreamObserver = stub.biTalk(new StreamObserver<StreamResponse>() {
            //收到服务器的返回结果
            @Override
            public void onNext(StreamResponse value) {
                System.out.println(value.getResponseInfo());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompled!");
            }
        });


        for (int i = 0; i <10 ; i++) {
            //客户端以流式的形式向服务器发送数据
            requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());

            Thread.sleep(1000);
        }
        Thread.sleep(5000);

    }
}

启动服务器端和客户端发,服务器控制台打印:

server start!
执行到这里
2017-10-21T10:09:32.111
2017-10-21T10:09:33.115
2017-10-21T10:09:34.121
2017-10-21T10:09:35.126
2017-10-21T10:09:36.130
2017-10-21T10:09:37.135
2017-10-21T10:09:38.140
2017-10-21T10:09:39.144
2017-10-21T10:09:40.147
2017-10-21T10:09:41.152
CANCELLED

客户端控制台打印:

84763283-f33d-44f6-8daf-dbfd31f180c1
ae5586ff-5384-4ba7-ae52-a3a291c5d447
1cdcf7fc-8154-4dc7-9e71-fe2e5d5394f3
edc1e62b-af98-455f-a67a-3e9ec8ca386c
f977cd98-1ca9-44db-a760-5169eafa7e05
d6a1fddb-9c19-4953-93bb-b6c555c5d268
753a119b-76c1-4417-8dde-af482a6bfd05
56807302-c5ca-43f9-b60d-49c552ea483e
10398836-7e8e-4244-aaab-4ec202495185
03e66e5b-9506-4dfe-976d-6a626146e5ff

Process finished with exit code 0

问题

我们上面的所有demo的开发中发现一些问题

  • 每次生成自动化代码的时候都要去将生成的代码复制代码到指定的目录下,并不是很好,怎么去解决呢?
  • 每次gradle generateProto之后把相关的代码复制到指定的目录,再去执行gradle build之后会出现构建失败,因为src/main目录下和自动生成的代码类重复。将build下的generated目录下代码删除之后,执行gradle build之后还会报构建失败,我们猜测执行gradle build的时候先去执行了gradle generateProto,再去执行了构建代码所以也失败了。

我们在思考如果能将gradle generateProto生成的代码直接生成到指定的src/main下面的目录下那么不就是直接解决了。

之前使用gradle generateProto生成的目录结构


目录结构

我们配置的gradle插件如下的时候就可以了

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.2.0"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.4.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {
                outputSubDir = "java"
            }
        }
    }

    generatedFilesBaseDir = "$projectDir/src"
}

generatedFilesBaseDir = "$projectDir/src"的作用就是将之前自动生成代码的目录$buildDir/generated/source/proto设置成当前项目下的src下而outputSubDir = "java"就是将之前的grpc目录下的代码也自动改生成到src/main/java下。

参考资料

Protobuf Plugin for Gradle

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容