背景
什么是 Informer 机制
一个控制器每次需要获取对象的时候都要访问 APIServer,这会给系统带来很高的负载,Informers 的内存缓存就是来解决这个问题的,此外 Informers 还可以几乎实时的监控对象的变化,而不需要轮询请求,这样就可以保证客户端的缓存数据和服务端的数据一致,就可以大大降低 APIServer 的压力了。
如上图展示了 Informer 的基本处理流程:
以 events 事件的方式从 APIServer 获取数据
提供一个类似客户端的 Lister 接口,从内存缓存中 get 和 list 对象
为添加、删除、更新注册事件处理程序
此外 Informers 也有错误处理方式,当长期运行的 watch 连接中断时,它们会尝试使用另一个 watch 请求来恢复连接,在不丢失任何事件的情况下恢复事件流。如果中断的时间较长,而且 APIServer 丢失了事件(etcd 在新的 watch 请求成功之前从数据库中清除了这些事件),那么 Informers 就会重新 List 全量数据。
而且在重新 List 全量操作的时候还可以配置一个重新同步的周期参数,用于协调内存缓存数据和业务逻辑的数据一致性,每次过了该周期后,注册的事件处理程序就将被所有的对象调用,通常这个周期参数以分为单位,比如10分钟或者30分钟。
Informers 的这些高级特性以及超强的鲁棒性,都足以让我们不去直接使用客户端的 Watch() 方法来处理自己的业务逻辑,而且在 Kubernetes 中也有很多地方都有使用到 Informers。但是在使用 Informers 的时候,通常每个 GroupVersionResource(GVR)只实例化一个 Informers,但是有时候我们在一个应用中往往有使用多种资源对象的需求,这个时候为了方便共享 Informers,我们可以通过使用共享 Informer 工厂来实例化一个 Informer。
共享 Informer 工厂允许我们在应用中为同一个资源共享 Informer,也就是说不同的控制器循环可以使用相同的 watch 连接到后台的 APIServer,例如,kube-controller-manager 中的控制器数据量就非常多,但是对于每个资源(比如 Pod),在这个进程中只有一个 Informer。
Informer 是 client-go 中的核心工具包,已经被 kubernetes 中众多组件所使用。所谓 Informer,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client,本地缓存被称为 Store,索引被称为 Index。使用 informer 的目的是为了减轻 apiserver 数据交互的压力而抽象出来的一个 cache 层, 客户端对 apiserver 数据的 "读取" 和 "监听" 操作都通过本地 informer 进行。
Informer 的主要功能:
同步数据到本地缓存
根据对应的事件类型,触发事先注册好的 ResourceEventHandle
为什么需要 Informer 机制?
我们知道Kubernetes各个组件都是通过REST API跟API Server交互通信的,而如果每次每一个组件都直接跟API Server交互去读取/写入到后端的etcd的话,会对API Server以及etcd造成非常大的负担。 而Informer机制是为了保证各个组件之间通信的实时性、可靠性,并且减缓对API Server和etcd的负担。
Informer 需要满足哪些要求?
消息可靠性
消息实时性
消息顺序性
高性能
核心功能
Informer的工作流程
Informer 首先会 list/watch apiserver,Informer 所使用的 Reflector 包负责与 apiserver 建立连接,Reflector 使用 ListAndWatch 的方法,会先从 apiserver 中 list 该资源的所有实例,list 会拿到该对象最新的 resourceVersion,然后使用 watch 方法监听该 resourceVersion 之后的所有变化,若中途出现异常,reflector 则会从断开的 resourceVersion 处重现尝试监听所有变化,一旦该对象的实例有创建、删除、更新动作,Reflector 都会收到"事件通知",这时,该事件及它对应的 API 对象这个组合,被称为增量(Delta),它会被放进 DeltaFIFO 中。
Informer 会不断地从这个 DeltaFIFO 中读取增量,每拿出一个对象,Informer 就会判断这个增量的时间类型,然后创建或更新本地的缓存,也就是 store。
如果事件类型是 Added(添加对象),那么 Informer 会通过 Indexer 的库把这个增量里的 API 对象保存到本地的缓存中,并为它创建索引,若为删除操作,则在本地缓存中删除该对象。
DeltaFIFO 再 pop 这个事件到 controller 中,controller 会调用事先注册的 ResourceEventHandler 回调函数进行处理。
在 ResourceEventHandler 回调函数中,其实只是做了一些很简单的过滤,然后将关心变更的 Object 放到 workqueue 里面。
Controller 从 workqueue 里面取出 Object,启动一个 worker 来执行自己的业务逻辑,业务逻辑通常是计算目前集群的状态和用户希望达到的状态有多大的区别,然后孜孜不倦地让 apiserver 将状态演化到用户希望达到的状态,比如为 deployment 创建新的 pods,或者是扩容/缩容 deployment。
在worker中就可以使用 lister 来获取 resource,而不用频繁的访问 apiserver,因为 apiserver 中 resource 的变更都会反映到本地的 cache 中。
List & Watch
List所做的,就是向API Server发送一个http短链接请求,罗列所有目标资源的对象。而Watch所做的是实际的“监听”工作,通过http长链接的方式,其与API Server能够建立一个持久的监听关系,当目标资源发生了变化时,API Server会返回一个对应的事件,从而完成一次成功的监听,之后的事情便交给后面的handler来做。
这样一个List & Watch机制,带来了如下几个优势:
事件响应的实时性:通过Watch的调用,当API Server中的目标资源产生变化时,能够及时的收到事件的返回,从而保证了事件响应的实时性。而倘若是一个轮询的机制,其实时性将受限于轮询的时间间隔。
事件响应的可靠性:倘若仅调用Watch,则如果在某个时间点连接被断开,就可能导致事件被丢失。List的调用带来了查询资源期望状态的能力,客户端通过期望状态与实际状态的对比,可以纠正状态的不一致。二者结合保证了事件响应的可靠性。
高性能:倘若仅周期性地调用List,轮询地获取资源的期望状态并在与当前状态不一致时执行更新,自然也可以do the job。但是高频的轮询会大大增加API Server的负担,低频的轮询也会影响事件响应的实时性。Watch这一异步消息机制的结合,在保证了实时性的基础上也减少了API Server的负担,保证了高性能。
事件处理的顺序性:我们知道,每个资源对象都有一个递增的ResourceVersion,唯一地标识它当前的状态是“第几个版本”,每当这个资源内容发生变化时,对应产生的事件的ResourceVersion也会相应增加。在并发场景下,K8s可能获得同一资源的多个事件,由于K8s只关心资源的最终状态,因此只需要确保执行事件的ResourceVersion是最新的,即可确保事件处理的顺序性。
ResourceVersion
Kubernetes 请求并发控制与数据一致性(含ResourceVersion、Update、Patch简析)
Kubernetes-resourceVersion机制分析
秘诀就是 Chunked transfer encoding(分块传输编码),它首次出现在HTTP/1.1。正如维基百科所说:
HTTP 分块传输编码允许服务器为动态生成的内容维持 HTTP 持久链接。通常,持久链接需要服务器在开始发送消息体前发送Content-Length消息头字段,但是对于动态生成的内容来说,在内容创建完之前是不可知的。使用分块传输编码,数据分解成一系列数据块,并以一个或多个块发送,这样服务器可以发送数据而不需要预先知道发送内容的总大小。
当客户端调用 watch API 时,apiserver 在response 的 HTTP Header 中设置 Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便和服务端该链接,并等待下一个数据块,即资源的事件信息。例如:
Informer 能保证通过list+watch不会丢失事件,如果网络抖动重新恢复后,watch会带着之前的resourceVersion号重连,resourceVersion是单调递增的, API Server 收到该请求后会将所有大于该resourceVersion的变更同步过来。
二级缓存
二级缓存属于 Informer 的底层缓存机制,这两级缓存分别是 DeltaFIFO 和 LocalStore。这两级缓存的用途各不相同。DeltaFIFO 用来存储 Watch API 返回的各种事件 ,LocalStore 只会被 Lister 的 List/Get 方法访问 。
如果K8s每次想查看资源对象的状态,都要经历一遍List调用,显然对 API Server 也是一个不小的负担,对此,一个容易想到的方法是使用一个cache作保存,需要获取资源状态时直接调cache,当事件来临时除了响应事件外,也对cache进行刷新。
虽然 Informer 和 Kubernetes 之间没有 resync 机制,但 Informer 内部的这两级缓存之间存在 resync 机制。
Resync
Resync 机制会将 Indexer 的本地缓存重新同步到 DeltaFIFO 队列中。一般我们会设置一个时间周期,让 Indexer 周期性地将缓存同步到队列中。直接 list/watch API Server 就已经能拿到集群中资源对象变化的 event 了,这里引入 Resync 的作用是什么呢?去掉会有什么影响呢?
自定义事件处理
ResourceEventHandler 用于处理对象的变更事件,用户可以通过实现 ResourceEventHandler 接口,并调用 sharedIndexInformer.AddEventHandler() 或 sharedIndexInformer.AddEventHandlerWithResyncPeriod() 方法注册到 sharedProcessor 中。这样,当数据发送变化时,就会回调 ResourceEventHandler 中对应的 OnAdd/OnUpdate/OnDelete 方法来实现用户自定义的处理逻辑
核心对象
Informer相关
client-go 中提供了几种不同的 Informer:
通过调用 NewInformer 函数创建一个简单的不带 indexer 的 Informer。
通过调用 NewIndexerInformer 函数创建一个简单的带 indexer 的 Informer。
通过调用 NewSharedIndexInformer 函数创建一个 Shared 的 Informer。
通过调用 NewDynamicSharedInformerFactory 函数创建一个为 Dynamic 客户端的 Shared 的 Informer。
这里带有 Indexer 和不带 Indexer 的大家好理解写,从字面意思来看,就是一个是带有 Indexer 功能一个不带有 Indexer 功能的 Informer。而这里的 Shared 的 Informer 引入,其实是因为随着 K8S 中,相同资源的监听者在不断地增加,从而导致很多调用者通过 Watch API 对 API Server 建立一个长连接去监听事件的变化,这将严重增加了 API Server 的工作负载,及资源的浪费。
比如在 kube-controller-manager 组件中,有很多控制管理都需要监听 Pod 资源的变化,如果都独立的调用 Informer 去维护一个对 APIServer 的长连接,这将导致 kube-controller-manager 中资源的浪费及增加了 APIServer 的负载,而不同控制管理者通过创建 Shared 的 Informer 则实现了这些控制管理者使用同一个 Watch 去和 APIServer 建立长连接,并在收到事件后,分发给下游的调用者。
SharedInformer
我们平时说的 Informer 其实就是 SharedInformer,它是可以共享使用的。如果同一个资源的 Informer 被实例化多次,那么就会运行多个 ListAndWatch 操作,这会加大 APIServer 的压力。而 SharedInformer 通过一个 map 来让同一类资源的 Informer 实现共享一个 Refelctor,这样就不会出现上面这个问题了。
Informer通过Local Store缓存目标资源对象,且仅为自己所用。但是在K8s中,一个Controller可以关心不止一种资源,使得多个Controller所关心的资源彼此会存在交集。如果几个Controller都用自己的Informer来缓存同一个目标资源,显然会导致不小的空间开销,因此K8s引入了SharedInformer来解决这个问题。
SharedInformer拥有为多个Controller提供一个共享cache的能力,从而避免资源缓存的重复、减小空间开销。除此之外,一个SharedInformer对一种资源只建立一个与API Server的Watch监听,且能够将监听得到的事件分发给下游所有感兴趣的Controller,这也显著地减少了API Server的负载压力。实际上,K8s中广泛使用的都是SharedInformer,Informer则出场甚少。
SharedIndexInformer
SharedIndexInformer 扩展了 SharedInformer 接口,提供了构建索引的能力。
SharedIndexInformerFactory
使用sharedInformerFactory可以统一管理控制器中需要的各资源对象的informer实例,避免同一个资源创建多个实例
默认的 Informer 实现
Informer 机制为 K8s 的各种对象提供了默认的 Informer 实现,可以通过以下方式快速创建一个 Informer 对象,并交由 SharedIndexInformerFactory 统一管理。
SharedInformerFactory.Core().V1().Nodes()
.Core().V1().Pods()
.Apps().V1().Deployments()
.Core().V1().Secrets()
.Batch().V1beta1().CronJobs()
List-Watch相关
Reflector
Reflector用来watch特定的k8s API资源。具体的实现是通过ListAndWatch的方法,watch可以是k8s内建的资源或者是自定义的资源。当reflector通过watch API接收到有关新资源实例存在的通知时,它使用相应的列表API获取新创建的对象,并将其放入watchHandler函数内的Delta Fifo队列中。
ListerWatcher
ListerWatcher 是 Informer 机制中的核心对象之一,其功能是通过 List() 方法从 API Server 中获取某一类型的全量数据,再通过 Watch() 方法监听 API Server 中数据的增量更新。
ListerWatcher 继承自 Lister 和 Watcher 接口,从而使其既能获取全量数据,又能监听增量数据更新。
Lister
Lister 接口用于完成全量数据的初始化。
Watcher
Watcher 接口用于监听数据的增量更新。
事件队列相关
Store
Store是一个通用的对象存储接口,其中定义了一系列与对象增删改查相关的方法。Store 要求对象有唯一键,键的计算方式由接口实现类中关联的 KeyFunc 决定的。
Queue
从 Queue 接口的定义可以看出,它继承自 Store 接口,所以其具备基本的数据存取能力。同时,它又具备从队列头部取出数据并调用 PopProcessFunc 处理头部数据并返回处理结果的能力。
DeltaFIFO
DeltaFIFO 是一个生产者-消费者的队列,生产者是 Reflector,消费者是 Pop 函数,从架构图上可以看出 DeltaFIFO 的数据来源为 Reflector,通过 Pop 操作消费数据,消费的数据一方面存储到 Indexer 中,另一方面可以通过 Informer 的 handler 进行处理,Informer 的 handler 处理的数据需要与存储在 Indexer 中的数据匹配。需要注意的是,Pop 的单位是一个 Deltas,而不是 Delta。
Delta
Delta 是 DeltaFIFO 存储的类型,它记录了对象发生了什么变化以及变化后对象的状态。如果变更是删除,它会记录对象删除之前的最终状态。
Deltas
Deltas 保存了对象状态的变更(Add/Delete/Update)信息(如 Pod 的删除添加等),Deltas 缓存了针对相同对象的多个状态变更信息,如 Pod 的 Deltas[0]可能更新了标签,Deltas[1]可能删除了该 Pod。最老的状态变更信息为 Oldest(),最新的状态变更信息为 Newest(),使用中,获取 DeltaFIFO 中对象的 key 以及获取 DeltaFIFO 都以最新状态为准。
最旧的 delta 在索引0位置,最新的 delta 在最后一个索引位置。
DeltaType
const (
Added DeltaType = "Added" // 增加
Updated DeltaType = "Updated" // 更新
Deleted DeltaType = "Deleted" // 删除
Sync DeltaType = "Sync" // 同步
)</pre>
DeltaFIFO 中数据存储形式
事件处理相关
Controller
Processor
[图片上传失败...(image-25a1a1-1637847474305)]
ResourceEventHandler
当经过List & Watch得到事件时,接下来的实际响应工作就交由ResourceEventHandler来进行,这个Interface定义如下:
type ResourceEventHandler interface {
// 添加对象回调函数
OnAdd(obj interface{})
// 更新对象回调函数
OnUpdate(oldObj, newObj interface{})
// 删除对象回调函数
OnDelete(obj interface{})
}
当事件到来时,Informer根据事件的类型(添加/更新/删除资源对象)进行判断,将事件分发给绑定的EventHandler,即分别调用对应的handle方法(OnAdd/OnUpdate/OnDelete),最后EventHandler将事件发送给Workqueue。
缓存相关
Indexer
Indexer在Store基础上扩展了索引能力,就好比给数据库添加的索引,以便查询更快,那么肯定需要有个结构来保存索引。典型的索引用例是基于对象标签创建索引。 Indexer可以根据多个索引函数维护索引。Indexer使用线程安全的数据存储来存储对象及其键。 在Store中定义了一个名为MetaNamespaceKeyFunc 的默认函数,该函数生成对象的键作为该对象的<namespace> / <name>组合。
Reflector 通过 ListAndWatch 把数据传入 DeltaFIFO 后,经过 DeltaFIFO 的 Pop 函数将资源对象存入到了本地的一个存储 Indexer 中,而这个底层真正的存储其实就是上面的 ThreadSafeStore。
ThreadSafeStore
使用示例
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String(, , "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags(, *kubeconfig)
if err != nil {
panic(err)
}
// 初始化 client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Panic(err.Error())
}
stopper := make(chan struct{})
defer close(stopper)
// 初始化 informer
factory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := factory.Core().V1().Nodes()
informer := nodeInformer.Informer()
defer runtime.HandleCrash()
// 启动 informer,list & watch
go factory.Start(stopper)
// 从 apiserver 同步资源,必不可少
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 使用自定义 handler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用
DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
})