深入分析kubelet(2)——创建Pod
紧接着上一篇继续学习。上一篇讲到生产者,本篇将介绍消费者。
background
PodManager
k8s.io\kubernetes\pkg\kubelet\pod\pod_manager.go
// Manager stores and manages access to pods, maintaining the mappings
// between static pods and mirror pods.
//
// The kubelet discovers pod updates from 3 sources: file, http, and
// apiserver. Pods from non-apiserver sources are called static pods, and API
// server is not aware of the existence of static pods. In order to monitor
// the status of such pods, the kubelet creates a mirror pod for each static
// pod via the API server.
//
// A mirror pod has the same pod full name (name and namespace) as its static
// counterpart (albeit different metadata such as UID, etc). By leveraging the
// fact that the kubelet reports the pod status using the pod full name, the
// status of the mirror pod always reflects the actual status of the static
// pod. When a static pod gets deleted, the associated orphaned mirror pod
// will also be removed.
type Manager interface {
// GetPods returns the regular pods bound to the kubelet and their spec.
GetPods() []*v1.Pod
// GetPodByFullName returns the (non-mirror) pod that matches full name, as well as
// whether the pod was found.
GetPodByFullName(podFullName string) (*v1.Pod, bool)
// GetPodByName provides the (non-mirror) pod that matches namespace and
// name, as well as whether the pod was found.
GetPodByName(namespace, name string) (*v1.Pod, bool)
// GetPodByUID provides the (non-mirror) pod that matches pod UID, as well as
// whether the pod is found.
GetPodByUID(types.UID) (*v1.Pod, bool)
// GetPodByMirrorPod returns the static pod for the given mirror pod and
// whether it was known to the pod manger.
GetPodByMirrorPod(*v1.Pod) (*v1.Pod, bool)
// GetMirrorPodByPod returns the mirror pod for the given static pod and
// whether it was known to the pod manager.
GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
// GetPodsAndMirrorPods returns the both regular and mirror pods.
GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod)
// SetPods replaces the internal pods with the new pods.
// It is currently only used for testing.
SetPods(pods []*v1.Pod)
// AddPod adds the given pod to the manager.
AddPod(pod *v1.Pod)
// UpdatePod updates the given pod in the manager.
UpdatePod(pod *v1.Pod)
// DeletePod deletes the given pod from the manager. For mirror pods,
// this means deleting the mappings related to mirror pods. For non-
// mirror pods, this means deleting from indexes for all non-mirror pods.
DeletePod(pod *v1.Pod)
// DeleteOrphanedMirrorPods deletes all mirror pods which do not have
// associated static pods. This method sends deletion requests to the API
// server, but does NOT modify the internal pod storage in basicManager.
DeleteOrphanedMirrorPods()
// TranslatePodUID returns the actual UID of a pod. If the UID belongs to
// a mirror pod, returns the UID of its static pod. Otherwise, returns the
// original UID.
//
// All public-facing functions should perform this translation for UIDs
// because user may provide a mirror pod UID, which is not recognized by
// internal Kubelet functions.
TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
// GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
// UIDs and mirror pod UIDs to static pod UIDs.
GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
// IsMirrorPodOf returns true if mirrorPod is a correct representation of
// pod; false otherwise.
IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool
MirrorClient
}
上一篇介绍过static pod
,与之对应的概念是mirror pod
Pod status
参考k8s.io\api\core\v1\types.go
- Pending: Pending状态是指api-server已经接受了Pod的创建请求,但是有容器还没有启动。Pending包括Pod未被调度以及拉取镜像阶段。PodPending means the pod has been accepted by the system, but one or more of the containers has not been started. This includes time before being bound to a node, as well as time spent pulling images onto the host.
- Running: Running是指所有的容器都已经运行过了,其中至少一个容器正在运行或者正在重新运行。PodRunning means the pod has been bound to a node and all of the containers have been started. At least one container is still running or is in the process of being restarted.
- Succeeded: Succeeded表示所有容器都成功运行结束了,并且K8S不会重新启动这些容器。PodSucceeded means that all containers in the pod have voluntarily terminated with a container exit code of 0, and the system is not going to restart any of these containers.
- Failed: Failed表示所有容器都不运行了,并且至少一个容器异常退出。PodFailed means that all containers in the pod have terminated, and at least one container has terminated in a failure (exited with a non-zero exit code or was stopped by the system).
- Unknown: Unknown表示因为默写原因无法获知Pod状态,常见于Node失联。PodUnknown means that for some reason the state of the pod could not be obtained, typically due to an error in communicating with the host of the pod.
PodWorkers
k8s.io\kubernetes\pkg\kubelet\pod_workers.go
// PodWorkers is an abstract interface for testability.
type PodWorkers interface {
UpdatePod(options *UpdatePodOptions)
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
ForgetWorker(uid types.UID)
}
type podWorkers struct {
// Protects all per worker fields.
podLock sync.Mutex
// Tracks all running per-pod goroutines - per-pod goroutine will be
// processing updates received through its corresponding channel.
podUpdates map[types.UID]chan UpdatePodOptions
// Track the current state of per-pod goroutines.
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
isWorking map[types.UID]bool
// Tracks the last undelivered work item for this pod - a work item is
// undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions
workQueue queue.WorkQueue
// This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time.
syncPodFn syncPodFnType
// The EventRecorder to use
recorder record.EventRecorder
// backOffPeriod is the duration to back off when there is a sync error.
backOffPeriod time.Duration
// resyncInterval is the duration to wait until the next sync.
resyncInterval time.Duration
// podCache stores kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
}
code
k8s.io\kubernetes\pkg\kubelet\kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
kl.syncLoop(updates, kl)
}
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
// The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
// 2s
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
for {
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
可以看到,updates
贯穿整个过程,是一个非常重要的概念,所以上一篇整篇都在分析updates
的由来。这里我们重点关注syncLoopIteration
。
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. houseKeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// * configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
// containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
// These are pods restored from the checkpoint. Treat them as new
// pods.
handler.HandlePodAdditions(u.Pods)
}
}
return true
}
本文只关注syncLoopIteration
函数的configCh
分支,在可预见的未来,将分析其他分支。
这里只是简单地将Pods分给对应的handler处理。
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
for _, pod := range pods {
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
}
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {},
})
}
调用podWorkers.UpdatePod
执行操作。
k8s.io\kubernetes\pkg\kubelet\pod_workers.go
// Apply the new setting to the specified pod.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
}
}
给Pod创建一个goroutine
,并创建一个channel管理它。
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
for update := range podUpdates {
err := func() error {
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
return err
}()
}
}
这里其实就是回调了podWorkers.syncPodFn
方法。
k8s.io\kubernetes\pkg\kubelet\kubelet.go
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
}
500多行的函数,里面就有我们需要的答案klet.syncPod
。这是一个很复杂的函数,包括了所有Pod状态的处理过程,我们慢慢拆解。
// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
// start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
pod := o.pod
mirrorPod := o.mirrorPod
podStatus := o.podStatus
updateType := o.updateType
// if we want to kill a pod, do it now!
if updateType == kubetypes.SyncPodKill {
return kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride)
}
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
// Create Cgroups for the pod and apply resource parameters
// to them if cgroups-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// If pod has already been terminated then we need not create
// or update the pod's cgroup
if !kl.podIsTerminated(pod) {
// Create and Update pod's Cgroups
pcm.EnsureExists(pod)
}
// Create Mirror Pod for Static Pod if it doesn't already exist
if kubepod.IsStaticPod(pod) {
if mirrorPod == nil || deleted {
kl.podManager.CreateMirrorPod(pod)
}
}
// Make data directories for the pod
kl.makePodDataDirs(pod)
// Volume manager will not mount volumes for terminated pods
if !kl.podIsTerminated(pod) {
// Wait for volumes to attach/mount
kl.volumeManager.WaitForAttachAndMount(pod)
}
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
}
创建Pod过程:
如果需要kill,直接kill
给Pod创建
PodStatus
对象创建cgroups
如果是static pod,就创建mirror pod,方便通过
apiserver
查询 static pod,只能查询,其他操作都不可以创建数据目录,比如挂载目录
挂载目录
获取ImagePullSecrets
调用CRI创建Pod
k8s.io\kubernetes\pkg\kubelet\kuberuntime\kuberuntime_manager.go
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create init containers.
// 6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
}else {
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil)
}
}
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
// Step 5: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit)
}
// Step 6: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular)
}
}
这个函数的注释写得很赞,过程如下:
- 比较网络和容器变化
- 如果网络有变化,就把之前的容器删掉
- 创建容器网络
- 启动init容器
- 启动容器
启动init容器也是有说法的,全部逻辑都在computePodActions
里面,必须先按顺序将init容器全部启动之后,再启动容器,大概过程如下:
- 第一次只启动
pod.Spec.InitContainers[0]
,changes.ContainersToStart
为空 - 之后每次启动下一个init容器
- init容器启动完,启动容器
k8s.io\kubernetes\pkg\kubelet\kuberuntime\kuberuntime_container.go
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
// Step 2: create the container.
m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
// Step 3: start the container.
m.runtimeService.StartContainer(containerID)
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,sandboxMeta.Namespace)
m.osInterface.Symlink(containerLog, legacySymlink)
}
经过一步步抽丝剥茧,这里终于真相了,
- 拉镜像
- create 容器,runtimeService其实就是通过
grpc
调用CRI
- start 容器
- 给容器日志创建soft link,增加K8S相关信息,这里日志采集的时候就很有用,详见K8S Fluentd Mongo日志采集