[istio源码分析][pilot] pilot之DiscoveryServer

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)

1. [istio源码分析][pilot] pilot之configController (mcp client)
2. [istio源码分析][pilot] pilot之ServiceController

前面两篇文章分析了configControllerServiceController, configController可以从galley获得资源(VirtualService等)信息, ServiceController可以从k8s中获得pod, Service等信息并转成istio的类型.
本文将分析DiscoveryServer的作用.

2. DiscoveryService

看一下DiscoveryService是如何初始化的.

// pilot/cmd/pilot-discovery/main.go
var (
    serverArgs = bootstrap.PilotArgs{
        CtrlZOptions:     ctrlz.DefaultOptions(),
        KeepaliveOptions: keepalive.DefaultOption(),
    }
    ...
)
...
discoveryServer, err := bootstrap.NewServer(serverArgs)
...

// pilot/pkg/bootstrap/server.go
func NewServer(args PilotArgs) (*Server, error) {
   ...
   if err := s.initDiscoveryService(&args); err != nil {
        return nil, fmt.Errorf("discovery service: %v", err)
   }
   ...
}

调用initDiscoveryService初始化.

2.1 initDiscoveryService

// pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService(args *PilotArgs) error {
    environment := &model.Environment{
        Mesh:             s.mesh,
        MeshNetworks:     s.meshNetworks,
        IstioConfigStore: s.istioConfigStore,
        ServiceDiscovery: s.ServiceController,
        PushContext:      model.NewPushContext(),
    }
    // 构造一个DiscoveryService 主要是提供给一些api供用户查看
    discovery, err := envoy.NewDiscoveryService(
        environment,
        args.DiscoveryOptions,
    )
    if err != nil {
        return fmt.Errorf("failed to create discovery service: %v", err)
    }
    s.mux = discovery.RestContainer.ServeMux
    // 创建DiscoveryServer并赋值给s.EnvoyXdsServer
    s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(environment,
        istio_networking.NewConfigGenerator(args.Plugins),
        s.ServiceController, s.kubeRegistry, s.configController)
    s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController)
    if s.kubeRegistry != nil {
        s.kubeRegistry.Env = environment
        s.kubeRegistry.InitNetworkLookup(s.meshNetworks)
        s.kubeRegistry.XDSUpdater = s.EnvoyXdsServer
    }
    // 启动DiscoveryServer
    s.addStartFunc(func(stop <-chan struct{}) error {
        s.EnvoyXdsServer.Start(stop)
        return nil
    })
    ...
    return nil
}

1. 构造了一个model.Environment对象.
2. 构造一个DiscoveryService, 主要是提供给一些api供用户查看.
3. 创建DiscoveryServer并赋值给s.EnvoyXdsServer.
4. 启动DiscoveryServer.
5. 启动http grpc服务.

2.2 v2/discovery

// pilot/pkg/proxy/envoy/v2/discovery.go
func NewDiscoveryServer(
    env *model.Environment,
    generator core.ConfigGenerator,
    ctl model.Controller,
    kubeController *controller.Controller,
    configCache model.ConfigStoreCache) *DiscoveryServer {
    out := &DiscoveryServer{
        Env:                     env,
        ConfigGenerator:         generator,
        ConfigController:        configCache,
        KubeController:          kubeController,
        EndpointShardsByService: map[string]map[string]*EndpointShards{},
        WorkloadsByID:           map[string]*Workload{},
        concurrentPushLimit:     make(chan struct{}, features.PushThrottle),
        pushChannel:             make(chan *model.PushRequest, 10),
        pushQueue:               NewPushQueue(),
    }
    // Flush cached discovery responses whenever services, service
    // instances, or routing configuration changes.
    // 注册一个serviceHandler
    serviceHandler := func(*model.Service, model.Event) { out.clearCache() }
    if err := ctl.AppendServiceHandler(serviceHandler); err != nil {
        return nil
    }
    // 注册一个instanceHandler
    instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() }
    if err := ctl.AppendInstanceHandler(instanceHandler); err != nil {
        return nil
    }
    authn_model.JwtKeyResolver.PushFunc = out.ClearCache
    if configCache != nil {
        configHandler := func(model.Config, model.Event) { out.clearCache() }
        for _, descriptor := range model.IstioConfigTypes {
            // 每一个支持的类型都注册一个handler
            configCache.RegisterEventHandler(descriptor.Type, configHandler)
        }
    }
    ...
    return out
}

这里可以看到所有资源注册的handler都是out.clearCache方法, 所以很有必要看一下clearCache的操作.

