1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)
在 [k8s源码分析][client-go] informer之store和index 和 [k8s源码分析][client-go] informer之store和index 的基础上进行分析, 因为在
informer
体系中reflector
属于一个反射器, 上面对接从k8s api
获得信息的ListWatcher
, 下面对接DeltaFIFO
, 也就是把k8s api
获得的信息通过reflector
存储到DeltaFIFO
中.
2. 类
type Reflector struct {
// 名字
name string
metrics *reflectorMetrics
// 该reflector接收的类型
expectedType reflect.Type
// 要存的地方 会是DeltaFIFO
store Store
// 与api-server打交道的listh和watcher
listerWatcher ListerWatcher
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
clock clock.Clock
// 最后一次sync的resourceversion
lastSyncResourceVersion string
// 用于resourceversion的锁
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
WatchListPageSize int64
}
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
period: time.Second,
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
return r
}
var internalPackages = []string{"client-go/tools/cache/"}
可以看到
Reflector
中有一个listerWatcher ListerWatcher
, 该对象是从api-server
中获得元素和监控. 也有一个store Store
对象, 这个在informers
体系中是DeltaFIFO
的一个对象.
3. 方法
直接从
Run
方法, 从Reflector
的功能看, 它也得是个循环操作, 需要一直从api-server
中的数据接到DeltaFIFO
中.
Run
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
wait.Until
方法就是每隔r.period
时间去执行一下Until
里面的方法, 当然下一次的执行要等到上一次执行完才会开始, 当stopCh
中有元素进入或关闭时整个wait.Until
才会退出.
所以可以简单理解为一直在执行
ListAndWatch
方法, 除非发信息给stopCh
通知关闭. 整个Run
才会结束.
ListAndWatch
在这里需要说明
k8s
的并发操作是通过ResourceVersion
来实现的, 在api-server
该对象有一次改动,ResourceVersion
就会加1
.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
var resourceVersion string
// ResourceVersion从0开始 可以获得该对象在api-server全部操作的情况
options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
list, err = pager.List(context.Background(), options)
close(listCh)
}()
// 等待获得上面的list
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
// 调用replace函数替换deltaFIFO中的元素
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
// 这里主要是启动一个异步goroutine
// 每隔r.resyncPeriod时间调用DeltaFIFO的Resync
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// 根据当前的ResourceVersion生成一个watch
// 监控该ResourceVersion后面的一系列变化 然后对应加入到DeltaFIFO中
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
...
}
// 从当前resourceVersion后面开始监控
w, err := r.listerWatcher.Watch(options)
...
// 调用watchHandler
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
}
return nil
}
}
}
主要分三步:
1. 获得所有对象list
并调用DeltaFIFO.Replace(syncWith)
用list
替代之前的元素. 关于Replace
方法在 [k8s源码分析][client-go] informer之delta_fifo 已经有详细分析. 并获得了最新的ResourceVersion
.
2. 启动一个异步goroutine
每隔r.resyncPeriod
时间调用DeltaFIFO
的Resync
. 在 [k8s源码分析][client-go] informer之delta_fifo 已经有分析.
3. 根据当前最新的ResourceVersion
生成一个watch
, 开始一直监控后面的变化.
watchHandler
func (r *Reflector) LastSyncResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
return r.lastSyncResourceVersion
}
func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
}
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now()
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
// stopCh被close了 退出watchHandler方法
return errorStopRequested
case err := <-errc:
// DeltaFIFO的Resync出现错误,退出watchHandler方法
return err
case event, ok := <-w.ResultChan():
if !ok {
// watch这个channel已经被关闭 跳出loop
break loop
}
if event.Type == watch.Error {
// 退出watchHandler方法
return apierrs.FromObject(event.Object)
}
// 得到的对象类型与该reflector监控的类型不一致
// 比如该reflector负责的是pod对象 来了一个Service对象
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
// 获得新的ResourceVersion
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
// 往Delta添加一个对象
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
// 往Delta更新一个对象
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
// 往Delta删除一个对象
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
// 更新ResourceVersion
// 处理的event总数加1
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
// 当前的watch channel被关闭了
watchDuration := r.clock.Since(start)
// 如果该watch一个event都没有处理 并且1秒钟都不到
// 那有可能问题 所以会返回一个错误 此时退出watchHandler后, 会整个退出ListAndWatch, Run中的util会再次调用ListAndWatch方法.
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
// 退出watchHandler后 不会整个退出ListAndWatch 在for循环里面再生成一个watch再次调用watchHandler
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
return nil
}
1. 这里可以看到
watchHandler
会把对应的状态加入到DeltaFIFO
中.
2.watchHandler
返回nil
不会导致ListAndWatch
返回. 如果watchHandler
返回错误会导致ListAndWatch
返回, 进而回到Run
中通过wait.Until
再次调用ListAndWatch
.
4. 总结
分析完了整个方法后可以知道. 整个
reflector
所做的工作就是从list
中获得所有对象, 然后根据当时拿到的resouceVersion
开始进行监控后面的一系列操作, 然后加入到deltaFIFO
中.
然后负责工作的是
WatchHandler
, 当该方法中出现不是nil
的错误时, 会重新调用WatchList
方法重新获得list
并replace
对接的deltaFIFO
. 如果出现的错误是nil
, 这种情况是因为watch
被关闭了, 这个时候watchHandler
会返回到WatchList
重新再次生成一个watch
对象重新调用watchHandler
进行监控.
informer整体
整个
informer
体系在k8s
代码中占有重要一环, 理解informer
可以更好理解k8s
的工作机制.
1. [k8s源码分析][client-go] informer之store和index
2. [k8s源码分析][client-go] informer之delta_fifo
3. [k8s源码分析][client-go] informer之reflector
4. [k8s源码分析][client-go] informer之controller和shared_informer(1)
5. [k8s源码分析][client-go] informer之controller和shared_informer(2)
6. [k8s源码分析][client-go] informer之SharedInformerFactory