kubebuild背后原理分析

1. Controller-runtime结构介绍

kubebuilder底层使用的就是Controller-runtime,Controller-runtime为 Controller 的开发提供了各种功能模块,每个模块中包括了一个 或多个实现,通过这些模块,开发者可以灵活地构建自己的 Controller,主要包括以下内容。

(1) Client:用于读写 Kubernetes 资源对象的客户端。

(2) Cache:本地缓存,用于保存需要监听的 Kubernetes 资源。缓存提供了只读客户端, 用于从缓存中读取对象。缓存还可以注册处理方法(EventHandler),以响应更新的事件。

(3) Manager:用于控制多个 Controller,提供 Controller 共用的依赖项,如 Client、 Cache、Schemes 等。通过调用 Manager.Start 方法,可以启动 Controller。

(4) Controller:控制器,响应事件(Kubernetes 资源对象的创建、更新、删除)并 确保对象规范(Spec 字段)中指定的状态与系统状态匹配,如果不匹配,则控制器需要根 据事件的对象,通过协调器(Reconciler)进行同步。在实现上,Controller 是用于处理 reconcile.Requests 的工作队列,reconcile.Requests 包含了需要匹配状态的资源对象。

① Controller 需要提供 Reconciler 来处理从工作队列中获取的请求。

② Controller 需要配置相应的资源监听,根据监听到的 Event 生成 reconcile.Requests 并加入队列。

(5) Reconciler:为 Controller 提供同步的功能,Controller 可以随时通过资源对象的 Name 和 Namespace 来调用 Reconciler,调用时,Reconciler 将确保系统状态与资源对象 所表示的状态相匹配。例如,当某个 ReplicaSet 的副本数为 5,但系统中只有 3 个 Pod 时, 同步 ReplicaSet 资源的 Reconciler 需要新建两个 Pod,并将它们的 OwnerReference 字段 指向对应的 ReplicaSet。

① Reconciler 包含了 Controller 所有的业务逻辑。

② Reconciler 通常只处理单个对象类型,例如只处理 ReplicaSets 的 Reconciler,不 处理其他的对象类型。如果需要处理多种对象类型,需要实现多个 Controller。如果你 希望通过其他类型来触发 Reconciler,例如,通过 Pod 对象的事件来触发 ReplicaSet 的 Recon- ciler,则可以提供一个映射,通过该映射将触发 Reconciler 的类型映射到需要匹 配的类型。

③ 提供给 Reconciler 的参数是需要匹配的资源对象的 Name 和 Namespace。

④ Reconciler 不关心触发它的事件的内容和类型。例如,对于同步 ReplicaSet 资源的 Reconciler 来说,触发它的是 ReplicaSet 的创建还是更新并不重要,Reconciler 总是会比 较系统中相应的 Pod 数量和 ReplicaSet 中指定的副本数量。

(6) WebHook:准 入 WebHook(Admission WebHook) 是 扩 展 Kubernetes API 的 一种机制,WebHook 可以根据事件类型进行配置,比如资源对象的创建、删除、更改等 事件,当配置的事件发生时,Kubernetes 的 APIServer 会向 WebHook 发送准入请求 (AdmissionRequests),WebHook 可以对请求中的资源对象进行更改或准入验证,然后将 处理结果响应给 APIServer。

(7) Source:resource.Source 是 Controller.Watch 的参数,提供事件,事件通常是来 自 Kubernetes 的 APIServer(如 Pod 创建、更新和删除)。例如,source.Kind 使用指定 对象(通过 GroupVersionKind 指定)的 Kubernetes API Watch 接口来提供此对象的创建、 更新、删除事件。

① Source 通过 Watch API 提供 Kubernetes 指定对象的事件流。

② 建议开发者使用 Controller-runtime 中已有的 Source 实现,而不是自己实现此接口。

(8) EventHandler:handler.EventHandler 是 Controller.Watch 的 参 数, 用 于 将 事 件对应的 reconcile.Requests 加入队列。例如,从 Source 中接收到一个 Pod 的创建事 件,eventhandler.EnqueueHandler 会 根 据 Pod 的 Name 与 Namespace 生 成 reconcile. Requests 后,加入队列。

