Kubernetes garbage collector即垃圾收集器,存在于kube-controller-manger中,它负责回收kubernetes中的资源对象,监听资源对象事件,更新对象之间的依赖关系,并根据对象的删除策略来决定是否删除其关联对象。
关于删除关联对象,细一点说就是,使用级联删除策略去删除一个owner时,会连带这个owner对象的dependent对象也一起删除掉。
关于对象的关联依赖关系,garbage collector会监听资源对象事件,根据资源对象中ownerReference 的值,来构建对象间的关联依赖关系,也即owner与dependent之间的关系。
例子
以创建deployment对象为例进行讲解。创建deployment对象后,kube-controller-manager为其创建出replicaset对象,且自动将该deployment的信息设置到replicaset对象ownerReference值。如下面示例,即说明replicaset对象test-1-59d7f45ffb的owner为deployment对象test-1,deployment对象test-1的dependent为replicaset对象test-1-59d7f45ffb。
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-1
namespace: test
uid: 4973d370-3221-46a7-8d86-e145bf9ad0ce
...
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: test-1-59d7f45ffb
namespace: test
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: Deployment
name: test-1
uid: 4973d370-3221-46a7-8d86-e145bf9ad0ce
uid: 386c380b-490e-470b-a33f-7d5b0bf945fb
...
同理,replicaset对象创建后,kube-controller-manager为其创建出pod对象,这些pod对象也会将replicaset对象的信息设置到pod对象的ownerReference的值中,replicaset是pod的owner,pod是replicaset的dependent。
对象中ownerReference 的值,指定了owner与dependent之间的关系。
简介
GarbageCollector Controller源码主要分为以下几部分:
- monitors作为生产者将变化的资源放入graphChanges队列;同时restMapper定期检测集群内资源类型,刷新monitors
- runProcessGraphChanges从graphChanges队列中取出变化的item,根据情况放入attemptToDelete队列;
- runProcessGraphChanges从graphChanges队列中取出变化的item,根据情况放入attemptToOrphan队列;
- runAttemptToDeleteWorker从attemptToDelete队列取出,尝试删除垃圾资源;
- runAttemptToOrphanWorker从attemptToDelete队列取出,处理该孤立的资源;
架构
garbage collector中最关键的代码就是
garbagecollector.go
与graph_builder.go
两部分。
garbage collector的主要组成为1个图(对象关联依赖关系图)、2个处理器(GraphBuilder
与GarbageCollector
)、3个事件队列(graphChanges
、attemptToDelete
与attemptToOrphan
):
1个图
(1)uidToNode
:对象关联依赖关系图,由GraphBuilder
维护,维护着所有对象间的关联依赖关系。在该图里,每一个k8s对象会对应着关系图里的一个node
,而每个node
都会维护一个owner
列表以及dependent
列表。
示例:现有一个deployment A,replicaset B(owner为deployment A),pod C(owner为replicaset B),则对象关联依赖关系如下:
3个node,分别是A、B、C
A对应一个node,无owner,dependent列表里有B;
B对应一个node,owner列表里有A,dependent列表里有C;
C对应一个node,owner列表里有B,无dependent。
2个处理器
(1)GraphBuilder
:负责维护所有对象的关联依赖关系图,并产生事件触发GarbageCollector
执行对象回收删除操作。GraphBuilder
从graphChanges
事件队列中获取事件进行消费,根据资源对象中ownerReference
的值,来构建、更新、删除对象间的关联依赖关系图,也即owner
与dependent
之间的关系图,然后再作为生产者生产事件,放入attemptToDelete
或attemptToOrphan
队列中,触发GarbageCollector
执行,看是否需要进行关联对象的回收删除操作,而GarbageCollector
进行对象的回收删除操作时会依赖于uidToNode
这个关系图。
(2)GarbageCollector
:负责回收删除对象。GarbageCollector
作为消费者,从attemptToDelete
与attemptToOrphan
队列中取出事件进行处理,若一个对象被删除,且其删除策略为级联删除,则进行关联对象的回收删除。关于删除关联对象,细一点说就是,使用级联删除策略去删除一个owner
时,会连带这个owner
对象的dependent
对象也一起删除掉。
3个事件队列
-
graphChanges
:list/watch apiserver,获取事件,由informer
生产,由GraphBuilder
消费; -
attemptToDelete
:级联删除事件队列,由GraphBuilder
生产,由GarbageCollector
消费; -
attemptToOrphan
:孤儿删除事件队列,由GraphBuilder
生产,由GarbageCollector
消费。
参数
想要启用GC,需要在kube-apiserver和kube-controller-manager的启动参数中都设置--enable-garbage-collector为true,1.13.2版本中默认开启GC
cm组件启动参数中,与garbage collector相关的参数代码如下:
// cmd/kube-controller-manager/app/options/garbagecollectorcontroller.go
// AddFlags adds flags related to GarbageCollectorController for controller manager to the specified FlagSet.
func (o *GarbageCollectorControllerOptions) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.Int32Var(&o.ConcurrentGCSyncs, "concurrent-gc-syncs", o.ConcurrentGCSyncs, "The number of garbage collector workers that are allowed to sync concurrently.")
fs.BoolVar(&o.EnableGarbageCollector, "enable-garbage-collector", o.EnableGarbageCollector, "Enables the generic garbage collector. MUST be synced with the corresponding flag of the kube-apiserver.")
}
从代码中可以看到,kcm组件启动参数中有两个参数与garbage collector相关,分别是:
(1)enable-garbage-collector:是否开启garbage collector,默认值为true;
(2)concurrent-gc-syncs:garbage collector同步操作的worker数量,默认20。
garbage collector的源码分析将分成两部分进行,分别是:
- 启动分析;
- 核心处理逻辑分析。
启动
kube-controller-manager启动入口,app.NewControllerManagerCommand()中加载controller manager默认启动参数,创建* cobra.Command对象:
// cmd/kube-controller-manager/app/core.go
func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
return nil, false, nil
}
gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery())
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
metadataClient, err := metadata.NewForConfig(config)
if err != nil {
return nil, true, err
}
// Get an initial set of deletable resources to prime the garbage collector.
deletableResources := garbagecollector.GetDeletableResources(discoveryClient)
ignoredResources := make(map[schema.GroupResource]struct{})
for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
}
garbageCollector, err := garbagecollector.NewGarbageCollector(
metadataClient,
ctx.RESTMapper,
deletableResources,
ignoredResources,
ctx.ObjectOrMetadataInformerFactory,
ctx.InformersStarted,
)
if err != nil {
return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err)
}
// Start the garbage collector.
workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
go garbageCollector.Run(workers, ctx.Stop)
// Periodically refresh the RESTMapper with new discovery information and sync
// the garbage collector.
go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)
return garbagecollector.NewDebugHandler(garbageCollector), true, nil
}
startGarbageCollectorController函数主要逻辑如下:
- 根据EnableGarbageCollector变量的值来决定是否开启garbage collector,EnableGarbageCollector变量的值根据kcm组件启动参数--enable-garbage-collector配置获取,默认为true;不开启则直接返回,不会继续往下执行;
- 初始化discoveryClient,主要用来获取集群中的所有资源对象;
- 调用garbagecollector.GetDeletableResources,获取集群内garbage collector需要处理去删除回收的所有资源对象,支持delete, list, watch三种操作的资源对象称为 deletableResource;
- 调用garbagecollector.NewGarbageCollector初始化garbage collector;
- 调用garbageCollector.Run,启动garbage collector;garbageCollector.Run(workers, ctx.Stop)启动一个monitors用来监听资源对象的变化(对应的由runProcessGraphChanges循环处理),和默认20个deleteWorkers协程处理可删除的资源对象、20个orphanWorkers协程处理孤儿对象。
- garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop) 定时去获取一个集群内是否有新类型的资源对象的加入,并重新刷新monitors,以监听新类型的资源对象。
- 暴露http服务,注册 debug 接口,用于debug,用来提供由GraphBuilder构建的集群内所有对象的关联关系
下面对startGarbageCollectorController函数里的部分逻辑稍微展开一下分析。
garbagecollector.NewGarbageCollector
NewGarbageCollector函数负责初始化garbage collector。主要逻辑如下:
(1)初始化GarbageCollector结构体;
(2)初始化GraphBuilder结构体,并赋值给GarbageCollector结构体的dependencyGraphBuilder属性。
// pkg/controller/garbagecollector/garbagecollector.go
func NewGarbageCollector(
metadataClient metadata.Interface,
mapper resettableRESTMapper,
deletableResources map[schema.GroupVersionResource]struct{},
ignoredResources map[schema.GroupResource]struct{},
sharedInformers controller.InformerFactory,
informersStarted <-chan struct{},
) (*GarbageCollector, error) {
attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
absentOwnerCache := NewUIDCache(500)
gc := &GarbageCollector{
metadataClient: metadataClient,
restMapper: mapper,
attemptToDelete: attemptToDelete,
attemptToOrphan: attemptToOrphan,
absentOwnerCache: absentOwnerCache,
}
gb := &GraphBuilder{
metadataClient: metadataClient,
informersStarted: informersStarted,
restMapper: mapper,
graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
uidToNode: &concurrentUIDToNode{
uidToNode: make(map[types.UID]*node),
},
attemptToDelete: attemptToDelete,
attemptToOrphan: attemptToOrphan,
absentOwnerCache: absentOwnerCache,
sharedInformers: sharedInformers,
ignoredResources: ignoredResources,
}
if err := gb.syncMonitors(deletableResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync all monitors: %v", err))
}
gc.dependencyGraphBuilder = gb
return gc, nil
}
gb.syncMonitors
gb.syncMonitors的主要作用是调用gb.controllerFor对各个deletableResources(deletableResources指支持 “delete”, “list”, “watch” 三种操作的资源对象)资源对象的infomer做初始化,并为资源的变化事件注册eventHandler(AddFunc、UpdateFunc 和 DeleteFunc),对于资源的add、update、delete event,都会push到graphChanges队列中,然后gb.processGraphChanges会从graphChanges队列中取出event进行处理(后面介绍garbage collector处理逻辑的时候会做详细分析)。
// pkg/controller/garbagecollector/graph_builder.go
func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
toRemove := gb.monitors
if toRemove == nil {
toRemove = monitors{}
}
current := monitors{}
errs := []error{}
kept := 0
added := 0
for resource := range resources {
if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
continue
}
if m, ok := toRemove[resource]; ok {
current[resource] = m
delete(toRemove, resource)
kept++
continue
}
kind, err := gb.restMapper.KindFor(resource)
if err != nil {
errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
continue
}
c, s, err := gb.controllerFor(resource, kind)
if err != nil {
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
continue
}
current[resource] = &monitor{store: s, controller: c}
added++
}
gb.monitors = current
for _, monitor := range toRemove {
if monitor.stopCh != nil {
close(monitor.stopCh)
}
}
klog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
// NewAggregate returns nil if errs is 0-length
return utilerrors.NewAggregate(errs)
}
gb.controllerFor
gb.controllerFor主要是对资源对象的infomer做初始化,并为资源的变化事件注册eventHandler(AddFunc、UpdateFunc 和 DeleteFunc),对于资源的add、update、delete event,都会push到graphChanges队列中。
// pkg/controller/garbagecollector/graph_builder.go
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) {
event := &event{
eventType: addEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: check if there are differences in the ownerRefs,
// finalizers, and DeletionTimestamp; if not, ignore the update.
event := &event{
eventType: updateEvent,
obj: newObj,
oldObj: oldObj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
event := &event{
eventType: deleteEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
}
shared, err := gb.sharedInformers.ForResource(resource)
if err != nil {
klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
return nil, nil, err
}
klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
// need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
return shared.Informer().GetController(), shared.Informer().GetStore(), nil
}
garbageCollector.Run
garbageCollector.Run负责启动garbage collector,主要逻辑如下:
(1)调用gc.dependencyGraphBuilder.Run:启动GraphBuilder;
(2)根据启动参数配置的worker数量,起相应数量的goroutine,执行gc.runAttemptToDeleteWorker与gc.runAttemptToOrphanWorker,两者属于GarbageCollector的核心处理逻辑,都是去删除需要被回收对象,具体分析会在下篇博客里进行分析。
// pkg/controller/garbagecollector/garbagecollector.go
func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer gc.attemptToDelete.ShutDown()
defer gc.attemptToOrphan.ShutDown()
defer gc.dependencyGraphBuilder.graphChanges.ShutDown()
klog.Infof("Starting garbage collector controller")
defer klog.Infof("Shutting down garbage collector controller")
go gc.dependencyGraphBuilder.Run(stopCh)
if !cache.WaitForNamedCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
return
}
klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")
// gc workers
for i := 0; i < workers; i++ {
go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
}
<-stopCh
}
gc.dependencyGraphBuilder.Run
gc.dependencyGraphBuilder.Run负责启动启动GraphBuilder,主要逻辑如下:
(1)调用gb.startMonitors,启动前面1.1 gb.syncMonitors中提到的infomers;
(2)每隔1s循环调用gb.runProcessGraphChanges,做GraphBuilder的核心逻辑处理,核心处理逻辑会在下篇博客里进行分析。
// pkg/controller/garbagecollector/graph_builder.go
func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
klog.Infof("GraphBuilder running")
defer klog.Infof("GraphBuilder stopping")
// Set up the stop channel.
gb.monitorLock.Lock()
gb.stopCh = stopCh
gb.running = true
gb.monitorLock.Unlock()
// Start monitors and begin change processing until the stop channel is
// closed.
gb.startMonitors()
wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
// Stop any running monitors.
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
monitors := gb.monitors
stopped := 0
for _, monitor := range monitors {
if monitor.stopCh != nil {
stopped++
close(monitor.stopCh)
}
}
// reset monitors so that the graph builder can be safely re-run/synced.
gb.monitors = nil
klog.Infof("stopped %d of %d monitors", stopped, len(monitors))
}
garbageCollector.Sync
garbageCollector.Sync的主要功能是周期性的查询集群中所有的deletableResources,调用gc.resyncMonitors来更新GraphBuilder的monitors,为新出现的资源对象初始化infomer和注册eventHandler,然后启动infomer,对已经移除的资源对象的monitors进行销毁。
// pkg/controller/garbagecollector/garbagecollector.go
func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) {
oldResources := make(map[schema.GroupVersionResource]struct{})
wait.Until(func() {
// Get the current resource list from discovery.
newResources := GetDeletableResources(discoveryClient)
...
if err := gc.resyncMonitors(newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
return false, nil
}
klog.V(4).Infof("resynced monitors")
...
gc.resyncMonitors
调用gc.dependencyGraphBuilder.syncMonitors:初始化infomer和注册eventHandler;
调用gc.dependencyGraphBuilder.startMonitors:启动infomer。
// pkg/controller/garbagecollector/garbagecollector.go
func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error {
if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil {
return err
}
gc.dependencyGraphBuilder.startMonitors()
return nil
}
garbagecollector.NewDebugHandler
garbagecollector.NewDebugHandler暴露http服务,注册 debug 接口,用于debug,用来提供由GraphBuilder构建的集群内所有对象的关联关系。
// pkg/controller/garbagecollector/dump.go
func NewDebugHandler(controller *GarbageCollector) http.Handler {
return &debugHTTPHandler{controller: controller}
}
type debugHTTPHandler struct {
controller *GarbageCollector
}
func (h *debugHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/graph" {
http.Error(w, "", http.StatusNotFound)
return
}
var graph graph.Directed
if uidStrings := req.URL.Query()["uid"]; len(uidStrings) > 0 {
uids := []types.UID{}
for _, uidString := range uidStrings {
uids = append(uids, types.UID(uidString))
}
graph = h.controller.dependencyGraphBuilder.uidToNode.ToGonumGraphForObj(uids...)
} else {
graph = h.controller.dependencyGraphBuilder.uidToNode.ToGonumGraph()
}
data, err := dot.Marshal(graph, "full", "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/vnd.graphviz")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Write(data)
w.WriteHeader(http.StatusOK)
}
获取对象关联关系图
获取全部的对象关联关系图:
curl http://{master_ip}:{kcm_port}/debug/controllers/garbagecollector/graph -o {output_file}
获取特定uid的对象关联关系图:
curl http://{master_ip}:{kcm_port}/debug/controllers/garbagecollector/graph?uid={project_uid} -o {output_file}
示例:
curl http://192.168.1.10:10252/debug/controllers/garbagecollector/graph?uid=8727f640-112e-21eb-11dd-626400510df6 -o /home/test
对象删除策略
kubernetes 中有三种对象删除策略:Orphan、Foreground 和Background,删除某个对象时,可以指定删除策略。下面对这三种策略进行介绍。
Foreground前台删除
Foreground即前台删除策略,属于级联删除策略,垃圾收集器会删除对象的所有dependent。
使用前台删除策略删除某个对象时,该对象的 deletionTimestamp 字段被设置,且对象的 metadata.finalizers 字段包含值 foregroundDeletion,用于阻塞该对象删除,等到垃圾收集器在删除了该对象中所有有阻塞能力的dependent对象(对象的 ownerReference.blockOwnerDeletion=true) 之后,再去除该对象的 metadata.finalizers 字段中的值 foregroundDeletion,然后删除该owner对象。
以删除deployment为例,使用前台删除策略,则按照Pod->ReplicaSet->Deployment的顺序进行删除。
Background后台删除
Background即后台删除策略,属于级联删除策略,Kubernetes会立即删除该owner对象,之后垃圾收集器会在后台自动删除其所有的dependent对象。
当删除一个对象时使用了Background后台删除策略时,该对象因没有相关的Finalizer设置(只有删除策略为foreground或Orphan时会设置相关Finalizer),会直接被删除,接着GraphBuilder会监听到该对象的delete事件,会将其dependents放入到attemptToDelete队列中去,触发GarbageCollector做dependents对象的回收删除处理。
以删除deployment为例,使用后台删除策略,则按照Deployment->ReplicaSet->Pod的顺序进行删除。
Orphan孤儿删除
Orphan即孤儿删除策略,属于非级联删除策略,即删除某个对象时,不会自动删除它的dependent,这些dependent也被称作孤立对象。
当删除一个对象时使用了Orphan孤儿删除策略时,该对象的 metadata.finalizers 字段包含值 orphan,用于阻塞该对象删除,直至GarbageCollector将其所有dependents的OwnerReferences属性中的该owner的相关字段去除,再去除该owner对象的 metadata.finalizers 字段中的值 Orphan,最后才能删除该owner对象。
以删除deployment为例,使用孤儿删除策略,则只删除Deployment,对应ReplicaSet和Pod不删除。
删除对象时指定删除策略
当删除对象时没有特别指定删除策略,将会使用默认删除策略:Background即后台删除策略。
- 指定后台删除策略
curl -X DELETE localhost:8080/apis/apps/v1/namespaces/default/replicasets/my-repset -d '{"kind":"DeleteOptions","apiVersion":"v1","propagationPolicy":"Background"}' -H "Content-Type: application/json" - 指定前台删除策略
curl -X DELETE localhost:8080/apis/apps/v1/namespaces/default/replicasets/my-repset -d '{"kind":"DeleteOptions","apiVersion":"v1","propagationPolicy":"Foreground"}' -H "Content-Type: application/json" - 指定孤儿删除策略
curl -X DELETE localhost:8080/apis/apps/v1/namespaces/default/replicasets/my-repset -d '{"kind":"DeleteOptions","apiVersion":"v1","propagationPolicy":"Orphan"}' -H "Content-Type: application/json"
garbage collector处理逻辑
1.GraphBuilder
首先先看到GraphBuilder。
GraphBuilder 主要有2个功能:
(1)基于 informers 中的资源事件在 uidToNode 属性中维护着所有对象的关联依赖关系;
(2)处理 graphChanges 中的事件,并作为生产者将事件放入到 attemptToDelete 和 attemptToOrphan 两个队列中,触发消费者GarbageCollector进行对象的回收删除操作。
GraphBuilder struct
先来简单的分析下GraphBuilder struct,里面最关键的几个属性及作用如下:
(1)graphChanges:informers 监听到的事件会放在 graphChanges 中,然后GraphBuilder会作为消费者,处理graphChanges队列中的事件;
(2)uidToNode(对象依赖关联关系图):根据对象uid,维护所有对象的关联依赖关系,也即前面说的owner与dependent之间的关系,也可以理解为GraphBuilder会维护一张所有对象的关联依赖关系图,而GarbageCollector进行对象的回收删除操作时会依赖于这个关系图;
(3)attemptToDelete与attemptToOrphan:GraphBuilder作为生产者往attemptToDelete 和 attemptToOrphan 两个队列中存放事件,然后GarbageCollector作为消费者会处理 attemptToDelete 和 attemptToOrphan 两个队列中的事件。
// pkg/controller/garbagecollector/graph_builder.go
type GraphBuilder struct {
...
// monitors are the producer of the graphChanges queue, graphBuilder alters
// the in-memory graph according to the changes.
graphChanges workqueue.RateLimitingInterface
// uidToNode doesn't require a lock to protect, because only the
// single-threaded GraphBuilder.processGraphChanges() reads/writes it.
uidToNode *concurrentUIDToNode
// GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
attemptToDelete workqueue.RateLimitingInterface
attemptToOrphan workqueue.RateLimitingInterface
...
}
// pkg/controller/garbagecollector/graph.go
type concurrentUIDToNode struct {
uidToNodeLock sync.RWMutex
uidToNode map[types.UID]*node
}
// pkg/controller/garbagecollector/graph.go
type node struct {
...
dependents map[*node]struct{}
...
owners []metav1.OwnerReference
}
从结构体定义中可以看到,一个k8s对象对应着对象关联依赖关系图里的一个node,而每个node都会维护一个owner列表以及dependent列表。
GraphBuilder-gb.processGraphChanges
接下来看到GraphBuilder的处理逻辑部分,从gb.processGraphChanges作为入口进行处理逻辑分析。
前面说过,informers 监听到的事件会放入到 graphChanges 队列中,然后GraphBuilder会作为消费者,处理graphChanges队列中的事件,而processGraphChanges方法就是GraphBuilder作为消费者处理graphChanges队列中事件地方。
所以在此方法中,GraphBuilder既是消费者又是生产者,消费处理graphChanges 中的所有事件并进行分类,再生产事件放入到 attemptToDelete 和 attemptToOrphan 两个队列中去,让GarbageCollector作为消费者去处理这两个队列中的事件。
主要逻辑:
(1)从graphChanges队列中取出事件进行处理;
(2)读取uidToNode,判断该对象是否已经存在于已构建的对象依赖关联关系图中;下面就开始根据对象是否存在于对象依赖关联关系图中以及事件类型来做不同的处理逻辑;
(3)若 uidToNode 中不存在该 node 且该事件是 addEvent 或 updateEvent,则为该 object 创建对应的 node,并调用 gb.insertNode 将该 node 加到 uidToNode 中,然后将该 node 添加到其 owner 的 dependents 中;
然后再调用 gb.processTransitions 方法做处理,该方法的处理逻辑是判断该对象是否处于删除状态,若处于删除状态会判断该对象是以 orphan 模式删除还是以 foreground 模式删除(其实就是判断deployment对象的finalizer来区分删除模式,删除deployment的时候会带上删除策略,kube-apiserver会根据删除策略给deployment对象打上相应的finalizer),若以 orphan 模式删除,则将该 node 加入到 attemptToOrphan 队列中,若以 foreground 模式删除则将该对象以及其所有 dependents 都加入到 attemptToDelete 队列中;
(4)若 uidToNode 中存在该 node 且该事件是 addEvent 或 updateEvent 时,则调用 referencesDiffs 方法检查该对象的 OwnerReferences 字段是否有变化,有变化则做相应处理,更新对象依赖关联关系图,最后调用 gb.processTransitions做处理;
(5)若事件为删除事件,则调用gb.removeNode,从uidToNode中删除该对象,然后从该node所有owners的dependents中删除该对象,再把该对象的dependents放入到attemptToDelete队列中,触发GarbageCollector处理;最后检查该 node 的所有 owners,若有处于删除状态的 owner,此时该 owner 可能处于删除阻塞状态正在等待该 node 的删除,将该 owner 加入到 attemptToDelete队列中,触发GarbageCollector处理。
// pkg/controller/garbagecollector/graph_builder.go
func (gb *GraphBuilder) runProcessGraphChanges() {
for gb.processGraphChanges() {
}
}
// Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
func (gb *GraphBuilder) processGraphChanges() bool {
item, quit := gb.graphChanges.Get()
if quit {
return false
}
defer gb.graphChanges.Done(item)
event, ok := item.(*event)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
return true
}
obj := event.obj
accessor, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return true
}
klog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType)
// Check if the node already exists
existingNode, found := gb.uidToNode.Read(accessor.GetUID())
if found {
// this marks the node as having been observed via an informer event
// 1. this depends on graphChanges only containing add/update events from the actual informer
// 2. this allows things tracking virtual nodes' existence to stop polling and rely on informer events
existingNode.markObserved()
}
switch {
case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
newNode := &node{
identity: objectReference{
OwnerReference: metav1.OwnerReference{
APIVersion: event.gvk.GroupVersion().String(),
Kind: event.gvk.Kind,
UID: accessor.GetUID(),
Name: accessor.GetName(),
},
Namespace: accessor.GetNamespace(),
},
dependents: make(map[*node]struct{}),
owners: accessor.GetOwnerReferences(),
deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
beingDeleted: beingDeleted(accessor),
}
gb.insertNode(newNode)
// the underlying delta_fifo may combine a creation and a deletion into
// one event, so we need to further process the event.
gb.processTransitions(event.oldObj, accessor, newNode)
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
// handle changes in ownerReferences
added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
// check if the changed dependency graph unblock owners that are
// waiting for the deletion of their dependents.
gb.addUnblockedOwnersToDeleteQueue(removed, changed)
// update the node itself
existingNode.owners = accessor.GetOwnerReferences()
// Add the node to its new owners' dependent lists.
gb.addDependentToOwners(existingNode, added)
// remove the node from the dependent list of node that are no longer in
// the node's owners list.
gb.removeDependentFromOwners(existingNode, removed)
}
if beingDeleted(accessor) {
existingNode.markBeingDeleted()
}
gb.processTransitions(event.oldObj, accessor, existingNode)
case event.eventType == deleteEvent:
if !found {
klog.V(5).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
return true
}
// removeNode updates the graph
gb.removeNode(existingNode)
existingNode.dependentsLock.RLock()
defer existingNode.dependentsLock.RUnlock()
if len(existingNode.dependents) > 0 {
gb.absentOwnerCache.Add(accessor.GetUID())
}
for dep := range existingNode.dependents {
gb.attemptToDelete.Add(dep)
}
for _, owner := range existingNode.owners {
ownerNode, found := gb.uidToNode.Read(owner.UID)
if !found || !ownerNode.isDeletingDependents() {
continue
}
// this is to let attempToDeleteItem check if all the owner's
// dependents are deleted, if so, the owner will be deleted.
gb.attemptToDelete.Add(ownerNode)
}
}
return true
}
结合代码分析可以得知,当删除一个对象时使用了Background后台删除策略时,该对象因没有相关的Finalizer设置(只有删除策略为Foreground或Orphan时会设置相关Finalizer),会直接被删除,接着GraphBuilder会监听到该对象的delete事件,会将其dependents放入到attemptToDelete队列中去,触发GarbageCollector做dependents对象的回收删除处理。
1.2.1 gb.insertNode
调用 gb.insertNode 将 node 加到 uidToNode 中,然后将该 node 添加到其 owner 的 dependents 中。
// pkg/controller/garbagecollector/graph_builder.go
func (gb *GraphBuilder) insertNode(n *node) {
gb.uidToNode.Write(n)
gb.addDependentToOwners(n, n.owners)
}
func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := gb.uidToNode.Read(owner.UID)
if !ok {
// Create a "virtual" node in the graph for the owner if it doesn't
// exist in the graph yet.
ownerNode = &node{
identity: objectReference{
OwnerReference: owner,
Namespace: n.identity.Namespace,
},
dependents: make(map[*node]struct{}),
virtual: true,
}
klog.V(5).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
gb.uidToNode.Write(ownerNode)
}
ownerNode.addDependent(n)
if !ok {
// Enqueue the virtual node into attemptToDelete.
// The garbage processor will enqueue a virtual delete
// event to delete it from the graph if API server confirms this
// owner doesn't exist.
gb.attemptToDelete.Add(ownerNode)
}
}
}
gb.processTransitions
gb.processTransitions 方法检查k8s对象是否处于删除状态(对象的deletionTimestamp属性不为空则处于删除状态),并且对象里含有删除策略对应的finalizer,然后做相应的处理。
因为只有删除策略为Foreground或Orphan时对象才会会设置相关Finalizer,所以该方法只会处理删除策略为Foreground或Orphan的对象,对于删除策略为Background的对象不做处理。
若对象的deletionTimestamp属性不为空,且有Orphaned删除策略对应的finalizer,则将对应的node放入到 attemptToOrphan 队列中,触发GarbageCollector去消费处理;
若对象的deletionTimestamp属性不为空,且有foreground删除策略对应的finalizer,则调用n.markDeletingDependents标记 node的 deletingDependents 属性为 true,代表该node的dependents正在被删除,并将对应的node及其dependents放入到 attemptToDelete 队列中,触发GarbageCollector去消费处理。
// pkg/controller/garbagecollector/graph_builder.go
func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav1.Object, n *node) {
if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
klog.V(5).Infof("add %s to the attemptToOrphan", n.identity)
gb.attemptToOrphan.Add(n)
return
}
if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
klog.V(2).Infof("add %s to the attemptToDelete, because it's waiting for its dependents to be deleted", n.identity)
// if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
n.markDeletingDependents()
for dep := range n.dependents {
gb.attemptToDelete.Add(dep)
}
gb.attemptToDelete.Add(n)
}
}
func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.Object) bool {
return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerOrphanDependents)
}
func startsWaitingForDependentsDeleted(oldObj interface{}, newAccessor metav1.Object) bool {
return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerDeleteDependents)
}
func deletionStartsWithFinalizer(oldObj interface{}, newAccessor metav1.Object, matchingFinalizer string) bool {
// if the new object isn't being deleted, or doesn't have the finalizer we're interested in, return false
if !beingDeleted(newAccessor) || !hasFinalizer(newAccessor, matchingFinalizer) {
return false
}
// if the old object is nil, or wasn't being deleted, or didn't have the finalizer, return true
if oldObj == nil {
return true
}
oldAccessor, err := meta.Accessor(oldObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
return false
}
return !beingDeleted(oldAccessor) || !hasFinalizer(oldAccessor, matchingFinalizer)
}
func beingDeleted(accessor metav1.Object) bool {
return accessor.GetDeletionTimestamp() != nil
}
func hasFinalizer(accessor metav1.Object, matchingFinalizer string) bool {
finalizers := accessor.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == matchingFinalizer {
return true
}
}
return false
}
gb.removeNode
调用gb.removeNode,从uidToNode中删除该对象,然后从该node所有owners的dependents中删除该对象,再把该对象的dependents放入到attemptToDelete队列中,触发GarbageCollector处理;最后检查该 node 的所有 owners,若有处于删除状态的 owner,此时该 owner 可能处于删除阻塞状态正在等待该 node 的删除,将该 owner 加入到 attemptToDelete队列中,触发GarbageCollector处理。
// pkg/controller/garbagecollector/graph_builder.go
func (gb *GraphBuilder) removeNode(n *node) {
gb.uidToNode.Delete(n.identity.UID)
gb.removeDependentFromOwners(n, n.owners)
}
func (gb *GraphBuilder) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := gb.uidToNode.Read(owner.UID)
if !ok {
continue
}
ownerNode.deleteDependent(n)
}
}
GarbageCollector
再来看到GarbageCollector。
GarbageCollector 主要有2个功能:
(1)处理 attemptToDelete队列中的事件,根据对象删除策略foreground或background做相应的回收逻辑处理,删除关联对象;
(2)处理 attemptToOrphan队列中的事件,根据对象删除策略Orphan,更新该owner的所有dependents对象,将对象的OwnerReferences属性中该owner的相关字段去除,接着再更新该owner对象,去除Orphan删除策略对应的finalizers。
GarbageCollector的2个关键处理方法:
(1)gc.runAttemptToDeleteWorker:主要负责处理attemptToDelete队列中的事件,负责删除策略为foreground或background的对象回收处理;
(2)gc.runAttemptToOrphanWorker:主要负责处理attemptToOrphan队列中的事件,负责删除策略为Orphan的对象回收处理。
GarbageCollector struct
先来简单的分析下GarbageCollector struct,里面最关键的几个属性及作用如下:
(1)attemptToDelete与attemptToOrphan:GraphBuilder作为生产者往attemptToDelete 和 attemptToOrphan 两个队列中存放事件,然后GarbageCollector作为消费者会处理 attemptToDelete 和 attemptToOrphan 两个队列中的事件。
// pkg/controller/garbagecollector/garbagecollector.go
type GarbageCollector struct {
...
attemptToDelete workqueue.RateLimitingInterface
attemptToOrphan workqueue.RateLimitingInterface
...
}
2.2 GarbageCollector-gc.runAttemptToDeleteWorker
接下来看到GarbageCollector的处理逻辑部分,从gc.runAttemptToDeleteWorker作为入口进行处理逻辑分析。
runAttemptToDeleteWorker主要逻辑为循环调用attemptToDeleteWorker方法。
attemptToDeleteWorker方法主要逻辑:
(1)从attemptToDelete队列中取出对象;
(2)调用 gc.attemptToDeleteItem 尝试删除 node;
(3)若删除失败则重新加入到 attemptToDelete 队列中进行重试。
// pkg/controller/garbagecollector/garbagecollector.go
func (gc *GarbageCollector) runAttemptToDeleteWorker() {
for gc.attemptToDeleteWorker() {
}
}
func (gc *GarbageCollector) attemptToDeleteWorker() bool {
item, quit := gc.attemptToDelete.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
if quit {
return false
}
defer gc.attemptToDelete.Done(item)
n, ok := item.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
return true
}
err := gc.attemptToDeleteItem(n)
if err != nil {
if _, ok := err.(*restMappingError); ok {
// There are at least two ways this can happen:
// 1. The reference is to an object of a custom type that has not yet been
// recognized by gc.restMapper (this is a transient error).
// 2. The reference is to an invalid group/version. We don't currently
// have a way to distinguish this from a valid type we will recognize
// after the next discovery sync.
// For now, record the error and retry.
klog.V(5).Infof("error syncing item %s: %v", n, err)
} else {
utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
}
// retry if garbage collection of an object failed.
gc.attemptToDelete.AddRateLimited(item)
} else if !n.isObserved() {
// requeue if item hasn't been observed via an informer event yet.
// otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
// see https://issue.k8s.io/56121
klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity)
gc.attemptToDelete.AddRateLimited(item)
}
return true
}
gc.attemptToDeleteItem
主要逻辑:
(1)判断 node 是否处于删除状态;
(2)从 apiserver 获取该 node 对应的对象;
(3)调用item.isDeletingDependents方法:通过 node 的 deletingDependents 字段判断该 node 当前是否正在删除 dependents,若是则调用 gc.processDeletingDependentsItem 方法对dependents做进一步处理:检查该node 的 blockingDependents 是否被完全删除,若是则移除该 node对应对象的相关 finalizer,若否,则将未删除的 blockingDependents 加入到 attemptToDelete队列中;
上面分析GraphBuilder时说到,在 GraphBuilder 处理 graphChanges 中的事件时,在processTransitions方法逻辑里,会调用n.markDeletingDependents,标记 node的 deletingDependents 属性为 true;
(4)调用gc.classifyReferences将 node 的owner分为3类,分别是solid(至少有一个 owner 存在且不处于删除状态)、dangling(owner 均不存在)、waitingForDependentsDeletion(owner 存在,处于删除状态且正在等待其 dependents 被删除);
(5)接下来将根据solid、dangling与waitingForDependentsDeletion的数量做不同的逻辑处理;
(6)第一种情况:当solid数量不为0时,即该node至少有一个 owner 存在且不处于删除状态,则说明该对象还不能被回收删除,此时将 dangling 和 waitingForDependentsDeletion 列表中的 owner 从 node 的 ownerReferences 中删除;
(7)第二种情况:solid数量为0,该 node 的 owner 处于 waitingForDependentsDeletion 状态并且 node 的 dependents 未被完全删除,将使用foreground前台删除策略来删除该node对应的对象;
(8)当不满足以上两种情况时(即),进入该默认处理逻辑:按照删除对象时使用的删除策略,调用 apiserver 的接口删除对象。
// pkg/controller/garbagecollector/garbagecollector.go
func (gc *GarbageCollector) attemptToDeleteItem(item *node) error {
klog.V(2).Infof("processing item %s", item.identity)
// "being deleted" is an one-way trip to the final deletion. We'll just wait for the final deletion, and then process the object's dependents.
if item.isBeingDeleted() && !item.isDeletingDependents() {
klog.V(5).Infof("processing item %s returned at once, because its DeletionTimestamp is non-nil", item.identity)
return nil
}
// TODO: It's only necessary to talk to the API server if this is a
// "virtual" node. The local graph could lag behind the real status, but in
// practice, the difference is small.
latest, err := gc.getObject(item.identity)
switch {
case errors.IsNotFound(err):
// the GraphBuilder can add "virtual" node for an owner that doesn't
// exist yet, so we need to enqueue a virtual Delete event to remove
// the virtual node from GraphBuilder.uidToNode.
klog.V(5).Infof("item %v not found, generating a virtual delete event", item.identity)
gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
// since we're manually inserting a delete event to remove this node,
// we don't need to keep tracking it as a virtual node and requeueing in attemptToDelete
item.markObserved()
return nil
case err != nil:
return err
}
if latest.GetUID() != item.identity.UID {
klog.V(5).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity)
gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
// since we're manually inserting a delete event to remove this node,
// we don't need to keep tracking it as a virtual node and requeueing in attemptToDelete
item.markObserved()
return nil
}
// TODO: attemptToOrphanWorker() routine is similar. Consider merging
// attemptToOrphanWorker() into attemptToDeleteItem() as well.
if item.isDeletingDependents() {
return gc.processDeletingDependentsItem(item)
}
// compute if we should delete the item
ownerReferences := latest.GetOwnerReferences()
if len(ownerReferences) == 0 {
klog.V(2).Infof("object %s's doesn't have an owner, continue on next item", item.identity)
return nil
}
solid, dangling, waitingForDependentsDeletion, err := gc.classifyReferences(item, ownerReferences)
if err != nil {
return err
}
klog.V(5).Infof("classify references of %s.\nsolid: %#v\ndangling: %#v\nwaitingForDependentsDeletion: %#v\n", item.identity, solid, dangling, waitingForDependentsDeletion)
switch {
case len(solid) != 0:
klog.V(2).Infof("object %#v has at least one existing owner: %#v, will not garbage collect", item.identity, solid)
if len(dangling) == 0 && len(waitingForDependentsDeletion) == 0 {
return nil
}
klog.V(2).Infof("remove dangling references %#v and waiting references %#v for object %s", dangling, waitingForDependentsDeletion, item.identity)
// waitingForDependentsDeletion needs to be deleted from the
// ownerReferences, otherwise the referenced objects will be stuck with
// the FinalizerDeletingDependents and never get deleted.
ownerUIDs := append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...)
patch := deleteOwnerRefStrategicMergePatch(item.identity.UID, ownerUIDs...)
_, err = gc.patch(item, patch, func(n *node) ([]byte, error) {
return gc.deleteOwnerRefJSONMergePatch(n, ownerUIDs...)
})
return err
case len(waitingForDependentsDeletion) != 0 && item.dependentsLength() != 0:
deps := item.getDependents()
for _, dep := range deps {
if dep.isDeletingDependents() {
// this circle detection has false positives, we need to
// apply a more rigorous detection if this turns out to be a
// problem.
// there are multiple workers run attemptToDeleteItem in
// parallel, the circle detection can fail in a race condition.
klog.V(2).Infof("processing object %s, some of its owners and its dependent [%s] have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the object is going to be deleted with Foreground", item.identity, dep.identity)
patch, err := item.unblockOwnerReferencesStrategicMergePatch()
if err != nil {
return err
}
if _, err := gc.patch(item, patch, gc.unblockOwnerReferencesJSONMergePatch); err != nil {
return err
}
break
}
}
klog.V(2).Infof("at least one owner of object %s has FinalizerDeletingDependents, and the object itself has dependents, so it is going to be deleted in Foreground", item.identity)
// the deletion event will be observed by the graphBuilder, so the item
// will be processed again in processDeletingDependentsItem. If it
// doesn't have dependents, the function will remove the
// FinalizerDeletingDependents from the item, resulting in the final
// deletion of the item.
policy := metav1.DeletePropagationForeground
return gc.deleteObject(item.identity, &policy)
default:
// item doesn't have any solid owner, so it needs to be garbage
// collected. Also, none of item's owners is waiting for the deletion of
// the dependents, so set propagationPolicy based on existing finalizers.
var policy metav1.DeletionPropagation
switch {
case hasOrphanFinalizer(latest):
// if an existing orphan finalizer is already on the object, honor it.
policy = metav1.DeletePropagationOrphan
case hasDeleteDependentsFinalizer(latest):
// if an existing foreground finalizer is already on the object, honor it.
policy = metav1.DeletePropagationForeground
default:
// otherwise, default to background.
policy = metav1.DeletePropagationBackground
}
klog.V(2).Infof("delete object %s with propagation policy %s", item.identity, policy)
return gc.deleteObject(item.identity, &policy)
}
}
gc.processDeletingDependentsItem
主要逻辑:检查该node 的 blockingDependents(即阻塞owner删除的dpendents)是否被完全删除,若是则移除该 node对应对象的相关 finalizer(finalizer移除后,kube-apiserver会删除该对象),若否,则将未删除的 blockingDependents 加入到 attemptToDelete队列中。
// pkg/controller/garbagecollector/garbagecollector.go
func (gc *GarbageCollector) processDeletingDependentsItem(item *node) error {
blockingDependents := item.blockingDependents()
if len(blockingDependents) == 0 {
klog.V(2).Infof("remove DeleteDependents finalizer for item %s", item.identity)
return gc.removeFinalizer(item, metav1.FinalizerDeleteDependents)
}
for _, dep := range blockingDependents {
if !dep.isDeletingDependents() {
klog.V(2).Infof("adding %s to attemptToDelete, because its owner %s is deletingDependents", dep.identity, item.identity)
gc.attemptToDelete.Add(dep)
}
}
return nil
}
item.blockingDependents
item.blockingDependents返回会阻塞node删除的dependents。一个dependents会不会阻塞owner的删除,主要看这个dependents的ownerReferences的blockOwnerDeletion属性值是否为true,为true则代表该dependents会阻塞owner的删除。
// pkg/controller/garbagecollector/graph.go
func (n *node) blockingDependents() []*node {
dependents := n.getDependents()
var ret []*node
for _, dep := range dependents {
for _, owner := range dep.owners {
if owner.UID == n.identity.UID && owner.BlockOwnerDeletion != nil && *owner.BlockOwnerDeletion {
ret = append(ret, dep)
}
}
}
return ret
}
GarbageCollector-gc.runAttemptToOrphanWorker
gc.runAttemptToOrphanWorker方法是负责处理orphan删除策略删除的 node。
gc.runAttemptToDeleteWorker主要逻辑为循环调用gc.attemptToDeleteWorker方法。
下面来看一下gc.attemptToDeleteWorker方法的主要逻辑:
(1)从attemptToOrphan队列中取出对象;
(2)调用gc.orphanDependents方法:更新该owner的所有dependents对象,将对象的OwnerReferences属性中该owner的相关字段去除,失败则将该owner重新加入到attemptToOrphan队列中;
(3)调用gc.removeFinalizer方法:更新该owner对象,去除Orphan删除策略对应的finalizers。
// pkg/controller/garbagecollector/garbagecollector.go
func (gc *GarbageCollector) runAttemptToOrphanWorker() {
for gc.attemptToOrphanWorker() {
}
}
func (gc *GarbageCollector) attemptToOrphanWorker() bool {
item, quit := gc.attemptToOrphan.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
if quit {
return false
}
defer gc.attemptToOrphan.Done(item)
owner, ok := item.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
return true
}
// we don't need to lock each element, because they never get updated
owner.dependentsLock.RLock()
dependents := make([]*node, 0, len(owner.dependents))
for dependent := range owner.dependents {
dependents = append(dependents, dependent)
}
owner.dependentsLock.RUnlock()
err := gc.orphanDependents(owner.identity, dependents)
if err != nil {
utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
gc.attemptToOrphan.AddRateLimited(item)
return true
}
// update the owner, remove "orphaningFinalizer" from its finalizers list
err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)
if err != nil {
utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
gc.attemptToOrphan.AddRateLimited(item)
}
return true
}
gc.orphanDependents
主要逻辑:更新指定owner的所有dependents对象,将对象的OwnerReferences属性中该owner的相关字段去除,对于每个dependents,分别起一个goroutine来处理,加快处理速度。
// pkg/controller/garbagecollector/garbagecollector.go
func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents []*node) error {
errCh := make(chan error, len(dependents))
wg := sync.WaitGroup{}
wg.Add(len(dependents))
for i := range dependents {
go func(dependent *node) {
defer wg.Done()
// the dependent.identity.UID is used as precondition
patch := deleteOwnerRefStrategicMergePatch(dependent.identity.UID, owner.UID)
_, err := gc.patch(dependent, patch, func(n *node) ([]byte, error) {
return gc.deleteOwnerRefJSONMergePatch(n, owner.UID)
})
// note that if the target ownerReference doesn't exist in the
// dependent, strategic merge patch will NOT return an error.
if err != nil && !errors.IsNotFound(err) {
errCh <- fmt.Errorf("orphaning %s failed, %v", dependent.identity, err)
}
}(dependents[i])
}
wg.Wait()
close(errCh)
var errorsSlice []error
for e := range errCh {
errorsSlice = append(errorsSlice, e)
}
if len(errorsSlice) != 0 {
return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error())
}
klog.V(5).Infof("successfully updated all dependents of owner %s", owner)
return nil
}