

LiuZhenweis-MacBook-Pro:kubernetes zwliu$ tree vendor/ -L 1
├── Godeps
├── deprecated-dynamic
├── discovery  #Discoveryclient 发现客户端
├── dynamic #DynamicClient 动态客户端,对任意kubernetes对象执行通用的操作,不同于 clientset,dynamic client 返回的对象是一个 map[string]interface{},如果一个 controller 中需要控制所有的 API,可以使用dynamic client,目前它在 garbage collector 和 namespace controller中被使用。
├── examples #使用client-go的几个示例
├── informers #每种Kubernetes资源的informer实现
├── kubernetes #ClientSet客户端,基于restClient封装
├── kubernetes_test
├── listers #为每种Kubernetes资源提供Lister功能,该功能对Get和List请求提供只读的缓存数据
├── pkg
├── plugin #提供OpenStack,GCP和Azure等云服务商授权插件
├── rest #RESTClient客户端,对Kubernetes API server执行RESTful操作(Get(),Put(),Post(),Delete()等)
├── restmapper
├── scale #ScaleClient客户端,用于扩容或缩容Deployment,ReplicaSet,Replication Controller等资源对象
├── testing
├── third_party
├── tools #提供常用工具,例如SharedInformer、Reflector、DealtFIFO及Indexers,提供查询和缓存机制,以减少向kub-apiserver发起的请求数;client-go controller逻辑在此
├── transport #提供安全的TCP连接,支持HTTP Stream,某些操作需要在客户端和容器之间传输二进制流,如exec,attach等
└── util #提供常用方法,如WokrQueue工作队列,Certificate证书管理等



config, err := rest.InClusterConfig()
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})

Client-go controller总体流程


在client-go中informer对象就是一个controller struct(controller即informer)。上图是client-go controller framework官方文档提供的,图的上半部分为client-go内部核心数据流转机制,下半部分为用户自定义控制器的核心实现逻辑。

Client-go controller关键概念

