1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/controller/replicaset
分支: tming-v1.13 (基于v1.13版本)
关于各类
controller
都会用到informers
, 所以关于informers
, 可以参考 [k8s源码分析][client-go] informer之SharedInformerFactory.
本文将分析
ReplicaSet
的Controller
是怎么工作的, 是如何控制pod
的数量.
2. 例子
这里有两个
yaml
文件, 一个生成pod
, 一个生成replicaset
, 但是该pod
的label
与replicaset
中template
的pod````的
label```是一样的.
matchpod.yaml
apiVersion: v1
kind: Pod
metadata:
name: test
labels:
env: prod
spec:
containers:
- name: podtest
image: nginx:now
ports:
- containerPort: 80
replicaset.yaml
[root@master kubectl]# cat replicaset.yaml
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: replicatest
spec:
replicas: 2
selector:
matchLabels:
env: prod
template:
metadata:
labels:
env: prod
spec:
containers:
- name: nginx
image: nginx:now
可以看到
label
的属性都是env: prod
.
2.1操作
先根据
matchpod.yaml
生成一个pod
.
[root@master kubectl]# ./kubectl apply -f matchpod.yaml
pod/test created
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test 1/1 Running 0 4s
[root@master kubectl]# ./kubectl get pod test -o yaml
apiVersion: v1
kind: Pod
metadata:
...
labels:
env: prod
name: test
namespace: default
resourceVersion: "89705"
...
创建一个
replicaset
[root@master kubectl]# ./kubectl get rs
No resources found.
[root@master kubectl]# ./kubectl get pod
NAME READY STATUS RESTARTS AGE
test 1/1 Running 0 90s
[root@master kubectl]# ./kubectl apply -f replicaset.yaml
replicaset.apps/replicatest created
[root@master kubectl]# ./kubectl get rs
NAME DESIRED CURRENT READY AGE
replicatest 2 2 2 4s
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-rdjv6 1/1 Running 0 8s
test 1/1 Running 0 2m16s
[root@master kubectl]# ./kubectl get pod test -o yaml
apiVersion: v1
kind: Pod
metadata:
...
labels:
env: prod
name: test
namespace: default
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: ReplicaSet
name: replicatest
uid: 74bda7cc-f671-11e9-bb78-525400d54f7e
resourceVersion: "90171"
...
从
pod
的ownerReferences
属性中可以看到该replicaset
已经将前面创建的pod
(test
)归属到自己名下了. 该pod
中resourceVersion
已经从89705
变成了90171
.
接下来看看删除replicaset
.
[root@master kubectl]# ./kubectl get rs
NAME DESIRED CURRENT READY AGE
replicatest 2 2 2 2m36s
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-rdjv6 1/1 Running 0 2m42s
test 1/1 Running 0 4m50s
[root@master kubectl]# ./kubectl delete rs replicatest
replicaset.extensions "replicatest" deleted
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test 0/1 Terminating 0 5m10s
[root@master kubectl]# ./kubectl get pods
No resources found.
[root@master kubectl]#
可以看到之前创建的
pod
也被删除了.
如果对此例子有不理解的, 那本文就可以解决这个疑惑.
3. 启动
关于
kube-controller-manager
组件整体的运行会有专门博客介绍, 这里直接看一下replicaset
这个controller
是如何启动的.
// cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
...
controllers["replicaset"] = startReplicaSetController
...
return controllers
}
// cmd/kube-controller-manager/app/apps.go
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}
都是同样的操作, 生成一个
ReplicaSetController
并传入相关参数. 然后以goroutine
的方式启动Run
方法.
4. ReplicaSetController
type ReplicaSetController struct {
// GroupVersionKind indicates the controller type.
schema.GroupVersionKind
// clientset 与api-server打交道
kubeClient clientset.Interface
// 操作pod 与api-server打交道
podControl controller.PodControlInterface
// 一次性最多增加/删除的pod的个数
burstReplicas int
syncHandler func(rsKey string) error
expectations *controller.UIDTrackingControllerExpectations
rsLister appslisters.ReplicaSetLister
rsListerSynced cache.InformerSynced
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
// Controllers that need to be synced
// 一个队列 用来解耦生产者和消费者
queue workqueue.RateLimitingInterface
}
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
apps.SchemeGroupVersion.WithKind("ReplicaSet"),
"replicaset_controller",
"replicaset",
controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
},
)
}
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
}
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
DeleteFunc: rsc.enqueueReplicaSet,
})
rsc.rsLister = rsInformer.Lister()
rsc.rsListerSynced = rsInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podLister = podInformer.Lister()
rsc.podListerSynced = podInformer.Informer().HasSynced
rsc.syncHandler = rsc.syncReplicaSet
return rsc
}
重要的地方都在
NewBaseController
方法中了.
1.rsInformer
添加了关于处理replicaset
的自定义逻辑.
2.podInformer
添加了关于处理pod
的自定义逻辑, 因为replicaset
是来管理属于它的pod
的, 当然需要监控pod
的变化.
3.queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName)
这个是workqueue
的初始化, 用来解耦生产者和消费者的, 主要是为解决消费者处理过慢. 关于workqueue
可以参考 [k8s源码分析][client-go] workqueue.
4.rsc.syncHandler = rsc.syncReplicaSet
这个是核心方法, 也就是workqueue
的消费者.
5. 初始化了expectations
. 可以参考 [k8s源码分析][controller-manager] controller_utils分析. 关于该controller
整个对expectations
的操作会在后面做统一分析.
4.1 replicaset的启动
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)
// controllerName = ReplicaSet
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)
// 等待同步
if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
// 启动多个goroutine同时执行rsc.worker方法
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
// 等待结束
<-stopCh
}
func (rsc *ReplicaSetController) worker() {
for rsc.processNextWorkItem() {
}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
// 从queue中取一个元素
key, quit := rsc.queue.Get()
if quit {
// 如果queue已经关闭 直接返回
return false
}
// 等到该key处理结束 调用Done方法表示结束 可以参考workqueue的实现
defer rsc.queue.Done(key)
// 处理该对象
err := rsc.syncHandler(key.(string))
if err == nil {
// 处理成功
rsc.queue.Forget(key)
return true
}
// 处理失败 重新加回到queue中
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
可以看到
Run
启动几个goroutine
无限制调用syncHandler
方法处理从队列queue
中的元素.
消费者: 所以现在可以知道了syncHandler
方法就是消费者, 而syncHandler
就是上面提到的syncReplicaSet
方法.
生产者: 现在知道如何消费queue
里面的元素, 那queue
里面的元素是怎么来的呢? 接下来看一下生产者.
4.2 生产者
可想而知生产者就是
rsInformer
,podInformer
监控着整个replicaset
和pod
的变化, 当有变化的时候就需要对某一些对象做操作, 就会放到queue
中.
rsInformer
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
DeleteFunc: rsc.enqueueReplicaSet,
})
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rsc.queue.Add(key)
}
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
oldRS := old.(*apps.ReplicaSet)
curRS := cur.(*apps.ReplicaSet)
if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
}
// 把新的replicaset进队列
rsc.enqueueReplicaSet(cur)
}
可以看到增加/删除/更新全部都是直接进队列, 更新的时候只进最新的那个, 旧的就不要了.
podInformer
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
addPod
func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.ReplicaSet {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != rsc.Kind {
return nil
}
// 从本地缓存中取出该controller
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
if err != nil {
// 如果本地缓存中没有 则返回nil
return nil
}
// 本地缓存中的controllerRef与pod中owner controller不一致
if rs.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
// 就pod所对应的owner rs
return rs
}
func (rsc *ReplicaSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
// 该pod处理terminating中
rsc.deletePod(pod)
return
}
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
// 表明该pod中的owner已经不存在了或者已经更新了
return
}
// 该pod对应的rs
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rsc.expectations.CreationObserved(rsKey)
// 将该rs加入到queue中
rsc.enqueueReplicaSet(rs)
return
}
// 该pod是个孤儿pod
// 获得与该pod可以match的所有replicaset
rss := rsc.getPodReplicaSets(pod)
if len(rss) == 0 {
// 如果没有任何的replicaset与该pod匹配 则不用处理了
return
}
klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
for _, rs := range rss {
// 将这些replicaset都加入到队列中
rsc.enqueueReplicaSet(rs)
}
}
当
podInformer
中增加一个Pod
时:
1. 如果该pod
处于terminating
中, 则再次删除并返回, 与controller
没有关系, 所以返回.
2. 如果该pod
中的owner
已经不存在了或者已经更新了, 直接返回.(可能有人会疑惑, 该pod
的owner
已经不存在了, 为什么不删除该pod
, 这个与删除的策略有关, 在garbagecollector
中会分析)
3. 如果该pod
有owner
并且该owner
是存在的, 那就把该owner
也就是rs
加入queue
中.
4. 如果该pod
中没有owner
信息并且集群中没有任何的replicaset
可以跟它匹配上, 那后面也就没有必要做什么了直接返回.
5. 如果该pod
中没有owner
信息, 将所有与该pod
可以匹配上的replicaset
加入到queue
中.
deletePod
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
// 这个在deltaFIFO中已经分析过了 如果当前informer由于某种原因错过了Delete事件,
// 同步的时候会把这些对象设置为DeletedFinalStateUnknown结构类型
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
return
}
}
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
// 该pod不属于任何的replicaset 所以删除就删除了 无须做别的事情
// No controller should care about orphans being deleted.
return
}
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
// 如果该pod的owner replicaset已经不存在了 那也没有必要处理了
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
// 加入到queue中
rsc.enqueueReplicaSet(rs)
}
当
podInformer
中收到一个删除Pod
事件时:
1. 如果该pod
就是一个孤儿pod
, 不属于任何的replicaset
. 那就没有必要做什么处理了, 删除就删除了, 不会有任何影响.
2. 如果该pod
所属的owner
(replicaset
)已经被删除了, 那也没有必要做什么处理了.
3. 1和2都不成立的时候, 删除该pod
对它所属的owner
有影响(比如会影响该replicaset
达不到所要求的replicas
数等等), 所以需要把该pod
所属的replicaset
入队列.
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
// 同一个ResourceVersion表明该pod没有任何改变
return
}
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if curPod.DeletionTimestamp != nil {
rsc.deletePod(curPod)
if labelChanged {
rsc.deletePod(oldPod)
}
return
}
curControllerRef := metav1.GetControllerOf(curPod)
oldControllerRef := metav1.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
rsc.enqueueReplicaSet(rs)
}
}
if curControllerRef != nil {
rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
if rs == nil {
return
}
klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
rsc.enqueueReplicaSet(rs)
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
}
return
}
if labelChanged || controllerRefChanged {
rss := rsc.getPodReplicaSets(curPod)
if len(rss) == 0 {
return
}
klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
for _, rs := range rss {
rsc.enqueueReplicaSet(rs)
}
}
}
稍微有点复杂. 大致意思如下, 但不会影响整体的理解.
// When a pod is updated, figure out what replica set/s manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
// and new replica set. old and cur must be *v1.Pod types.
4.2.1 总结
可以看到生产者往队列中放的元素是不带有状态, 无论该对象是被删除
pod
的时候或者增加一个replicaset
的时候, 全部都是把需要放进队列的replicaset
的key
放入队列中.