gRPC-go服务发现&负载均衡

前言

以下示例基于 https://github.com/grpc/grpc-go v1.30.0,关于proto文件定义,代码生成参考gRPC 官方文档中文版

client

grpc使用的是客户端负载均衡模式,每次新建连接的时候会根据负载均衡算法选出服务端的IP然后建立连接。现在grpc默认支持两种算法pick_first(第一次地址) 和 round_robin(轮询)
pick_first:pick_first每次都是尝试连接第一个地址,如果连接失败就会尝试下一个,直到连接成功为止,之后的RPC请求都会使用这个连接
round_robin:round_robin会对每个地址建立连接,之后的RPC请求会依次通过这些连接发送到后端

客户端新建一个连接

conn, err := grpc.Dial(
        fmt.Sprintf("%s:///%s", "game", baseService),
        grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
        grpc.WithInsecure(),
        //grpc.WithUnaryInterceptor(unaryClientInterceptor),
        //grpc.WithBlock(),
               //grpc.WithCompressor  Deprecated
    )

客户端每次发起请求都需要通过grpc.dail创建一个ClientConn,然后通过ClientConn.XXXX发送请求。

建立连接的各项参数:
grpc.WithInsecure:禁用传输认证,没有这个选项必须设置一种认证方式
grpc.WithCompressor:在grpc.Dial参数中设置压缩的方式将要被废弃,推荐使用UseCompressor

        grpc.UseCompressor(gzip.Name)
        conn, err := grpc.Dial(
              //...
        )

PS:压缩方式客户端应该和服务端对应

grpc.WithBlock():grpc.Dial默认建立连接是异步的,加了这个参数后会等待所有连接建立成功后再返回

grpc.WithUnaryInterceptor:一元拦截器,适用于普通rpc连接,相应的还有流拦截器。拦截器只有第一个生效,所以一般设置一个。拦截器是对请求的一次封装,客户端和服务端都可以设置拦截器,请求的发送/执行都是在拦截器内操作的,所以在请求的前后都可以嵌入用户自定义的代码,类似hook

//客户端拦截器
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    var credsConfigured bool
    for _, o := range opts {
        _, ok := o.(grpc.PerRPCCredsCallOption)
        if ok {
            credsConfigured = true
            break
        }
    }
    if !credsConfigured {
        opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
            AccessToken: fallbackToken,
        })))
    }
    start := time.Now()
    err := invoker(ctx, method, req, reply, cc, opts...)
    end := time.Now()
    logger("RPC: %s, start time: %s, end time: %s, err: %v", method, start.Format("Basic"), end.Format(time.RFC3339), err)
    return err
}
//服务端拦截器
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // authentication (token verification)
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, errMissingMetadata
    }
    if !valid(md["authorization"]) {
        return nil, errInvalidToken
    }
    m, err := handler(ctx, req)
    if err != nil {
        logger("RPC failed with error %v", err)
    }
    return m, err
}

grpc.WithDefaultServiceConfig: 旧的版本可以通过grpc.RoundRobin(),和grpc.WithBalancer()来设置负载均衡,这个版本grpc.RoundRobin()已经取消了,grpc.WithBalancer()和grpc. 也WithBalancerName()标记为废弃。现在改为读取外部配置,主要是方便服务启动后动态更新(设计初衷应该是主要用在服务端)

//service config example
{
  "loadBalancingConfig": [ { "round_robin": {} } ],
  "methodConfig": [
    {
      "name": [
        { "service": "foo", "method": "bar" },
        { "service": "baz" }
      ],
      "timeout": "1.0000000001s"
    }
  ]
}
   grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name))

可以这样设置BalancingPolicy

target: grpc.Dial:的第一个参数,这个参数的主要作用的通过它来找到对应的服务端地址,target传入是一个字符串,统一格式为scheme://authority/endpoint,然后通过以下方式解析为Target struct

type Target struct {
    Scheme    string
    Authority string
    Endpoint  string
}

func parseTarget(target string) (ret resolver.Target) {
    var ok bool
    ret.Scheme, ret.Endpoint, ok = split2(target, "://")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    return ret
}

解析target的时候有以下几种情况:

  • 当前参数有没有直接设置resolverBuilder,如果设置了,直接设置Endpoint=target
  • 如果未直接设置resolverBuilder,则通过Scheme来找到resolverBuilder
  • 如果通过Scheme没有找到resolverBuilder,resolverBuilder为默认的dns builder,设置
    Endpoint=target

所以,真正获取IP地址是通过resolverBuilder这个接口

type Builder interface {
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    Scheme() string
}

Build():为给定目标创建一个新的resolver,当调用grpc.Dial()时执行。
Scheme():返回此resolver方案的名称