在kubernetes中,组件之间通过HTTP协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等,kubernetes是依靠Informer机制达到此目的的。Kubernetes的其他组件都是通过client-go的informer机制与kubernetes API Server进行通信的。


  1. Reflector

    Reflector 用于监听(watch)指定kubernetes资源,当监听的资源发生变化时,触发相应的处理。例如Added事件、Updated事件、Deleted事件,并将其资源对象(runtime.object)存放到本地缓存DeltaFIFO中。Reflector类似一个生产者。

    // Reflector watches a specified resource and causes all changes to be reflected in the given store.
    type Reflector struct {
     // name identifies this reflector. By default it will be a file:line if possible.
     name string
     // metrics tracks basic metric information about the reflector
     metrics *reflectorMetrics
     // The type of object we expect to place in the store.
     expectedType reflect.Type
     // The destination to sync up with the watch source
     store Store
     // listerWatcher is used to perform lists and watches.
     listerWatcher ListerWatcher
     // period controls timing between one watch ending and
     // the beginning of the next one.
     period       time.Duration
     resyncPeriod time.Duration
     ShouldResync func() bool
     // clock allows tests to manipulate time
     clock clock.Clock
     // lastSyncResourceVersion is the resource version token last
     // observed when doing a sync with the underlying store
     // it is thread safe, but not synchronized with the underlying store
     lastSyncResourceVersion string
     // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
     lastSyncResourceVersionMutex sync.RWMutex
  1. DeltFIFO


    type DeltaFIFO struct {
     // lock/cond protects access to 'items' and 'queue'.
     lock sync.RWMutex
     cond sync.Cond
     // We depend on the property that items in the set are in
     // the queue and vice versa, and that all Deltas in this
     // map have at least one Delta.
     items map[string]Deltas
     queue []string
     // populated is true if the first batch of items inserted by Replace() has been populated
     // or Delete/Add/Update was called first.
     populated bool
     // initialPopulationCount is the number of items inserted by the first call of Replace()
     initialPopulationCount int
     // keyFunc is used to make the key used for queued item
     // insertion and retrieval, and should be deterministic.
     keyFunc KeyFunc
     // knownObjects list keys that are "known", for the
     // purpose of figuring out which items have been deleted
     // when Replace() or Delete() is called.
     knownObjects KeyListerGetter
     // Indication the queue is closed.
     // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
     // Currently, not used to gate any of CRED operations.
     closed     bool
     closedLock sync.Mutex


    // queueActionLocked appends to the delta list for the object.
    // Caller must lock first.
    func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
       id, err := f.KeyOf(obj)
       if err != nil {
          return KeyError{obj, err}
       // If object is supposed to be deleted (last event is Deleted),
       // then we should ignore Sync events, because it would result in
       // recreation of this object.
       if actionType == Sync && f.willObjectBeDeletedLocked(id) {
          return nil
       newDeltas := append(f.items[id], Delta{actionType, obj})
       newDeltas = dedupDeltas(newDeltas)//去重
       if len(newDeltas) > 0 {
          if _, exists := f.items[id]; !exists {
             f.queue = append(f.queue, id)//将数据加入到队列中 
          f.items[id] = newDeltas
       } else {
          // We need to remove this from our map (extra items in the queue are
          // ignored if they are not in the map).
          delete(f.items, id)
       return nil


    func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
       defer f.lock.Unlock()
       for {
          for len(f.queue) == 0 {
             // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
             // When Close() is called, the f.closed is set and the condition is broadcasted.
             // Which causes this loop to continue and return from the Pop().
             if f.IsClosed() {
                return nil, FIFOClosedError
          id := f.queue[0]
          f.queue = f.queue[1:]
          if f.initialPopulationCount > 0 {
          item, ok := f.items[id]
          if !ok {
             // Item may have been deleted subsequently.
          delete(f.items, id)
          err := process(item)
          if e, ok := err.(ErrRequeue); ok {
             f.addIfNotPresent(id, item)
             err = e.Err
          // Don't need to copyDeltas here, because we're transferring
          // ownership to the caller.
          return item, err

    DeltaFIFO的queueActionLockedPop是必要重要的两个方法,分别作为生产者方法和消费者方法,一方对接reflector来生产数据并将数据加入到队列中,唤醒消费者;另一方对接informer controller的processLoop(该方法进而会调用用户定义的EventHandler)来消费队列中的数据。

  2. Indexer

    Indexer 是 client-go用来存储资源对象并且自带索引功能的本地存储。Informer不断的从DeltaFIFO中弹出(消费)对象,并和Indexer同步(期间Informer也会回调用户注册的event handler函数)。Indexer与etcd中的数据完全保持一致,client-go或者在用户的event handler函数中可以很方便地从本地存储中(Indexer)读取相应的资源对象数据,而无需从远程etcd中读取,以减轻Kubernetes API Server和etcd的压力。

    // Indexer is a storage interface that lets you list objects using multiple indexing functions
    type Indexer interface {
     // Retrieve list of objects that match on the named indexing function
     Index(indexName string, obj interface{}) ([]interface{}, error)
     // IndexKeys returns the set of keys that match on the named indexing function.
     IndexKeys(indexName, indexKey string) ([]string, error)
     // ListIndexFuncValues returns the list of generated values of an Index func
     ListIndexFuncValues(indexName string) []string
     // ByIndex lists object that match on the named indexing function with the exact key
     ByIndex(indexName, indexKey string) ([]interface{}, error)
     // GetIndexer return the indexers
     GetIndexers() Indexers
     // AddIndexers adds more indexers to this store.  If you call this after you already have data
     // in the store, the results are undefined.
     AddIndexers(newIndexers Indexers) error


// cache responsibilities are limited to:
//  1. Computing keys for objects via keyFunc
//  2. Invoking methods of a ThreadSafeStorage interface
type cache struct {
    // cacheStorage bears the burden of thread safety for the cache
    cacheStorage ThreadSafeStore
    // keyFunc is used to make the key for objects stored in and retrieved from items, and
    // should be deterministic.
    keyFunc KeyFunc

// src/
type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
    Resync() error

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices

//type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
    Resync() error

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices


// IndexFunc knows how to provide an indexed value for an object.
type IndexFunc func(obj interface{}) ([]string, error)

// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]Index
  1. Indexers: 保存了索引器函数,key为索引器名称,value为索引器函数
  2. IndexFunc: 索引器函数,接受一个资源对象,返回检索结果列表(字符串列表,表示根据资源对像里特定字段分析出来的索引列表)
  3. Indices: 缓存器,key为缓存器名称(一般情况下这个值与索引器名称相同),value为缓存数据,如上图所示
  4. Index: 缓存数据(其实存储的是根据indexFunc分析到的索引值及所关联的所有资源对像的key,如上图所示)

Client-go controller启动流程


package cmd

import (

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
    if err != nil {
    stopCh := make(chan struct{})
    defer close(stopCh)
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
    sharedInformer := informers.NewSharedInformerFactory(clientset, time.Second)

    informer := sharedInformer.Core().V1().Pods().Informer() //K8S中每个资源都有自己的informer
        AddFunc: func(obj interface{}) {
        UpdateFunc: func(oldObj, newObj interface{}) {
            log.Info(oldObj, newObj)
        DeleteFunc: func(obj interface{}) {



  1. Shared Informer机制

    在使用Client-go编码时,若同一资源的Informer被实例化了多次,每个Informer必然拥有着一个Reflector,那么就会运行多个相同的ListAndWatch(Reflector调用ListAndWatch),这样的话就会对Kubernetes API Server造成不必要的压力,同时也会产生太多针对同一资源的序列化和反序列化操作。

    所以,在使用Client-go开发自定义控制器的时候会使用informers.NewSharedInformerFactory对具体的资源的informer 进行实例化,informers.NewSharedInformerFactory不会对相同资源的informer进行多次真实的实例化,可以使相同的资源的informer共享一个Reflector。在informers.sharedInformerFactory使用一个map数据结构实现共享Informer机制。

    如我们的示例代码中sharedInformer.Core().V1().Pods().Informer()会调用informers.sharedInformerFactory.InformerFor()方法,从而实现pod informer的实例化,并且注册到sharedInformerFactory中。

type sharedInformerFactory struct {
 client           kubernetes.Interface
 namespace        string
 tweakListOptions internalinterfaces.TweakListOptionsFunc
 lock             sync.Mutex
 defaultResync    time.Duration
 customResync     map[reflect.Type]time.Duration
 informers map[reflect.Type]cache.SharedIndexInformer
 // startedInformers is used for tracking which informers have been started.
 // This allows Start() to be called multiple times safely.
 startedInformers map[reflect.Type]bool


// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
 defer f.lock.Unlock()
 informerType := reflect.TypeOf(obj)
 informer, exists := f.informers[informerType]
 if exists { //若存在,直接返回之前创建好的informer
     return informer
 resyncPeriod, exists := f.customResync[informerType]
 if !exists {
     resyncPeriod = f.defaultResync
  //newFunc 是具体某个资源的informer中的informerFor方法传过来的,是具体实例化某个资源informer的方法;
 informer = newFunc(f.client, resyncPeriod)
 f.informers[informerType] = informer

 return informer

由上面代码可知,在我们执行informer := sharedInformer.Core().V1().Pods().Informer()的时候,如果已经存在了pod资源的informer,并不会再实例化一个informer对象,而是将已经存在的那个informer对象返回给用户;也就是说相关资源的reflector也只会有一个,将来启动的时候,pod资源的ListAndWatch只会执行一个。我们可以理解为pod资源的生产者只有一个,但是消费者往往是用户自定义的(比如自定义控制器),各个消费者对某个资源的处理逻辑都不是同的。那么,针对于某一个具体类型的(如Pod)的informer(reflector),在消费者端肯定是对接了一个或者多个EventHandle。在我们的示例代码中,我们会通过如下代码注册我们的自定义EvenHandle:

   AddFunc: func(obj interface{}) {
   UpdateFunc: func(oldObj, newObj interface{}) {
      log.Info(oldObj, newObj)
   DeleteFunc: func(obj interface{}) {


func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
 s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)

//handler 为用户自定义的处理函数,里面一般都是用户自定义控制器的核心逻辑入口
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
 defer s.startedLock.Unlock()
  //若当前这个类型(如Pod informer)的informer已经停止,则不直接返回
 if s.stopped {
     klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)

 if resyncPeriod > 0 {
     if resyncPeriod < minimumResyncPeriod {
         klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
         resyncPeriod = minimumResyncPeriod

     if resyncPeriod < s.resyncCheckPeriod {
         if s.started {
             klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
             resyncPeriod = s.resyncCheckPeriod
         } else {
             // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
             // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
             // accordingly
             s.resyncCheckPeriod = resyncPeriod
// 根据用户自定义hanlder示例化一个listener对象
  //initialBufferSize 默认值为1024,为每个listener创建一个buffer,默认可存储1024个等待消费的事件
 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
 if !s.started {

 // in order to safely join, we have to
 // 1. stop sending add/update/delete notifications
 // 2. do a list against the store
 // 3. send synthetic "Add" events to the new handler
 // 4. unblock
 defer s.blockDeltas.Unlock()
 for _, item := range s.indexer.List() {
     listener.add(addNotification{newObj: item})
  1. Listener(AddEventHandler)


    func (p *sharedProcessor) addListener(listener *processorListener) {
     defer p.listenersLock.Unlock()
     if p.listenersStarted {
  1. 消费者入口(HandleDeltas)


    此时,我们从示例代码中的 sharedInformer.Start(stopCh)说起,它是所有informer的启动入口,里面会启动reflector并调用HandleDeltas(相当于生产者,消费者入口都是在这里启动的):

    // Start initializes all requested informers.
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
     defer f.lock.Unlock()
     for informerType, informer := range f.informers {
         if !f.startedInformers[informerType] {
             go informer.Run(stopCh)
             f.startedInformers[informerType] = true

    我们到go informer.Run(stopCh)中看一下:

    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
     defer utilruntime.HandleCrash()
     //实例化deltaFIFO对象,reflector从k8s api server获取到的事件都会存储在此
     fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
     cfg := &Config{
         Queue:            fifo,
         ListerWatcher:    s.listerWatcher,
         ObjectType:       s.objectType,
         FullResyncPeriod: s.resyncCheckPeriod,
         RetryOnError:     false,
         ShouldResync:     s.processor.shouldResync,
         Process: s.HandleDeltas,//注册了事件处理函数
     func() {
         defer s.startedLock.Unlock()
         s.controller = New(cfg)//实例化controller对象
         s.controller.(*controller).clock = s.clock
         s.started = true
     // Separate stop channel because Processor should be stopped strictly after controller
     processorStopCh := make(chan struct{})
     var wg wait.Group
     defer wg.Wait()              // Wait for Processor to stop
     defer close(processorStopCh) // Tell Processor to stop
     wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
     defer func() {
         defer s.startedLock.Unlock()
         s.stopped = true // Don't want any new listeners


    // Run begins processing items, and will continue until a value is sent down stopCh.
    // It's an error to call Run more than once.
    // Run blocks; call via go.
    func (c *controller) Run(stopCh <-chan struct{}) {
     defer utilruntime.HandleCrash()
     go func() {
     r := NewReflector(
        //具体某个k8s资源的listAndWatch函数(在我们的示例代码是pod informer指定的ListAndWatch函数)
     r.ShouldResync = c.config.ShouldResync
     r.clock = c.clock
     c.reflector = r
     var wg wait.Group
     defer wg.Wait()
     wg.StartWithChannel(stopCh, r.Run) //在一个新的goroutine中启动reflector
     wait.Until(c.processLoop, time.Second, stopCh)//执行processLoop,会一直运行


    // processLoop drains the work queue.
    // TODO: Consider doing the processing in parallel. This will require a little thought
    // to make sure that we don't end up processing the same object multiple times
    // concurrently.
    // TODO: Plumb through the stopCh here (and down to the queue) so that this can
    // actually exit when the controller is stopped. Or just give up on this stuff
    // ever being stoppable. Converting this whole package to use Context would
    // also be helpful.
    func (c *controller) processLoop() {
       for {
          obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
          if err != nil {
             if err == FIFOClosedError {
             if c.config.RetryOnError {
                // This is the safe way to re-enqueue.


    // Pop blocks until an item is added to the queue, and then returns it.  If
    // multiple items are ready, they are returned in the order in which they were
    // added/updated. The item is removed from the queue (and the store) before it
    // is returned, so if you don't successfully process it, you need to add it back
    // with AddIfNotPresent().
    // process function is called under lock, so it is safe update data structures
    // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
    // may return an instance of ErrRequeue with a nested error to indicate the current
    // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
    // Pop returns a 'Deltas', which has a complete list of all the things
    // that happened to the object (deltas) while it was sitting in the queue.
    func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
       defer f.lock.Unlock()
       for {
          for len(f.queue) == 0 {
             // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
             // When Close() is called, the f.closed is set and the condition is broadcasted.
             // Which causes this loop to continue and return from the Pop().
             if f.IsClosed() {
                return nil, FIFOClosedError
          id := f.queue[0]
          f.queue = f.queue[1:]
          if f.initialPopulationCount > 0 {
          item, ok := f.items[id]
          if !ok {
             // Item may have been deleted subsequently.
          delete(f.items, id)
          err := process(item)
          if e, ok := err.(ErrRequeue); ok {
             f.addIfNotPresent(id, item)
             err = e.Err
          // Don't need to copyDeltas here, because we're transferring
          // ownership to the caller.
          return item, err


    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
     defer s.blockDeltas.Unlock()
     // from oldest to newest
     for _, d := range obj.(Deltas) {
         switch d.Type {
         case Sync, Added, Updated:
             isSync := d.Type == Sync //Sync类型的事件是由Resync机制产生的,下面会聊
             if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                 if err := s.indexer.Update(d.Object); err != nil { //如果该事件在本地缓存中也存在,则更新本地缓存
                     return err
                 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
             } else {
                 if err := s.indexer.Add(d.Object); err != nil {
                     return err
                 s.processor.distribute(addNotification{newObj: d.Object}, isSync)
         case Deleted:
             if err := s.indexer.Delete(d.Object); err != nil {
                 return err
             s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
     return nil


    func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
       defer p.listenersLock.RUnlock()
       if sync { // 如果事件类型是Sync,会进入该逻辑
          for _, listener := range p.syncingListeners { //syncingListeners与listeners都会保存所有注册进来的Listener
       } else {
          for _, listener := range p.listeners { //p.listners上面代码有聊到,如何将一个新的listener注册进来


    func (p *processorListener) add(notification interface{}) {
       p.addCh <- notification


    func (p *processorListener) pop() {
       defer utilruntime.HandleCrash()
       defer close(p.nextCh) // Tell .run() to stop
       var nextCh chan<- interface{}
       var notification interface{}
       for {
          select {
          case nextCh <- notification: //notification的值是下面分支赋值的,同时nextCh的值是p.nextCh,这里相当于将数据给p.nextCh
             // Notification dispatched
             var ok bool
             notification, ok = p.pendingNotifications.ReadOne() 
             if !ok { // Nothing to pop
                nextCh = nil // Disable this select case //如果buffer中没有值,那么nextCh,这种情况下这个分支不会触发
          case notificationToAdd, ok := <-p.addCh: //若addCh中有数据,直接进入这个分支
             if !ok {
             if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd //从addCh读取到值赋给notification
                nextCh = p.nextCh //p.nextCh也赋给本地变量nextCh
             } else { // There is already a notification waiting to be dispatched


    func (p *processorListener) run() {
       // this call blocks until the channel is closed.  When a panic happens during the notification
       // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
       // the next notification will be attempted.  This is usually better than the alternative of never
       // delivering again.
       stopCh := make(chan struct{})
       wait.Until(func() {
          // this gives us a few quick retries before a long pause and then a few more quick retries
          err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
             for next := range p.nextCh { //不停地从p.nextCh中读取数据
                switch notification := next.(type) {
                case updateNotification:
                   p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                case deleteNotification:
                   utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
             // the only way to get here is if the p.nextCh is empty and closed
             return true, nil
          // the only way to get here is if the p.nextCh is empty and closed
          if err == nil {
       }, 1*time.Minute, stopCh)


  1. Reflector(生产者)

    在上面的s.controller.Run(stopCh)中有通过wg.StartWithChannel(stopCh, r.Run),启动 reflector,我们直接进入reflecotr.Run中看下:

    // Run starts a watch and handles watch events. Will restart the watch if it is closed.
    // Run will exit when stopCh is closed.
    func (r *Reflector) Run(stopCh <-chan struct{}) {
       klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod,
       wait.Until(func() {
          if err := r.ListAndWatch(stopCh); err != nil { //直接调用ListAndWatch
       }, r.period, stopCh)
    // ListAndWatch first lists all items and get the resource version at the moment of call,
    // and then use the resource version to watch.
    // It returns error if ListAndWatch didn't even try to initialize watch.
    func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
     klog.V(3).Infof("Listing and watching %v from %s", r.expectedType,
     var resourceVersion string
     // Explicitly set "0" as resource version - it's fine for the List()
     // to be served from cache and potentially be delayed relative to
     // etcd contents. Reflector framework will catch up via Watch() eventually.
     options := metav1.ListOptions{ResourceVersion: "0"} //在执行的List的时候会从version 0读取
     if err := func() error {
         initTrace := trace.New("Reflector " + + " ListAndWatch")
         defer initTrace.LogIfLong(10 * time.Second)
         var list runtime.Object
         var err error
         listCh := make(chan struct{}, 1)
         panicCh := make(chan interface{}, 1)
         go func() {
             defer func() {
                 if r := recover(); r != nil {
                     panicCh <- r
             list, err = r.listerWatcher.List(options) //调用某个k8s资源informer指定的list函数,如我们在pod informer中看到的list函数的定义
         select {
         case <-stopCh:
             return nil
         case r := <-panicCh:
         case <-listCh:
         if err != nil {
             return fmt.Errorf("%s: Failed to list %v: %v",, r.expectedType, err)
         initTrace.Step("Objects listed")
         listMetaInterface, err := meta.ListAccessor(list)
         if err != nil {
             return fmt.Errorf("%s: Unable to understand list result %#v: %v",, list, err)
         resourceVersion = listMetaInterface.GetResourceVersion()
         initTrace.Step("Resource version extracted")
         items, err := meta.ExtractList(list)
         if err != nil {
             return fmt.Errorf("%s: Unable to understand list result %#v (%v)",, list, err)
         initTrace.Step("Objects extracted")
         if err := r.syncWith(items, resourceVersion); err != nil { 
             return fmt.Errorf("%s: Unable to sync list result: %v",, err)
         initTrace.Step("SyncWith done")
         initTrace.Step("Resource version updated")
         return nil
     }(); err != nil {
         return err
     resyncerrc := make(chan error, 1)
     cancelCh := make(chan struct{})
     defer close(cancelCh)
     go func() {
         resyncCh, cleanup := r.resyncChan()
         defer func() {
             cleanup() // Call the last one written into cleanup
         for {
             select {
             case <-resyncCh:
             case <-stopCh:
             case <-cancelCh:
             if r.ShouldResync == nil || r.ShouldResync() {
                 klog.V(4).Infof("%s: forcing resync",
            if err :=; err != nil { //调用deltaFIFO中的reSync中,将informer 缓存中(indexer)的数据定期在读取出来,放到deltaFIFO队列尾部,以Sync类型的数据分发出去,最终会以Update类型的事件触发所有的Listener
                     resyncerrc <- err
             resyncCh, cleanup = r.resyncChan()
     for {
         // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
         select {
         case <-stopCh:
             return nil
         timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
         options = metav1.ListOptions{
             ResourceVersion: resourceVersion,
             // We want to avoid situations of hanging watchers. Stop any wachers that do not
             // receive any events within the timeout window.
             TimeoutSeconds: &timeoutSeconds,
    //调用某个资源informer定义的watch方法,如我们在pod informer看到的watch方法的定义
         w, err := r.listerWatcher.Watch(options)
         if err != nil {
             switch err {
             case io.EOF:
                 // watch closed normally
             case io.ErrUnexpectedEOF:
                 klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v",, r.expectedType, err)
                 utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v",, r.expectedType, err))
             // If this is "connection refused" error, it means that most likely apiserver is not responsive.
             // It doesn't make sense to re-list all objects because most likely we will be able to restart
             // watch where we ended.
             // If that's the case wait and resend watch request.
             if urlError, ok := err.(*url.Error); ok {
                 if opError, ok := urlError.Err.(*net.OpError); ok {
                     if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
             return nil
         if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
             if err != errorStopRequested {
                 klog.Warningf("%s: watch of %v ended with: %v",, r.expectedType, err)
             return nil
  2. Resync机制


// Resync will send a sync event for each item
func (f *DeltaFIFO) Resync() error {
    defer f.lock.Unlock()

    if f.knownObjects == nil {
        return nil

    keys := f.knownObjects.ListKeys()
    for _, k := range keys {
        if err := f.syncKeyLocked(k); err != nil {
            return err
    return nil

func (f *DeltaFIFO) syncKeyLocked(key string) error {
    obj, exists, err := f.knownObjects.GetByKey(key) // kndownObjects就是本地缓存Indexer,实例化DeltaFIFO的时候传参
    if err != nil {
        klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
        return nil
    } else if !exists {
        klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
        return nil

    // If we are doing Resync() and there is already an event queued for that object,
    // we ignore the Resync for it. This is to avoid the race, in which the resync
    // comes with the previous value of object (since queueing an event for the object
    // doesn't trigger changing the underlying store <knownObjects>.
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    if len(f.items[id]) > 0 { //如果这个资源对像目前也在deltaFIFO中存在,则什么都不做。
        return nil

    if err := f.queueActionLocked(Sync, obj); err != nil { 
        return fmt.Errorf("couldn't queue object: %v", err)
    return nil
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,165评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,503评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,295评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,589评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,439评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,342评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,749评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,397评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,700评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,740评论 2 313
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,523评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,364评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,755评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,024评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,297评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,721评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,918评论 2 336
