Table of Contents
- 1.章节介绍
- 2. cache.SharedIndexInformer结构介绍
- 3. sharedIndexInformer.Run
- 4 参考
1.章节介绍
从上一章节可以知道。利用informer机制可以非常简单地实现一个资源对象的控制器,具体步骤为
(1)new SharedInformerFactory实例,然后指定indexer,listWatch参数,就可以生成一个 cache.SharedIndexInformer 对象
(2)cache.SharedIndexInformer 实际是完成了下图中的informer机制
这一章节开始从SharedIndexInformer入手研究informer机制。
2. cache.SharedIndexInformer结构介绍
type sharedIndexInformer struct {
indexer Indexer // 本地的缓存+索引机制,上一篇文章详解介绍了
controller Controller // 控制器,启动reflector, 这个controller包含reflector:根据用户定义的ListWatch方法获取对象并更新增量队列DeltaFIFO
processor *sharedProcessor // 注册了add,update,del事件的listener集合
cacheMutationDetector CacheMutationDetector // 突变检测器
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher // 定义了list, watch函数, 看podinformer那里就可以知道,是直接调用了client往apiserver发送了请求
objectType runtime.Object // 定义要List watch的对象类型。如果是Podinfomer,就是要传入core.v1.pod
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration // 给自己的controller的reflector每隔多少s<尝试>调用listener的shouldResync方法
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration // 通过AddEventHandler注册的handler的默认同步值
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}
SharedIndexInformer主要包括以下对象:
(1)indexer
图中右下角的indexer。上一节已经分析了具体的实现。
(2)Controller
图中左边的Controller,启动reflector, list-watch那一套机制。接下来重点分析
(3)processor
图中最下面的listeners,所有往 informer注册了 ResourceEventHandler的都是一个listener。
因为是共享informer,所以存在一个inforemr实例化了多次,然后注册了多个ResourceEventHandler。一般情况下,一个Informer一个listener
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener // 记录了informer添加的所有listener
syncingListeners []*processorListener // 记录了informer中哪些listener处于sync状态。由resyncCheckPeriod参数控制。每隔resyncCheckPeriod秒,listener都需要重新同步一下,同步就是将listener变成syncingListeners。
clock clock.Clock
wg wait.Group
}
ResourceEventHandler结构体如下。这个就是定义Informer,add, update, del的处理事件。
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
(4)CacheMutationDetector
突变检测器,用来检测内存中对象是否发生了突变。测试的时候用,默认不开启。这个先不深入了解
3. sharedIndexInformer.Run
k8s.io/client-go/tools/cache/shared_informer.go
在使用informer的时候,定义好sharedIndexInformer后,就直接运行了sharedIndexInformer.Run函数开始了整个Informer机制。
整个informer的运转逻辑就是:
(1)deltaFIFO接收listAndWatch的全量/增量数据,然后通过pop函数发送到HandleDeltas函数中 (生产)
(2)HandleDeltas将一个一个的事件发送到自定义的handlers 和 更新indexer缓存 (消费)
现在就沿着 Run这个函数入手,看看具体是如何实现的。sharedIndexInformer.Run主要逻辑如下:
- new一个 deltafifo对象,并且指定对象的keyfun为 MetaNamespaceKeyFunc,就是用 ns/name 来当对象的key
- 生成config,利用config 生成一个controller
- 运行用户自定义handler的处理逻辑,s.processor.run (开启消费)
- 运行controller.run,实现整体的运作逻辑 (开启生产)
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 1. new一个 deltafifo对象,并且指定对象的keyfun为 MetaNamespaceKeyFunc,就是用 ns/name 来当对象的key
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 2. 生成config
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod, // 同步周期
RetryOnError: false,
ShouldResync: s.processor.shouldResync, // 这是个函数,用于判断自定义的handler是否需要同步
Process: s.HandleDeltas, // listwatch来了数据,如何处理的函数
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 3. 利用config 生成一个controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
// 内存突变检测,忽略
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 4. 运行用户自定义handler的处理逻辑
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 5.运行controller
s.controller.Run(stopCh)
}
3.1 NewDeltaFIFO
3.1.1 DeltaFIFO的定位
在apisever中的list-watch机制介绍中,就可以知道。直接使用list,watch api就可以获得全量和增量数据。
如果让我写一个最简单的client-go客户端,我实现的方式是:
(1)定义一个本地存储cache,list的时候将数据放到cache中
(2)然后watch的时候就更新cache数据,然后再将对象发送到自定义的add, update, del handler函数中
需要cache的原因:本地缓存一份etcd数据,这样控制器需要访问数据的话,直接从本地拿。
以上可以实现一个很简陋的客户端,但是还远远达不到informer机制的要求。
informer机制为啥需要DeltaFIFO?
(1)为啥需要FIFO队列?
很容易理解,FIFO是保障有序,不有序就会导致数据错乱。 队列是为了缓冲,如果更新的数据太多,informer机制可能就扛不住了
(2)为啥需要delta?
FIFO队列的元素总共就两个去向。第一用于同步本地缓存。第二用于发送给自定义的add, update, del handler函数。
假设某个极短的时间内,某一个对象做了大量的update,最后被删除了。这样的话,FIFO队列其实是堆积了很多数据。
一个一个发送给handler函数没有问题,因为用户就想知道这个过程。但是如果是一个一个的更新本地缓存,最后又delete了,那前面的update就浪费了。
所以这个时候DeltaFIFO队列出现了。它解决了这个问题。
3.1.2 DeltaFIFO结构介绍
DeltaFIFO可以认为是一个特殊的FIFO队列。Delta就是k8s系统中对象的变化(增、删、改、同步)的一个标记。
增、删、改肯定是需要的,因为就算我们自己实现一个队列也需要当前是做了什么操作。
同步是重新List apiserver的时候需要的
// 有着四种类型
// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync"
)
// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType
Object interface{} //k8s中的对象
}
// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string
// populated和initialPopulationCount 是用来判断 process是否同步的
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool //队列的元素开始被消费
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
}
DeltaFIFO最关键的是, items, queue, 和knownObjects。
items: 对象的变化过程列表
Queue: 表示对象的key。
knownObjects:从下面的初始化可以看出来,就是 cache.indexer
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
knownObjects: knownObjects,
}
f.cond.L = &f.lock
return f
}
3.1.3 举例说明deltaFifo核心结构
假设监听了 default命名空间的所有Pod,最开始该命名空间没有Pod,然后监听了一会后,创建了三个Pod, 分别为:
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
那么watch函数依次会产生如下的事件:
pod1-1:表示pod1对应的第一个阶段 (pending状态)
pod1-2:表示pod1对应的第二个阶段 (scheduled状态)
pod1-3:表示pod1对应的第三个阶段 (running状态)
ADD: pod1-1(省略模式,其实是整个pod的元数据,{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}})
ADD: pod2-1
MODIFIED: pod1-2
ADD: pod3-1
MODIFIED: pod2-2
MODIFIED: pod3-2
MODIFIED: pod1-3
MODIFIED: pod3-3
MODIFIED: pod2-3
这个时候 deltaFIFO结构体的对象为:
deltaFIFO {
queue: ["one", "two", "tree"],
Items: {
"one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}],
"two": [{"add", pod2-1}, {"update", pod2-2}, {"update", pod2-3}],
"tre": [{"add", pod3-1}, {"update", pod3-2}, {"update", pod3-3}],
}
}
这样的好处就是:
(1)每次是以一个对象为单位进行发送,比如这里一次就将 "one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}] 三个事件发送给了 handler方
(2)indexer可以知道当前对象的最终状态。比如 "one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}], 这个,能跳过pod1-1, pod1-2状态,直接将pod1-3状态更新到缓存中去。
3.2 sharedIndexInformer生产数据
都知道数据产生方来着 apisever的listAndWatch。接下来看看是如何使用的。这里直接从 controller.run入手。
3.2.1 controller结构
controller结构本身非常简单,主要就是一个config,然后根据config实现的一些生产数据相关的函数
// New makes a new Controller from the given Config.
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
// Config contains all the settings for a Controller.
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
// should accept the output of this Queue's Pop() method.
// 弄一个数据缓存
Queue
// 从aipserver接收数据
ListerWatcher
// Something that can process your objects.
// 如何处理接收到的数据
Process ProcessFunc
// The type of your objects.
// 数据是什么类型,Pod? deploy?
ObjectType runtime.Object
FullResyncPeriod time.Duration
// 是否需要同步
ShouldResync ShouldResyncFunc
//是否错误重试
RetryOnError bool
}
3.2.2 controller.run
- 实例化 NewReflector
- 通过List-watch获得生产数据
- 处理生产数据,不断执行processLoop,这个方法其实就是从DeltaFIFO pop出对象,再调用reflector的Process(其实是shareIndexInformer的HandleDeltas方法)处理
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 1.实例化 NewReflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
// 2. 通过List-watch获得生产数据
wg.StartWithChannel(stopCh, r.Run)
// 3. 处理生产数据
// 不断执行processLoop,这个方法其实就是从DeltaFIFO pop出对象,再调用reflector的Process(其实是shareIndexInformer的HandleDeltas方法)处理
wait.Until(c.processLoop, time.Second, stopCh)
}
3.2.3 Reflector实例
Reflector核心结构,可以看出来基本都是从config基础下来的。
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
// The type of object we expect to place in the store.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store //获得数据存放哪里,就是deltaFIFO队列
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// Defaults to pager.PageSize.
WatchListPageSize int64
}
3.2.4 Reflector.run
就是上面的r.un。就做一件事。运行listAndWatch函数。
注意:ListAndWatch函数是1s运行一次哟。
所以relist并不是listAndWatch干的。ListAndWatch只是进行一轮list 和 watch(正常情况会一直保持watch)
当ListAndWatch因为异常/错误或者其他原因退出了,Reflector会自动再次执行listAndWatch
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
NewReflector定义了period是1s
// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
period: time.Second, // period是1s
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
r.setExpectedType(expectedType)
return r
}
3.2.5 ListAndWatch
知识补充
listAndWatch核心思路就是:将apiserver list/watch到的数据发送到deltaFIFO队列中去。
在看代码之前,先通过curl kube-apiserver来看看 list-watch的特性。
(1)podList可以认为是一个新的对象,它也是有资源版本的说法
(2)list默认是用来chunk(分段传输)的,chunk的介绍和好处 https://zh.wikipedia.org/wiki/%E5%88%86%E5%9D%97%E4%BC%A0%E8%BE%93%E7%BC%96%E7%A0%81
(3)v1.19 及以上版本的 API 服务器支持 resourceVersionMatch
参数,用以确定如何对 LIST 调用应用 resourceVersion 值。 强烈建议在为 LIST 调用设置了 resourceVersion
时也设置 resourceVersionMatch
。 如果 resourceVersion
未设置,则 resourceVersionMatch
是不允许设置的。 为了向后兼容,客户端必须能够容忍服务器在某些场景下忽略 resourceVersionMatch
的行为:
- 当设置
resourceVersionMatch=NotOlderThan
且指定了limit
时,客户端必须能够 处理 HTTP 410 "Gone" 响应。例如,客户端可以使用更新一点的resourceVersion
来重试,或者回退到resourceVersion=""
(即允许返回任何版本)。 - 当设置了
resourceVersionMatch=Exact
且未指定limit
时,客户端必须验证 响应数据中ListMeta
的resourceVersion
与所请求的resourceVersion
匹配, 并处理二者可能不匹配的情况。例如,客户端可以重试设置了limit
的请求。
除非你对一致性有着非常强烈的需求,使用 resourceVersionMatch=NotOlderThan
同时为 resourceVersion
设定一个已知值是优选的交互方式,因为与不设置 resourceVersion
和 resourceVersionMatch
相比,这种配置可以取得更好的 集群性能和可扩缩性。后者需要提供带票选能力的读操作。
参考:https://kubernetes.io/zh/docs/reference/using-api/api-concepts/
resourceVersionMatch 参数 | 分页参数 | resourceVersion 未设置 | resourceVersion="0" | resourceVersion="<非零值>" |
---|---|---|---|---|
resourceVersionMatch 未设置 | limit 未设置 | 最新版本 | 任意版本 | 不老于指定版本 |
resourceVersionMatch 未设置 | limit=<n>, continue 未设置 | 最新版本 | 任意版本 | 精确匹配 |
resourceVersionMatch 未设置 | limit=<n>, continue=<token> | 从 token 开始、精确匹配 | 非法请求,视为从 token 开始、精确匹配 | 非法请求,返回 HTTP 400 Bad Request
|
resourceVersionMatch=Exact [1] | limit 未设置 | 非法请求 | 非法请求 | 精确匹配 |
resourceVersionMatch=Exact [1] | limit=<n>, continue 未设置 | 非法请求 | 非法请求 | 精确匹配 |
resourceVersionMatch=NotOlderThan [1] | limit 未设置 | 非法请求 | 任意版本 | 不老于指定版本 |
resourceVersionMatch=NotOlderThan [1] | limit=<n>, continue 未设置 | 非法请求 | 任意版本 | 不老于指定版本 |
// curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods -i
HTTP/1.1 200 OK
Audit-Id: 4ff9e833-e3e0-4001-9e1a-d83c9a9b1937
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 02:10:48 GMT
Transfer-Encoding: chunked
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/namespaces/test-test/pods",
"resourceVersion": "163916927"
},
"items": [
root@cld-kmaster1-1051:/home/zouxiang# curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods?limit=1 -i
HTTP/1.1 200 OK
Audit-Id: 17d0d42f-a122-4c5a-9659-70224a22522a
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 02:09:32 GMT
Transfer-Encoding: chunked //chunked传输
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/namespaces/test-test/pods",
"resourceVersion": "163915936",
// 注意这continue
"continue": "eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJydiI6MTYzOTE1OTM2LCJzdGFydCI6ImFwcC1pc3Rpb3ZlcnNpb24tdGVzdC01NDZkZmZmNTYtNnQ2MnBcdTAwMDAifQ",
"remainingItemCount": 23 //表示当前还有23个没有展示处理
},
"items": [
{
"metadata": {
"name": "app-istioversion-test-546dfff56-6t62p",
"generateName": "app-istioversion-test-546dfff56-",
// 加上这个continue参数,会把剩下的23个展示出来。
curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods?continue=eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJydiI6MTYzOTE1OTM2LCJzdGFydCI6ImFwcC1pc3Rpb3ZlcnNpb24tdGVzdC01NDZkZmZmNTYtNnQ2MnBcdTAwMDAifQ
watch很简单,就是一个长链接,chunked
root@cld-kmaster1-1051:/home/zouxiang# curl http://7.34.19.44:58201/api/v1/namespaces/default/pods?watch=true -i
HTTP/1.1 200 OK
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 01:32:06 GMT
Transfer-Encoding: chunked
源码分析
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
// 以版本号ResourceVersion=0开始首次list
options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
// 开始list数据,分页
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
// 获取list的数据
list, err = pager.List(context.Background(), options)
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
}
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
// 提取list
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
// 提取list的数据
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
// 设置下一次list的resourceVersion
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
// 进行deltaFIFo的同步
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
// 开始Watch
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
continue
}
return nil
}
// 处理watch的事件
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
结合知识补充大概的流程很清楚。回答以下几个问题
(1)list操作为什么需要resoureversion?
A: list机制本来就有resoureversion,resoureversion不同的值有不同的含义。每次list的时候记录了resoureversion,可以保证数据最少是上一次list后的(实际基本都是最新的)
(2)为什么list会分页?
如果设置了limit就会分页
(3)如果提取list的数据
先是通过 items, err := meta.ExtractList(list) ,将list数据保持到items数组中
然后通过syncWith将List数据保持到 deltafIfo队列中去
syncWith的逻辑如下:
(1)遍历所有list的数据,通过 queueActionLocked(Sync, item)将所有的数据,以(sync, item)的方式追加到 deltafifo的items里面
(2)遍历所有fIfo queue的数据,判断是否存下 fifo有,但是最新list没有的数据。如果存在这种情况。说明fifo漏到了delete请求,所以封装一个(delete, DeletedFinalStateUnknown) 到deltafifo的items里面。
为什么是DeletedFinalStateUnknown呢?
因为Replace方法可能是reflector发生re-list的时候再次调用,这个时候就会出现knownObjects中存在的对象不在Replace list的情况(比
如watch的delete事件丢失了),这个时候是把这些对象筛选出来,封装成DeletedFinalStateUnknown对象以Delete type类型再次加入
到deltaFIFO中,这样最终从detaFIFO处理这个DeletedFinalStateUnknown 增量时就可以更新本地缓存并且触发reconcile。 因为这个对
象最终的结构确实找不到了,所以只能用knownObjects里面的记录来封装delta,所以叫做FinalStateUnknown。
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
// Replace will delete the contents of 'f', using instead the given map.
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 第一次遍历list到的数据
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
// 2.将数据同步到fifo队列中去。这个就是往fifi的items加入元素。可以看出来,list的都是同步的数据
// items的delta有四种类型:add, update, del, sync (这里都是sync)
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 这个不存在,因为f.knownObjects=deltafifo
if f.knownObjects == nil {
// Do deletion detection against our own list.
}
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
// 第二次遍历fifo中队列的数据
for _, k := range knownKeys {
// 如果fifo中的数据,List也有,那就不用管,因为上面的for循环已经处理了
if keys.Has(k) {
continue
}
// 如果fifo中的数据,list没有,那就是该数据已经删除了,但是由于某些原因,缓存没有收到,所以要删除这个队形
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
// 发送的是delete的delta,主要这里是DeletedFinalStateUnknown
因为Replace方法可能是reflector发生re-list的时候再次调用,这个时候就会出现knownObjects中存在的对象不在Replace list的情况(比如watch的delete事件丢失了),这个时候是把这些对象筛选出来,封装成DeletedFinalStateUnknown对象以Delete type类型再次加入到deltaFIFO中,这样最终从detaFIFO处理这个DeletedFinalStateUnknown 增量时就可以更新本地缓存并且触发reconcile。 因为这个对象最终的结构确实找不到了,所以只能用knownObjects里面的记录来封装delta,所以叫做FinalStateUnknown。
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
3.2.6 c.processLoop
list, watch将apiserver获取的数据最终都保存到了 deltafifo队列中去
processLoop将数据进行了分发处理。
processLoop就是将一个个元素拿出来,
func (c *controller) processLoop() {
for {
// for循环的方式将fifo队列中的元素发送到 PopProcessFunc函数中去
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // 在new config的时候指定了process= cfg :=HandleDeltas 函数
}
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 1.队列为空,判断是否关闭,如果没有关闭就等待,否则返回
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
// 2.取出来第一个元素, 注意是 queue里面的一个元素,对应的是Items里面的一个 map key-value对
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
// 3.调用process进行处理
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
HandleDeltas函数
终于出现了HandleDeltas, 如图中HandleDeltas功能所示:
HandleDeltas就是干两件事情:
(1)更新Indexer (这里很奇怪,没有一次性更新Indexer到位,就是如果Deltas最后一个是del事件,还是会先update后再删除)
(2)将事件进行distribute发送
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
// 同步就是relist的时候,fifo replace函数发出来的事件
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
distribute就很简单,将事件进行发送,这里有一个很简单的逻辑:
就是注册resourceHandler的时候,可以指定是否需要同步。比如我New一个informer,然后指定不同步。
这个时候我对应的resourceHandler就不是syncingListeners.
理解listeners和syncingListeners的区别
processor可以支持listener的维度配置是否需要resync:一个informer可以配置多个EventHandler,而一个EventHandler对应processor中的一个listener,每个listener可以配置需不需要resync,如果某个listener需要resync,那么添加到deltaFIFO的Sync增量最终也只会回到对应的listener
reflector中会定时判断每一个listener是否需要进行resync,判断的依据是看配置EventHandler的时候指定的resyncPeriod,0代表该listener不需要resync,否则就每隔resyncPeriod看看是否到时间了
- listeners:记录了informer添加的所有listener
- syncingListeners:记录了informer中哪些listener处于sync状态
syncingListeners是listeners的子集,syncingListeners记录那些开启了resync且时间已经到达了的listener,把它们放在一个独立的slice是避免下面分析的distribute方法中把obj增加到了还不需要resync的listener中
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
add 就是往 addch chan发送数据
虽然p.addCh是一个无缓冲的channel,但是因为listener中存在ring buffer,所以这里并不会一直阻塞
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
3.3 s.processor.run消费数据
sharedIndexInformer.Run指定了controller.run进行数据生产:就是将List, watch到的数据,以delta的方式保存到了deltafifo中
然后HandleDeltas 通过 distribute 函数将 delta变量发送到每一个 listener中去。
接下来分析s.processor.run是如何消费数据的。
s.processor.run的逻辑很清楚。启动每一个listener,run and pop。
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
processorListener结构
type processorListener struct {
nextCh chan interface{} // 发送给handler的对象
addCh chan interface{} // distribute发送下来的对象
handler ResourceEventHandler //定义informer时候的 add, update, del函数
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing // 缓存器,避免distribute发送的太快或者 hanlder处理的太慢
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration // 同步周期
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync.Mutex
}
pod and run
pop就是将addch 的对象发送到 nextCh。如果nextch满了的话,就保持在pendingNotifications中
run就是将nextCh的对象发送的 hanlder中去处理。
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
4. 总结
(1)使用shareInformerFactory机制可以共享informer
(2)Infomer的核心就是下面的reflector机制,运转流程为:
通过kube-apiserver的listAndWatch,监听到etcd的资源变化
-
内部通过deltaFIFO队列更好的分发处理这些资源变化
- deltaFIFO除了原封不动的继承kube-apiserver 的add/update/delete事件(这个是数据库元素的变化)外,还会增加一个sync动作。这个是重新list的时候,FIFO通过replace函数加的。
-
核心的处理函数事HandleDelta函数,它对这些资源变化进行处理分发,核心逻辑如下:
informer本身会自带indexer, 不管你使不使用,这是一个本队的缓存
-
对于一个资源来说,HandleDelta会首先更新本地的indexer缓存。然后再将资源变化发给每个listener。注意:
(1)kube-apiserver 的add/update/delete事件,不一定是listener看到的事件。比如一个apiserver update事件,如果indexer没有数据,那么下发给listenner的时候就是一个add事件
(2)indexerInformer通过来指定resyncPeriod,表示indexer的数据会定期这个时间从apiserver拉起全量数据。这些就是sync事件。这个只会同步同步需要sync的listener。
5.参考
https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html