① EventHandlers 处理事件的方式是将一个或多个 reconcile.Requests 加入队列。

② 在 EventHandler 的处理中,事件所属的对象的类型(比如 Pod 的创建事件属于 Pod 对象),可能与 reconcile.Requests 所加入的对象类型相同。

③ 事件所属的对象的类型也可能与 reconcile.Requests 所加入的对象类型不同。例如 将 Pod 的事件映射为所属的 ReplicaSet 的 reconcile.Requests。

④ EventHandler 可能会将一个事件映射为多个 reconcile.Requests 并加入队列,多个 reconcile.Requests 可能属于一个对象类型,也可能涉及多个对象类型。例如,由于集群扩 展导致的 Node 事件。

⑤ 在大多数情况下,建议开发者使用 Controller-runtime 中已有的 EventHandler 来 实现,而不是自己实现此接口。

(9) Predicate:predicate.Predicate 是 Controller.Watch 的参数,是用于过滤事件的 过滤器,过滤器可以复用或者组合。

① Predicate 接口以事件作为输入,以布尔值作为输出,当返回 True 时,表示需要将 事件加入队列。

② Predicate 是可选的。

③ 建议开发者使用 Controller-runtime 中已有的 Predicate 实现,但可以使用其他 Predicate 进行过滤。


image.png

Controller-runtime 核心流程如下:

  • Source 通过 Kubernetes APIServer 监听指定资源对象
  • EventHandler 根据资源对象变化事件,将 reconcile.Request 加入队列
  • 从队列中获取 reconcile.Request,并调用 Reconciler 进行同步
image.png

2. Controller-runtime 底层原理

2.1 manager相关结构体介绍

Manager的方法


type Manager interface {
    cluster.Cluster                   //cluster.Cluster  提供了一系列方法,以获取与集群相关的对象。
    Add(Runnable) error               //添加controller
    Elected() <-chan struct{}         // 选举相关, 返回一个 Channel 结构,用于判断选举状态。当未配
置选举或当选 Leader 时,Channel 将被关闭。
    AddMetricsExtraHandler(path string, handler http.Handler) error     // metrics相关
    AddHealthzCheck(name string, check healthz.Checker) error           // 健康检查相关
    AddReadyzCheck(name string, check healthz.Checker) error            // 是否就绪
    Start(ctx context.Context) error                                    // 启动所有的controller
    GetWebhookServer() *webhook.Server                                  
    GetLogger() logr.Logger
    GetControllerOptions() v1alpha1.ControllerConfigurationSpec             
}

Manager启动时Options介绍。这里介绍几个关键的。

(1) Scheme 结构。一般先通过 k8s.io/apimachinery/pkg/runtime 中的 NewScheme() 方法获取 Kubernetes 的 Scheme,然后再将 CRD 注册到 Scheme

(2) MapperProvider 是一个函数对象,其定义为 func(c *rest.Config) (meta.RESTMapper,error),用于定义 Manager 如何获取 RESTMapper。默认通过 k8s.io/client-go 中的 DiscoveryClient 请求获取 Kube-APIServer。

(3) Logger 用于定义 Manager 的日志输出对象,默认使用 pkg/internal/log 包下的 全局参数 RuntimeLog。

(4)SyncPeriod 参数用于指定 Informer 重新同步并处理资源的时间间隔,默认为 10 小时。此参数也决定了 Controller 重新同步的时间间隔,每个 Controller 的时间间隔以此 参数为基准有 10% 的抖动,以避免多个 Controller 同时进行重新同步。

(5) Namespace 参数用于限制 Manager.Cache 只监听指定 Namespace 的资源,默认 情况下无限制。

(6) EventBroadcaster 参数用于提供 Manager,以获取 EventRecorder,当前已不 推荐使用,因为当Manager或Controller的生命周期短于EventBroadcaster的生命周期时, 可能会导致 goroutine 泄露。

// Options are the arguments for creating a new Manager.
type Options struct {
    // Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
    // Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better
    // idea to pass your own scheme in.  See the documentation in pkg/scheme for more information.
    Scheme *runtime.Scheme

    // MapperProvider provides the rest mapper used to map go types to Kubernetes APIs
    MapperProvider func(c *rest.Config) (meta.RESTMapper, error)

    SyncPeriod *time.Duration
    。。。
}


