Golang 之 WaitGroup 源码解析

前言

如果我们有一个大的任务要做,我们会尝试将这个任务分解,分解完成之后并发交由 goroutine 去做,并且我需要当全部的任务完成之后再进行下面的步骤,在 sync 包下,就有这样一个东西适合上述情况,WaitGroup,今天我们来看看具体它是怎么实现的。

PS:在下面我统一用 wg 来简称 WaitGroup

使用

它的使用非常简单,如下:

func main () {
    wg := sync.WaitGroup {}
    for i := 0; i < 10; i++ {
        wg.Add (1)
        go func (job int) {
            defer wg.Done ()
            //do something
            fmt.Printf ("job % d done\n", job)
        }(i)
    }
    wg.Wait ()
    fmt.Println ("all done")
}

输出:

job 9 done
job 1 done
job 0 done
job 8 done
job 7 done
job 3 done
job 6 done
job 2 done
job 4 done
job 5 done
all done

我们可以看到,使用非常简单,每次有一个任务就使用 Add 方法加一个,每次做完任务就使用 Done 方法告诉它已经完成了,而 Wait 就是等着所有的任务完成。

思考问题

在看 wg 的实现之前,首先来问几个问题,来考考自己。

  1. Wait 方法能否被多次调用,比如再开一个 goroutine 去 wait
  2. Wait 方法调用后是否还能再继续调用 Add 添加任务
  3. 每次只能 Done 一个任务,能否一次性 Done 多个任务呢
  4. wg 能否被拷贝或作为参数传递
  5. 如果让你自己实现一个,你会如何实现

前几个问题,如果你都能很清楚的回答,那么你对 wg 的了解可以说已经非常熟悉了。首选我来说一下对于最后的一个问题的回答,因为在看源码之前我都会想想如果是我,我会如何去实现,那么我想的也很简单。

  • 使用一个变量进行计数
  • 每次任务数量变更时使用 atom 原子操作 + 1 或者 - 1
  • -1 时判断任务数量是否已经为 0
  • 如果为 0 向一个 channel 里面发送消息
  • 所有 wait 的地方监听 channel 的消息,收到消息则证明任务全部完成

源码分析

结构

type WaitGroup struct {
    noCopy noCopy
    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    //compilers do not ensure it. So we allocate 12 bytes and then use
    //the aligned 8 bytes in them as state, and the other 4 as storage
    //for the sema.
    state1 [3] uint32
}

结构非常简单,就只有两个熟悉,一个 noCopy 还有一个 state1(我也很好奇为什么要用 1 来结尾命名,大佬的想法总是很奇妙)

noCopy: sync 包下的一个特殊标记吧,vet 检查,如果有拷贝的变量则会报错

func main () {
    wg := sync.WaitGroup {}
    w := wg
    fmt.Println (w, wg)
}

你 run 肯定没问题的,但是如果你使用 go vet 做个检查就有警告了

➜  go vet main.go
# command-line-arguments
./main.go:10:10: assignment copies lock value to w: sync.WaitGroup contains sync.noCopy
./main.go:11:17: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
./main.go:11:20: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy

state1:是用来存放任务计数器和等待者计数器的(我一看到这个结构就明白肯定后面又是位操作这样的高端操作了)

state [0] state [1] state [2]
64 位 waiter counter sema
32 位 sema waiter counter

其中 waiter 是等待者计数,counter 是任务计数,sema 是信号量

奇怪的是在 64 位还 32 位操作系统上是不一样的,具体原因以及对于它操作请继续看下去

state

//state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state () (statep *uint64, semap *uint32) {
    if uintptr (unsafe.Pointer (&wg.state1))%8 == 0 {
        return (*uint64)(unsafe.Pointer (&wg.state1)), &wg.state1 [2]
    } else {
        return (*uint64)(unsafe.Pointer (&wg.state1 [1])), &wg.state1 [0]
    }
}

这个方法是一个内部方法,就是将 state1 中存储的状态取出来,返回值 statep 就是计数器的状态,semap 是信号量

Done

func (wg *WaitGroup) Done () {
    wg.Add (-1)
}

没想到吧~居然 Done 就是调用 Add 并传递一个 - 1

所以其实我们完全可以再外部调用 Add 传递一个 - 3 一次性结束 3 个任务

Add

