client-go中的SharedInformerFactory机制

Table of Contents

1.章节介绍

本章首先介绍SharedInformerFactory,了解其组成和作用。

然后以Podinformer为例,了解一个资源实例的Informer应该需要实现哪些函数。

本节并没有设计到具体图中的informer机制,只是从大的入口入手,看看SharedInformerFactory到底是什么

image.png


2. SharedInformerFactory

SharedInformerFactory封装了NewSharedIndexInformer方法。字如其名,SharedInformerFactory使用的是工厂模式来生成各类的Informer。无论是k8s控制器,还是自定义控制器, SharedInformerFactory都是非常重要的一环。所以首先分析SharedInformerFactory。这里以一个实例入手分析SharedInformerFactory。

2.1 SharedInformerFactory实例介绍

package main

import (
    "fmt"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/tools/cache"
    "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "time"
)

func main()  {
    config := &rest.Config{
        Host: "http://172.21.0.16:8080",
    }
    client := clientset.NewForConfigOrDie(config)
    // 生成SharedInformerFactory
    factory := informers.NewSharedInformerFactory(client, 5 * time.Second)
    // 生成PodInformer
    podInformer := factory.Core().V1().Pods()
    // 获得一个cache.SharedIndexInformer 单例模式
    sharedInformer := podInformer.Informer()

    //注册add, update, del处理事件
    sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) {fmt.Printf("add: %v\n", obj.(*v1.Pod).Name)},
        UpdateFunc: func(oldObj, newObj interface{}) {fmt.Printf("update: %v\n", newObj.(*v1.Pod).Name)},
        DeleteFunc: func(obj interface{}){fmt.Printf("delete: %v\n", obj.(*v1.Pod).Name)},
    })

    stopCh := make(chan struct{})

    // 第一种方式
    // 可以这样启动  也可以按照下面的方式启动
    // go sharedInformer.Run(stopCh)
    // time.Sleep(2 * time.Second)

    // 第二种方式,这种方式是启动factory下面所有的informer
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)

    pods, _ := podInformer.Lister().Pods("default").List(labels.Everything())

    for _, p := range pods {
        fmt.Printf("list pods: %v\n", p.Name)
    }
    <- stopCh
}

2.2 sharedInformerFactory结构体

type sharedInformerFactory struct {
  // client客户端
    client           kubernetes.Interface            
    // sharedInformerFactory是没有namespaces限制的。不过可以设置namespaces限制该factory后面的informer都是指定namespaces的
    namespace        string          
  // TweakListOptionsFunc其实就是ListOptions,这个是针对所有Informer List生效的 (WithTweakListOptions可以看出来)
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    lock             sync.Mutex
    // 这个是list默认定期同步的时间间隔
    defaultResync    time.Duration
    // 每种informer还可以自定义
    customResync     map[reflect.Type]time.Duration
  
  // 属于该factory下面的所有的informer
    informers map[reflect.Type]cache.SharedIndexInformer
    // startedInformers is used for tracking which informers have been started.
    // This allows Start() to be called multiple times safely.
    // 判断informer是否已经 Run起来了
    startedInformers map[reflect.Type]bool   
}


2.3 sharedInformerFactory成员函数

定义customResync
// WithCustomResyncConfig sets a custom resync period for the specified informer types.
func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption 

定义tweakListOptions
// WithTweakListOptions sets a custom filter on all listers of the configured SharedInformerFactory.
func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption 

定义namespaces
// WithNamespace limits the SharedInformerFactory to the specified namespace.
func WithNamespace(namespace string) SharedInformerOption 

// start所有的informer
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

// WaitForCacheSync让所有的informers同步cache。一般informer.run函数中都有一个这样的语句。先等cache同步。这个的含义就是等list完了的数据,全部转换到cache中去。
    // Wait for all involved caches to be synced, before processing items from the queue is started
    if !cache.WaitForCacheSync(stopCh, ctrl.Informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }
    
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
    informers := func() map[reflect.Type]cache.SharedIndexInformer {
        f.lock.Lock()
        defer f.lock.Unlock()

        informers := map[reflect.Type]cache.SharedIndexInformer{}
        for informerType, informer := range f.informers {
            if f.startedInformers[informerType] {
                informers[informerType] = informer
            }
        }
        return informers
    }()

    res := map[reflect.Type]bool{}
    for informType, informer := range informers {
        res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
    }
    return res
}


// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    // 如果存在同类的,直接返回,不会再new一个。这里的type就是 pod/deploy
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