type Resolver interface {
    ResolveNow(ResolveNowOptions)
    Close()
}

ResolveNow():被 gRPC 调用,以尝试再次解析目标名称。只用于提示,可忽略该方法。
Close方法:关闭resolver

下面我们看一个示例


func init() {
    resolver.Register(&exampleResolverBuilder{})  
/*
//注册的时候将Scheme => builder保存到m
func Register(b Builder) {
    m[b.Scheme()] = b
}
*/
}

const (
    exampleScheme      = "example"
    exampleServiceName = "lb.example.grpc.io"
)

var addrs = []string{"localhost:50051", "localhost:50052"}

type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    r := &exampleResolver{
        target: target,
        cc:     cc,
        addrsStore: map[string][]string{
            exampleServiceName: addrs,
        },
    }
    r.start()
    return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return exampleScheme }

type exampleResolver struct {
    target     resolver.Target
    cc         resolver.ClientConn
    addrsStore map[string][]string
}

func (r *exampleResolver) start() {
    addrStrs := r.addrsStore[r.target.Endpoint]
    addrs := make([]resolver.Address, len(addrStrs))
    for i, s := range addrStrs {
        addrs[i] = resolver.Address{Addr: s}
    }
    r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*exampleResolver) Close()                                  {}

func main() {
//...
roundrobinConn, err := grpc.Dial(
        // Target{Scheme:exampleScheme,Endpoint:exampleServiceName}
        fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
        grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
        grpc.WithInsecure(),
        grpc.WithBlock(),
    )
//...
}

grpc.Dial() 会调用Scheme=>builder 的Build() 方法,之后调用r.start()

        r.cc.UpdateState(resolver.State{Addresses: addrs})

UpdateState()将addr更新到cc,也就是外部的连接中,供其他接口使用。

server

server相对来说启动比较简单,一般都会加拦截器来获取matedata或者去recover() panic,又或者打印一些日志

        grpc.UseCompressor(gzip.Name)
        s := grpc.NewServer(grpc.UnaryInterceptor(unaryServerInterceptor))
//...

matedata: matedata是一个map[string][]string的结构,用来在客户端和服务器之间传输数据。其中的一个作用是可以传递分布式调用环境中的链路id,方便跟踪调试。另外也可以传一些业务相关的数据

客户端拦截器中设置metedata

        md := metadata.Pairs("XXX_id",xxxID, "YYY_id", yyyID)
        mdOld, _ := metadata.FromIncomingContext(ctx)
        md = metadata.Join(mdOld, md)
        ctx = metadata.NewOutgoingContext(ctx, md)
          //...
       invoker(ctx, method, req, reply, cc, opts...)

服务端拦截器获取metadata

    var xxxID,yyyID
    md, _ := metadata.FromIncomingContext(ctx)
    if arr := md["XXX_id"]; len(arr) > 0 {
        xxxID = arr[0]
    }
    if arr := md["YYY_id"]; len(arr) > 0 {
        yyyID = arr[0]
    }
        m, err := handler(ctx, req)
    if err != nil {
        logger("RPC failed with error %v", err)
    }

在server启动之后,需要将这个服务注册到etcd 。
用etcd3在编译的时候出现了和groc-go版本不兼容的问题

首先当前用的etcd 版本是 3.4.9,支持的grpc-go最高版本是v1.26.0,于是需要将grpc-go降级
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
降级之后之前生成的proto.pb.go 又出现了错误,于是将protobuf降级
replace github.com/golang/protobuf => github.com/golang/protobuf v1.2.0
以上的问题网上其他人也遇到过,下面的这个不清楚是我本地环境有问题还是其他原因
报错原因是 google.golang.org/genproto这个包下面生成的proto.pb.go里面指定了protobuf1.4的版本变量,解决办法还是降级,版本号是在$GOPATH/pkg/mod/... 下面找到的
replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8

关于etcd的内容之后再整理吧。

小结

结合etcd 的watch功能,很容易检测某一个路径节点的变化,如果,server端注册两个服务到etcd
key = /project/service/user/1 val = 127.0.0.1:9999
key = /project/service/user/2 val = 127.0.0.1:9998

在客户端,如果我们自定义了一个名叫example的resolverBuilder,
同时开启一个watch协程 ,监测/project/service下面的节点,动态维护Build()中addrsStore,这个时候我们设置addrsStore[user] = {127.0.0.1:9999,127.0.0.1:9998}。

然后在客户端grpc.Dai中令target = example:///user
那么在r.start()中就可以获取到 {127.0.0.1:9999,127.0.0.1:9998}(具体可以看上面示例中r.start()方法)

server注册的key,Build()中addrsStore中的key,以及target 后面的endPoint 的不同选择可以实现不通粒度的服务划分。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343