k8s源码阅读(3) - kube-controller-manager

启动流程

文件cmd/kube-controller-manager/app/controllermanager.go

入口方法

  • func NewControllerManagerCommand() *cobra.Command
  • func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error
  • func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks)
    获取锁,获取到了之后有callback,开始StartController
  • func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
    unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error

    var controllerChecks []healthz.HealthChecker
    // 逐个启动controller, 生成健康检查
    for controllerName, initFn := range controllers {
        if !controllerCtx.IsControllerEnabled(controllerName) {
            klog.Warningf("%q is disabled", controllerName)
            continue
        }

        time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

        klog.V(1).Infof("Starting %q", controllerName)
        ctrl, started, err := initFn(ctx, controllerCtx)
        if err != nil {
            klog.Errorf("Error starting %q", controllerName)
            return err
        }
        if !started {
            klog.Warningf("Skipping %q", controllerName)
            continue
        }
        check := controllerhealthz.NamedPingChecker(controllerName)
        if ctrl != nil {
            // check if the controller supports and requests a debugHandler
            // and it needs the unsecuredMux to mount the handler onto.
            if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil {
                if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil {
                    basePath := "/debug/controllers/" + controllerName
                    unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
                    unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
                }
            }
            if healthCheckable, ok := ctrl.(controller.HealthCheckable); ok {
                if realCheck := healthCheckable.HealthChecker(); realCheck != nil {
                    check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
                }
            }
        }
        controllerChecks = append(controllerChecks, check)

        klog.Infof("Started %q", controllerName)
    }

    healthzHandler.AddHealthChecker(controllerChecks...)
  • func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
// 这一段定义了加载启动的controller列表
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["endpointslice"] = startEndpointSliceController
    controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefulset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["csrsigning"] = startCSRSigningController
    controllers["csrapproving"] = startCSRApprovingController
    controllers["csrcleaner"] = startCSRCleanerController
    controllers["ttl"] = startTTLController
    controllers["bootstrapsigner"] = startBootstrapSignerController
    controllers["tokencleaner"] = startTokenCleanerController
    controllers["nodeipam"] = startNodeIpamController
    controllers["nodelifecycle"] = startNodeLifecycleController
    if loopMode == IncludeCloudLoops {
        controllers["service"] = startServiceController
        controllers["route"] = startRouteController
        controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
        // TODO: volume controller into the IncludeCloudLoops only set.
    }
    controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
    controllers["attachdetach"] = startAttachDetachController
    controllers["persistentvolume-expander"] = startVolumeExpandController
    controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
    controllers["pvc-protection"] = startPVCProtectionController
    controllers["pv-protection"] = startPVProtectionController
    controllers["ttl-after-finished"] = startTTLAfterFinishedController
    controllers["root-ca-cert-publisher"] = startRootCACertPublisher
    controllers["ephemeral-volume"] = startEphemeralVolumeController
    if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
        utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
        controllers["storage-version-gc"] = startStorageVersionGCController
    }

NodeLifecycle控制器

文件cmd/kube-controller-manager/app/core.go

启动控制器

  • func startNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error)

文件pkg/controller/nodelifecycle/node_lifecycle_controller.go

新建控制器

  • func NewNodeLifecycleController(
    ctx context.Context,
    leaseInformer coordinformers.LeaseInformer,
    podInformer coreinformers.PodInformer,
    nodeInformer coreinformers.NodeInformer,
    daemonSetInformer appsv1informers.DaemonSetInformer,
    kubeClient clientset.Interface,
    nodeMonitorPeriod time.Duration,
    nodeStartupGracePeriod time.Duration,
    nodeMonitorGracePeriod time.Duration,
    podEvictionTimeout time.Duration,
    evictionLimiterQPS float32,
    secondaryEvictionLimiterQPS float32,
    largeClusterThreshold int32,
    unhealthyZoneThreshold float32,
    runTaintManager bool,
    ) (*Controller, error)
// 这里是初始化controller,增加一些client-go的消费事件以及出发函数、缓存等

运行NodeLifecycleController

  • func (nc *Controller) Run(ctx context.Context)
