go 的并发机制

并发与并行的概念

① 多线程程序在单核cpu上运行就是并发;
② 多线程程序在多核cpu上运行就是并行;

goroutine 特点

  • 它是一个协程,是一个轻量级的线程。
  • 非抢占式多任务处理,由协程主动交出控制权。(下面会解释)
  • 编译器/解释器/虚拟机层面的多任务。
  • 多个协程可能运行在多个线程之上,这个是由调度器去决定的。

协程是非抢占式的,由协程主动交出控制权,而线程是抢占式的,也就是由操作系统主动停掉一个线程让给其他线程执行。

非抢占式多任务

非抢占式的含义就是其他任务不会抢我的cpu,而是等待我主动让出去。别的goroutine才可以执行。而io等操作会主动让出去goroutine的执行权。

func main() {
    var a [10]int
    for i := 0;i< 10;i++{
        go func(i int) {
            for {
                a[i]++
            }
        }(i)
    }
    time.Sleep(time.Second)
    fmt.Println(a)
}

这段代码可能让人感觉 1s后就会退出阿,因为main函数会退出阿。其实不会的。这个程序会卡住。而其实main函数也是一个goroutine。而程序中对一个变量++ 是不会交出控制权的(io操作可以交出控制权,如fmt.println),所以main也得不到运行了。。可以看到这段程序死机了(cpu使用率400%,我是4核机器)

进程   USER      PR  NI    VIRT    RES    SHR   %CPU %MEM     TIME+ COMMAND                                                                   
20990 kiosk     20   0  102708   1440   1052 S 390.2  0.0   0:36.46 /tmp/___go_build_main_go  

为了手动交出控制权,可以添加一行 runtime.Gosched()
普通函数是协程的一个特例。因为协程中,main和dowork是可以相互交互的。
调度器会在合适的点进行切换,不需要人来介入。


goroutine的可能切换点

  • I/O select
  • channel
  • 函数调用
  • runtime.Gosched()
  • 等待锁
  • 其他
多个协程可能运行于多个线程之上。

还是以刚才的例子。观察goroutine 运行占cpu 362。他开启了6个线程,但实质上只运行了4个(看下图的pidstat输出),因为我的机器是4核。他运行在4核CPU上。go的调度器是很智能的。

pidstat -t -p 21312 1
Linux 4.15.0-50-generic (Prometheus)    2019年06月05日     _x86_64_    (4 CPU)

23时34分45秒   UID      TGID       TID    %usr %system  %guest   %wait    %CPU   CPU  Command
23时34分46秒  1000     21312         -  100.00    1.00    0.00    0.00  100.00     3  ___go_build_mai
23时34分46秒  1000         -     21312    0.00    0.00    0.00    0.00    0.00     3  |_____go_build_mai
23时34分46秒  1000         -     21313    0.00    0.00    0.00    0.00    0.00     2  |_____go_build_mai
23时34分46秒  1000         -     21314   98.00    0.00    0.00    2.00   98.00     2  |_____go_build_mai
23时34分46秒  1000         -     21315  100.00    0.00    0.00    0.00  100.00     1  |_____go_build_mai
23时34分46秒  1000         -     21316   96.00    0.00    0.00    4.00   96.00     3  |_____go_build_mai
23时34分46秒  1000         -     21317   98.00    0.00    0.00    2.00   98.00     0  |_____go_build_mai

M,P,G

Go 的调度器内部有三个十分重要的结构,M,P,G。(M>P 如上个例子的 6 个M,4个P)

  • M 表示真正的内核OS线程,和POSIX里的thread差不多,真正干活的人。
  • P 表示调度等上下文,可以把他看做一个局部的调度器,使go代码在一个线程上跑,它是实现从N:1 (多个用户线程在一个内核线程上跑)到 N:M 映射的关键。
  • G 代表一个 goroutine,它有自己的栈,用于调度。
    M,P,G

    上图表示有两个物理线程M,每个M都拥有一个context(P),每一个P上又拥有一个正在运行的G和很多等待运行的G。
    P 的总数量可以通过 GOMAXPROCS() 设置。它表示真正的并发量,即有多少个goroutine可以同时运行。
    上面等待的(灰色)goroutine处于ready的就绪态。而每个P都维护着一个队列(runqueue)
M,P,G

当一个M(线程)阻塞了,P(调度器)可以转而投奔另一个OS线程,当一个OS线程M0 阻塞,P转而在OS线程M1 运行。调度器保证有足够的线程来运行所有的P。(如之前看到的4个P,6个M)
当M0返回时,它必须尝试取得一个 context P 调度器,一般情况下,会从其他的OS 线程上偷steal一个P过来。
如果没有偷到的话,他就把goroutine放到global runqueue 中,自己睡眠(放回线程池)。P 也会周期性检查global runqueue。


M,P,G

另一种情况就是某一个P所分配的任务G很快被执行完了(分配不均),这就导致一个context P 闲着,如果 global runqueue 上没有 G 了,那么它会偷其他P 的G。一般偷的话会偷一半。确保每个OS线程都能得到充分的使用。

