目录
一、 Go的并发机制:线程模型
二、 Go并发编程初探:goroutine和channel
Go有两种并发编程的选择,一种是本篇介绍的goroutine,它是基于通信顺序进程(CSP)的编发模式,另一种是传统的通过共享内存访问多线程的模式。
goroutine
goroutine是Go程序中最基本的组织单位,一个程序最少有一个goroutine,通常只需main()
函数的称为主goroutine。这好像有点像传统的线程,但是由上一篇我们已经知道,两者是完全不同的。
在这里,可以直接将goroutine理解为上一篇GMP中的G,原理都已经介绍过,所以下面主要是记录一些它的特性以及使用中比较容易忽略的地方。
先来一段最简单的并发操作
func main() {
go func() {
fmt.Println("say hi")
}()
}
这是一个匿名函数,在函数前面加一个go就能开启并发,上面的代码正常来说会创建两个goroutine,一个是主goroutine,另个一个是用于执行go后面的func代码的goroutine,这个过程需要找到一个可用的空闲G并初始化,然后加入到P队列,再找可用的M关联(详细流程翻阅上一篇),但是这一通操作还没来得及,主goroutine已经结束了,所以上面代码并不会输出任何信息。
如果需要等待新的goroutine执行完打印语句在结束可以通过下面两种方法:
func main() {
//方法1
c := make(chan struct{})
go func() {
fmt.Println("say hi 1")
c <- struct{}{}
}()
<-c
//方法2
var n sync.WaitGroup
n.Add(1)
go func() {
defer n.Done()
fmt.Printf("say hi 2")
}()
n.Wait()
}
输出:
say hi 1
say hi 2
第一种用到通道(channel),在下面通过<-c
读取通道的值造成阻塞,等到另一个goroutin执行完打印操作,通过c<-
往通道传递值,才能继续。通道相关部分下面会介绍。第二种使用传统共享内存访问的方式,相信好多人看到n.wait()
也能猜出来,在Go语言中,sync包提供了传统共享内存访问的操作,waiGroup能使我们可以收集多个goroutine的结果,Add
类似一个计数器,而Done
会进行递减,wait
会一直阻塞知道计数器为零。sync包还包含互斥锁、读写锁、条件变量等,这些都是共享内存访问的并发方式,它们的含义和使用上其他语言这里不多,这里不打算介绍。
通道(channel)
通道是可以让一个goroutine发送特定的值到另一个goroutine的通信机制。通道是可以指定传递某种类型的,例如一个int类型的元素通道:chain int
,创建一个通道可以和map一样:ch := make(chan int)
。
通道是引用类型的,未初始化的时候值为零值nil,使用==
对通道进行比较时,如果都是同一通道的数据引用时值为true
。通道有三个最主要的操作:
ch <- x //发送值,写
x := <- ch //接收值,读
close(ch) //关闭通道
下面通过一个简单的例子:一个goroutine通知另一个goroutine打印指定信息,来了解下通道的使用,同时对比一下其他语言使用多线程实现的区别:
func main() {
ch := make(chan string)
done := make(chan struct{})
//发送方
go func() {
ch <- "Golang"
}()
//接收方
go func() {
x := <-ch
fmt.Printf("Print: %s\n", x)
done <- struct{}{}
}()
<-done
}
首先创建了两个通道,第一个ch
是字符串类型的元素通道,一个goroutine将需要打印的信息通过此通道,传递给另一个goroutine,第二个done
是一个struct{}
空结构体的通道,这种通道一般表示不用来传递任何实质的值,在这里的用途主要是告知主goroutine打印已经执行完成,这种不关心传通讯递值只是关心通讯本身的时间点的通道我们一般称为事件(event)。
接着发送方将“Golang”字符串放入通道ch
,虽然两个goroutine是并发,但是在发送方塞入值之前,接收方会一直阻塞在x := <-ch
这里,知道接收到通道ch
的值后,将其读取,并打印出来。接着把一个空结构体放入done
通道,这里同理,主goroutine会一直阻塞在<-done
这里,等待接收方的通知来临,最后整个程序结束。
通道是可以有方向的,上面两种通道在使用上都是双向的,即既可以读(<-ch)也可以写(ch<-),通道也是可以限制为单向的,但一般不会在创建的时候就指定是单向的,因为创建一个只进不出的通道显示是没有意义的,所以单向通道一般是通过双向通道转换而来,把上面的例子改造为单向通道:
func main() {
//创建的时候依然是双向通道
ch := make(chan string)
done := make(chan struct{})
//发送方
//约束只能是单向的写通道
go func(w chan<- string) {
w <- "Golang"
// <-w 尝试读取值,会报编译错误
}(ch)
//接收方
//约束稚只能是单向的读通道
go func(r <-chan string) {
x := <-r
fmt.Printf("Print: %s\n", x)
done <- struct{}{}
}(ch)
<-done
}
两个goroutine函数的入参除声明了单向的通道,如果尝试在单向写通道读取值,编译就不会通过,还有一点主意,close
关闭通道的操作也只能在写通道处执行。既然双向通道也可以正常使用,为什么要弄一个单向通道呢?这个问题我觉得和规范使用通道有关,也是在真正的项目使用上需要特别注意的,这部分放在下面再详细介绍。
无缓冲通道
上面例子的代码使用的都是无缓冲通道,前面也介绍过,如果一个goroutine在通道读取值的时候,通道内没有任何值,这时候goroutine就会一直阻塞。反过来,如果一个goroutine往通道写入一个值,但是没有其他goroutine去读取,这时候如果继续往通道写值也会一直阻塞。无缓冲通道使通讯的双方变得同步化,所以有时候也叫做同步通道。
缓存通道
缓冲通道内部维护一个队列,队列的最大长度在创建的时候通过make
的容量来设置
//创建一个容量为5的缓冲通道
ch := make(chan string,5)
make(chan string,0) 等价于 make(chan string)
既然内部是一个队列,所以通道的值无论是写入还是读取,都是有序的,遵循FIFO的规则。除此之外,对通道进行写入和读取的操作也是有序的,下面再看一个例子:
func main() {
ch := make(chan string, 3)
var n sync.WaitGroup
n.Add(3)
go func() {
time.Sleep(1 * time.Second)
ch <- "Golang"
ch <- "Golang"
ch <- "Golang"
}()
go func() {
time.Sleep(3 * time.Second)
x := <-ch
fmt.Printf("A sleep 2sec and print: %s\n", x)
n.Done()
}()
go func() {
time.Sleep(3 * time.Second)
x := <-ch
fmt.Printf("B sleep 3sec and print: %s\n", x)
n.Done()
}()
go func() {
time.Sleep(1 * time.Second)
x := <-ch
fmt.Printf("C sleep 1sec and print: %s\n", x)
n.Done()
}()
n.Wait()
}
输出:
C sleep 1sec and print: Golang
A sleep 2sec and print: Golang
B sleep 3sec and print: Golang
创建一个容量为3的缓冲通道,并一次写入3个值,下面新开3个goroutine分别读取通道的值,注意看每个函数里面都有睡眠时间,最后的输出结果会按照睡眠时间的升序排序。
无缓冲通道的值是一次性写入的,也就是说它不用关系有没有goroutine正在读取,当写入的值数量达到了最大值,如果继续尝试写入,这时候就会阻塞。
刚开始接触通道的时候,可能大家都会和我一样觉得它和队列(queue)十分相识,但是官方提示千万不要把它作为队列来使用,因为它还和goroutine有深度的关联,这样使用反而会变得复杂。
关闭通道(close)
通道的第三种操作。只有那些已创建并正在打开的通道才能关闭,尝试再未初始化、已关闭、只读的通道进行关闭都是不允许的。还有一点,当一个通道关闭的时候,会通知(实际是唤醒)到所有读取通道的goroutine:
func main() {
ch := make(chan string, 3)
var n sync.WaitGroup
n.Add(3)
go func() {
time.Sleep(1 * time.Second)
close(ch)
}()
go func() {
x := <-ch
fmt.Printf("A print: %s\n", x)
n.Done()
}()
go func() {
x := <-ch
fmt.Printf("B print: %s\n", x)
n.Done()
}()
go func() {
x := <-ch
fmt.Printf("C print: %s\n", x)
n.Done()
}()
n.Wait()
}
输出:
B print:
C print:
A print:
上面代码有3个goroutine在读取通道的值,第一个goroutine在睡眠1秒后关闭通道,这时候控制台即刻输出结果并结束程序。我们并没有往通道写入任何值,但是读取值的goroutine仍然被唤醒,最后读取的是个空字符串,因为string
是一个值类型,默认值就是空字符串。
循环读取通道的值
说到循环,岔开一下,在Go语言中,循环是通过for
来完成的,没有其他语言的while和foreach,任何你想要循环的逻辑都可以通过一个for
来完成,事实上简单易用也是Go追求的。从循环只有for
,没有++i
这种表达也可以看出,开发者们的确是想砍掉那些可有可无的,留下的都是必须要的。我觉得这种想法是很好的,特别是对于一个初学者,进来一下用for一下用while,会感到迷茫。
说回来,如果想要对一个无缓冲通道依次读取值,也可以使用for
,通过range
关键字作为参数遍历,它会在通道关闭时自动中断循环:
func main() {
ch := make(chan string, 5)
go func() {
defer close(ch)
for i := 0; i < 5; i++ {
ch <- strconv.Itoa(i)
}
}()
for i := range ch {
fmt.Println(i)
}
}
输出:
0
1
2
3
4
这里在主goroutine循环读取通道的值,for依次读取了通道的值。在写入通道的goroutine写入5个值,最后关闭通道defer close(ch)
,这一句是必须的,如果不关闭通道,for会一直尝试循环读取值而导致阻塞,最终结果就是死锁。
select多路复用
select语句是一种只能用于通道接收和发送操作的专用语句,他的用法与switch有点相似
select {
case <-ch:
//读取通道值
case x := <-ch:
//读取通道值,并赋值给x
case ch <- "s":
//通道写入值:s
default:
//所有case语句都不符合条件
}
上面展示了select语句一般的使用场景,和switch一样它有多个条件和一个默认的分支(可选)。上面3个case分别代表了3中情况,当执行select语句的时候,运行时并不急于值判断,而是先求所有case语句的值,例如先求得case getTopNum()
,这时候也知道哪个case是符合条条件的。接着判断:如果有一个case符合条件,执行case分支下的代码,抛弃其他case;如果有多个case符合条件,通过伪随机算法选择一个case执行,并抛弃其他case;如果没有任何符合条件的case,执行default的代码,如果这种情况下没有设置default分支,则会阻塞而不是终止;
for-select循环
select语句只会执行一轮,无论是有符合条件还是没有都是,就算是阻塞,也只是延长一轮的结束时间,由于select是用在通道这种特殊的场景,所有一般会结合for
循环来使用,达到一种类似监听的效果:
func main() {
ch := make(chan string, 5)
go func() {
defer close(ch)
for i := 0; i < 5; i++ {
ch <- strconv.Itoa(i)
}
}()
loop:
for {
select {
case v, ok := <-ch:
if v == "2" {
fmt.Printf("value:%s ok:%v \n", v, ok)
}
if !ok {
fmt.Printf("channel closed %v \n", ok)
break loop
}
}
}
}
上面展示了for和select结合使用的例子,一开始新启动的goroutine会往通道写入5个值,select的case分支读取通道的值,因为通道是有序的,第一个读取的是“0”,这时候由于有v=="2"
的判断,所以不会进入代码块,继续往下走,到了判断ok
值的分支,当通道的值已被全部取出并关闭时,此值为false。所以第一轮ok的值肯定是true的,这时候对于select语句来说,已经结束了,但是我们再select外面包了一层for循环,这时候会让select再执行一次。注意这里使用了带标签(loop)的break语句,当前最后执行到break loop
的时候,会直接终止for循环。
在前面对于select语句有个关于多个case分支符号条件的执行结果,可能大家会有疑问,为什么会伪随机执行而不是顺序去执行?我认为是这样:Go预约的select语句是特殊的,只能和通道一起使用,而通道是并发中goroutine通讯的管道,多个通道频繁传递值肯定是常有的,想象一下,如果有多个case分支,每一个都正在监控着不同的通道,每个通道都正在不停的进行通讯(写->读),这时候如果是顺序执行,可能永远都只执行第一个case,其他case被抛弃。这种情况下,最好的方法只能是通过伪随机,是每一个case分支都有运行的机会。
通道的使用规范
前面还留下一个问题:为什么还需要有单向通道的存在?这里也说下我的理解,在这之前,先列一个通道操作以及对应的状态:通道有三种操作且有多种状态,在每某些状态下对通道进行一些操作,可能会导致发生一些不可预测的问题,例如panic、死锁,非期望的阻塞的呢过。而通道本身与普通的变量并无不同,可以函数间进行传递,想象如果在一个只是需要读取值的函数,不小心对通道进行了关闭操作。
上面列表出现了好几种死锁和panic的操作,应该尽量避免这种情况:
- 正确的分配通道的所有权,这里所有权定义为实例化、写入和关闭通道。要弄清楚哪个goroutine拥有通道。在这里,单向通道的作用就提现出来了,它将允许我们区分通道的拥有者和使用者。拥有者有一个(chan或<-chan)权限,使用者只有(chan<-)权限。一旦我们将拥有者和使用者区分,上面表的结果就会很清晰。
- 尽量保持通道所有权的范围很小,避免将一个通道作为结构体的公开成员变量。
总结
总得来说,Go语言提供goroutine和sync同步包支持并发,Go的设计者们更愿意推荐使用goroutine和通道的方式,这是基于CSP(communicating sequential process)的设计,相比使用sync优点是更方便优雅的编写并发程序,但传统的共享内存是最接近硬件的通信方式,性能也更好。当然,在真正的项目上,肯定也不会只局限与一种,可能也需要两者结合使用。