Informer
k8s里每一种资源(包括内置资源如pod和自定义资源),都会通过client-go代码生成器informer-gen,为该资源对象实现informer接口。为什么使用informer,就是性能好。有图好说话:
再来个官方controller demo,使用client-go实现控制器,创建informer工厂、定制关心的资源对象、定义事件回调处理函数这些框架都一样,复杂的逻辑还是业务那一块。
informer卓越的性能依赖于三大组件:
- Reflector:与k8s通讯,ListerWatcher就是Reflector的主要功能。根据业务关心的资源对象与apiserver建立链接,进行监控,当资源发生变化时(增删改),将事件添加到DeltaFIFO、并将事件(key)通过我们定义的回调函数发送到业务层。
- DeltaFIFO:先知道是一个队列就可以。
- Indexer:带索引的本地高效缓存。
先一起了解下Reflector的实现
Reflector
Reflector的核心是ListFunc和WatchFunc两个函数,我们下面分析整个流程:
- list、watch函数是和informer一起定义的,看下informer的定义入口(prometheus operator为例):
controller := &Controller{
kubeclientset: clientset,
prometheusClientset: prometheusClientset,
prometheusLister: prometheusInformer.Lister(),// Lister和下一行的Informer都会走到informer定义那块,但是发现已经定义过,就不会重复定义。
prometheusSynced: prometheusInformer.Informer().HasSynced,
......
}
- 跟进来后最终是这个方法创建的informer,我们可以看到Prometheusesde list和watch方法的实现。
// NewFilteredPrometheusInformer constructs a new informer for Prometheus type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPrometheusInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.MonitoringV1().Prometheuses(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.MonitoringV1().Prometheuses(namespace).Watch(options)
},
},
&monitoringv1.Prometheus{},
resyncPeriod,
indexers,
)
}
- ListFunc就是一次携带了namespace、资源对象(这里是prometheus)和资源信息的简单get请求
// List takes label and field selectors, and returns the list of Prometheuses that match those selectors.
func (c *prometheuses) List(opts metav1.ListOptions) (result *v1.PrometheusList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1.PrometheusList{}
err = c.client.Get().
Namespace(c.ns).
Resource("prometheuses").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do().
Into(result)
return
}
- WatchFunc则与apiserver建立一个长连接,每当etcd资源发生变化时apiserver就会通知client-go客户端,依赖http分块传输编码机制。
// Watch returns a watch.Interface that watches the requested prometheuses.
func (c *prometheuses) Watch(opts metav1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("prometheuses").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch()
}
- 通过ListWatch和&monitoringv1.Prometheus{}实例化一个Prometheus informer
// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
return sharedIndexInformer
}
- 在informer工厂方法中运行Prometheus实例,把上面定义的Lister、Watcher方法及资源对象类型复制给informer 的控制器
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
......
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher, // 这里把listerWatcher 和 资源对象赋给Informer 的controller
ObjectType: s.objectType,
......
}
......
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh) // 直接看
}
- controller 中的主要逻辑是Reflector,Reflector会一直监听k8s中资源对象,直到我们的业务程序正常或者异常退出
func (c *controller) Run(stopCh <-chan struct{}) {
......
// 创建reflector,这里有我们一路带过来的ListerWatcher、ObjectType
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
wg.StartWithChannel(stopCh, r.Run) // 主要看这里的Reflector的run方法
wait.Until(c.processLoop, time.Second, stopCh)
}
- reflector 将list和watch逻辑都封装在ListAndWatch方法中。这里如果watch长连接断开了,ListAndWatch会被再次执行,例如,网络异常回复后,这样就会重新执行ListAndWatch。
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
- reflector中resource version(就是我们kubectl get pod -oyaml 中的resource version)是很重要的概念,list和watch都依赖版本号,第一次list时ResourceVersion设置为0,就会list所有版本的资源对象;然后会把当前版本号设置为list结果最新版本号;如果发生ListAndWatch重启,list才会执行第二次,这是会list当前最新版本号之后的资源对象,否则list只会执行一次。最后forLoop 运行Watch, 如果有事件发生,通过watchHandler将时间加到DeltaFIFO中,没有事件就阻塞。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
......
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
// 这里用pager封装了一下list,目的是通过分页的方式list数据,例如1000个对象分10次传输,避免apiserver瞬间压力过大。
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
list, err = pager.List(context.Background(), options)
close(listCh)
}()
......
// 获取版本号
resourceVersion = listMetaIn//terface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
// 将数据存储到DeltaFIFO
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
// 跟新版本号
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
......
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
......
}
// 没有事件发生,则阻塞
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
continue
}
return nil
}
// watch到事件后,通过watchHandler发送到DeltaFIFO
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
}
return nil
}
}
}
- watchHandler根据watch的数据更新DeltaFIFO和资源版本号
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
......
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
......
}
- watch decoder的实现, 没有事件则blocks
// Decode blocks until it can return the next object in the reader. Returns an error
// if the reader is closed or an object can't be decoded.
func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
var got metav1.WatchEvent
res, _, err := d.decoder.Decode(nil, &got)
if err != nil {
return "", nil, err
}
if res != &got {
return "", nil, fmt.Errorf("unable to decode to metav1.Event")
}
switch got.Type {
case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error), string(watch.Bookmark):
default:
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}
obj, err := runtime.Decode(d.embeddedDecoder, got.Object.Raw)
if err != nil {
return "", nil, fmt.Errorf("unable to decode watch event: %v", err)
}
return watch.EventType(got.Type), obj, nil
}
最后
这里把Reflector的机制做了比较详细的介绍,如果大家想看的更新,相信这篇文章也会提供一点思路,看源码时更清晰。
参考
https://github.com/kubernetes/sample-controller/blob/master/main.go
https://weread.qq.com/web/reader/f1e3207071eeeefaf1e138a