2.2 controller相关结构体介绍

接口

type Controller interface {
  // 匿名接口,定义了 Reconcile(context.Context,Request) (Result,error)
    reconcile.Reconciler

  // Watch() 方法会从 source.Source 中 获 取 Event, 并 根 据 参 数 Eventhandler 来 决 定 如 何 入 队, 根 据 参 数 Predicates 进行 Event 过滤,Preficates 可能有多个,只有所有的 Preficates 都返回True 时,才会将 Event 发送给 Eventhandler 处理。
    Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error

  // Controller 的启动方法,实现了 Controller 接 口的对象,也实现了 Runnable,因此,该方法可以被 Manager 管理。
    Start(ctx context.Context) error

    // 获取 Controller 内的 Logger,用于日志输出。
    GetLogger() logr.Logger
}

结构体实现

Controller 的实现在 pkg/internal/controller/controller.go 下,为结构体 Controller, Controller 结构体中包括的主要成员如下。 (1) Name string:必须设置,用于标识 Controller,会在 Controller 的日志输 出中进行关联。

(2) MaxConcurrentReconciles int:定义允许 reconcile.Reconciler 同时运行的最多个 数,默认为 1。

(3) Do reconcile.Reconciler:定义了 Reconcile() 方法,包含了 Controller 同步的业务 逻辑。Reconcile() 能在任意时刻被调用,接收一个对象的 Name 与 Namespace,并同步集 群当前实际状态至该对象被设置的期望状态。

(4) MakeQueue func() workqueue.RateLimitingInterface:用 于 在 Controller 启动时,创建工作队列。由于标准的 Kubernetes 工作队列创建后会立即启动,因此, 如果在 Controller 启动前就创建队列,在重复调用 controller.New() 方法创建 Controller 的情况下,就会导致 Goroutine 泄露。

(5) Queue workqueue.RateLimitingInterface:使用上面方法创建的工作队列。 (6) SetFields func(i interface{}) error:用 于 从 Manager 中 获 取 Controller 依 赖 的 方 法, 依 赖 包 括 Sourcess、EventHandlers 和 Predicates 等。 此 方 法 存 储 的 是 controllerManager.SetFields() 方法。 (7) Started Bool:用于表示 Controller 是否已经启动。 (8) CacheSyncTimeout time.Duration:定义了 Cache 完成同步的等待时长,超过时 长会被认为是同步失败。默认时长为 2 分钟。 (9) startWatches [ ]watchDescription:定 义 了 一 组 Watch 操 作 的 属 性, 会 在 Controller 启动时,根据属性进行 Watch 操作。watchDescription 的定义见代码清单 3-30,watchDescription 包 括 Event 的 源 source.Source、Event 的 入 队 方 法 handler. EventHandler 以及 Event 的过滤方法 predicate.Predicate。

// Controller implements controller.Controller.
type Controller struct {
    // Name is used to uniquely identify a Controller in tracing, logging and monitoring.  Name is required.
    Name string

    // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
    MaxConcurrentReconciles int

    // Reconciler is a function that can be called at any time with the Name / Namespace of an object and
    // ensures that the state of the system matches the state specified in the object.
    // Defaults to the DefaultReconcileFunc.
    Do reconcile.Reconciler

    // MakeQueue constructs the queue for this controller once the controller is ready to start.
    // This exists because the standard Kubernetes workqueues start themselves immediately, which
    // leads to goroutine leaks if something calls controller.New repeatedly.
    MakeQueue func() workqueue.RateLimitingInterface

    // Queue is an listeningQueue that listens for events from Informers and adds object keys to
    // the Queue for processing
    Queue workqueue.RateLimitingInterface

    // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
    // Deprecated: the caller should handle injected fields itself.
    SetFields func(i interface{}) error

    // mu is used to synchronize Controller setup
    mu sync.Mutex

    // Started is true if the Controller has been Started
    Started bool

    // ctx is the context that was passed to Start() and used when starting watches.
    //
    // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
    // while we usually always strive to follow best practices, we consider this a legacy case and it should
    // undergo a major refactoring and redesign to allow for context to not be stored in a struct.
    ctx context.Context

    // CacheSyncTimeout refers to the time limit set on waiting for cache to sync
    // Defaults to 2 minutes if not set.
    CacheSyncTimeout time.Duration

    // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
    startWatches []watchDescription

    // Log is used to log messages to users during reconciliation, or for example when a watch is started.
    Log logr.Logger

    // RecoverPanic indicates whether the panic caused by reconcile should be recovered.
    RecoverPanic bool
}

