在Kubernetes 子节点,在每个Node节点上都会启动一个kubelet服务进程。该进程用于处理Master节点下发到本节点的任务,管理Pod及Pod中的容器。每个Kubelet进程会在APIServer上注册节点自身信息,定期向Master节点汇报节点资源的使用情况,并通过cAdvise监控容器和节点资源,每个Node节点都会启动一个kubelet进程。用于处理Master节点下发到的任务,管理Pod以及Pod中的容器。每个Kubelet进程会向APIServer注册节点信息,定期向Master节点汇报资源的使用情况,并通过cAdvise监控容器和节点资源。
--node-sync-period: 设置同步的时间周期
--register-node 参数设置为 true(默认为 true),kubelet 会向 apiserver 注册自己
--kubeconfig: 登陆 apiserver 所需凭据/证书的目录
--api-servers: apiserver 地址
Kubelet组件运行在Node节点上,维持运行中的Pods以及提供kuberntes运行时环境,主要完成以下使命:
1.监视分配给该Node节点的pods
2.挂载pod所需要的volumes
3.下载pod的secret
4.通过docker/rkt来运行pod中的容器
5.周期的执行pod中为容器定义的liveness探针
6.上报pod的状态给系统的其他组件
7.上报Node的状态
整个kubelet可以按照上图所示的模块进行划分,模块之间相互配合完成Kubelet的所有功能.下面对上图中的模块进行简要的介绍.
Kubelet对外暴露的端口,通过该端口可以获取到kubelet的状态
10250 kubelet API –kublet暴露出来的端口,通过该端口可以访问获取node资源以及状态,另外可以配合kubelet的启动参数contention-profiling enable-debugging-handlers来提供了用于调试和profiling的api
4194 cAdvisor –kublet通过该端口可以获取到本node节点的环境信息以及node上运行的容器的状态等内容
10255 readonly API –kubelet暴露出来的只读端口,访问该端口不需要认证和鉴权,该http server提供查询资源以及状态的能力.注册的消息处理函数定义src/k8s.io/kubernetes/pkg/kubelet/server/server.go:149
10248 /healthz –kubelet健康检查,通过访问该url可以判断Kubelet是否正常work, 通过kubelet的启动参数–healthz-port –healthz-bind-address来指定监听的地址和端口.默认值定义在、kubernetes-master/pkg/kubelet/apis/config/v1beta1/defaults.go
核心功能模块
-
PLEG
PLEG全称为PodLifecycleEvent,PLEG会一直调用container runtime获取本节点的pods,之后比较本模块中之前缓存的pods信息,比较最新的pods中的容器的状态是否发生改变,当状态发生切换的时候,生成一个eventRecord事件,输出到eventChannel中. syncPod模块会接收到eventChannel中的event事件,来触发pod同步处理过程,调用container runtime来重建pod,保证pod工作正常.
cAdvisor
cAdvisor集成在kubelet中,起到收集本Node的节点和启动的容器的监控的信息,启动一个Http Server服务器,对外接收rest api请求.cAvisor模块对外提供了interface接口,可以通过interface接口获取到node节点信息,本地文件系统的状态等信息,该接口被imageManager,OOMWatcher,containerManager等所使用
cAdvisor相关的内容详细可参考github.com/google/cadvisorGPUManager
对于Node上可使用的GPU的管理,当前版本需要在kubelet启动参数中指定feature-gates中添加Accelerators=true,并且需要才用runtime=Docker的情况下才能支持使用GPU,并且当前只支持NvidiaGPU,GPUManager主要需要实现interface定义的Start()/Capacity()/AllocateGPU()三个函数。默认情况下,kubelet 使用 CFS 配额 来执行 Pod 的 CPU 约束。 当节点上运行了很多 CPU 密集的 Pod 时,工作负载可能会迁移到不同的 CPU 核, 这取决于调度时 Pod 是否被扼制,以及哪些 CPU 核是可用的。 许多工作负载对这种迁移不敏感,因此无需任何干预即可正常工作。
然而,有些工作负载的性能明显地受到 CPU 缓存亲和性以及调度延迟的影响。 对此,kubelet 提供了可选的 CPU 管理策略,来确定节点上的一些分配偏好。
CPU 管理策略通过 kubelet 参数 --cpu-manager-policy
来指定。支持两种策略:
-
none
: 默认策略,表示现有的调度行为。 -
static
: 允许为节点上具有某些资源特征的 pod 赋予增强的 CPU 亲和性和独占性。
CPU 管理器定期通过 CRI 写入资源更新,以保证内存中 CPU 分配与 cgroupfs 一致。 同步频率通过新增的 Kubelet 配置参数 --cpu-manager-reconcile-period
来设置。 如果不指定,默认与 --node-status-update-frequency
的周期相同。
static
策略针对具有整数型 CPUrequests
的Guaranteed
Pod ,它允许该类 Pod 中的容器访问节点上的独占 CPU 资源。这种独占性是使用cpuset cgroup 控制器来实现的。
该策略管理一个共享 CPU 资源池,最初,该资源池包含节点上所有的 CPU 资源。可用 的独占性 CPU 资源数量等于节点的 CPU 总量减去通过--kube-reserved
或--system-reserved
参数保留的 CPU 。从1.17版本开始,CPU保留列表可以通过 kublet 的 '--reserved-cpus' 参数显式地设置。 通过 '--reserved-cpus' 指定的显式CPU列表优先于使用 '--kube-reserved' 和 '--system-reserved' 参数指定的保留CPU。 通过这些参数预留的 CPU 是以整数方式,按物理内 核 ID 升序从初始共享池获取的。 共享池是BestEffort
和Burstable
pod 运行 的 CPU 集合。Guaranteed
pod 中的容器,如果声明了非整数值的 CPUrequests
,也将运行在共享池的 CPU 上。只有Guaranteed
pod 中,指定了整数型 CPUrequests
的容器,才会被分配独占 CPU 资源。
当Guaranteed
Pod 调度到节点上时,如果其容器符合静态分配要求, 相应的 CPU 会被从共享池中移除,并放置到容器的 cpuset 中。 因为这些容器所使用的 CPU 受到调度域本身的限制,所以不需要使用 CFS 配额来进行 CPU 的绑定。 换言之,容器 cpuset 中的 CPU 数量与 Pod 规约中指定的整数型 CPUlimit
相等。 这种静态分配增强了 CPU 亲和性,减少了 CPU 密集的工作负载在节流时引起的上下文切换。
OOMWatcher
系统OOM的监听器,将会与cadvisor模块之间建立SystemOOM,通过Watch方式从cadvisor那里收到的OOM信号,并发生相关事件ProbeManager
探针管理,依赖于statusManager,livenessManager,containerRefManager,实现Pod的健康检查的功能ProberManager 实现对容器的健康检查。目前有三种 probe 探针:
- liveness: 让Kubernetes知道你的应用程序是否健康,如果你的应用程序不健康,Kubernetes将删除Pod并启动一个新的替换它(与RestartPolicy有关)。Liveness 探测可以告诉 Kubernetes 什么时候通过重启容器实现自愈。
- readiness: readiness与liveness原理相同,不过Readiness探针是告诉 Kubernetes 什么时候可以将容器加入到 Service 负载均衡中,对外提供服务。
- startupProbe:1.16开始支持的新特性,检测慢启动容器的状态。
Kubelet 定期调用容器中的探针来诊断容器的健康状况。 包含如下三种实现方式:
- ExecAction:在容器内部执行一个命令,如果该命令的退出状态码为 0,则表明容器健康;
- TCPSocketAction:通过容器的 IP 地址和端口号执行 TCP 检查,如果端口能被访问,则表明容器健康;
- HTTPGetAction:通过容器的 IP 地址和端口号及路径调用 HTTP GET 方法,如果响应的状态码大于等于 200 且小于 400,则认为容器状态健康。
StatusManager
StatusManager 的主要功能是将 pod 状态信息同步到 apiserver,statusManage 并不会主动监控 pod 的状态,而是提供接口供其他 manager 进行调用。比如 probeManager。probeManager 会定时去监控 pod 中容器的健康状况,一旦发现状态发生变化,就调用 statusManager 提供的方法更新 pod 的状态。Container/RefManager
容器引用的管理,相对简单的Manager,通过定义map来实现了containerID与v1.ObjectReferece容器引用的映射.EvictionManager
evictManager当node的节点资源不足的时候,达到了配置的evict的策略,将会从node上驱赶pod,来保证node节点的稳定性.可以通过kubelet启动参数来决定evict的策略.另外当node的内存以及disk资源达到evict的策略的时候会生成对应的node状态记录.ImageGC
imageGC负责Node节点的镜像回收,当本地的存放镜像的本地磁盘空间达到某阈值的时候,会触发镜像的回收,删除掉不被pod所使用的镜像.回收镜像的阈值可以通过kubelet的启动参数来设置.
垃圾回收是 kubelet 的一个有用功能,它将清理未使用的镜像和容器。,每五分钟对镜像执行一次垃圾回收。
不建议使用外部垃圾收集工具,因为这些工具可能会删除原本期望存在的容器进而破坏 kubelet 的行为。ContainerGC
containerGC负责NOde节点上的dead状态的container,自动清理掉node上残留的容器.具体的GC操作由runtime来实。 Kubelet 将每分钟对容器执行一次垃圾回收ImageManager
调用kubecontainer.ImageService提供的PullImage/GetImageRef/ListImages/RemoveImage/ImageStates的方法来保证pod运行所需要的镜像,主要是为了kubelet支持cni.VolumeManager
负责node节点上pod所使用的volume的管理.主要涉及如下功能
volumeManager通过actualStateOfWorld和desiredStateOfWorld来表明当前的volume挂载状态和期望的volume挂载状态。然后由desiredStateOfWorldPopulator维护desireedStateOfWorld和podManager的一致性;由reconcile维护actualStateOfWorld和desiredStateOfWorld的一致性及磁盘volume挂载和actualStateOfWorld的一致性。通过这些机制,volumeManager完成了volume挂载生命周期的管理。
Volume状态的同步,模块中会启动gorountine去获取当前node上volume的状态信息以及期望的volume的状态信息,会去周期性的sync volume的状态,另外volume与pod的生命周期关联,pod的创建删除过程中volume的attach/detach流程.更重要的是kubernetes支持多种存储的插件,kubelet如何调用这些存储插件提供的interface.涉及的内容较多,更加详细的信息可以看kubernetes中volume相关的代码和文档.-
containerManager
负责node节点上运行的容器的cgroup配置信息,kubelet启动参数如果指定–cgroupPerQos的时候,kubelet会启动gorountie来周期性的更新pod的cgroup信息,维持其正确.实现了pod的Guaranteed/BestEffort/Burstable三种级别的Qos,通过配置kubelet可以有效的保证了当有大量pod在node上运行时,保证node节点的稳定性.该模块中涉及的struct主要包括
runtimeManager
containerRuntime负责kubelet与不同的runtime实现进行对接,实现对于底层container的操作,初始化之后得到的runtime实例将会被之前描述的组件所使用.
当前可以通过kubelet的启动参数–container-runtime来定义是使用docker还是rkt.runtime需要实现的接口定义在src/k8s.io/kubernetes/pkg/kubelet/apis/cri/services.go文件里面-
podManager
podManager提供了接口来存储和访问pod的信息,维持static pod和mirror pods的关系,提供的接口如下所示
跟其他Manager之间的关系,podManager会被statusManager/volumeManager/runtimeManager所调用,并且podManager的接口处理流程里面同样会调用secretManager以及configMapManager.
上面所说的模块可能只是kubelet所有模块中的一部分,更多的需要大家一起看探索.
工作原理
如上面所说,Kubelet 的工作主要是围绕一个 SyncLoop 来展开,借助 go channel,各组件监听 loop 消费事件,或者往里面生产 pod 相关的事件,整个控制循环由事件驱动运行。可以用下图来表示:
- 用户从http,静态文件以及APIServer对pod的修改通过PodConfigchannel传递到syncLoop;
- 另外一方面,PLEG会周期(默认1s)通过relist从CRI获取所有pod当前状态并且跟之前状态对比产生Pod的event发送到syncLoop;
- syncLoop的syncLoopIteration从各种chan中取出update的内容,一方面会通过podManger里更新pod状态,另一方面会通过dispatchWork将更新内容通过PodWoker更新pod状态,调用的是syncPod这个接口(由Kubelet.syncPod实现);
- 而syncPod这里通过podStatusChannel 更新状态到statusManager, 再patch Status到APIServer;
- syncPod一方面通过containerManager更新non-runtime的信息,例如QoS,Cgroup信息;另外一方面通过CRI更新pod的状态;
流程分析
限于篇幅,以下所贴代码都是挑选一些重要代码列在下面。并不是源码重现。
NewKubeletCommand
kubelet通过cobra框架来处理启动命令和启动参数,从main函数进来直接跳到cobra的Run函数注册则可,大致做下面几件事情
- 初始化启动命令和参数
- 初始化FeatureGate
- 校验命令行参数
- 加载KubeletConfigFile并验证之,即“启动命令”一节中提到的config参数传入的文件
- 加载使用动态配置,如果有启用
- 构造kubeletServer及kubeletDeps
- 调用Run函数运行kubelet
代码位于/cmd/kubelet/app/server.go
func(cmd *cobra.Command, args []string) {
//1. 初始化启动命令和参数
if err := cleanFlagSet.Parse(args); err != nil {
}
//2. 初始化FeatureGate
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
}
//3. 校验命令行参数
if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
}
//4. 加载KubeletConfigFile并验证之
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
}
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
klog.Fatal(err)
}
////5. 加载使用动态配置的部分略
//6. 构造kubeletServer及kubeletDeps
// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
klog.Fatal(err)
}
//7. 调用Run函数运行kubelet
if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
klog.Fatal(err)
}
}
run函数
run函数主要为kubelet启动做一些环境检查,准备及校验操作
- 将当前的配置文件注册到 http server /configz URL 中
- 初始化各种客户端
- 初始化 auth,cgroupRoot,cadvisor,ContainerManager
- 为 kubelet 进程设置 oom 分数
- 初始化Runtime Server,设置CRI
- 调用 RunKubelet 方法执行后续的启动操作
- 启动 Healthz http server
代码位于 /cmd/kubelet/server/server.go
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
//1 将当前的配置文件注册到 http server /configz URL 中
err = initConfigz(&s.KubeletConfiguration)
//2 初始化各种客户端,主要是非standalone模式下会进入这个,否则会将所有客户端都置为空
switch {
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
}
//3 初始化 auth,cgroupRoot,cadvisor,ContainerManager
if kubeDeps.Auth == nil {
auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
}
nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
}
if kubeDeps.ContainerManager == nil {
kubeDeps.ContainerManager, err = cm.NewContainerManager(...)
}
//4. 为 kubelet 进程设置 oom 分数
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
}
//5. 初始化Runtime Server,设置CRI
err = kubelet.PreInitRuntimeService(...)
//6. 调用 RunKubelet 方法执行后续的启动操作
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
//7. 启动 Healthz http server
if s.HealthzPort > 0 {
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
}, 5*time.Second, wait.NeverStop)
}
}
RunKubelet
RunKubelet函数核心就两个
初始化kubelet对象
将kubelet及相关kubelet的api跑起来
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
k, err := createAndInitKubelet(...)
if runOnce {
}else{
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
}
}
createAndInitKubelet
createAndInitKubelet先构造出kubelet对象,NewMainKubelet的函数传入的参数也很多,函数里面包含了前文中“主要模块”的初始化操作。构造完毕后调用BirthCry方法往k8s发一个Starting kubelet.的event。然后就马上启动containerGC
func createAndInitKubelet(......) {
k, err = kubelet.NewMainKubelet(...)
k.BirthCry()
k.StartGarbageCollection()
return k, nil
}
startKubelet
startKubelet函数是通过调用kubelet的Run方法将kubelet跑起来,kubelet.Run包含了一部分“主要模块”中提及的manager的start方法调用,意味着kubelet的各个模块从此开始运行起来,此外还包括了kubelet的核心循环syncLoop在这里开始调用
运行了kubelet后,kubelet api、readonly API等server也在这里开始运行
func startKubelet(...) {
// start the kubelet
go k.Run(podCfg.Updates())
// start the kubelet server
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth,
enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling, kubeCfg.EnableSystemLogHandler)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
至此,kubelet运行起来了,开始执行它节点资源上报,Pod的启停及状态管理等工作。
启动流程的调用链
通过下面调用链大致回顾整个启动流程
main // cmd/kubelet/kubelet.go
|--NewKubeletCommand // cmd/kubelet/app/server.go
|--Run // cmd/kubelet/app/server.go
|--initForOS // cmd/kubelet/app/server.go
|--run // cmd/kubelet/app/server.go
|--initConfigz // cmd/kubelet/app/server.go
|--BuildAuth
|--cm.NodeAllocatableRoot
|--cadvisor.NewImageFsInfoProvider
|--NewContainerManager
|--ApplyOOMScoreAdj
|--PreInitRuntimeService
|--RunKubelet // cmd/kubelet/app/server.go
| |--k = createAndInitKubelet // cmd/kubelet/app/server.go
| | |--NewMainKubelet
| | | |--watch k8s Service
| | | |--watch k8s Node
| | | |--klet := &Kubelet{}
| | | |--init klet fields
| | |
| | |--k.BirthCry()
| | |--k.StartGarbageCollection()
| |
| |--startKubelet(k) // cmd/kubelet/app/server.go
| |--go k.Run() // -> pkg/kubelet/kubelet.go
| | |--go cloudResourceSyncManager.Run()
| | |--initializeModules
| | |--go volumeManager.Run()
| | |--go nodeLeaseController.Run()
| | |--initNetworkUtil() // setup iptables
| | |--go Until(PerformPodKillingWork, 1*time.Second, neverStop)
| | |--statusManager.Start()
| | |--runtimeClassManager.Start
| | |--pleg.Start()
| | |--syncLoop(updates, kl) // pkg/kubelet/kubelet.go
| |
| |--k.ListenAndServe
|
|--go http.ListenAndServe(healthz)