// SharedInformerFactory provides shared informers for resources in all known
// API group versions.
type SharedInformerFactory interface {
    internalinterfaces.SharedInformerFactory
    ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

  // 提供k8s内置资源的定义接口,从这里可以看出来
    Admissionregistration() admissionregistration.Interface 
    Apps() apps.Interface
    Autoscaling() autoscaling.Interface
    Batch() batch.Interface
    Certificates() certificates.Interface
    Coordination() coordination.Interface
    Core() core.Interface
    Events() events.Interface
    Extensions() extensions.Interface
    Networking() networking.Interface
    Policy() policy.Interface
    Rbac() rbac.Interface
    Scheduling() scheduling.Interface
    Settings() settings.Interface
    Storage() storage.Interface
}

// 例如core组下面的资源,f.Core().v1.pods() 就是这个
func (f *sharedInformerFactory) Core() core.Interface {
    return core.New(f, f.namespace, f.tweakListOptions)
}


2.4 总结

通过对sharedInformerFactory的成员和函数介绍,了解到:

(1)factory就是提供了一个构造informer的入口,里面包含了一堆Informer

(2)同一中资源类型共用一个Infomer。这样的话就可以节省不必要的资源。例如kcm中,rs可以需要监听pod资源,gc也需要监听Pod资源,通过factory机制就可以使用同一个

(3)但是监听同一种类型的资源,但是不同的listOption看起来也是不行,例如一个Informer监听running的pod,一个Informer监听error的Pod, 是需要多个factory。

3. podInformer

从上诉可以看出来,sharedInformerFactory只是一个入口。接下来以podInformer为例,看看一个具体的资源Informer需要实现哪些功能。

3.1 PodInformer结构体

// PodInformer provides access to a shared informer and lister for
// Pods.
// 只需要实现Informer,Lister函数
type PodInformer interface {
    Informer() cache.SharedIndexInformer     
    Lister() v1.PodLister
}

type podInformer struct {
    factory          internalinterfaces.SharedInformerFactory   //  是哪一个factory生成的informer
    tweakListOptions internalinterfaces.TweakListOptionsFunc    //  有哪些filter
    namespace        string                                     //  命名空间
}

3.2 PodInformer成员函数

从函数定义可以看出来,informer其实就是 cache.SharedIndexInformer

New SharedIndexInformer的时候指定了ListWatch函数。

listFunc: client.CoreV1().Pods(namespace).List(options)

WatchFunc: client.CoreV1().Pods(namespace).Watch(options)

所以从结构体上推测:

(1) informer最终都是 cache.SharedIndexInformer。但是 cache.SharedIndexInformer需要先定义好list, watch函数

(2)cache.SharedIndexInformer里面的index就是存储+查询。根据定义好的list, watch更新index的数据

接下来继续看看cache.SharedIndexInformer是如何实现的。

// NewPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, namespace, resyncPeriod, indexers, nil)
}

// NewFilteredPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).Watch(options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}

// 默认只有namespaces这个indexer
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}


func (f *podInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

// 返回Lister数据, 这里是从index里面获取,而不是从apiserver中获取
func (f *podInformer) Lister() v1.PodLister {
    return v1.NewPodLister(f.Informer().GetIndexer())
}

cache中的index定义
k8s.io/client-go/tools/cache/index.go
// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
    Store
    // Retrieve list of objects that match on the named indexing function
    Index(indexName string, obj interface{}) ([]interface{}, error)
    // IndexKeys returns the set of keys that match on the named indexing function.
    IndexKeys(indexName, indexKey string) ([]string, error)
    // ListIndexFuncValues returns the list of generated values of an Index func
    ListIndexFuncValues(indexName string) []string
    // ByIndex lists object that match on the named indexing function with the exact key
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    // GetIndexer return the indexers
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
}

4.总结

(1)factory就是提供了一个构造informer的入口,里面包含了一堆Informer

(2)同一中资源类型共用一个Infomer。这样的话就可以节省不必要的资源。例如kcm中,rs可以需要监听pod资源,gc也需要监听Pod资源,通过factory机制就可以使用同一个

(3)但是监听同一种类型的资源,但是不同的listOption看起来也是不行,例如一个Informer监听running的pod,一个Informer监听error的Pod, 是需要多个factory。

(4)当前factory并没有利用到图中表示Informer机制。最终是cache.SharedIndexInformer 包含了所有的参数,实现了上诉图中的Informer机制。下一节开始介绍cache.SharedIndexInformer

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容