2.3 controller启动流程

controller跟随manager.start而启动。然后根据下面的流程运行。在c.Do.Reconcile函数中调用了我们实现的Reconcile函数进行真正的控制器逻辑处理。

image.png

2.4 manager是如何启动controller的

2.4.1 第一步-manager的初始化

一般在main函数就调用ctrl.NewManager函数进行初始化。ctrl.NewManager函数有2个参数,第一个参数就是k8s集群的*rest.Config, 第二个就是Options。就是manger结构体介绍的参数,比如可以自定义SyncPeriod等等。

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:                 scheme,
        MetricsBindAddress:     metricsAddr,
        Port:                   9443,
        HealthProbeBindAddress: probeAddr,
        LeaderElection:         enableLeaderElection,
        LeaderElectionID:       "ec7e1f70.github.com",
        // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
        // when the Manager ends. This requires the binary to immediately end when the
        // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
        // speeds up voluntary leader transitions as the new leader don't have to wait
        // LeaseDuration time first.
        //
        // In the default scaffold provided, the program ends immediately after
        // the manager stops, so would be fine to enable this option. However,
        // if you are doing or is intended to do any operation such as perform cleanups
        // after the manager stops then its usage might be unsafe.
        // LeaderElectionReleaseOnCancel: true,
    })


ctrl.NewManager实际就是初始化这个结构体,有了这个结构体就可以和k8s集群打交道了。

return &controllerManager{
        stopProcedureEngaged:          pointer.Int64(0),
        cluster:                       cluster,
        runnables:                     runnables,
        errChan:                       errChan,
        recorderProvider:              recorderProvider,
        resourceLock:                  resourceLock,
        metricsListener:               metricsListener,
        metricsExtraHandlers:          metricsExtraHandlers,
        controllerOptions:             options.Controller,
        logger:                        options.Logger,
        elected:                       make(chan struct{}),
        port:                          options.Port,
        host:                          options.Host,
        certDir:                       options.CertDir,
        webhookServer:                 options.WebhookServer,
        leaseDuration:                 *options.LeaseDuration,
        renewDeadline:                 *options.RenewDeadline,
        retryPeriod:                   *options.RetryPeriod,
        healthProbeListener:           healthProbeListener,
        readinessEndpointName:         options.ReadinessEndpointName,
        livenessEndpointName:          options.LivenessEndpointName,
        gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,
        internalProceduresStop:        make(chan struct{}),
        leaderElectionStopped:         make(chan struct{}),
        leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
    }, nil
2.4.2 第二步-将controller绑定到manager

这一步需要调用SetupWithManager函数,这个是每个controller自己实现的。最简单就是使用通用的方法。

// SetupWithManager sets up the controller with the Manager.
func (r *PodCountReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&zouxappv1.PodCount{}).
        Complete(r)
}

详细地说,创建 Controller 基本分为 3 步。

第一步,通过 ControllerManagedBy(m manager.Manager) *Builder 方法实例化一个 Builder 对象,其中传入的 Manager 提供创建 Controller 所需的依赖。

这步骤的意思是,我定义了一个builder,绑定了manager

// Builder builds a Controller.
type Builder struct {
    forInput         ForInput
    ownsInput        []OwnsInput
    watchesInput     []WatchesInput
    mgr              manager.Manager
    globalPredicates []predicate.Predicate
    ctrl             controller.Controller
    ctrlOptions      controller.Options
    name             string
}

// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
func ControllerManagedBy(m manager.Manager) *Builder {
    return &Builder{mgr: m}
}

第二步,使用 For(object client.Object,opts ...ForOption)方法设置需要监听的资源 类型。

实际就是完善Builder的forInput结构体。

注意:这里就相对于调用了Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).