func (s *DiscoveryServer) clearCache() {
    s.ConfigUpdate(&model.PushRequest{Full: true})
}
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
    inboundConfigUpdates.Increment()
    s.pushChannel <- req
}

1. 构造了一个model.PushRequest{Full: true}并且写入到s.pushChannel这个channel中.
2.[istio源码分析][pilot] pilot之configController (mcp client) 中可以知道如果类型不是ServiceEntry的时候会调用s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true}).
3.[istio源码分析][pilot] pilot之ServiceController 中可以知道调用AppendXXXHandler的时候在调用完c.XDSUpdater.XXXUpdate后会调用注册的方法, 也就是ClearCache.

那最终谁会从s.pushChannel中读取数据并做什么的操作呢?

start
// pilot/pkg/proxy/envoy/v2/discovery.go
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
    go s.handleUpdates(stopCh)
    go s.periodicRefresh(stopCh)
    go s.periodicRefreshMetrics(stopCh)
    go s.sendPushes(stopCh)
}

查看DiscoveryServerstart方法

handleUpdates
// pilot/pkg/proxy/envoy/v2/discovery.go
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
    debounce(s.pushChannel, stopCh, s.Push)
}
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {
    // 做了一些控制性的事情 最终会落到Push方法
    ...
}
func (s *DiscoveryServer) Push(req *model.PushRequest) {
    if !req.Full {
        req.Push = s.globalPushContext()
        go s.AdsPushAll(versionInfo(), req)
        return
    }
    ...
    // 创建一个新的PushContext
    push := model.NewPushContext()
    // 从当前状态中获得信息(istioConfigStore,ServiceController) 
    // 也就是galley,k8s中获取的内容
    err := push.InitContext(s.Env)
    ...
    s.updateMutex.Lock()
    // 更新PushContext
    s.Env.PushContext = push
    s.updateMutex.Unlock()
    ...
    req.Push = push
    go s.AdsPushAll(versionLocal, req)
}

1. handleUpdates调用debounce, debounce做了一些控制性的事情, 从s.pushChannel中获得req, 然后调用Push方法处理.
2. 关于Push注意以下几点:

2.1 如果req.Full = false, 则使用当前的pushcontext调用AdsPushAll方法.
2. 如果req.Full = true, 则生成新的push context, 通过InitContext为新的push填充数据, 数据从s.Env中得到, 因为s.Env中包含了istioConfigStore,ServiceController, 也就是galley, k8s中的资源数据.
3. 调用AdsPushAll方法, 此时的req.push已经被填充数据了.

AdsPushAll
// pilot/pkg/proxy/envoy/v2/ads.go
func (s *DiscoveryServer) AdsPushAll(version string, req *model.PushRequest) {
    if !req.Full {
        s.edsIncremental(version, req.Push, req)
        return
    }
    ...
    s.startPush(req)
}
func (s *DiscoveryServer) startPush(req *model.PushRequest) {
    adsClientsMutex.RLock()
    // Create a temp map to avoid locking the add/remove
    pending := []*XdsConnection{}
    // 所有的ads client端
    for _, v := range adsClients {
        pending = append(pending, v)
    }
    adsClientsMutex.RUnlock()
    currentlyPending := s.pushQueue.Pending()
    if currentlyPending != 0 {
        adsLog.Infof("Starting new push while %v were still pending", currentlyPending)
    }
    req.Start = time.Now()
    for _, p := range pending {
        // 为每一个ads client端都添加了这个req
        s.pushQueue.Enqueue(p, req)
    }
}

1. 最终到startPush为每个ads client端都添加了这个reqs.pushQueue中.

s.pushQueue中的内容什么时候会取出来运行呢?

sendPushes
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
    for {
        select {
        case <-stopCh:
            return
        default:
            ...
            client, info := queue.Dequeue()
            ...
            go func() {
                ...
                select {
                // 组装成一个XdsEvent 往XdsConnection类型中的pushChannel中
                case client.pushChannel <- &XdsEvent{
                    push:               info.Push,
                    edsUpdatedServices: edsUpdates,
                    done:               doneFunc,
                    start:              info.Start,
                    targetNamespaces:   info.TargetNamespaces,
                }:
                    return
                ...
            }()
        }
    }
}
func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
    doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
}

1.queue.Dequeue()取出来运行, 放到XdsConnection类型中的pushChannel. 其实就是往各个注册的client中发送数据.

2.3 总结

discovery_server.png

3. 参考

1. istio 1.3.6源码

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容