[k8s源码分析][client-go] workqueue

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/util/workqueue
分支: tming-v13.0 (基于v13.0版本)

本文将分析util包中的workqueue. 在各类controller中经常会使用该workqueue中的一些类.

architecture.png

2. queue

2.1 Interface接口

type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShuttingDown() bool
}

这里有几点需要注意一下:
1. 增加了Done方法, 告诉queue对这个item的处理已经结束了.
2. 以往的queue在pop之后就对这个item的状态不管了, 但是在该iterface中明显需要进行管理, 因为需要实现Done方法, 所以需要知道哪些item正在处理.

2.2 实现类Type

type Type struct {
    // queue定义了需要处理的item的顺序
    // 这些element应该出现在dirty中 而不会在processing中.
    queue []t
    // dirty保存着那些需要被处理的item
    dirty set
    // 代表那些正在被处理的元素,
    processing set
    cond *sync.Cond
    shuttingDown bool
    metrics queueMetrics
    unfinishedWorkUpdatePeriod time.Duration
    clock                      clock.Clock
}
type empty struct{}
type t interface{}
type set map[t]empty
func (s set) has(item t) bool {
    _, exists := s[item]
    return exists
}
func (s set) insert(item t) {
    s[item] = empty{}
}
func (s set) delete(item t) {
    delete(s, item)
}

2.3 方法

Get
// 返回头部item 和 queue是否关闭
func (q *Type) Get() (item interface{}, shutdown bool) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait()
    }
    if len(q.queue) == 0 {
        // We must be shutting down.
        return nil, true
    }

    // 获得队列的头 并且更新queue数组
    item, q.queue = q.queue[0], q.queue[1:]

    q.metrics.get(item)

    // 1. 将该item加入到processing中, 表明该item正在被处理
    // 2. 将该item从dirty中删除
    q.processing.insert(item)
    q.dirty.delete(item)

    return item, false
}
  1. 如果queue没有关闭 但是目前没有元素 一直waiting
  2. 如果queue已经关闭 则返回nil, true
  3. 获得队列的头item并且更新queue数组
  4. 将该item加入到processing中 表明该item正在被处理
  5. dirtyqueue中删除该item
  6. 返回该item, false
Add 和 Done
func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    // 如果queue已经关闭了
    if q.shuttingDown {
        return
    }
    // 如果已经在queue中了 就直接返回了
    if q.dirty.has(item) {
        return
    }

    q.metrics.add(item)
    // 加入到dirty中
    q.dirty.insert(item)
    // 如果该元素正在被处理, 那直接返回了 不会加入到queue中
    if q.processing.has(item) {
        return
    }

    q.queue = append(q.queue, item)
    q.cond.Signal()
}

// 表明该item已经结束了
func (q *Type) Done(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    q.metrics.done(item)

    // 如果结束了 就从processing中删除
    q.processing.delete(item)
    // 如果dirty中有 在把item加回去
    // 这里主要是因为在处理item的过程中, 上流程序又调用了Add方法
    // 因为该item正在被处理, 此时如果加入到queue中, 然后又Get拿到该item
    // 此时就会有两个同样的item正在被处理

    // 所以最终设计的目的是为了保证正在被处理的过程中保证每个都不一样 不会出现有两个相同的item正在被处理
    if q.dirty.has(item) {
        q.queue = append(q.queue, item)
        q.cond.Signal()
    }
}

该方法的实现是为了保证不能同时有两个相同的item正在被处理

1. 如果一个item正在被处理, 此时又add了一个相同的item, 那此时不能加入到queue中(把该item暂时加到了dirty), 此时如果加入到queue中, 然后又Get拿到该item, 那么同一时间就会有两个相同的item在被处理
2. 所以在该item处理结束(Done方法)的时候会检查该item是否有在被处理的过程中再次加入queue, 也就是判断dirty中是否含有该item, 如果有, 则加入到queue

3. delaying_queue

3.1 DelayingInterface接口

type DelayingInterface interface {
    Interface
    // 在此刻过duration时间后再加入到queue中
    AddAfter(item interface{}, duration time.Duration)
}

DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.

该接口涉及的目的就是可以避免某些失败的items陷入热循环. 因为某些item在出队列进行处理有可能失败, 失败了用户就有可能将该失败的item重新进队列, 如果短时间内又出队列有可能还是会失败(因为短时间内整个环境可能没有改变等等), 所以可能又重新进队列等等, 因此陷入了一种hot-loop.

所以就为用户提供了AddAfter方法, 可以允许用户告诉DelayingInterface过多长时间把该item加入队列中.

