gRPC负载均衡(自定义负载均衡策略--etcd 实现)

背景

在工作学习中使用gRPC的地方比较多,通常我们都使用的是自带的负载均衡算法,但是在某些场景下我们需要对服务的版本进行控制
比如 [app V2 只能去链接 user V3],在这样的情况下就只能选自定义负载均衡策略

目标

实现基于版本(version)的grpc负载均衡器,了解过程后可自己实现更多的负载均衡功能

  • 注册中心
    • Etcd Lease 是一种检测客户端存活状况的机制。 群集授予具有生存时间的租约。 如果etcd 群集在给定的TTL 时间内未收到keepAlive,则租约到期。 为了将租约绑定到键值存储中,每个key 最多可以附加一个租约
  • 服务注册 (注册服务)
    • 定时把本地服务(APP)地址,版本等信息注册到服务器
  • 服务发现 (客户端发起服务解析请求(APP))
    • 查询注册中心(APP)下有那些服务
    • 并向所有的服务建立HTTP2长链接
    • 通过Etcd watch 监听服务(APP),通过变化更新链接
  • 负载均衡 (客户端发起请求(APP))
    • 负载均衡选择合适的服务(APP HTTP2长链接)
    • 发起调用

服务注册 (注册服务)

源码 register.go

func NewRegister(opt ...RegisterOptions) (*Register, error) {
    s := &Register{
        opts: newOptions(opt...),
    }
    var ctx, cancel = context.WithTimeout(context.Background(), time.Duration(s.opts.RegisterTtl)*time.Second)
    defer cancel()
    data, err := json.Marshal(s.opts)
    if err != nil {
        return nil, err
    }
    etcdCli, err := clientv3.New(s.opts.EtcdConf)
    if err != nil {
        return nil, err
    }
    s.etcdCli = etcdCli
    //申请租约
    resp, err := etcdCli.Grant(ctx, s.opts.RegisterTtl)
    if err != nil {
        return s, err
    }
    s.name = fmt.Sprintf("%s/%s", s.opts.Node.Path, s.opts.Node.Id)
    //注册节点
    _, err = etcdCli.Put(ctx, s.name, string(data), clientv3.WithLease(resp.ID))
    if err != nil {
        return s, err
    }
    //续约租约
    s.keepAliveChan, err = etcdCli.KeepAlive(context.Background(), resp.ID)
    if err != nil {
        return s, err
    }
    return s, nil
}

在etcd里面我们可以看到如下信息
APP v1版本服务在节点的key /hwholiday/srv/app/app-beb3cb56-eb61-11eb-858d-2cf05dc7c711

{
    "node": {
        "name": "app",
        "path": "/hwholiday/srv/app",
        "id": "app-beb3cb56-eb61-11eb-858d-2cf05dc7c711",
        "version": "v1",
        "address": "172.12.12.188:8089"
    }
}

APP v2版本服务在节点的key /hwholiday/srv/app/app-beb3cb56-eb61-11eb-858d-2cf05dc7c711

{
    "node": {
        "name": "app",
        "path": "/hwholiday/srv/app",
        "id": "app-19980562-eb63-11eb-99c0-2cf05dc7c711",
        "version": "v2",
        "address": "172.12.12.188:8088"
    },
}

服务发现 (客户端发起服务解析请求(APP))

源码 discovery.go
实现 grpc内的 resolver.Builder 接口(Builder 创建一个解析器,用于监视名称解析更新)

func NewDiscovery(opt ...ClientOptions) resolver.Builder {
    s := &Discovery{
        opts: newOptions(opt...),
    }
    etcdCli, err := clientv3.New(s.opts.EtcdConf)
    if err != nil {
        panic(err)
    }
    s.etcdCli = etcdCli
    return s
}

// Build 当调用`grpc.Dial()`时执行
func (d *Discovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    d.cc = cc
    res, err := d.etcdCli.Get(context.Background(), d.opts.SrvName, clientv3.WithPrefix())
    if err != nil {
        return nil, err
    }
    for _, v := range res.Kvs {
        if err = d.AddNode(v.Key, v.Value); err != nil {
            log.Println(err)
            continue
        }
    }
    go func(dd *Discovery) {
        dd.watcher()
    }(d)
    return d, err
}

//根据官方的建议我们把从注册中心拿到的服务信息储存到Attributes中
// Attributes contains arbitrary data about the resolver intended for
// consumption by the load balancing policy.
// 属性包含有关供负载平衡策略使用的解析器的任意数据。
//Attributes *attributes.Attributes
func (d *Discovery) AddNode(key, val []byte) error {
    var data = new(register.Options)
    err := json.Unmarshal(val, data)
    if err != nil {
        return err
    }
    addr := resolver.Address{Addr: data.Node.Address}
    addr = SetNodeInfo(addr, data)
    d.Node.Store(string(key), addr)
    return d.cc.UpdateState(resolver.State{Addresses: d.GetAddress()})
}