这段参考知乎 Golang 的 goroutine 是如何实现的? Yi Wang 的回答

CSP 模型 channel

看下面的一段代码。这里的使用函数式编程,以一个函数创建一个通道。

func CreateWorker(id int) chan<- int {     // 返回一个只允许往里送数据的chan。
    c := make(chan int)
    go func() {
        for {
            n, ok := <-c    // 自己在goroutine 里收数据 
            if !ok {break}
            fmt.Printf("Worker %d received %d \n",id,n)
        }
    }()
    return c
}

func main() {
    var c [5]chan<- int
    for i := 0;i < 5; i++{
        c[i] = CreateWorker(i)      //创建5个没有buffer的通道 返回值是只允许发数据的chan
    }

    for j := 0;j < 5; j++ {
        c[j] <- j                       // 通道里写值来确保一个任务结束
        close(c[j])             // close 不是说必须的, 但是关闭通道的最好是发送方!!!
    }
}

执行结果:
Worker 0 received 0 
Worker 1 received 1 
Worker 2 received 2 
Worker 3 received 3 

这个函数有个问题啊。为什么打印出的是4个不是5个呢,在最后一个打印时,gorounting结束了,但是最后一个还没来得及打印。
如下改造, 加一个 done,当 done 中的数据被取出来后,打印的动作肯定也就完成了。

type Worker struct {
    In      chan    int
    Done    chan    bool
}

func doWorker(id int,w Worker) {
    for n := range w.In {
        fmt.Printf("Worker %d received %c \n",id,n)  // 先打印再down
        w.Done <- true
    }
}

func CreateWorker(id int) Worker {
    w := Worker{
        In: make(chan int),
        Done: make(chan bool),
    }
    go doWorker(id,w)
    return w
}

func main() {
    var workers [5]Worker
    for i := 0;i < 5; i++{
        workers[i] = CreateWorker(i)
    }

    for j := 0;j < 5; j++ {
        workers[j].In <- 'a'+j
        <- workers[j].Done   // 将数据送进去之后,等待work打印完成(等待down)
    }
}

执行结果
Worker 0 received a 
Worker 1 received b 
Worker 2 received c 
Worker 3 received d 
Worker 4 received e 

close 了的channel还可以接受数据吗?

通道被关闭,是还可以接着收数据的。如下面的代码 (呼应了通道的关闭最好是发送方!!)

close 了的channel 关闭了就不能再发送数据了,这里就不做解释了。

func worker(c chan int) {
    for {
        fmt.Printf("Worker recived %d\n",<-c)
    }
}

func main() {
    c := make(chan int)
    go worker(c)
    c <- 'a'
    c <- 'b'
    c <- 'd'
    close(c)

    time.Sleep(50 * time.Microsecond)
}

执行结果
Worker recived 97
Worker recived 98
Worker recived 100
Worker recived 0
Worker recived 0
Worker recived 0
Worker recived 0
....

当一个channel 被关闭了,就会一直收到 0。怎么避免呢?

  • 方法一: ok-parten 模式
    所有的 channel 接收者都会在 channel 关闭时,立立刻从阻塞等待中返回且 ok 值为 false。
func worker( c chan int) {
    for {
        if n,ok := <-c ;ok {
            fmt.Printf("Worker recived %d\n",n)
        }
    }
}
  • 方法二 :range 模式
func worker( c chan int) {
    for n := range c{
        fmt.Printf("Worker recived %d\n",n)
    }
}
打印怎么是顺序的?

这里 还有一个问题,打印的数据是按顺序的,这和直接按顺序打印没有区别了。(因为这里每打印一次就要等待一个down,down了才能开始下一次)
将done 和 打印分开就可以了

type Worker struct {
    In      chan    int
    Done    chan    bool
}

func doWorker(id int,w Worker) {
    for n := range w.In {
        fmt.Printf("Worker %d received %c \n",id,n)  // 先打印再down
        go func(w Worker) {
            w.Done <- true
        }(w)

    }
}

func CreateWorker(id int) Worker {
    w := Worker{
        In: make(chan int),
        Done: make(chan bool),
    }
    go doWorker(id,w)
    return w
}

func main() {
    var workers [20]Worker
    for i := 0;i < 20; i++{
        workers[i] = CreateWorker(i)
    }

    for i,worker := range workers {
        worker.In <- 'a'+i   // 专心往里送数据,然后打印
    }

    for _,worker := range workers {
        <- worker.Done    // 等待完成这件事放到最后面
    }
}

执行结果:
Worker 0 received a 
Worker 3 received d 
Worker 5 received f 
Worker 2 received c 
Worker 1 received b 
Worker 4 received e 
Worker 6 received g 
...

上述的方法还是不够优雅,看下面的

WaitGroup 去并发任务(确保任务都执行)

引入WaitGroup,当所有的任务都完成才退出。还有waitgroup确保一个任务被执行。

type Worker struct {
    In      chan    int
    Done    func()         // 函数式编程,Done 去调用 wg.Done
}

