grpc 超时和重连

最近项目要使用grpc,但是关于grpc的超时和重连这一块很多文章都是说的不够详细,无奈只能自己看代码.顺手记录一下。

超时

建立连接

主要就2函数Dail和DialContext。

// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
    return DialContext(context.Background(), target, opts...)
}
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error){...}

DialContext 太长了不帖了.看Dial实际上也是调用DialContext来实现的.如果你想在建立连接的时候使用超时控制.就使用DialContext传入一个Timeout的context,就像下面的例子

ctx1, cel := context.WithTimeout(context.Background(), time.Second*3)
defer cel()
conn, err := grpc.DialContext(ctx1, address, grpc.WithBlock(), grpc.WithInsecure())

另外调用Dial建立连接默认只是返回一个ClientConn的指针,相当于new了一个ClientConn 把指针返回给你。并不是一定要建立真实的h2连接.至于真实的连接建立实际上是一个异步的过程。当然了如果你想等真实的链接完全建立再返回ClientConn可以通过WithBlock传入Options来实现,当然了这样的话链接如果建立不成功就会一直阻塞直到Contex超时。真正的建立链接的代码后面介绍重试的时候会再详细介绍。

调用超时

这个比较简单

ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)
defer cancel()
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

如上代码传入一个timeout context就可以。

重连

假设我们想这样一个问题,刚才我们说Dial实际上是new了一个ClientConn.真实的连接建立在另外一个协程中,那这个协程是建立连接后就退出了呢,还是还在运行。另外如果我们退出服务端然后启动客户端会重新建立链接吗,如果是那又是如何重试的。

grpc调用的时候启动的协程

要回答第一个问题,很简单我们在client代码中启动pprof看看有哪些协程在跑。

   go func() {
        log.Println(http.ListenAndServe("localhost:6006", nil))
    }()
main.main.func1()
    /Users/myonlyzzy/go/src/google.golang.org/grpc/examples/helloworld/greeter_client/main.go:41 +0x3e
created by main.main
    /Users/myonlyzzy/go/src/google.golang.org/grpc/examples/helloworld/greeter_client/main.go:40 +0x47

goroutine 6 [select]:
google.golang.org/grpc.(*ccResolverWrapper).watcher(0xc4201941e0)
    /Users/myonlyzzy/go/src/google.golang.org/grpc/resolver_conn_wrapper.go:110 +0x182
created by google.golang.org/grpc.(*ccResolverWrapper).start
    /Users/myonlyzzy/go/src/google.golang.org/grpc/resolver_conn_wrapper.go:96 +0x3f

goroutine 7 [select]:
google.golang.org/grpc.(*ccBalancerWrapper).watcher(0xc42006e280)
    /Users/myonlyzzy/go/src/google.golang.org/grpc/balancer_conn_wrappers.go:122 +0x14a
created by google.golang.org/grpc.newCCBalancerWrapper
    /Users/myonlyzzy/go/src/google.golang.org/grpc/balancer_conn_wrappers.go:113 +0x14c

goroutine 8 [select]:
google.golang.org/grpc.(*addrConn).transportMonitor(0xc42019e280)
    /Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:1240 +0x235
google.golang.org/grpc.(*addrConn).connect.func1(0xc42019e280)
    /Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:839 +0x216
created by google.golang.org/grpc.(*addrConn).connect
    /Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:829 +0xe1

我们看到有一个transportMonitor的协程一直阻塞在select中.代码都在clientconn.go 中。我们进去看看其实有4个主要的方法.

func (ac *addrConn) connect() error 
func (ac *addrConn) resetTransport() error
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error)
func (ac *addrConn) transportMonitor()

connect

    // Start a goroutine connecting to the server asynchronously.
    go func() {
        if err := ac.resetTransport(); err != nil {
            log.Printf("resetTransport %v ",err)
            grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
            if err != errConnClosing {
                // Keep this ac in cc.conns, to get the reason it's torn down.
                ac.tearDown(err)
            }
            return
        }
        ac.transportMonitor()
    }()
    return nil

上面的是connect的一部分。connect会调用resetTransport来建立链接。再启动transportMonitor来监控链接的情况。

resetTransport

