1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/util/workqueue
分支: tming-v13.0 (基于v13.0版本)
本文将分析
util
包中的workqueue
. 在各类controller
中经常会使用该workqueue
中的一些类.
2. queue
2.1 Interface接口
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
这里有几点需要注意一下:
1. 增加了Done
方法, 告诉queue
对这个item
的处理已经结束了.
2. 以往的queue在pop之后就对这个item的状态不管了, 但是在该iterface
中明显需要进行管理, 因为需要实现Done
方法, 所以需要知道哪些item
正在处理.
2.2 实现类Type
type Type struct {
// queue定义了需要处理的item的顺序
// 这些element应该出现在dirty中 而不会在processing中.
queue []t
// dirty保存着那些需要被处理的item
dirty set
// 代表那些正在被处理的元素,
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
type empty struct{}
type t interface{}
type set map[t]empty
func (s set) has(item t) bool {
_, exists := s[item]
return exists
}
func (s set) insert(item t) {
s[item] = empty{}
}
func (s set) delete(item t) {
delete(s, item)
}
2.3 方法
Get
// 返回头部item 和 queue是否关闭
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
// 获得队列的头 并且更新queue数组
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
// 1. 将该item加入到processing中, 表明该item正在被处理
// 2. 将该item从dirty中删除
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
- 如果
queue
没有关闭 但是目前没有元素 一直waiting
- 如果
queue
已经关闭 则返回nil
,true
- 获得队列的头
item
并且更新queue
数组- 将该
item
加入到processing
中 表明该item
正在被处理- 从
dirty
和queue
中删除该item
- 返回该
item
,false
Add 和 Done
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果queue已经关闭了
if q.shuttingDown {
return
}
// 如果已经在queue中了 就直接返回了
if q.dirty.has(item) {
return
}
q.metrics.add(item)
// 加入到dirty中
q.dirty.insert(item)
// 如果该元素正在被处理, 那直接返回了 不会加入到queue中
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
// 表明该item已经结束了
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
// 如果结束了 就从processing中删除
q.processing.delete(item)
// 如果dirty中有 在把item加回去
// 这里主要是因为在处理item的过程中, 上流程序又调用了Add方法
// 因为该item正在被处理, 此时如果加入到queue中, 然后又Get拿到该item
// 此时就会有两个同样的item正在被处理
// 所以最终设计的目的是为了保证正在被处理的过程中保证每个都不一样 不会出现有两个相同的item正在被处理
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
该方法的实现是为了保证不能同时有两个相同的
item
正在被处理
1. 如果一个
item
正在被处理, 此时又add
了一个相同的item
, 那此时不能加入到queue
中(把该item暂时加到了dirty
), 此时如果加入到queue
中, 然后又Get
拿到该item
, 那么同一时间就会有两个相同的item
在被处理
2. 所以在该item
处理结束(Done
方法)的时候会检查该item
是否有在被处理的过程中再次加入queue
, 也就是判断dirty
中是否含有该item
, 如果有, 则加入到queue
中
3. delaying_queue
3.1 DelayingInterface接口
type DelayingInterface interface {
Interface
// 在此刻过duration时间后再加入到queue中
AddAfter(item interface{}, duration time.Duration)
}
DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.
该接口涉及的目的就是可以避免某些失败的
items
陷入热循环. 因为某些item
在出队列进行处理有可能失败, 失败了用户就有可能将该失败的item
重新进队列, 如果短时间内又出队列有可能还是会失败(因为短时间内整个环境可能没有改变等等), 所以可能又重新进队列等等, 因此陷入了一种hot-loop
.
所以就为用户提供了
AddAfter
方法, 可以允许用户告诉DelayingInterface
过多长时间把该item
加入队列中.
3.2 实现类delayingType
因为有延迟的时间, 所以那些
item
到时间了可以进入队列了, 那些没到时间还不能进入队列, 所以waitFor
和waitForPriorityQueue
应运而生.
waitFor: 保存了包括数据
data
和该item
什么时间起(readyAt
)就可以进入队列了.
waitForPriorityQueue: 是用于保存waitFor
的优先队列, 按readyAt
时间从早到晚排序. 先ready
的item
先出队列.
// delayingType wraps an Interface and provi`des delayed re-enquing
// 提供有延迟的重进队列
type delayingType struct {
Interface
// clock tracks time for delayed firing
clock clock.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once
// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor
metrics retryMetrics
}
// waitFor holds the data to add and the time it should be added
type waitFor struct {
data t
// 该data可以加入queue的时间
readyAt time.Time
// index in the priority queue (heap)
index int
}
type waitForPriorityQueue []*waitFor
3.3 方法
newDelayingQueue
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop()
return ret
}
这里需要注意的是
1. 启动了goroutine执行waitingLoop方法, 关于waitingLoop
在后面介绍.
2. NewNamed(name)是一个Type对象实例(2.2中的Type)
AddAfter
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
// 如果队列已经关闭 直接返回
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
// 如果不需要延迟 直接调用iterface(Type)中的Add方法
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
// 队列关闭的时候才会进入这里
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
// 构造一个waitFor放到waitingForAddCh(一个channel, 缓冲1000)
}
}
1. 构造一个
waitFor
放到channel(waitingForAddCh)
中,waitingLoop
会在另外一端接收.
waitingLoop
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
// queue关闭返回
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// 将那些已经ready好了的item可以加入到queue中
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// nextReadyAt启一个channel用于存着下次可以加入queue的时间
nextReadyAt := never
if waitingForQueue.Len() > 0 {
// 优先队列中的第一个肯定是最早ready的
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
}
select {
case <-q.stopCh:
return
// ticker的操作 每隔10s会调用
// 有可能去增加已经ready items
case <-q.heartbeat.C():
// continue the loop, which will add ready items
// 下次ready的时间到了
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
// 从AddAfter过来的数据
if waitEntry.readyAt.After(q.clock.Now()) {
// 如果时间没到 就加入waitingForQueue中
// waitingEntryByData用于保存waitEntry.data与waitEntry的关系
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
// 直接加入到queue中
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// if the entry already exists, update the time only if it would cause the item to be queued sooner
// 如果已经在waitingQueue中了 取readyAt最小的那个 为了可以让它早点出queue
// 就是说增加的item重复了 ready的时间取最早的一个
existing, exists := knownEntries[entry.data]
if exists {
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
}
return
}
heap.Push(q, entry)
knownEntries[entry.data] = entry
}
这里启动一个
for
循环:
1. 只有队列关闭的时候才会退出.
2. 利用channel(q.waitingForAddCh)
一直接受从AddAfter
过来的waitFor
.
3. 一直将已经ready
的item
加入到队列中.
注意: 如果有两个相同的
data
加入时,waitFor
中取ready
时间最早的那一个.
4. default_rate_limit
4.1 RateLimiter接口
type RateLimiter interface {
// 返回该item应该要等待多长时间
When(item interface{}) time.Duration
// 停止跟踪该item
Forget(item interface{})
// 返回该item失败的次数
NumRequeues(item interface{}) int
}
一个控制速率的接口. 在
3.1 DelayingInterface接口
中提到hot-loop
, 那关于等待多长时间可以根据失败的次数有关, 比如最简单的失败一次等待时间就增加一倍, 第一次失败等待1s
, 第二次失败等待2s
, 第三次失败等待4s
, 以此类推. 那RateLimiter
接口就是这样一个控制速率的抽象定义, 接下来看一个它的实现类.
4.2 ItemExponentialFailureRateLimiter
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
// 存的失败的item
failures map[interface{}]int
baseDelay time.Duration
maxDelay time.Duration
}
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
return &ItemExponentialFailureRateLimiter{
failures: map[interface{}]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}
func DefaultItemBasedRateLimiter() RateLimiter {
return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
}
4.3 方法
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 该item失败的次数
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
直接通过一个例子更好说明:
5. rate_limiting_queue
5.1 RateLimitingInterface接口
type RateLimitingInterface interface {
DelayingInterface
AddRateLimited(item interface{})
Forget(item interface{})
NumRequeues(item interface{}) int
}
因为
DelayingInterface
已经有了AddAfter
方法, 但是到底是after
多长时间并没有进行控制, 最简单的就是一个常量时间, 对所有的item
都一样. 但是这样的话也有可能会造成一些问题, 有些item
因为失败来回来回进入队列十几次, 而有些item
才失败一次, 就有可能排在这个失败十几次的item
后面, 如果还是不成功, 这样就会造成资源的浪费.
所以
RateLimitingInterface
就在DelayingInterface
加入对item
的速率进行控制, 并且与失败的次数进行相关.
5.2 实现类以及方法
很常规.
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
// 删除该item
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}