如果想一个controller监听多个对象,或者想实现自己的监听逻辑,比如不想监听删除操作,执行监听特定的update操作。就需要自己NewController来实现了。

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
// update events by *reconciling the object*.
// This is the equivalent of calling
// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
    if blder.forInput.object != nil {
        blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
        return blder
    }
    input := ForInput{object: object}
    for _, opt := range opts {
        opt.ApplyToFor(&input)
    }

    blder.forInput = input
    return blder
}

第三步,使用Complete函数将controller绑定到manager

// Complete builds the Application Controller.
func (blder *Builder) Complete(r reconcile.Reconciler) error {
    _, err := blder.Build(r)
    return err
}

总结: controller-runtime实际是通过builder这个对象,将mgr和controller绑定。

2.4.3 第三步-启动manager.start

controllerManager.start会依次启动serveMetrics,serveHealthProbes,Webhooks,Caches,startLeaderElectionRunnables

这里就是关注如何启动每个controller的。manager.start->startLeaderElectionRunnables->cm.runnables.LeaderElection.Start -> go r.reconcile() -> fou循环go routinue启动每个controller

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        setupLog.Error(err, "problem running manager")
        os.Exit(1)
    }


// Start starts the manager and waits indefinitely.
// There is only two ways to have start return:
// An error has occurred during in one of the internal operations,
// such as leader election, cache start, webhooks, and so on.
// Or, the context is cancelled.
func (cm *controllerManager) Start(ctx context.Context) (err error) {
    cm.Lock()
    if cm.started {
        cm.Unlock()
        return errors.New("manager already started")
    }
    var ready bool
    defer func() {
        // Only unlock the manager if we haven't reached
        // the internal readiness condition.
        if !ready {
            cm.Unlock()
        }
    }()

    // Initialize the internal context.
    cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

    // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
    stopComplete := make(chan struct{})
    defer close(stopComplete)
    // This must be deferred after closing stopComplete, otherwise we deadlock.
    defer func() {
        // https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg
        stopErr := cm.engageStopProcedure(stopComplete)
        if stopErr != nil {
            if err != nil {
                // Utilerrors.Aggregate allows to use errors.Is for all contained errors
                // whereas fmt.Errorf allows wrapping at most one error which means the
                // other one can not be found anymore.
                err = kerrors.NewAggregate([]error{err, stopErr})
            } else {
                err = stopErr
            }
        }
    }()

    // Add the cluster runnable.
    if err := cm.add(cm.cluster); err != nil {
        return fmt.Errorf("failed to add cluster to runnables: %w", err)
    }

    // Metrics should be served whether the controller is leader or not.
    // (If we don't serve metrics for non-leaders, prometheus will still scrape
    // the pod but will get a connection refused).
    if cm.metricsListener != nil {
        cm.serveMetrics()
    }

    // Serve health probes.
    if cm.healthProbeListener != nil {
        cm.serveHealthProbes()
    }

    // First start any webhook servers, which includes conversion, validation, and defaulting
    // webhooks that are registered.
    //
    // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
    // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
    // to never start because no cache can be populated.
    if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
        if err != wait.ErrWaitTimeout {
            return err
        }
    }

    // Start and wait for caches.
    if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
        if err != wait.ErrWaitTimeout {
            return err
        }
    }

    // Start the non-leaderelection Runnables after the cache has synced.
    if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
        if err != wait.ErrWaitTimeout {
            return err
        }
    }

    // Start the leader election and all required runnables.
    {
        ctx, cancel := context.WithCancel(context.Background())
        cm.leaderElectionCancel = cancel
        go func() {
            if cm.resourceLock != nil {
                if err := cm.startLeaderElection(ctx); err != nil {
                    cm.errChan <- err
                }
            } else {
               // 启动每个controller
                // Treat not having leader election enabled the same as being elected.
                if err := cm.startLeaderElectionRunnables(); err != nil {
                    cm.errChan <- err
                }
                close(cm.elected)
            }
        }()
    }

    ready = true
    cm.Unlock()
    select {
    case <-ctx.Done():
        // We are done
        return nil
    case err := <-cm.errChan:
        // Error starting or running a runnable
        return err
    }
}