3.2 实现类delayingType

因为有延迟的时间, 所以那些item到时间了可以进入队列了, 那些没到时间还不能进入队列, 所以waitForwaitForPriorityQueue应运而生.

waitFor: 保存了包括数据data和该item什么时间起(readyAt)就可以进入队列了.
waitForPriorityQueue: 是用于保存waitFor的优先队列, 按readyAt时间从早到晚排序. 先readyitem先出队列.

// delayingType wraps an Interface and provi`des delayed re-enquing
// 提供有延迟的重进队列
type delayingType struct {
    Interface
    // clock tracks time for delayed firing
    clock clock.Clock
    // stopCh lets us signal a shutdown to the waiting loop
    stopCh chan struct{}
    // stopOnce guarantees we only signal shutdown a single time
    stopOnce sync.Once
    // heartbeat ensures we wait no more than maxWait before firing
    heartbeat clock.Ticker
    // waitingForAddCh is a buffered channel that feeds waitingForAdd
    waitingForAddCh chan *waitFor
    metrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
type waitFor struct {
    data    t
    // 该data可以加入queue的时间
    readyAt time.Time
    // index in the priority queue (heap)
    index int
}
type waitForPriorityQueue []*waitFor

3.3 方法

newDelayingQueue
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
    ret := &delayingType{
        Interface:       NewNamed(name),
        clock:           clock,
        heartbeat:       clock.NewTicker(maxWait),
        stopCh:          make(chan struct{}),
        waitingForAddCh: make(chan *waitFor, 1000),
        metrics:         newRetryMetrics(name),
    }

    go ret.waitingLoop()

    return ret
}

这里需要注意的是
1. 启动了goroutine执行waitingLoop方法, 关于waitingLoop在后面介绍.
2. NewNamed(name)是一个Type对象实例(2.2中的Type)

AddAfter
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    // don't add if we're already shutting down
    // 如果队列已经关闭 直接返回
    if q.ShuttingDown() {
        return
    }

    q.metrics.retry()

    // immediately add things with no delay
    // 如果不需要延迟 直接调用iterface(Type)中的Add方法
    if duration <= 0 {
        q.Add(item)
        return
    }

    select {
    case <-q.stopCh:
        // unblock if ShutDown() is called
        // 队列关闭的时候才会进入这里
    case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
        // 构造一个waitFor放到waitingForAddCh(一个channel, 缓冲1000)
    }
}

1. 构造一个waitFor放到channel(waitingForAddCh)中, waitingLoop会在另外一端接收.

waitingLoop
func (q *delayingType) waitingLoop() {
    defer utilruntime.HandleCrash()
    // Make a placeholder channel to use when there are no items in our list
    never := make(<-chan time.Time)
    waitingForQueue := &waitForPriorityQueue{}
    heap.Init(waitingForQueue)
    waitingEntryByData := map[t]*waitFor{}
    for {
        // queue关闭返回
        if q.Interface.ShuttingDown() {
            return
        }
        now := q.clock.Now()
        // 将那些已经ready好了的item可以加入到queue中
        for waitingForQueue.Len() > 0 {
            entry := waitingForQueue.Peek().(*waitFor)
            if entry.readyAt.After(now) {
                break
            }
            entry = heap.Pop(waitingForQueue).(*waitFor)
            q.Add(entry.data)
            delete(waitingEntryByData, entry.data)
        }
        // nextReadyAt启一个channel用于存着下次可以加入queue的时间
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            // 优先队列中的第一个肯定是最早ready的
            entry := waitingForQueue.Peek().(*waitFor)
            nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
        }
        select {
        case <-q.stopCh:
            return
        // ticker的操作 每隔10s会调用
        // 有可能去增加已经ready items
        case <-q.heartbeat.C():
            // continue the loop, which will add ready items
        // 下次ready的时间到了
        case <-nextReadyAt:
            // continue the loop, which will add ready items
        case waitEntry := <-q.waitingForAddCh:
            // 从AddAfter过来的数据
            if waitEntry.readyAt.After(q.clock.Now()) {
                // 如果时间没到 就加入waitingForQueue中
                // waitingEntryByData用于保存waitEntry.data与waitEntry的关系
                insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                // 直接加入到queue中
                q.Add(waitEntry.data)
            }
            drained := false
            for !drained {
                select {
                case waitEntry := <-q.waitingForAddCh:
                    if waitEntry.readyAt.After(q.clock.Now()) {
                        insert(waitingForQueue, waitingEntryByData, waitEntry)
                    } else {
                        q.Add(waitEntry.data)
                    }
                default:
                    drained = true
                }
            }
        }
    }
}
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
    // if the entry already exists, update the time only if it would cause the item to be queued sooner
    // 如果已经在waitingQueue中了 取readyAt最小的那个 为了可以让它早点出queue
    // 就是说增加的item重复了 ready的时间取最早的一个
    existing, exists := knownEntries[entry.data]
    if exists {
        if existing.readyAt.After(entry.readyAt) {
            existing.readyAt = entry.readyAt
            heap.Fix(q, existing.index)
        }

        return
    }

    heap.Push(q, entry)
    knownEntries[entry.data] = entry
}

