Go 通道
1.什么是通道?
我们知道多个并发单元在对同一资源进行访问时会涉及资源的占用问题,在其他语言的方案中,都是通过共享内存的方式去访问资源,即互斥锁。当一个并发单元对一个资源进行访问时,如该资源有可能也被其他并发单元访问,我们会在对该资源操作时加互斥锁,以保证在同一时间单位里该资源是确定的。这种方式解决了资源访问的安全问题,但也同时带来并发效率问题,如锁定资源的范围和时间过大,则其他相关并发单元会处于等待状态,带来效率的损失。
为此Go在并发单元对资源的访问里设计另一种可能性,即channel,中文可叫通道或管道,它提倡要通过通信共享内存,而不要通过共享内存进行通信。
通道最早由CSP模型提出,以点对点通道代替内存共享实现并发单元间的数据交互,相比内存共享数据交互,它在程序并发原语上更加直观,也可能在某种程度上会有相较的效率提升。关于CSP模型在《Go并发编程初探》篇章已提到,这里不再赘述。
在Go中,通道是一种特殊的程序原语,它为CSP并发模型而设计,用chan关键字表示。它的主要作用是在协程之间实现通信,可以说是go协程间的高速公路。
通道声明:
var {变量名} chan {数据类型}
//声明一个整型通道chA
var chA chan int
通道创建:
{接收变量名} := make(chan {数据类型},{通道容量})
//使用make函数创建一个容量为5的整型通道
chB := make(chan int,5)
通道有几个特点:
1.通道是有类型的,基于go的基本数据类型
2.通道是有方向的,可从通道读出数据,可向通道写入数据。当通道被传递时可设置其读写方向
3.通道是有缓存的,通过缓存容量可控制协程间的阻塞
4.通道是可关闭的,且只能被关闭一次,通道也是资源,如果不使用应关闭
通道使用常识:
1.通道数据读写,通道写入数据后必须关闭;
2.从一个已经关闭的通道中读取数据,读完了之后,继续读会读到其通道类型的零值;
3.没有初始化的通道被关闭会报panic;
4.读取通道数据时应校验其有效性;
5.关闭通道时会产出一个广播,所有从通道读取数据的协程都会收到消息;
6.被遍历的通道如果没有收到通道被关闭的广播,遍历会一直被阻塞;
7.通道一般在写入协程处调用关闭,写只有一个写入协程的情况,通道被关闭后不能写入数据,但其他协程可以读出,注意重复关闭的情况;
以下演示通道在协程间的通信作用:
func BaseChannel01() {
//声明一个通道:声明后没有初始化的通道是空指针nil,证明其为引用类型
//PS:go的引用类型有五种:slice、map、chan、指针、接口,前三种都是直接可以用make()函数创建
//var chA chan int
//fmt.Println(chA)
//初始化一个通道
chB0 := make(chan int, 0) //无缓存能力的整型通道
chB1 := make(chan string, 5) //缓存能力为5的字符串通道
chB2 := make(chan<- int, 3) //缓存能力为3的整型只写通道
chB3 := make(<-chan int, 3) //缓存能力为3的整型只读通道
fmt.Printf("chB0:类型%T,值%v\n", chB0, chB0)
fmt.Printf("chB1:类型%T,值%v\n", chB1, chB1)
fmt.Printf("chB2:类型%T,值%v\n", chB2, chB2)
fmt.Printf("chB3:类型%T,值%v\n", chB3, chB3)
//协程从通道不断的写入读出数据
//开辟一个协程往通道里写数据
go func(ch chan int) {
//循环不断往通道里写入当前秒数
for {
now := time.Now()
second := now.Second()
ch <- second
}
}(chB0)
//另一个协程不断从通道读出数据
go func(ch chan int) {
for {
nowSecond := <-ch
fmt.Println("子协程读出数据:", nowSecond)
time.Sleep(time.Second)
}
}(chB0)
//父协程不断从通道读出数据
for {
nowSecond := <-chB0
fmt.Println("父协程读出数据:", nowSecond)
time.Sleep(time.Second)
}
//读取通道数据时应校验其有效性
fmt.Println("读取通道数据时校验其有效性:")
chCheck := make(chan int, 3)
chCheck <- 123
chCheck <- 456
chCheck <- 789
close(chCheck)
go func(ch chan int) {
chc1 := <-chCheck
chc2 := <-chCheck
chc3 := <-chCheck
chc4 := <-chCheck
chc5 := <-chCheck
fmt.Println(chc1, "-", chc2, "-", chc3, "-", chc4, "-", chc5)
chc6, ok := <-chCheck
fmt.Printf("chc5:值%v,有效性:%v\n", chc6, ok)
}(chCheck)
time.Sleep(time.Second * 3)
//遍历通道,通道关闭以后遍历读取会自动被通知退出
for v := range chCheck{
time.Sleep(time.Second)
fmt.Println(v)
}
}
2.通道的读写和异常
关闭通道的几个注意事项:
- 不能关闭一个没有初始化的通道
- 通道不能重复关闭
- 不能往已经关闭的通道中写入数据,但是可以从中读数据
func BaseChanner02() {
var ch1 chan int
close(ch1) // panic: close of nil channel
ints1 := make(chan int, 1)
ints1 <- 111
close(ints1)
close(ints1) //panic: close of closed channel]
ints := make(chan int, 1)
ints <- 111
close(ints)
ints <- 111 //panic: send on closed channel
}
3.无缓存的通道
使用一个无缓存的通道时应该注意,它是阻塞的:
- 没人读就永远写不进(阻塞)
- 没人写就永远读不出(阻塞)
func BaseChanner03() {
chInt := make(chan int)
go func(ch chan int) {
fmt.Println("启动协程1")
ch <- 111
close(ch)
fmt.Println("结束协程1")
}(chInt)
go func(ch chan int) {
fmt.Println("启动协程2")
fmt.Println("通道数据:", <-ch)
fmt.Println("结束协程2")
}(chInt)
for {
time.Sleep(time.Second)
}
}
4.有缓存能力的通道
可利用通道缓存能力进行协程调度,通道的元素个数或称缓存能力,决定协程是否产生阻塞,若通道数据已满则阻塞,写入阻塞读出也阻塞,这是相互的。
func BaseChanner04() {
//创建一个缓存能力为3的整型通道
chInt := make(chan int, 3)
go func(ch chan int) {
fmt.Println("启动协程1")
for i := 1; i <= 5; i++ {
time.Sleep(time.Second * 2)
fmt.Println("协程1写入数据:", i)
ch <- i
}
fmt.Println("结束协程1")
}(chInt)
go func(ch chan int) {
fmt.Println("启动协程2")
for i := 1; i <= 5; i++ {
num := <-ch
fmt.Println("协程2读出数据:", num)
}
fmt.Println("结束协程2")
}(chInt)
for {
time.Sleep(time.Second * 3)
}
}
5.select选择通道,协程多路复用
在讲到Go外壳:分支专题是提到select,select关键字是go特有的,其主要用于配合通道实现多路复用。
func BaseChanner05() {
//创建三个通道
ch1 := make(chan int, 3)
ch2 := make(chan int, 4)
ch3 := make(chan int, 5)
//创建3条协程
go func(c chan int) {
ticker := time.NewTicker(time.Second * 1)
for {
<-ticker.C
c <- 1
}
}(ch1)
go func(c chan int) {
ticker := time.NewTicker(time.Second * 2)
for {
<-ticker.C
c <- 2
}
}(ch2)
go func(c chan int) {
ticker := time.NewTicker(time.Second * 3)
for {
<-ticker.C
c <- 3
}
}(ch3)
//time.Sleep(time.Second)
//主协程select多路复用,for不断获取不同通道的数据,随先来则优先处理谁。
for {
select {
case chV1, ok := <-ch1:
fmt.Printf("通道ch1输出:%v,有效性%v\n", chV1, ok)
case chV2, ok := <-ch2:
fmt.Printf("通道ch2输出:%v,有效性%v\n", chV2, ok)
case chV3, ok := <-ch3:
fmt.Printf("通道ch3输出:%v,有效性%v\n", chV3, ok)
}
}
6.通过容量控制并发数
利用有容量通道的阻塞能力——地铁闸机模型
func BaseChanner06() {
//创建一个容量为5的通道,无论协程开多少,控制每次5条并发
semaphore := make(chan int, 5)
for i := 1; i <= 100; i++ {
go func(c chan int, n int) {
for {
c <- i //抢通道写入,抢不到则阻塞
fmt.Println("协程", n, "抢到通道")
time.Sleep(time.Second)
<-c //做完操作后自己读出,空出容量
}
}(semaphore, i)
}
for {
time.Sleep(time.Second)
}
}
7.定时器
固定时间和周期时长定时器,其与time.sleep()区别是可以终止和重置定时器。下面演示一下timer和ticker,以及在子协程中终止ticker
func BaseChannel07() {
//使用固定时间定时
timer := time.NewTimer(time.Second * 3)
<-timer.C
fmt.Println("父协程定时3秒输出!!!")
//简单的使用变量标识ticker状态
var tickerStopped = false
//使用周期定时器
ticker := time.NewTicker(time.Second * 1)
go func(t *time.Ticker) {
//5秒后终止周期器
//使用固定时间定时
timer := time.NewTimer(time.Second * 5)
<-timer.C
fmt.Println("子协程定时5秒关闭周期器!!!")
t.Stop()
tickerStopped = true
runtime.Goexit()
}(ticker)
for {
if !tickerStopped {
s := <-ticker.C
fmt.Println("父协程每隔1秒输出!!!", s)
} else {
fmt.Println("子协程已关闭周期定时器!!!")
os.Exit(0)
}
}
//不可撤销的time.Sleep()
}
8.如何优雅的关闭通道
比较优雅的方式一般建议在发送方关闭。
协程对通道的操作分几种情况:
- 一个发送者,一个接收者;
在确认只有一个发送者的情况下,通道关闭时比较简单的,写入完成后记得立即或延迟关闭通道即可,接收者在读完所有数据后会校验读取数据的有效性。
- 一个发送者,多个接收者;
这种情况与上一种类似,如果其他多个协程的接受者在遍历读取,它们都会收到通道被关闭的广播,并退出退出遍历阻塞状态。
- 多个发送者,一个接收者;
这种情况比较复杂,各个发送者都不能粗暴的关闭通道,虽然可以通过sync.Once控制只关闭一次,但其他发送者仍有可能向一个已被关闭的通道发送数据。这里可能需要其他状态量标志各个发送者的完成状态,接收者监控该状态量确认各发送者写入完成后,由接收者关闭通道。可以使用等待组、状态变量、或其他通道等等。
- 多个发送者,多个接收者;
一般这种状况较少,多个协程一起对同一个通道进行读写,一般都需要一个管理者监控该通道的读写完成情况,如单独一个协程检测各个操作该通道的协程的完成状态,当全部读写完成后再进行通道关闭。