最终Controller就像内置的控制器一样,通过processNextWorkItem函数一个个处理。主要这里还可以通过MaxConcurrentReconciles提高并发。

// Start implements controller.Controller.
func (c *Controller) Start(ctx context.Context) error {
    // use an IIFE to get proper lock handling
    // but lock outside to get proper handling of the queue shutdown
    c.mu.Lock()
    if c.Started {
        return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
    }

    c.initMetrics()

    // Set the internal context.
    c.ctx = ctx

    c.Queue = c.MakeQueue()
    go func() {
        <-ctx.Done()
        c.Queue.ShutDown()
    }()

    wg := &sync.WaitGroup{}
    err := func() error {
        defer c.mu.Unlock()

        // TODO(pwittrock): Reconsider HandleCrash
        defer utilruntime.HandleCrash()

        // NB(directxman12): launch the sources *before* trying to wait for the
        // caches to sync so that they have a chance to register their intendeded
        // caches.
        for _, watch := range c.startWatches {
            c.Log.Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

            if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
                return err
            }
        }

        // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
        c.Log.Info("Starting Controller")

        for _, watch := range c.startWatches {
            syncingSource, ok := watch.src.(source.SyncingSource)
            if !ok {
                continue
            }

            if err := func() error {
                // use a context with timeout for launching sources and syncing caches.
                sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
                defer cancel()

                // WaitForSync waits for a definitive timeout, and returns if there
                // is an error or a timeout
                if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
                    err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
                    c.Log.Error(err, "Could not wait for Cache to sync")
                    return err
                }

                return nil
            }(); err != nil {
                return err
            }
        }

        // All the watches have been started, we can reset the local slice.
        //
        // We should never hold watches more than necessary, each watch source can hold a backing cache,
        // which won't be garbage collected if we hold a reference to it.
        c.startWatches = nil

        // Launch workers to process resources
        c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
        wg.Add(c.MaxConcurrentReconciles)
        for i := 0; i < c.MaxConcurrentReconciles; i++ {
            go func() {
                defer wg.Done()
                // Run a worker thread that just dequeues items, processes them, and marks them done.
                // It enforces that the reconcileHandler is never invoked concurrently with the same object.
                for c.processNextWorkItem(ctx) {
                }
            }()
        }

        c.Started = true
        return nil
    }()
    if err != nil {
        return err
    }

    <-ctx.Done()
    c.Log.Info("Shutdown signal received, waiting for all workers to finish")
    wg.Wait()
    c.Log.Info("All workers finished")
    return nil
}

2.5 runtime cache

2.5.1 cache是什么

Cache 接口定义了如下两个接口:

(1)client.Reader:用于从 Cache 中获取及列举 Kubernetes 集群的资源。

(2)Informers:可为不同的 GVK 创建或获取对应的 Informer,并将 Index 添加到对 应的 Informer 中。

Kubernetes 是典型的 Server-Client 的架构,APIServer 作为集群统一的操作入口,任 何对资源所做的操作(包括增删改查)都必须经过 APIServer。为了减轻 APIServer 的压力, Controller-runtime 抽象出一个 Cache 层,Client 端对 APIServer 数据的读取和监听操作都 将通过 Cache 层来进行。

// Cache knows how to load Kubernetes objects, fetch informers to request
// to receive events for Kubernetes objects (at a low-level),
// and add indices to fields on the objects stored in the cache.
type Cache interface {
    // Cache acts as a client to objects stored in the cache.
    client.Reader

    // Cache loads informers and adds field indices.
    Informers
}


2.5.2 cache初始化逻辑

在new Manager的时候就初始化了缓存,具体的步骤是 New-> cluster.New ->

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
    // Set default values for options fields
    options = setOptionsDefaults(options)

    cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
        clusterOptions.Scheme = options.Scheme
        clusterOptions.MapperProvider = options.MapperProvider
        clusterOptions.Logger = options.Logger
        clusterOptions.SyncPeriod = options.SyncPeriod
        clusterOptions.Namespace = options.Namespace
        clusterOptions.NewCache = options.NewCache
        clusterOptions.NewClient = options.NewClient
        clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
        clusterOptions.DryRunClient = options.DryRunClient
        clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
    })