负载均衡 (客户端发起请求(APP))

源码 version_balancer.go

  • gRPC提供了PickerBuilder和Picker接口让我们实现自己的负载均衡策略
//PickerBuilder 创建 balancer.Picker。
type PickerBuilder interface {
    //Build 返回一个选择器,gRPC 将使用它来选择一个 SubConn。
    Build(info PickerBuildInfo) balancer.Picker
}
//gRPC 使用 Picker 来选择一个 SubConn 来发送 RPC。
//每次平衡器的内部状态发生变化时,它都会从它的快照中生成一个新的选择器。 
//gRPC 使用的选择器可以通过 ClientConn.UpdateState() 更新。
type Picker interface {
    //选择合适的子链接发送请求
    Pick(info PickInfo) (PickResult, error)
}
  • 从上面得知我们可以干事的地方在Build方法或者Pick方法(调用gRPC方法时先执行Build再执行Pick)
    • Build(info PickerBuildInfo) balancer.Picker
      info里面有服务的链接,和链接对应的刚刚通过AddNode方法存入的服务信息
      这里我们可以基于grpc-client层面来做负载,比如(加权随机负载)
    • Pick(info PickInfo) (PickResult, error)
      info里面有调用的方法名和 context.Context
      通过context.Context我们可以获得这个来获取发起请求的时候填入的参数,这样我们可以很灵活的针对每个方法进行不同的负载
      这里我们可以基于grpc-client-api层面来做负载
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
    if len(info.ReadySCs) == 0 {
        return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
    }
    var scs = make(map[balancer.SubConn]*register.Options, len(info.ReadySCs))
    for conn, addr := range info.ReadySCs {
        nodeInfo := GetNodeInfo(addr.Address)
        if nodeInfo != nil {
            scs[conn] = nodeInfo
        }
    }
    if len(scs) == 0 {
        return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
    }
    return &rrPicker{
        node: scs,
    }
}
func (p *rrPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    p.mu.Lock()
    defer p.mu.Unlock()
    version := info.Ctx.Value("version")
    var subConns []balancer.SubConn
    for conn, node := range p.node {
        if version != "" {
            if node.Node.Version == version.(string) {
                subConns = append(subConns, conn)
            }
        }
    }
    if len(subConns) == 0 {
        return balancer.PickResult{}, errors.New("no match found conn")
    }
    index := rand.Intn(len(subConns))
    sc := subConns[index]
    return balancer.PickResult{SubConn: sc}, nil
}

客户的使用我们定义的 version 负载均衡策略

    r := discovery.NewDiscovery(
        discovery.SetName("hwholiday.srv.app"),
        discovery.SetEtcdConf(clientv3.Config{
            Endpoints:   []string{"172.12.12.165:2379"},
            DialTimeout: time.Second * 5,
        }))
    resolver.Register(r)
    // 连接服务器
    conn, err := grpc.Dial(
        "hwholiday.srv.app", //没有使用这个参数
        grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "version")),
        grpc.WithInsecure(),
    )
    if err != nil {
        log.Fatalf("net.Connect err: %v", err)
    }
    defer conn.Close()
    // 调用服务
        apiClient := api.NewApiClient(conn)

    ctx := context.WithValue(context.Background(), "version", "v1")
    _, err = apiClient.ApiTest(ctx, &api.Request{Input: "v1v1v1v1v1"})
    if err != nil {
        fmt.Println(err)
    }

运行效果

测试源码

  • 运行APP服务v1,调用grpc-client 使用 v1

    • APP打印
    • 启动成功 === > 0.0.0.0:8089
    • input:"v1v1v1v1v1"
    • grpc-client打印
    • === RUN TestClient
    • v1v1v1v1v1v1v1v1v1v1
  • 运行APP服务v1,调用grpc-client 使用 v2

    • APP打印
    • 启动成功 === > 0.0.0.0:8089
    • grpc-client打印
    • === RUN TestClient
    • rpc error: code = Unavailable desc = no match found conn

总结

详情介绍地址
源码地址: https://github.com/hwholiday/learning_tools/tree/master/etcd
通过学习我们可以实现基于version的负载策略,这里只是提供一种思路怎么去实现可能我的这个例子不太适合这个,但是提供了一种思路,欢迎一起讨论

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容