grpc

基本概念

基本组件

  • Stub (打桩, 定义接口层): 起到了客户端的作用,用来发起proto请求和接收proto响应
  • Channel: 在传输层上的抽象, 适合用于做切面(用于拦截, 装饰), 一般用于日志, 监控, 鉴权
  • Transport: 传输层, grpc 有三种实现
    • netty-based: 用于 client 和 server
    • okhttp-based: 一般用于安卓, 且只有 client端
    • in-process: 线程内通信, 用于client 和 server 在同一进程内情况

打桩有2类意思。(来自知乎)

  1. 容易扩展的块。可以是一个宏,一个函数调用,或者数个宏或函数调用的组合。这里的桩是起着辅助的作用,与一般的代码块的不同是这个桩具体做什么不是那么固定,而只是一个块放在这里。类似面向方面编程的横切点,只不过是显式插入的。
  2. 另一个就是做PRC时的一个代理的点,它不真正做事,而是通过PRC或者类似的机制由外部完成真正的工作。

grpc 提供了4中类型的 service 方法

// 简单的rpc调用, 调用完后等待服务端返回
rpc GetFeature(Point) returns (Feature) {}

// 服务端流 : 客户端发送请求后, 服务端返回 Stream, 客户端用它来读取消息序列, 直到没有消息读取完
rpc ListFeatures(Rectangle) returns (stream Feature) {}

// 客户端流: 客户端通过流写入消息序列发送到服务端, 当客户端完成写入, 会等待服务端读取完成并返回
rpc RecordRoute(stream Point) returns (RouteSummary) {}

// 双向流: 客户端和服务端都通过 Stream 发送消息序列, 两端的操作完成独立, 因此客户端和服务端可以读取或写入而不用管是否读取完成, 消息序列在stream中是有序的
// 例如: 服务端可以选择在返回前读取所有数据, 可以使用读取一条写入一条方式, 或者批量读取后返回
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

maven包依赖以及插件

依赖包 和 生成代码插件 可以在 https://github.com/grpc/grpc-java 看到, 将 proto 文件放入 src/main/proto 或 src/test/proto 目录下, 就可以编译

example 实例

构建 grpc 的步骤

  1. 定义 .proto 文件
  2. 生成服务端, 客户端 proto 代码
  3. 使用 grpc api 编写自己的客户端和服务端

生成代码: 这两货都要点, 1个是proto 生成, 一个是 grpc 的

image.png

生成后路径是

image.png

服务端

服务端创建步骤

  1. 使用自己的业务代码覆实现 grpc 中定义的服务
  2. 运行 grpc server 用于监听请求

可以看到生成的代码中包含抽象类 RouteGuideImplBase , 实现具体业务时就可以通过继承来实现, 具体实现

 private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
     ...
 }
  1. 简单调用实现模式
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  // 返回数据
  responseObserver.onNext(checkFeature(request));
  // 标识完成 rpc 连接
  responseObserver.onCompleted();
}
  1. 服务端 Stream 模式: 返回多个数据到客户端, 和 List<?> 发送区别在于 list 只能同一时间发送, Stream 可以在不同时间发送
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  ...

  for (Feature feature : features) {
    ...
    responseObserver.onNext(feature);
  }
  responseObserver.onCompleted();
}
  1. 客户端 Stream 模式: 用于收集一波数据, 汇总计算之类的
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    /** 处理每个业务数据逻辑 */
    @Override
    public void onNext(Point point) {
      ...
    }

    /** 出错处理 */
    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    /** 完成接收后通过 onNext 返回, 并用 onCompleted 完成连接*/
    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  }
}
  1. 双向 grpc 流模式
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

启动服务用于监听连接

// 创建 ServerBuilder
ServerBuilder.forPort(port)
// 注册服务
server = serverBuilder.addService(new RouteGuideService(features)).build();
// 启动服务,并注册钩子
server.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
  @Override
  public void run() {
    // 等30s 处理完遗留问题后关闭
    try {
      if (server != null) {
        server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
      }
    } catch (InterruptedException e) {
      e.printStackTrace(System.err);
    }
    System.err.println("*** server shut down");
  }
});
// 阻塞直到关闭
server.blockUntilShutdown();

客户端

调用服务端方法, 我们可以使用两种 stubs

  • 阻塞/同步 stub
  • 非阻塞/异步 stub

在创建 stub 之前需要先创建 grpc Channel, 通过指定 地址/端口 来创建

public RouteGuideClient(String host, int port) {
  this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}

public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
  channel = channelBuilder.build();
  blockingStub = RouteGuideGrpc.newBlockingStub(channel);
  asyncStub = RouteGuideGrpc.newStub(channel);
}

然后通过 Channel 创建 stub

//同步
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
//异步
asyncStub = RouteGuideGrpc.newStub(channel);

调用服务端方法

// 简单调用, StatusRuntimeException 包含 Status, 可用于错误消息处理
try {
  feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

// 服务端流, 和简单调用类似
Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

客户端流则稍微复杂一些: 这里客户端会发送一个 Point 的流到服务端, 服务端返回一个 RouteSummary(汇总消息), 这里需要用 异步Stub

// 服务端返回定义
 final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
    // 接收返回消息
    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }
    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }
    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

// 使用异步stub发送
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      // 发送信息
      requestObserver.onNext(point);
      // 稍后发送下一个
      Thread.sleep(rand.nextInt(1000) + 500);
      // 连接已关闭退出
      if (finishLatch.getCount() == 0) {
        return;
      }
    }
} catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
}
// Mark the end of requests
requestObserver.onCompleted();

双向流和客户端流类似, 只是说客户端流服务端返回值后就会执行 onCompleted 通知完成, 可以查看 routeChat方法

尽管每一方总是按照它们被写入的顺序获取另一方的消息,但客户机和服务器都可以以任何顺序读取和写入消息——流完全独立地操作

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,264评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,549评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,389评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,616评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,461评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,351评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,776评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,414评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,722评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,760评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,537评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,381评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,787评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,030评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,304评论 1 252
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,734评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,943评论 2 336

推荐阅读更多精彩内容