func (wg *WaitGroup) Add (delta int) {
    // 首先获取状态值
    statep, semap := wg.state ()
    // 对于 statep 中 counter + delta
    state := atomic.AddUint64 (statep, uint64 (delta)<<32)
    // 获取任务计数器的值
    v := int32 (state >> 32)
    // 获取等待者计数器的值
    w := uint32 (state)
    
    // 任务计数器不能为负数
    if v < 0 {
        panic ("sync: negative WaitGroup counter")
    }
    // 已经有人在等待,但是还在添加任务
    if w != 0 && delta > 0 && v == int32 (delta) {
        panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 没有等待者或者任务还有没做完的
    if v > 0 || w == 0 {
        return
    }
    // 有等待者,但是在这个过程中数据还在变动
    if *statep != state {
        panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
    }

    // Reset waiters count to 0.
    // 重置状态,并用发出等同于等待者数量的信号量,告诉所有等待者任务已经完成
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease (semap, false, 0)
    }
}

这里有几个要点我们其实已经看到了:

  • Wait 的 ** 过程中 ** 是不能 Add 的,不然就会 panic,要注意
  • 虽然我们可以借助 Add 一个负数来一次性结束多个任务,但是如果任务数量控制的不好,变成负数也会 panic,Done 次数多了也一样
  • wg 是通过信号量来通知的,当然可以有很多人在等,wg 它都会一一通知到位的

Wait

func (wg *WaitGroup) Wait () {
    // 先获取状态
    statep, semap := wg.state ()
  
    for {
        // 这里注意要用 atomic 的 Load 来保证一下写操作已经完成
        state := atomic.LoadUint64 (statep)
        // 同样的,这里是任务计数
        v := int32 (state >> 32)
        // 这里是等待者计数
        w := uint32 (state)
        // 如果没有任务,那么直接结束,不用等待了
        if v == 0 {
            return
        }
        // 使用 cas 操作,如果不相等,证明中间已经被其他人修改了状态,重新走 for 循环
        // 注意这里 if 进去之后等待者的数量就 +1 了
        if atomic.CompareAndSwapUint64 (statep, state, state+1) {
            // 等待信号量
            runtime_Semacquire (semap)
            // 如果信号量来了,但是状态还不是 0,则证明 wait 之后还是在人在 add,证明有人想充分利用 wg 但是时机不对
            if *statep != 0 {
                panic ("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

其实 wait 虽然简单,也有要点

  • 通过 load 和 cas 操作 + 循环来避免了锁,其实这个操作可以学一下
  • 其实这里也说明明白了,wg 可以重用,但是你必须等到 wait 全部完成之后再说

其他注意点

func main () {
    wg := sync.WaitGroup {}
    for i := 0; i < 10; i++ {
        wg.Add (1)
        go func (job int) {
            doJob (job, wg)
        }(i)
    }
    wg.Wait ()
    fmt.Println ("all done")
}

func doJob (job int, wg sync.WaitGroup) {
    fmt.Printf ("job % d done\n", job)
    wg.Done ()
}

上面的代码有问题吗?问题在哪呢?

其实很简单,wg 作为一个参数传递的时候,wg 还是一个普通的结构体,我们在函数中操作的时候还是操作的一个拷贝的变量而已,对于原来的 wg 是不会改变的,所以这里需要传递指针才是正确的

func main () {
    wg := &sync.WaitGroup {}
        for i := 0; i < 10; i++ {
            wg.Add (1)
            go func (job int) {
                doJob (job, wg)
            }(i)
    }
    wg.Wait ()
    fmt.Println ("all done")
}

func doJob (job int, wg *sync.WaitGroup) {
    fmt.Printf ("job % d done\n", job)
    wg.Done ()
}

但是其实并不推荐这样去传递 wg,因为这样很容易出现问题,一个不好就出问题了,个人还是建议直接在使用 goroutine 之后马上接一个 defer wg.Done () 来的更加靠谱一些

总结

回过头来看看,之前的问题也都有了答案:

  1. Wait 可以被调用多次,并且每个都会收到完成的通知
  2. Wait 之后,如果再 Wait 的过程中不能在 Add,否则会 panic,但是 Wait 结束之后可以继续使用 Add 进行重用
  3. 可以使用 Add 传递负数的方式一次性结束多个任务,但是需要保证任务计数器非负,否则会 panic
  4. wg作为参数传递的时候需要注意传递指针,或者尽量避免传递
  5. 官方利用位操作节约了空间,存在在同一个地方;利用信号量来实现任务结束的通知....

总的来说 wg 的实现还是非常简单的,需要注意的就是几个使用上的点不要出现意外即可。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343