这里启动一个for循环:
1. 只有队列关闭的时候才会退出.
2. 利用channel(q.waitingForAddCh)一直接受从AddAfter过来的waitFor.
3. 一直将已经readyitem加入到队列中.

注意: 如果有两个相同的data加入时, waitFor中取ready时间最早的那一个.

4. default_rate_limit

4.1 RateLimiter接口

type RateLimiter interface {
    // 返回该item应该要等待多长时间
    When(item interface{}) time.Duration
    // 停止跟踪该item
    Forget(item interface{})
    // 返回该item失败的次数
    NumRequeues(item interface{}) int
}

一个控制速率的接口. 在3.1 DelayingInterface接口中提到hot-loop, 那关于等待多长时间可以根据失败的次数有关, 比如最简单的失败一次等待时间就增加一倍, 第一次失败等待1s, 第二次失败等待2s, 第三次失败等待4s, 以此类推. 那RateLimiter接口就是这样一个控制速率的抽象定义, 接下来看一个它的实现类.

4.2 ItemExponentialFailureRateLimiter

type ItemExponentialFailureRateLimiter struct {
    failuresLock sync.Mutex
        // 存的失败的item
    failures     map[interface{}]int

    baseDelay time.Duration
    maxDelay  time.Duration
}
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
    return &ItemExponentialFailureRateLimiter{
        failures:  map[interface{}]int{},
        baseDelay: baseDelay,
        maxDelay:  maxDelay,
    }
}
func DefaultItemBasedRateLimiter() RateLimiter {
    return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
}

4.3 方法

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
        // 该item失败的次数
    exp := r.failures[item]
    r.failures[item] = r.failures[item] + 1
       
    // The backoff is capped such that 'calculated' value never overflows.
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    if backoff > math.MaxInt64 {
        return r.maxDelay
    }
    calculated := time.Duration(backoff)
    if calculated > r.maxDelay {
        return r.maxDelay
    }
    return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    delete(r.failures, item)
}

直接通过一个例子更好说明:

example.png

5. rate_limiting_queue

5.1 RateLimitingInterface接口

type RateLimitingInterface interface {
    DelayingInterface
    AddRateLimited(item interface{})
    Forget(item interface{})
    NumRequeues(item interface{}) int
}

因为DelayingInterface已经有了AddAfter方法, 但是到底是after多长时间并没有进行控制, 最简单的就是一个常量时间, 对所有的item都一样. 但是这样的话也有可能会造成一些问题, 有些item因为失败来回来回进入队列十几次, 而有些item才失败一次, 就有可能排在这个失败十几次的item后面, 如果还是不成功, 这样就会造成资源的浪费.

所以RateLimitingInterface就在DelayingInterface加入对item的速率进行控制, 并且与失败的次数进行相关.

5.2 实现类以及方法

很常规.

type rateLimitingType struct {
    DelayingInterface
    rateLimiter RateLimiter
}
func (q *rateLimitingType) AddRateLimited(item interface{}) {
    q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

func (q *rateLimitingType) NumRequeues(item interface{}) int {
    return q.rateLimiter.NumRequeues(item)
}

// 删除该item
func (q *rateLimitingType) Forget(item interface{}) {
    q.rateLimiter.Forget(item)
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,082评论 1 32
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,860评论 2 11
  • 一:base.h 二:block.h 1. dispatch_block_flags:DISPATCH_BLOCK...
    小暖风阅读 2,406评论 0 0
  • 下午闲无聊事约上好友去图书馆看书,本想看上一本书回来方便写书评。书看到一半我们就开始聊起来,起初是宣泄悲伤学,后来...
    钟无迭笙阅读 130评论 0 0
  • 身体调养之辐射 辐射,在科学上,分为电离辐射和非电离辐射,医院的照片、CT等,都是基于能使分子发生电离效应的电离辐...
    蓬头小龙虾阅读 371评论 0 0