kotlin channel 入门
前言
最近项目中对 kotlin 的使用比较多。不得不说 kotlin 确实可以极大的提高 android 的开发效率,有许多之前得用 java 写非常多、非常啰嗦的样板代码的 case,用 kotlin 却可以几行搞定,四两拨千斤,同时逻辑表达也更加清晰。而 kotlin 对于 java 而言,最大的不同莫过于协程了。习惯了 kotlin 的协程,可能再也不想使用 java 的 handler + postDelay 了。因此,在这里,本人准备对 kotlin 协程中一些比较难以上手的点,进行说明和分析。这篇文章,将会带大家一起学习一下 kotlin 协程中 channel 的使用。
channel 概述
kotlin 中,我们常用 defer 来进行协助之间单个值的传递。比如,我们可能会写如下代码:
val deferred = GlobalScope.async {
// do something,
"this is a result"
}
deferred.await()
用来等待一个异步协程的结果。在结果返回之前,当前协程挂起。那么,如果我们想获取一系列的结果,应该怎么办呢?注意,这里的一系列的结果,不是说我们需要一个 list,而是说,我们想第一次 await()
的时候,得到一个值,然后再次 await()
的时候,还能获取到值。就像从一个队列里面不断的取出新的元素一样。
这个时候我们就可以使用 channel
了。channel
非常类似于一个 java 中非常常见的概念 BlockingQueue
。只不过,BlockingQueue
使用可以阻塞的 put
方法,而 channel
使用可以挂起的 send
方法;BlockingQueue
使用可以阻塞的 take
方法,而 channel
使用可以挂起的 receive
方法。所以,如果什么时候我们对于 channel
的理解产生了困惑,可以简单的把相关的内容类比到 BlockingQueue
中,来帮助我们进行理解。
channel 的用法
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
}
repeat(5) { println(channel.receive()) }
println("Done!")
简单说明一下上面的代码:我们有一个 channel ,我们会从这个 channel 中 receive 5 次。这五次一次获取到从 1 到 5 一共五个数字。
这个简单的代码片段,其实蕴含了非常重要的程序执行流程:
我们假设,根据代码的书写顺序,先执行到了 channel.send(1)
。根据上面阐述的内容,send
因为是一个挂起的方法,第一次只会执行 1,并把 1 放入到 channel
中。然后,receive
方法获取到 1。这个时候在 repeat(5) 的循环中,再次执行到 receive
的时候,因为 channel 中已经没有数了,所以 receive
会挂起。之后,协程会通过调度算法,让 channel.send(2 * 2) 执行,并让 channel.send(3 * 3)
挂起。再之后,channel.receive()
在经过调度之后,得到执行,获取到刚才 channel.send(2 * 2)
的结果,也就是 4 。以此类推。
- channel.send(1)
- 发送方挂起
- channel.receive(1)
- 接收方挂起
- channel.send(4)
- 发送方挂起
- channel.receive(4)
- 接收方挂起
。。。
channel 的关闭和遍历
channel
跟 queue
的一个不同的点就是,channel
是可以关闭的。close
这个动作,底层其实是给 channel
发送了一个消息。官方管这个东西叫 close token
。因为 channel 在接收到 close
消息的时候,会立刻停止在这个 channel
上的遍历的工作,所以 kotlin
会保证在 close
被调用之前已经在 channel
中的消息被 received
。
kotlin 为我们提供了一个简单的 channel 的遍历方法,也就是 for 循环,使用方法如下:
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
channel 的流水线模式
流水线模式的使用场景如下:一个协程不断的生产新的消息,其他协程不断的处理这些消息,并且在这个过程中可能返回新的结果。跟我们说的函数式编程中的 map(映射) 非常类似。
这个模式可以让我们很轻松的写出一些简洁而逻辑清晰的代码,比如,下面代码展示了如何生成素数的逻辑:
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
fun main() {
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
这样,执行 main()
方法之后,就会输出前 10 个素数。这里的原理也很简单。如果我们不考虑 kotlin
的语法问题,但从计算机的角度解决生成素数的问题,一种解法是,我们需要用一个 list 来存储已经找到的素数,然后,在对 n
进行自增的过程中,遍历所有已经找到的素数 list
,如果所有的素数都不能整除 n
,那么这个 n
就是新的素数。
用 pipeline
模式写出的代码,原理跟上面阐述的一样。只是上文所说的 list
被封装在了一层一层的 filter
中,最终执行的过程中,对于一个 n ,需要通过所有的 filter
,这跟上文说的遍历所有已经找到的素数列表的效果是一致的。
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
扇入和扇出
扇入:多对一,多个 channel 作为生产者,一个 channel 作为消费者。
扇出:一对多,一个 channel 作为生产者,多个 channel 作为消费者。
虽然概念有不同,但是,写法上,跟一对一的 channel 是一样的。
缓冲 channel
channel 默认的 capacity 是 1。这也就是我们上文说的,send 方法在第二次会挂起,因为中间没有 receive 来消费这个消息。直到有 receive 消费了上一个消息之后,刚才挂起的 send 才能恢复执行。当然,我们可以通过设置参数让这个 capacity 的值不为 1,比如4。那么,跟上面的分析是一样的,send 会执行四次,然后在第五次的时候挂起,直到有 receive 把消息给消费掉了之后,之前挂起的 send 才能继续恢复执行。
channel 的公平性
channel 是公平的。所以,他会严格的按照 first-in first-out 的顺序来执行。一个比较好的例子,就是模拟打乒乓球:
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
// prints
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
这里的公平性就体现在,ping
协程是先启动的,所以理应获得到 ball
。但是 ping
在通过调用 send
把 ball
送还给 channel
之后,又在循环的下一轮立刻请求获取 ball
。但是,因为 pong
的 receive
是比 ping
的下一个 receive
先调用的,(是在上一个 ping
的后面调用的),所以是 pong
得到 ball
而不是 ping
。
Ticker channels
channel
还有一种比较常用的用法,就是用来实现令牌系统。比如,我们现在的需求,是每 100 ms 产生一个令牌,那么我在 51ms 来取,肯定是获取不到的。但是我在 101ms 的时候来取,是可以获取到的。考虑一种情况,令牌没有得到及时的消费,比如,就是前 150ms 都没有消费,那么第 151ms 来的消费者是可以立刻获取到令牌的。但是,第 152ms 来的消费者是不能获取到令牌的。但是,第 201ms 过来的消费者是可以获取到令牌的。