golang - sync.WaitGroup

go 版本基于1.18

结构体

结构体定义如下:

type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers only guarantee that 64-bit fields are 32-bit aligned.
    // For this reason on 32 bit architectures we need to check in state()
    // if state1 is aligned or not, and dynamically "swap" the field order if
    // needed.
    state1 uint64
    state2 uint32
}

当我们初始化一个WaitGroup对象时,其counter值、waiter值、semap值均为0

  • noCopy :
    空结构体,它并不会占用内存,编译器也不会对其进行字节填充。它主要是为了通过go vet工具来做静态编译检查,主要作用是防止开发者在使用WaitGroup过程中对其进行了复制,从而导致的安全隐患

  • state1, state2:
    主要代表三部分内容:

    1. 通过Add()设置的子goroutine的计数值counter
    2. 通过Wait()陷入阻塞的waiter数
    3. 信号量semap

    其中在64位 的操作系统中(对齐系数为8), 此时state1 的的高32 位代表计数器counter, 低32位代表waiter 数, state2 代表信号量
    在32 位的操作系统中(对齐系数为4), 此时将state1 和state2 unsafe.Pointer() 转化为[3]uint32的state数组,其中state[0] 代表信号量semap, state[0]作为uint64的高32位,即counter, state[1] 作为uint64的低32位, 即waiter。 具体实现的代码如下

    state方法就是返回对应的计数(counter,waiter)和信号量(semap)

      func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
              if unsafe.Alignof(wg.state1) == 8 ||     uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
          // state1 is 64-bit aligned: nothing to do.
              return &wg.state1, &wg.state2
          } else {
              // state1 is 32-bit aligned but not 64-bit aligned: this   means that
              // (&state1)+4 is 64-bit aligned.
              state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
          return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
        }
      }
    

方法

1. Add

  • 源码实现
func (wg *WaitGroup) Add(delta int) {
        // 获取计数器和信号量
    statep, semap := wg.state()
        // 竞争检测相关,与功能无关,忽略
    if race.Enabled {
        _ = *statep // trigger nil deref early
        if delta < 0 {
            // Synchronize decrements with Wait.
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
        // 计数值加上 delta: statep 的前四个字节是计数值,因此将 delta 前移 32位
    state := atomic.AddUint64(statep, uint64(delta)<<32)
        // 当前的counter计数值
    v := int32(state >> 32)
        // 当前的waiter 计数值
    w := uint32(state)
        // 竞争检测,忽略
    if race.Enabled && delta > 0 && v == int32(delta) {
        // The first increment must be synchronized with Wait.
        // Need to model this as a read, because there can be
        // several concurrent wg.counter transitions from 0.
        race.Read(unsafe.Pointer(semap))
    }
        // counter 计数值<0 , 曝panic 异常
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    // delta > 0 && v == int32(delta) : 表示从 0 开始添加计数值
   // w!=0 :表示已经有了等待者
   // 说明在添加counter计数值的时候,同时添加了等待者,非法操作。添加等待者需要在添加计数值之后
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
         // v>0 : 计数值不等于0,不需要唤醒等待者,直接返回
         // w==0: 没有等待者,不需要唤醒,直接返回
    if v > 0 || w == 0 {
        return
    }
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
          // 再次检查数据是否一致
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }

     // 到这里说明计数值为0,且等待者大于0,需要唤醒所有的等待者,并把系统置为初始状态(0状态)
  // 将计数值和等待者数量都置为0
    *statep = 0
          // 唤醒等待者
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

2. Done

  • 源码
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

完成一个任务,将计数值减一,当计数值减为0时,需要唤醒所有的等待者

3.Wait

  • 源码
func (wg *WaitGroup) Wait() {
        // 获取计数器和信号量
    statep, semap := wg.state()
        // 竞争检测,忽略
    if race.Enabled {
        _ = *statep // trigger nil deref early
        race.Disable()
    }
        
    for {
                // 原子操作,获取计数器值
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
                // 所有任务都完成了,counter =0,此时直接退出,即不阻塞
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // waiter 计数器加一
                // 这里会有竞争,比如多个 Wait 调用,或者在同时调用 Add 方法,增加不成功会继续 for 循环
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(semap))
            }
                        //   // 增加成功后,阻塞在信号量这里,等待被唤醒
            runtime_Semacquire(semap)
                         // 被唤醒的时候,计数器应该是0状态。如果重用 WaitGroup,需要等 Wait 返回
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
    }
}

注意事项

  • 保证 Add 在 Wait 前调用: 确保在子go 程中不使用Add 方法, 又可能导致和wait 造成竞争冲突,最后导致panic
  • Add 函数不要传入负值,有可能导致panic 或者导致 wait 函数中 信号量P 操作死锁等待
  • 不要复制使用 WaitGroup,函数传递时使用指针传递, WaitGroup 不支持复制操作, 可用go tool vet 检查是否对WaitGroup 复制使用
  • 尽量不复用 WaigGroup,减少出问题的风险, 复用的前提要在wait 函数返回之后

使用示例

package main

import (
    "sync"
)

type httpPkg struct{}

func (httpPkg) Get(url string) {}

var http httpPkg

func main() {
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.example.com/",
    }
    for _, url := range urls {
        // Increment the WaitGroup counter.
        wg.Add(1)
        // Launch a goroutine to fetch the URL.
        go func(url string) {
            // Decrement the counter when the goroutine completes.
            defer wg.Done()
            // Fetch the URL.
            http.Get(url)
        }(url)
    }
    // Wait for all HTTP fetches to complete.
    wg.Wait()
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容