options.NewCache初始化cache
// Create the cache for the cached read client and registering informers
    cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
    if err != nil {
        return nil, err
    }
    
// Allow newCache to be mocked
    if options.NewCache == nil {
        options.NewCache = cache.New
    }

在 Controller Manager 的初始化启动过程中,将会构建 Cache 层,以供 Manager 使 用。在用户没有指定 Cache 初始化函数的前提下,将使用 Controller-runtime 默认提供的 Cache 初始化函数,默认 Cache 初始化的流程如下:

// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
   opts, err := defaultOpts(config, opts)
   if err != nil {
      return nil, err
   }
   selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.DefaultSelector, opts.Scheme)
   if err != nil {
      return nil, err
   }
   disableDeepCopyByGVK, err := convertToDisableDeepCopyByGVK(opts.UnsafeDisableDeepCopyByObject, opts.Scheme)
   if err != nil {
      return nil, err
   }
   im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK)
   return &informerCache{InformersMap: im}, nil
}

(1) 设 置 默 认 参 数:若 Scheme 为 空, 则 设 置 为 scheme.Scheme ;若 Mapper 为 空, 则 通 过 apiutil.NewDiscoveryRESTMapper 基 于 Discovery 的 信 息 构 建 出 一 个 RESTMapper,用于管理所有 Object 的信息;若同步时间为空,则将 Informer 的同步时 间设置为 10 小时。

(2)初始化 InformersMap,为 3 种不同类型的 Object(structured、unstructured、 metadata-only)分别构建 InformersMap。

(3)初始化 specificInformersMap:该接口通过 Object 与 GVK 的组合信息创建并缓 存 Informers。

(4)定义 List-Watch 函数:为 3 种不同类型的 Object 实现 List-Watch 函数,通过 该函数可对 GVK 进行 List 和 Watch 操作。 通过 Cache 的初始化流程,我们可以看出 Cache 主要创建了 InformersMap,Scheme 中的每个 GVK 都会创建对应的 Informer,再通过 informersByGVK 的 Map,实现 GVK 到 Informer的映射;每个Informer都会通过List-Watch函数对相应的GVK进行List和Watch操作。

image.png

Cache 启动的核心是启动创建的所有 Informer

// Start calls Run on each of the informers and sets started to true.  Blocks on the context.
func (m *InformersMap) Start(ctx context.Context) error {
   go m.structured.Start(ctx)
   go m.unstructured.Start(ctx)
   go m.metadata.Start(ctx)
   <-ctx.Done()
   return nil
}


// Start calls Run on each of the informers and sets started to true.  Blocks on the context.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *specificInformersMap) Start(ctx context.Context) {
    func() {
        ip.mu.Lock()
        defer ip.mu.Unlock()

        // Set the stop channel so it can be passed to informers that are added later
        ip.stop = ctx.Done()

        // Start each informer
        for _, informer := range ip.informersByGVK {
            go informer.Informer.Run(ctx.Done())
        }

        // Set started to true so we immediately start any informers added later.
        ip.started = true
        close(ip.startWait)
    }()
    <-ctx.Done()
}

Informer 的启动流程主要包含以下 3 个步骤:

(1)初始化 Delta FIFO 队列。

(2)创建内部 Controller:配置 Delta FIFO 队列和事件的处理函数。

(3)启动 Controller:创建 Reflector,负责监听 APIServer 上指定的 GVK,将 Add、 Update、Delete 变更事件写入 Delta FIFO 队列中,作为变更事件的生产者;Controller 中 的事件处理函数 HandleDeltas() 会消费这些变更事件,负责将更新写入本地 Indexer,同 时将这些 Add、Update、Delete 事件分发给之前注册的监听器。

3.总结

controller-runtime其实就是利用client-go informer那套,底层是创建shareIndexInformer。

controller-runtime通过屏蔽底层细节,让crd operator的实现非常简单。梳理一下,整理的工作流程如下所示:

image.png

4. 参考

云原生应用开发:Operator原理与实践

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,165评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,503评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,295评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,589评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,439评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,342评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,749评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,397评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,700评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,740评论 2 313
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,523评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,364评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,755评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,024评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,297评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,721评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,918评论 2 336

推荐阅读更多精彩内容