for connectRetryNum := 0; ; connectRetryNum++ {
       ac.mu.Lock()
       if ac.backoffDeadline.IsZero() {
           // This means either a successful HTTP2 connection was established
           // or this is the first time this addrConn is trying to establish a
           // connection.
           backoffFor := ac.dopts.bs.backoff(connectRetryNum) // time.Duration.
           // This will be the duration that dial gets to finish.
           dialDuration := minConnectTimeout
           if backoffFor > dialDuration {
               // Give dial more time as we keep failing to connect.
               dialDuration = backoffFor
           }
           start := time.Now()
           backoffDeadline = start.Add(backoffFor)
           connectDeadline = start.Add(dialDuration)
           ridx = 0 // Start connecting from the beginning.
       } else {
           // Continue trying to conect with the same deadlines.
           connectRetryNum = ac.connectRetryNum
           backoffDeadline = ac.backoffDeadline
           connectDeadline = ac.connectDeadline
           ac.backoffDeadline = time.Time{}
           ac.connectDeadline = time.Time{}
           ac.connectRetryNum = 0
       }
       if ac.state == connectivity.Shutdown {
           ac.mu.Unlock()
           return errConnClosing
       }
       ac.printf("connecting")
       if ac.state != connectivity.Connecting {
           ac.state = connectivity.Connecting
           ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
       }
       // copy ac.addrs in case of race
       addrsIter := make([]resolver.Address, len(ac.addrs))
       copy(addrsIter, ac.addrs)
       copts := ac.dopts.copts
       ac.mu.Unlock()
       connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
       if err != nil {
           return err
       }
       if connected {
           return nil
       }

   }

resetTransport 主要内容就是一个for 循环,可以看到在这个for循环中会尝试建立链接。如果建立成功就返回一个nil。如果不成功会不断重试下去。实际上不管是开头的Dial或者Dial完了关闭服务器后都是由这段代码来建立真实的链接。这也就是如果你使用withBlock 但是不使用超时的话会不断的重试下去。中途断掉也会不断重联。当然了重连的过程中是使用了backoff算法来重连。而且默认会在grpc的配置中有个默认最大重试间隔时间。默认是120.

var DefaultBackoffConfig = BackoffConfig{
    MaxDelay:  120 * time.Second,
    baseDelay: 1.0 * time.Second,
    factor:    1.6,
    jitter:    0.2,
}

transportMonitor

for {
        var timer *time.Timer
        var cdeadline <-chan time.Time
        ac.mu.Lock()
        t := ac.transport
        if !ac.connectDeadline.IsZero() {
            timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
            cdeadline = timer.C
        }
        ac.mu.Unlock()
        // Block until we receive a goaway or an error occurs.
        select {
        case <-t.GoAway():
        case <-t.Error():
        case <-cdeadline:
            ac.mu.Lock()
            // This implies that client received server preface.
            if ac.backoffDeadline.IsZero() {
                ac.mu.Unlock()
                continue
            }
            ac.mu.Unlock()
            timer = nil
            // No server preface received until deadline.
            // Kill the connection.
            grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
            t.Close()
        }

        if timer != nil {
            timer.Stop()
        }
        // If a GoAway happened, regardless of error, adjust our keepalive
        // parameters as appropriate.
        select {
        case <-t.GoAway():
            ac.adjustParams(t.GetGoAwayReason())
        default:
        }
        ac.mu.Lock()
        if ac.state == connectivity.Shutdown {
            ac.mu.Unlock()
            return
        }
        // Set connectivity state to TransientFailure before calling
        // resetTransport. Transition READY->CONNECTING is not valid.
        ac.state = connectivity.TransientFailure
        ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
        ac.cc.resolveNow(resolver.ResolveNowOption{})
        ac.curAddr = resolver.Address{}
        ac.mu.Unlock()
        if err := ac.resetTransport(); err != nil {
            ac.mu.Lock()
            ac.printf("transport exiting: %v", err)
            ac.mu.Unlock()
            grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
            if err != errConnClosing {
                // Keep this ac in cc.conns, to get the reason it's torn down.
                ac.tearDown(err)
            }
            return
        }

    }

monitor也是运行一个for 循环如果连接断开就调用resetTransport重试。

其实我们使用etcdclient的时候的经常要使用一个DialTimeout参数其实那个参数就是用来生成一个TimeOut的Context.用来控制建立链接的超时。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,064评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,606评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,011评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,550评论 1 269
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,465评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,919评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,428评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,075评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,208评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,185评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,191评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,914评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,482评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,585评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,825评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,194评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,703评论 2 339

推荐阅读更多精彩内容