Client-go客户端源码解析--WorkQueue

在我们上面提供的示例代码中(我们可以称它为一个比较简陋的自定义控制器),我们将接受到的事件(资源对像)直接打印出来了,并没有经过任何处理。但是在正常的业务需求中,我们需要根据接收到的事件类型,最资源对象做各种各样的负责的计算和处理动作。所以在生产用的自定义控制器中,我们的EventHandler方法中接收到事件之后往往不会马上处理(或者只是简单的处理下数据),而是将事件资源对象的Key先放保存至一个队列,然后由自定义控制器提前启动好的多个gorouine并发的消费这个队列的数据,这样以来不仅可以提高自定义控制器的吞吐量,还可以利用队列的特性来达到限速事件消费的目的,最终使得自定义控制既稳定又高性能。

这个队列,我们一般会使用Kubernetes官方提高的WorkQueue,如官方github的sample-controller里面就使用了WorkQueue(https://github.com/kubernetes/sample-controller/blob/master/controller.go)。我们接下来对WorkQueue的核心代码做个分析。

WorkQueue支持3种队列,并提供了3种接口。

最基本的FIFO队列,支持提供了队列的基本操作方法,如:

//基本的FIFO队列接口定义
type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool) //获取队列头部的元素
    Done(item interface{}) //标记该元素已被处理
    ShutDown() //关闭队列
    ShuttingDown() bool //队列是否正在关闭
} 

//数据结构定义,实现了Interface的方法
type Type struct {
    // queue defines the order in which we will work on items. Every
    // element of queue should be in the dirty set and not in the
    // processing set.
    queue []t

    // dirty defines all of the items that need to be processed.
    dirty set

    // Things that are currently being processed are in the processing set.
    // These things may be simultaneously in the dirty set. When we finish
    // processing something and remove it from this set, we'll check if
    // it's in the dirty set, and if so, add it to the queue.
    processing set

    cond *sync.Cond

    shuttingDown bool

    metrics queueMetrics //监控指标相关的,用于Prometheus监控

    unfinishedWorkUpdatePeriod time.Duration
    clock                      clock.Clock
}
  1. queue 实际存储元素的地方
  2. dirty 类型是set(使用Map的key来实现的,确保唯一),能保证去重;同时也保证了再并发情况下,一个元素在被处理之前买,哪怕被添加了多次,也只会被处理一次
  3. processing 用于标记一个元素正在被处理
FIFO

基础FIFO队列核心源码

// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    if q.shuttingDown {
        return
    }
    if q.dirty.has(item) { //在dirty中如果存在该元素,就直接返回;确保了元素在队列中的唯一性
        return
    }

    q.metrics.add(item)

    q.dirty.insert(item) //先将元素插入dirty中
    if q.processing.has(item) { //如果该元素正在被处理,则返回
        return
    }

    q.queue = append(q.queue, item) //将元素加入队列尾部,等待消费
    q.cond.Signal() //唤醒一个消费者goroutine去队列中消费元素(调用Get方法)
}
......
func (q *Type) Get() (item interface{}, shutdown bool) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait() //如果队列为空并且没有处于关闭中,则阻塞,等待被唤醒(调用了Add方法和ShutDown方法Done方法都会唤醒该阻塞)
    }
    if len(q.queue) == 0 {//如果是上面的阻塞被唤醒了,但是队列长度还是0,则表示该队列被关闭
        // We must be shutting down.
        return nil, true 
    }

    item, q.queue = q.queue[0], q.queue[1:] //从队列头部取一个元素

    q.metrics.get(item)

    q.processing.insert(item)//标记该元素正在被处理
    q.dirty.delete(item) //正在处理的元素从dirty中删除

    return item, false
}

// 表示某个元素被处理完成
func (q *Type) Done(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    q.metrics.done(item)

    q.processing.delete(item) //从processing中移除该元素
    if q.dirty.has(item) { //如果dirty中还有一个相同的元素存在,说明在该元素被处理的时候,又加入了相同的元素进来
        q.queue = append(q.queue, item)//此时需要将元素添加至Queue中,等待被消费
    q.cond.Signal() //唤醒消费goroutine(那些调用了Get方法而阻塞的goroutine,只会被唤醒一个)
    }
}
//关闭WorkQueue
func (q *Type) ShutDown() {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.shuttingDown = true
  q.cond.Broadcast() //唤醒所有的消费者goroutine,让它们安全退出(哪些调用了Get方法而阻塞的goroutine)
}

延迟队列

延迟队列,基于FIFO队列接口封装,延迟一段时间后再将元素插入FIFO队列,主要在原有的功能上增加了AddAfter方法。

type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}

type delayingType struct {
    Interface

    // clock tracks time for delayed firing
    clock clock.Clock

    // stopCh lets us signal a shutdown to the waiting loop
    stopCh chan struct{}

    // heartbeat ensures we wait no more than maxWait before firing
    heartbeat clock.Ticker

    // waitingForAddCh is a buffered channel that feeds waitingForAdd
    waitingForAddCh chan *waitFor //初始化延迟队列的时候,channel长度为1000

    // metrics counts the number of retries
    metrics           retryMetrics
    deprecatedMetrics retryMetrics
}

