1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)
本文将分析
tools/cache
包中的store
. 主要会涉及到store.go
,thread_safe_store.go
和index.go
. 这部分是整个informer
其中的一环, 功能是提供本地缓存.
2. 整体接口与实现类
Store
: 接口定义了基本的方法.
Indexer
: 在Store
的基础上添加了几个关于index
的方法.
ThreadSafeStore
: 定义了一系列方法, 与Indexer
中所有方法(会包括Store
中的方法)的最大区别是它有key
.
threadSafeMap
: 是ThreadSafeStore
的一个实现类.
cache
: 是Indexer
或Store
的一个实现类, 它会根据keyFunc
生成该obj
对应的一个key
, 然后调用ThreadSafeStore
的方法.
2.1 Store
// tools/cache/store.go
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
// 1. 会删除store里面的内容
// 2. 用传进来的list代替以前的内容
Replace([]interface{}, string) error
// 同步
Resync() error
}
可以看到该
Store
接口中有两个方法ListKeys
和GetByKey
方法, 是与key
有关的, 也就是存储一个obj
的时候是根据key
来存储的, 每一个obj
都有一个唯一的key
. 等到回头看该key
的实现类的时候在仔细说一下.
2.2 Indexer
// tools/cache/thread_safe_store.go
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String
// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc
// Indices maps a name to an Index
type Indices map[string]Index
说实话, 这块不太好理解, 等到它的实现类的时候可以看到这个
Indexer
的功能, 并且会用一个例子进行说明.
2.3 ThreadSafeStore
// tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error
}
type threadSafeMap struct {
lock sync.RWMutex
// 存储着key与obj的对应关系
items map[string]interface{}
// indexers maps a name to an IndexFunc
// 存着indexer的名字与它对应的生成index的方法
indexers Indexers
// indices maps a name to an Index
indices Indices
}
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}
这里与
Store
有点区别的是ThreadSafeStore
与index
无关的全部都是针对key
的操作, 而index
方面的操作都是与Indexer
方法意义.
另外
threadSafeMap
是ThreadSafeStore
接口的实现类, 也是真正实现逻辑的核心实体.
2.4 cache
// tools/cache/store.go
type KeyFunc func(obj interface{}) (string, error)
type cache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}
// 一个Store实例 没有index相关方法
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
// 一个带有indexer实例 有index相关方法
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
type ExplicitKey string
// 一般情况下都是<namespace>/<name> unless <namespace> is empty, then it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
cache
是Indexer
接口的实现类, 那么自然也是Store
接口的实现类, 可以看到cacheStorage
是一个ThreadSafeStore
的对象, 而ThreadSafeStore
是一个根据key
来操作的类, 所以cache
中有一个为obj
生成唯一key
的keyFunc
方法(比如MetaNamespaceKeyFunc
), 然后就可以调用ThreadSafeStore
的对应方法.
3. 方法
此部分将会以一个例子来贯穿整个方法的使用, 与上流调用程序打交道的是
Store
或者Indexer
, 真正的核心实体类是threadSafeMap
, 所以接下来会从上流程序的调用的角度来看其如何实现.
3.1 生成一个Indexer
实例
func testUsersIndexFunc(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
usersString := pod.Annotations["users"]
return strings.Split(usersString, ","), nil
}
func TestMultiIndexKeys(t *testing.T) {
index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc})
}
注意
1.keyFunc
是MetaNamespaceKeyFunc
方法.
2. 一个indexer
的名字byUser
, 以及该byUser
生成index
方法.
3.2 Add
上流程序调用
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
index.Add(pod1)
index.Add(pod2)
index.Add(pod3)
接下来看一下它的逻辑, 分析以
pod1
为例.
// tools/cache/store.go
func (c *cache) Add(obj interface{}) error {
// 根据它的keyFunc生成该obj的key
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
// 会调用threadSafeMap的Add方法
c.cacheStorage.Add(key, obj)
return nil
}
1. 根据
MetaNamespaceKeyFunc
生成key
. (pod1
生成的key
是one
).
2. 调用threadSafeMap
的Add
方法. (c.cacheStorage.Add("one", pod1)
)
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
1. 因为以前该
key
可能存在, 取出oldObject
, 不存在则为nil
. (oldObject=nil
)
2. 将对应的key
和obj
存储到一个map
结构(item
)中.(c.item["one"] = pod1
)
2. 调用updateIndices
方法.
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// if we got an old object, we need to remove it before we add it again
// 如果以前存在 则删除
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 遍历所有的indexers
for name, indexFunc := range c.indexers {
// 根据indexFunc生成该对象newObj的键
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 取出当前indexer的结构 是一个map对象
index := c.indices[name]
// 如果不存在 则创建一个新的
if index == nil {
index = Index{}
c.indices[name] = index
}
// 遍历刚刚生成的键
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
}
不多说, 直接用
pod1
来说明吧.
c.indexers = {"byUser": testUsersIndexFunc}
// 所以只有一次循环
name = "byUser"
indexFunc = testUsersIndexFunc
===> indexValues = ["ernie", "bert"]
===> indices["byUser"] = {}
======> indices["byUser"]["ernie"] = [{"one": Empty}]
======> indices["byUser"]["bert"] = [{"one": Empty}]
最终加入
pod1
,pod2
和pod3
的结果如下:
3.3 查询方法
理解了
Add
方法, 接下来看一下几个查询方法, 有了上面的基础, 查询的话基本上对照着图看就差不多可以得到答案了.
3.3.1 ByIndex
// 上流程序调用
index.ByIndex("byUser", "ernie")
// tools/cache/store.go
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
return c.cacheStorage.ByIndex(indexName, indexKey)
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
index := c.indices[indexName]
set := index[indexKey]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
可以看到其实就是取
indices["byUser"]["ernie"]
, 所以返回值就是["one", "tre"]
ListIndexFuncValues
// 上流程序调用
index.ListIndexFuncValues("byUser")
// tools/cache/store.go
func (c *cache) ListIndexFuncValues(indexName string) []string {
return c.cacheStorage.ListIndexFuncValues(indexName)
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
c.lock.RLock()
defer c.lock.RUnlock()
index := c.indices[indexName]
names := make([]string, 0, len(index))
for key := range index {
names = append(names, key)
}
return names
}
返回某个
indexName
生成的所有键. 相当于indices["byUser"].keySet()
, 所以返回值将会是["ernie", "bert", "elmo", "oscar"]
.
List() 和 Get(obj interface{})
// 上流程序调用
index.List()
index.Get("pod1")
// tools/cache/store.go
func (c *cache) List() []interface{} {
return c.cacheStorage.List()
}
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := c.keyFunc(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return c.GetByKey(key)
}
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
item, exists = c.cacheStorage.Get(key)
return item, exists, nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) List() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
}
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[key]
return item, exists
}
很明显
List()
和Get
方法是对items
的操作.
所以List()
返回[pod1, pod2, pod3]
,Get
方法返回pod1
.
Delete
有了上面的基础, 这些操作无非都是在维护
indices
和items
这两个数据结构, 所以可想而知, 删除操作就是从这两个数据结构中删除某个obj
带来的数据.
// 上流程序调用
index.Delete(pod3)
// tools/cache/store.go
func (c *cache) Delete(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Delete(key)
return nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.deleteFromIndices(obj, key)
delete(c.items, key)
}
}
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(obj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
if index == nil {
continue
}
for _, indexValue := range indexValues {
set := index[indexValue]
if set != nil {
set.Delete(key)
}
}
}
}
其实也没有什么, 就是怎么增加的就怎么删除就行了.
删除完的结果如下:
update
// tools/cache/store.go
func (c *cache) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Update(key, obj)
return nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Update(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
可以看到
update
和Add
方法是一模一样的, 因为Add
方法是先删除旧的, 然后再添加新的.
resync
// tools/cache/store.go
func (c *cache) Resync() error {
return c.cacheStorage.Resync()
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Resync() error {
// Nothing to do
return nil
}
该方法在这里没有任何实现, 在一些子类中会有具体的实现. 比如
FIFO
,DeltaFIFO
等等.
Replace
// tools/cache/store.go
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
items := make(map[string]interface{}, len(list))
for _, item := range list {
key, err := c.keyFunc(item)
if err != nil {
return KeyError{item, err}
}
items[key] = item
}
c.cacheStorage.Replace(items, resourceVersion)
return nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
// 更新items
c.items = items
// rebuild any index
// 重新构建indices
c.indices = Indices{}
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
}
简单一点理解就是把之前
items
,indices
存的数据全部删除, 然后将list
里面的内容一个个添加进去.
informer整体
整个
informer
体系在k8s
代码中占有重要一环, 理解informer
可以更好理解k8s
的工作机制.
1. [k8s源码分析][client-go] informer之store和index
2. [k8s源码分析][client-go] informer之delta_fifo
3. [k8s源码分析][client-go] informer之reflector
4. [k8s源码分析][client-go] informer之controller和shared_informer(1)
5. [k8s源码分析][client-go] informer之controller和shared_informer(2)
6. [k8s源码分析][client-go] informer之SharedInformerFactory