func doWorker(id int,w Worker) {
    for n := range w.In {
        fmt.Printf("Worker %d received %c \n",id,n)  // 先打印再down
        w.Done()
    }
}

func CreateWorker(id int,wg *sync.WaitGroup) Worker {   // 这里的wg必须是指针
    w := Worker{
        In: make(chan int),
        Done: func() {
            wg.Done()                      // 函数式编程,将wg.Done() 放在函数中
        },
    }
    go doWorker(id,w)
    return w
}

func main() {
    var workers [20]Worker
    var wg sync.WaitGroup
    for i := 0;i < 20; i++{
        wg.Add(1)
        workers[i] = CreateWorker(i,&wg)
    }

    for i,worker := range workers {
        worker.In <- 'a'+i
    }

    wg.Wait()
}

执行结果:
乱序打印

锁争抢

看以下代码。下面的代码是一段有问题的代码。

func main() {
    cnt := 0
    var wg  sync.WaitGroup
    for i := 0; i < 500 ; i++{
        wg.Add(1)
        go func(wg *sync.WaitGroup) {
            cnt++
            wg.Done()
        }(&wg)
    }
    wg.Wait()
    fmt.Println(cnt)
}

执行结果:
498

使用 go run -race 去检测一下。

$ go run -race goroutine.go 
==================
WARNING: DATA RACE
Read at 0x00c0000a6010 by goroutine 7:
  main.main.func1()
      /home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:45 +0x38

Previous write at 0x00c0000a6010 by goroutine 6:
  main.main.func1()
      /home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:45 +0x4e

Goroutine 7 (running) created at:
  main.main()
      /home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:44 +0xe4

Goroutine 6 (finished) created at:
  main.main()
      /home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:44 +0xe4
==================
496
Found 1 data race(s)

上面显示 cnt++ 即在读又在写

为了解决这种线程不安全,就需要加锁。经过改造得。这下的结果就正确了,结果为500。

func main() {
    cnt := 0
    var wg  sync.WaitGroup
    var lock sync.Mutex
    for i := 0; i < 500 ; i++{
        wg.Add(1)
        go func(wg *sync.WaitGroup) {
            lock.Lock()
            defer lock.Unlock()
            cnt++
            wg.Done()
        }(&wg)
    }
    wg.Wait()
    fmt.Println(cnt)
}

执行结果:
500
$ go run -race goroutine.go 
500

本例子是对一个变量进行++ 操作。这是线程不安全的。所以这种操作还是加锁比较安全

select 多路选择和超时

当select 中没有default的话,哪个个case收到值了,就执行并返回,否则一直阻塞等待。

func createWorker() <-chan int {
    ch := make(chan int)
    go func() {
        randInt :=  rand.New(rand.NewSource(time.Now().Unix())).Intn(10)   //产生随机数
        time.Sleep(time.Duration(randInt)*time.Second)  // 睡眠随机秒
        ch <- randInt     // 扔进去一个channel
    }()
    return ch
}

func main() {
    ch1 := createWorker()            
    ch2 := createWorker()

    select {                       // 2个都不返回的话就阻塞,一个返回就彻底运行结束
    case n := <- ch1:
        fmt.Println("ch1 get ",n)
    case n := <- ch2:
        fmt.Println("ch2 get ",n)
    }
}

执行结果:
ch2 get 6

那么select 有什么用呢?select 和 select,poll,epoll 类似。就是监听 IO 操作,当IO 操作发生时,就触发相应的动作,否则就阻塞。

  • 用处一 超时等待
    加上一个 time.After 。如果5s 还没有数据返回,就不阻塞了。
   ch1 := createWorker()
   ch2 := createWorker()

   select {
   case n := <- ch1:
       fmt.Println("ch1 get ",n)
   case n := <- ch2:
       fmt.Println("ch2 get ",n)
   case <- time.After(5*time.Second):
       fmt.Println("get value timeout ")
   }
  • 用处二 判断channel是否满或空
    因为ch1 和 ch2 都是空,所以就执行到 default,那么就可以判断所有的通道是否为空了。
 ch1 := make (chan int, 1)
ch2 := make (chan int, 1)

select {
case <-ch1:
    fmt.Println("ch1 pop one element")
case <-ch2:
    fmt.Println("ch2 pop one element")
default:
    fmt.Println("default")
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容

  • 轻量级线程:协程 在常用的并发模型中,多进程、多线程、分布式是最普遍的,不过近些年来逐渐有一些语言以first-c...
    Tenderness4阅读 6,351评论 2 10
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,525评论 0 5
  • Go语言从诞生到普及已经三年了,先行者大都是Web开发的背景,也有了一些普及型的书籍,可系统开发背景的人在学习这些...
    空即是色即是色即是空阅读 530评论 0 6
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,084评论 1 32
  • 宫外孕———作为妇科最为常见的一个妇科急症,我觉得应该给大家做些科普。 一、什么是宫外孕? 正常情况下,受精...
    沃筱蝶阅读 1,571评论 0 0