延迟队列运行原理

延迟队列核心原理

核心代码

// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    // don't add if we're already shutting down
    if q.ShuttingDown() {
        return
    }

    q.metrics.retry()
    q.deprecatedMetrics.retry()

    // immediately add things with no delay
    if duration <= 0 {
        q.Add(item)
        return
    }

    select {
    case <-q.stopCh:
        // unblock if ShutDown() is called
    case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
    }
}
...
/ waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
    defer utilruntime.HandleCrash()

    // Make a placeholder channel to use when there are no items in our list
    never := make(<-chan time.Time)

    waitingForQueue := &waitForPriorityQueue{}
    heap.Init(waitingForQueue)

    waitingEntryByData := map[t]*waitFor{}

    for {
        if q.Interface.ShuttingDown() {
            return
        }

        now := q.clock.Now()

        // Add ready entries
        for waitingForQueue.Len() > 0 {
            entry := waitingForQueue.Peek().(*waitFor)
            if entry.readyAt.After(now) {
                break
            }

            entry = heap.Pop(waitingForQueue).(*waitFor)
            q.Add(entry.data)
            delete(waitingEntryByData, entry.data)
        }

        // Set up a wait for the first item's readyAt (if one exists)
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            entry := waitingForQueue.Peek().(*waitFor)
            nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
        }

        select {
        case <-q.stopCh:
            return

        case <-q.heartbeat.C():
            // continue the loop, which will add ready items

        case <-nextReadyAt:
            // continue the loop, which will add ready items

        case waitEntry := <-q.waitingForAddCh:
            if waitEntry.readyAt.After(q.clock.Now()) {
                insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                q.Add(waitEntry.data)
            }

            drained := false
            for !drained {
                select {
                case waitEntry := <-q.waitingForAddCh:
                    if waitEntry.readyAt.After(q.clock.Now()) {
                        insert(waitingForQueue, waitingEntryByData, waitEntry)
                    } else {
                        q.Add(waitEntry.data)
                    }
                default:
                    drained = true
                }
            }
        }
    }
}

限速队列

限速队列是基于延迟队列和FIFO队列接口封装的,限速队列的接口:

type RateLimitingInterface interface {
    DelayingInterface //延迟队列

    // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
    AddRateLimited(item interface{}) //想队列对添加元素

    // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
    // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})//当某个元素处理完成之后,调用Forget,清空元素的排队数,调用具体限速算法的同名方法

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int //获取指定元素的排队数,调用具体限速算法的同名方法
}

限速队列的数据结构:

// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
   DelayingInterface

   rateLimiter RateLimiter //限速队列的接口定义是比较简单的,限速队列的重点就在于其提供的几种不同限速算法
}

限速队列的原理,就是利用了延迟队列的特性,延迟某个元素的插入时间,达到限速的目的。RateLimiter接口定义如下:

type RateLimiter interface {
   // When gets an item and gets to decide how long that item should wait
   When(item interface{}) time.Duration //获取指定元素插入队列前应该等待的时间
   // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
   // or for success, we'll stop tracking it
   Forget(item interface{})//释放指定元素,清空该元素的排队数
   // NumRequeues returns back how many failures the item has had
   NumRequeues(item interface{}) int//返回指定元素的排队数
}

Workqueue提供了4种限速算法:

  1. 令牌桶算法(BucketRateLimiter): 以固定的速率往桶里填充Token,直到填满为止(多余的会被丢弃);每个元素都会从桶里获取一个Token,只有拿到Token的才允许通过,否则该元素处理等待Token的状态;以此达到限速的目的。
  2. 排队指数算法(ItemExponentialFailureRateLimiter): 将相同元素排队数作为指数,排队数增大,速率限制呈指数增长,但最大不回超过maxDelay。
  3. 计数器算法(ItemFastFlowRateLimiter):限制一段时间内允许通过的元素数量。
  4. 混合模式: 将多种限速算法混合使用。

我们主要看下排队指数算法下,如何添加元素到队列中。

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
//添加元素到队列中
//首先要先获取某个元素需要等待的时间,q.rateLimiter.When(item) 
//然后调用延迟队列的Addfter添加元素到队列
func (q *rateLimitingType) AddRateLimited(item interface{}) {
   q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
//排队指数算法的 When方法
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    exp := r.failures[item] //获取指定元素的排队数
    r.failures[item] = r.failures[item] + 1

    // The backoff is capped such that 'calculated' value never overflows.
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) //指数计算需要等待的时间
    if backoff > math.MaxInt64 {
        return r.maxDelay
    }

    calculated := time.Duration(backoff)
    if calculated > r.maxDelay {
        return r.maxDelay //确保等待时间不会大于maxDelay,maxDelay为1000s
    }

    return calculated
}

调用q.rateLimiter.When(item)拿到所需等待的时间后,就会执行延迟队列的AddAfter方法将元素添加进队列,后续具体的添加动作就是延迟队列的相关操作。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,165评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,503评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,295评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,589评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,439评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,342评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,749评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,397评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,700评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,740评论 2 313
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,523评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,364评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,755评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,024评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,297评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,721评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,918评论 2 336

推荐阅读更多精彩内容