// 这里开始执行消费pod,node事件的处理协程
    ...
    if nc.runTaintManager {
        go nc.taintManager.Run(ctx)
    }

    // Close node update queue to cleanup go routine.
    defer nc.nodeUpdateQueue.ShutDown()
    defer nc.podUpdateQueue.ShutDown()

    // Start workers to reconcile labels and/or update NoSchedule taint for nodes.
    for i := 0; i < scheduler.UpdateWorkerSize; i++ {
        // Thanks to "workqueue", each worker just need to get item from queue, because
        // the item is flagged when got from queue: if new event come, the new item will
        // be re-queued until "Done", so no more than one worker handle the same item and
        // no event missed.
        go wait.UntilWithContext(ctx, nc.doNodeProcessingPassWorker, time.Second)
    }

    for i := 0; i < podUpdateWorkerSize; i++ {
        go wait.UntilWithContext(ctx, nc.doPodProcessingWorker, time.Second)
    }

    if nc.runTaintManager {
        // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
        // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
        go wait.UntilWithContext(ctx, nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod)
    } else {
        // Managing eviction of nodes:
        // When we delete pods off a node, if the node was not empty at the time we then
        // queue an eviction watcher. If we hit an error, retry deletion.
        go wait.UntilWithContext(ctx, nc.doEvictionPass, scheduler.NodeEvictionPeriod)
    }

    // Incorporate the results of node health signal pushed from kubelet to master.
    go wait.UntilWithContext(ctx, func(ctx context.Context) {
        if err := nc.monitorNodeHealth(ctx); err != nil {
            klog.Errorf("Error monitoring node health: %v", err)
        }
    }, nc.nodeMonitorPeriod)

    <-ctx.Done()
  • func (nc *Controller) doNodeProcessingPassWorker(ctx context.Context)
    node生命周期控制器的一个worker, 从nodeUpdateQueue里消费事件进行处理
    Tips: 队列workqueue经常使用AddRateLimited()这个方法,主要用处是将失败的任务重新放回队列并按照配置的重试次数进行重试
1. doNoScheduleTaintingPass
- 这里根据node的状态转换对应的污点,如果包含了不能调度的状态,则打上一个NoSchedule的污点,以下就是会被打禁止调度污点的状态匹配规则,匹配到了则打污点
    nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
        v1.NodeReady: {
            v1.ConditionFalse:   v1.TaintNodeNotReady,
            v1.ConditionUnknown: v1.TaintNodeUnreachable,
        },
        v1.NodeMemoryPressure: {
            v1.ConditionTrue: v1.TaintNodeMemoryPressure,
        },
        v1.NodeDiskPressure: {
            v1.ConditionTrue: v1.TaintNodeDiskPressure,
        },
        v1.NodeNetworkUnavailable: {
            v1.ConditionTrue: v1.TaintNodeNetworkUnavailable,
        },
        v1.NodePIDPressure: {
            v1.ConditionTrue: v1.TaintNodePIDPressure,
        },
    }

- node.Spec.Unschedulable字段被改为true(cordon一个node)    也会打上一个污点
- 对分析出应打的污点做一个分析出需要增加跟删除的污点,操作api进行更新
2. reconcileNodeLabels 
自动补os跟arch标签的,自动将kubernetes.io/os跟beta.kubernetes.io/os的标签互相补全,应该是为了兼容不同版本的kubelet还有调度器吧,并无其他作用.
  • func (nc *Controller) doPodProcessingWorker(ctx context.Context)
  • func (nc *Controller) processPod(ctx context.Context, podItem podUpdateItem)
    消费pod事件
控制器的nodeHealthData属性里缓存了node的健康状态,结构体对应的nodeHealthMap
- 消费pod事件,获取pod实例跟node的实例以及node的健康状态的deepcopy
- 如果开启了taint manager则会根据node的状态去平滑的执行evictPod,平滑的删除pod信息
- 根据node的状态,如果状态异常,将pod的状态标记为notready(更新pod的status),并产生event事件
  • func (nc *Controller) doNoExecuteTaintingPass(ctx context.Context)
    开启了taint manager
详细逻辑自行阅读代码
  • func (nc *Controller) doEvictionPass(ctx context.Context)
    未开启taint manager
详细逻辑自行阅读代码


以上内容以node生命周期控制器为例,讲解了整个kube-controller-manager的启动流程,协助大家根据自己的具体需求,快速定位修改源码.
文章未完结,看到哪补充到哪

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,932评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,554评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 145,894评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,442评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,347评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,899评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,325评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,980评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,196评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,163评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,085评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,826评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,389评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,501评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,753评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,171评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,616评论 2 339

推荐阅读更多精彩内容