这篇文章大部分内容来自:https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md
这篇教程基于一系列的例子来讲解
kotlinx.coroutines
的核心特性
笔者使用的kotlin版本为1.2.51
,协程核心库的版本为0.23.4
注意:协程库还处于实验阶段,API是不稳定的,谨慎用于生产环境
简介&安装
作为一个语言,kotlin仅在标准库里提供最少的底层API,从而让其他库能利用协程。不像其他有相似能力的语言,async
和await
不是kotlin的关键字,甚至不是标准库的一部分。
kotlinx.coroutines
是一个非常丰富的库,包含若干高层协程启动机制(launch
,async
等)。你需要添加kotlinx-coroutines-core
模块的依赖才能在你的项目中使用这些机制。
<!-- 笔者写这篇文章时,最新的kotlin版本为1.2.51 -->
<properties>
<kotlin.version>1.2.51</kotlin.version>
</properties>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>0.23.4</version>
</dependency>
基本概念
这个章节覆盖了协程的基本概念。
你的第一个协程
运行下面的代码:
fun main(args: Array<String>) {
launch { // 在后台启动一个新的协程,然后继续执行
delay(1000L) // 不阻塞的延迟1s
println("World!") // 延迟后打印
}
println("Hello,") // 当协程延迟时,主线程还在跑
Thread.sleep(2000L) // 阻塞主线程2s,为了让jvm不挂掉
}
运行结果:
Hello,
World!
本质上,协程是轻量级的线程。可以使用launch
协程建造器启动。你可以将launch { ... }
替换为thread { ... }
,delay(...)
替换为Thread.sleep(...)
以达到相同的效果。试试看。
如果你只把launch
替换为thread
,编译器会产生如下错误:
Suspend functions are only allowed to be called from a coroutine or another suspend function
这是因为delay
是一个特殊的函数,这里暂且称之为挂起函数,它不会阻塞线程,但是会挂起协程,而且它只能在协程中使用。
连接阻塞和非阻塞世界
第一个例子在同一块代码中混合了非阻塞的delay(...)
和阻塞的Thread.sleep(...)
,很容易就搞晕了哪个是阻塞的,哪个是非阻塞的。下面,我们使用runBlocking
协程建造器,明确指明阻塞:
fun main(args: Array<String>) {
launch { // 在后台启动一个新的协程,然后继续执行
delay(1000L)
println("World!")
}
println("Hello,") // 主线程立即继续跑
runBlocking { // 这块阻塞了主线程
delay(2000L) // 延迟2s,让jvm不挂掉
}
}
结果还是一样的,但是这代码只用了非阻塞的dalay
。主线程调用了runBlocking
,然后一直被阻塞,一直到runBlocking
执行完成。
这个例子可以改得更符合语言习惯些,用runBlocking
包装主函数的执行:
fun main(args: Array<String>) = runBlocking<Unit> { // 开始主协程
launch { // 在后台启动一个新的协程,然后继续执行
delay(1000L)
println("World!")
}
println("Hello,") // 主协程立即继续跑
delay(2000L) // 延迟2s,让jvm不挂掉
}
这里runBlocking<Unit> { ... }
的作用像一个适配器,用来启动顶层的主协程。明确指定是Unit
返回类型,是因为一个格式良好的kotlin主函数必须返回Unit
。
下面是为挂起函数写单元测试的方法:
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// 这里我们可以通过任何我们喜欢的断言风格使用挂起函数
}
}
等待任务(job)
当另一个协程在运行时,延迟一段时间并不是一个好办法。让我们明确的等待(非阻塞的方式),直到我们启动的后台任务完成:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { // 启动一个新协程,并创建一个对其任务的引用
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // 等到子协程完成
}
结果还是一样的,但是主协程和后台任务没有用后台任务的执行时间联系在一起。好多了。
提取函数重构
让我们来提取出launch { ... }
块中的代码到另一个函数中。当你用“提取函数”重构这块代码时,你会得到一个用suspend
修饰的新函数。这是你一个挂起函数。挂起函数可用于协程中,就像使用普通函数一样,但是它们有额外的特性——可以调用其他的挂起函数去挂起协程的执行,像这个例子中的delay
。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { doWorld() }
println("Hello,")
job.join()
}
// 这是你第一个挂起函数
suspend fun doWorld() {
delay(1000L)
println("World!")
}
协程是轻量级的
运行下面的代码:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) { // 启动大量的协程,并返回它们的任务
launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // 等待其他全部的任务完成
}
这里启动了十万个协程,一秒之后,每个协程打印了一个点。你用线程试试?(很有可能就OOM了)
协程像守护线程
下面的代码启动了一个长时间运行的协程,一秒打印两次"I'm sleeping",然后延迟一段后,从主函数返回:
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延迟后就退出
}
你运行看看,打印了三行,然后就结束了:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
活跃的协程并不会保活进程,所以它更像守护线程。
取消和超时
这个章节包含了协程的取消和超时。
取消协程执行
在小应用中,从主函数返回看起来是个结束所有协程的好办法。在更大的、长时间运行的应用中,需要更细粒度的控制。launch
函数返回了一个可以取消协程执行的Job
:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延迟一小会
println("main: I'm tired of waiting!")
job.cancel() // 取消任务
job.join() // 等待任务结束
println("main: Now I can quit.")
}
运行,产生如下输出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
在调用job.cancel
不久后,因为协程被取消掉了,所以看不到任何输出了。Job
的扩展函数cancelAndJoin
结合了cancel
和join
的作用。
取消是需要配合的
协程的取消是需要配合的,协程的代码必须可配合取消。所有kotlinx.coroutines
中的挂起函数都是可取消的,这些挂起函数会检查协程的取消状态,若已取消则抛出CancellationException
。然而,如果协程正处于运算中,没有检查取消状态,那么其不可被取消,如下所示:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // 浪费CPU的循环运算
// 2秒打印一个消息
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 延迟一会
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消任务,并等待其结束
println("main: Now I can quit.")
}
运行看看。结果是,在取消之后,其持续打印"I'm sleeping",直到循环5次之后,任务自己结束。
使运算代码可取消
两种方式使运算代码可取消。
- 周期执行挂起函数,检查取消状态。
yield
函数是达到这个目的的好办法。 - 显式的检查取消状态。
我们来尝试下第二种方式:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // 可取消的运算
// 一秒打印两次消息
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 延迟一会
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消任务,并等待其结束
println("main: Now I can quit.")
}
现在,循环就是可取消的了。isActive
是协程内CoroutineScope
对象的的一个属性。
用finally释放资源
可取消的挂起函数在取消时会抛出CancellationException
,通常的方式就可以处理了。例如,try {...} finally {...}
表达式或Kotlinuse
(use api)函数,会在协程取消时,执行结束动作。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(1300L) // 延迟一会
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消任务,并等待其结束
println("main: Now I can quit.")
}
join
和cancelAndJoin
都会等所有结束动作完成,因此以上代码的输出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.
运行不可取消的代码块
任何尝试在finally
块中使用挂起函数均会产生CancellationException
,因为运行代码的协程已经被取消了。通常,这不是个问题,因为所有有良好实现的关闭操作(关闭文件,取消任务,或关闭任何种类的沟通通道)通常是非阻塞的,并不需要挂起函数参与。但是,在很少的情况下,你需要在取消的协程中进行挂起操作,那么你可以将相应代码的使用withContext(NonCancellable) {...}
包装,这里使用了withContext
函数和NonCancellable
上下文,如下所示:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // 延迟一会
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消任务,并等待其结束
println("main: Now I can quit.")
}
超时
超时,是实际应用中取消协程执行最显而易见的原因,因为其执行时间超时了。你还在用手动记录相应任务的引用,然后启动另一个协程在延迟一段时间后取消记录的那个协程?不用那么麻烦啦,这里有个`withTimeout``函数,帮你做了这些工作。看看吧:
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
输出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
at kotlinx.coroutines.experimental.ScheduledKt.TimeoutCancellationException(Scheduled.kt:202)
at kotlinx.coroutines.experimental.TimeoutCoroutine.run(Scheduled.kt:100)
at kotlinx.coroutines.experimental.EventLoopBase$DelayedRunnableTask.run(EventLoop.kt:322)
at kotlinx.coroutines.experimental.EventLoopBase.processNextEvent(EventLoop.kt:148)
at kotlinx.coroutines.experimental.BlockingCoroutine.joinBlocking(Builders.kt:82)
at kotlinx.coroutines.experimental.BuildersKt__BuildersKt.runBlocking(Builders.kt:58)
...
TimeoutCancellationException
是由withTimeout
抛出的CancellationException
的子类。之前,我们没有在控制台看到过异常堆栈信息,因为在一个取消了的协程中,CancellationException
通常是一个结束协程的正常原因。然而,这个例子中,我们正好在main
函数中使用了withTimeout
。
因为取消是一个异常,因此所有的资源将要被正常的关闭。如果你需要针对超时做一些额外的处理,可以将代码用try {...} catch (e: TimeoutCancellationException) {...}
包装,或者使用与withTimeout
类似的withTimeoutOrNull
,后者返回null
而不是抛出异常:
fun main(args: Array<String>) = runBlocking<Unit> {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
这次就没有异常了:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
组合挂起函数
这个章节覆盖了组合挂起函数的多种方式。
默认是顺序的
假设我们有俩定义好有用的挂起函数,例如远程服务调用,或者计算。这里,我们先假设这俩有用,实际上就是延迟一小会:
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // 假装有一波骚操作
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // 假装有一波骚操作
return 29
}
如果我们要顺序执行他们,先执行doSomethingUsefulOne
,再执行doSomethingUsefulTwo
,然后其计算结果之和,怎么搞?实际使用中,需要用第一个函数的返回值来判断是否需要调用第二个函数或如何去调,才会这么做。
我们用顺序调用就可以了,因为协程中的代码和普通的代码一样,默认是顺序执行的。下面的例子通过测量俩挂起函数总的执行时间来演示:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}
结果近似如下:
The answer is 42
Completed in 2017 ms
用async来并发
如果doSomethingUsefulOne
和doSomethingUsefulTwo
的执行没有依赖关系,我们想通过并发来更快的获取到结果,那该怎么做呢?async
就是干这茬的。
概念上来讲,async
就跟launch
类似。其启动了一个与其他协程并发运行单独协程(轻量级线程)。区别是,launch
返回了一个不携带任何结果的Job
,但是async
返回了一个Deferred
,一个轻量级非阻塞的future,表示一会就会返回结果的承诺。你可以在一个延期的值(deferred value)使用.await()
来获取最终的结果,但Deferred
也是个Job
,因此,需要的话,你也可以取消掉。
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
产生如下输出:
The answer is 42
Completed in 1017 ms
快了两倍,因为用了两个协程并发执行。注意,协程的并发性总是明确的(多个协程同时运行,那么肯定是并发的)。
懒启动async
async
有个懒加载选项,配置其可选参数start
,值设置为CoroutineStart.LAZY
。只在值被await
需要时,或start
函数被调用时才启动协程。运行下面的例子,跟前面的例子就多了个选项:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
产生如下输出:
The answer is 42
Completed in 2017 ms
好吧,又回到了顺序执行,首先我们启动并等待one
,然后启动并等待two
。 这并不是懒执行的预期场景。这个设计是用来替换标准的lazy
函数,如果其计算涉及了挂起函数。
异步风格的函数
我们可以用async
协程建造器定义异步风格的函数,异步的调用doSomethingUsefulOne
和 doSomethingUsefulTwo
。给这些函数加上Async
后缀是一个很好的风格,强调了他们是异步计算的,需要用延期的值来获取结果。
// somethingUsefulOneAsync 的结果是 Deferred<Int> 类型
fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}
// somethingUsefulTwoAsync 的结果是 Deferred<Int> 类型
fun somethingUsefulTwoAsync() = async {
doSomethingUsefulTwo()
}
注意,这些xxxAsync
函数不是挂起函数,它们随处均可使用。但是它们的使用总是意味着其行为是异步(也相当于并发)执行的。
下面的例子展示了在协程之外的使用:
// 注意,这里没有用runBlocking
fun main(args: Array<String>) {
val time = measureTimeMillis {
// 我们可以在协程外部初始化异步操作
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// 但是等待结果必须涉及挂起或阻塞
// 这里,我们用`runBlocking { ... }`阻塞主线程来获取结果
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
结果如下:
The answer is 42
Completed in 1128 ms
协程的上下文(context)和调度器(dispatchers)
协程总是在上下文中执行,上下文代表的值是CoroutineContext
,定义在Kotlin标准库中。
协程上下文是一系列的元素,主要的元素包括我们之前看到过的协程的Job
,还有调度器,这个章节会介绍。
调度器和线程
协程上下文包括了一个决定相应协程在哪个或哪些线程执行的协程调度器(参见 CoroutineDispatcher)。协程调度器可以限制协程在具体的线程中执行,或调度到一个线程池,或者无限制运行。
所有像launch
或async
一样的协程建造器都接受一个可选的CoroutineContext
参数,这个参数可以用来显式指定调度器和其他上下文元素。
尝试下面的例子:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // 没有限制 - 将在主线程执行
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // 父协程的上下文,runBlocking 协程
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // 将会调度到ForkJoinPool.commonPool(或等价的地方)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // 新线程
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
产生如下输出(可能顺序不同):
'Unconfined': I'm working in thread main
'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main
之前章节中使用的默认调度器是DefaultDispatcher
,当前的实现中等同于CommonPool
。因此,launch { ... }
==launch(DefaultDispatcher) { ... }
==launch(CommonPool) { ... }
。
父coroutineContext
和Unconfined
上下文的区别一会看。
注意,newSingleThreadContext
创建了一个新的线程,这是非常昂贵的资源。在实际应用中,要么用完之后就用close
函数回收,要么就存储在顶层变量中,在应用中到处复用。
非限制(Unconfined) VS 限制(confined) 调度器
Unconfined
协程调度器在调用线程启动协程,但直到第一个挂起点之前。在挂起之后在什么线程恢复全权由之前调用的挂起函数决定。Unconfined
调度器适合在协程不消耗CPU时间或不更新任何限制于特定线程共享数据(类似UI)的场景。
再说coroutineContext
属性,它在任何协程中均可用,引用当前协程的上下文。通过这种方式,父上下文可以被继承。特别的,runBlocking
创建的协程默认调度器限定到调用者线程,因此,继承runBlocking
的上下文就有了使用可预测的先进先出调度限制在这个线程内执行的作用。
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // 没有限制 -- 在主线程运行
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // 父(runBlocking协程)上下文,
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
输出结果:
'Unconfined': I'm working in thread main
'coroutineContext': I'm working in thread main
'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main
因此,继承了runBlocking {...}
的coroutineContext
的协程继续在main
线程执行,而没有限制的协程在delay
函数使用的默认线程池线程中恢复。
调试协程和线程
协程用Unconfined
或默认的多线程调度器可以从一个线程挂起,从另一个线程恢复。即使是用单线程的调度器,也很难知道协程在什么地方,什么时候在干什么。在多线程应用中,在日志中打印出线程的名字是一个通常的做法。一般的日志框架也是支持这个特性的。但当使用协程时,仅线程名称对上下文的描述不够充分,因此,kotlinx.coroutines
包含的设施让调试更容易。
给JVM参数加上-Dkotlinx.coroutines.debug
,然后运行下面的代码:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking<Unit> {
val a = async(coroutineContext) {
log("I'm computing a piece of the answer")
6
}
val b = async(coroutineContext) {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
}
三个协程:
主协程(#1) - runBlocking
创建的协程
a
(#2)、b
(#3) - 两个计算延迟返回值的协程
都在runBlocking
的上下文限定在主线程中执行,输出如下:
[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42
log
函数在方括号中打印出线程名称和当前执行的协程标识,调试模式开启的时候,这个标识会连续的赋值给创建的协程。
线程间切换
给JVM参数加上-Dkotlinx.coroutines.debug,然后运行下面的代码:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
这个例子演示了几种新技术。一是使用runBlocking
时,指定了特定的上下文;二是使用withContext
函数切换协程的上下文,但依然是在相同的协程中执行。输出如下:
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
注意:这里使用了kotlin标准库里的use
函数,用于当newSingleThreadContext
创建的线程不再被需要时,将其释放。
上下文中的任务(Job)
协程的任务是上下文的一部分。协程可以取出自己上下文的任务,用coroutineContext[Job]
表达式:
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext[Job]}")
}
在调试模式下,输出如下:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
因此,CoroutineScope
中的isActive
是coroutineContext[Job]?.isActive == true
的便捷写法。
协程的父子关系
当协程的coroutineContext
用来启动另一个协程,那么新协程的Job
就成了父协程Job
的儿子。想父协程取消的时候,所有的子协程也会递归取消。
fun main(args: Array<String>) = runBlocking<Unit> {
// 启动一个协程来处理请求
val request = launch {
// 生成两个任务,一个有自己的上下文
val job1 = launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// 另一个继承父上下文
val job2 = launch(coroutineContext) {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
// 当子任务完成,请求才算完成
job1.join()
job2.join()
}
delay(500)
request.cancel() // 取消请求
delay(1000) // 延迟1s,看看会发生什么
println("main: Who has survived request cancellation?")
}
输出如下:
job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
结合上下文
协程上下文可以用+
操作符结合。右手边的上下文替换掉左手边上下文相关的条目。例如,协程的Job
可以被继承,但调度器会被替换。
fun main(args: Array<String>) = runBlocking<Unit> {
// 启动一个协程处理请求
val request = launch(coroutineContext) { // 使用 `runBlocking` 的上下文
// 在CommonPool中创建CPU密集型的任务
val job = launch(coroutineContext + CommonPool) {
println("job: I am a child of the request coroutine, but with a different dispatcher")
delay(1000)
println("job: I will not execute this line if my parent request is cancelled")
}
job.join() // 子任务完成时,请求完成
}
delay(500)
request.cancel() // 取消请求的处理
delay(1000) // 延迟1s看看有啥发生
println("main: Who has survived request cancellation?")
}
预期结果如下:
job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?
当爹的责任
父协程总是会等所有的子协程执行完成。父协程不必显式的记录所有其启动的子协程,也不必使用Job.join
等待其子协程执行完成。
fun main(args: Array<String>) = runBlocking<Unit> {
// 启动一个协程处理请求
val request = launch {
repeat(3) { i -> // 启动几个子协程
launch(coroutineContext) {
delay((i + 1) * 200L) // 可变延迟 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // 等待请求完成,也包括其子协程
println("Now processing of the request is complete")
}
结果如下:
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
给协程命名方便调试
当协程日志很频繁或者你只想关联相同协程产生的日志记录时,自动生成id是挺好的。然而,当协程固定的处理一个特别的请求,或者处理特定的后台任务,为了调试,还是命名比较好。CoroutineName
上下文元素与线程名称的功能一致,在调试默认打开的时,执行协程的线程名称将会展示为CoroutineName
。
下面的例子展示了这个理念:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
log("Started main coroutine")
// 启动两个后台计算
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2")
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}
当有JVM参数-Dkotlinx.coroutines.debug
时,产生如下结果:
[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42
通过指定任务取消执行
现在,我们已经了解了上下文,父子关系和任务,让我们把这些玩意儿放一块耍耍。假设我们的应用有一个有生命周期的对象,这个对象不是协程。例如,我们写一个Android应用时,在Android Activity上下文中启动了各种各样的协程用于异步获取数据和动画计算。当activity销毁时,所有的协程都得取消掉,避免内存泄漏。
我们一个创建一个跟activity绑定的Job
实例,用于管理我们的协程。Job
实例由Job()
工厂创建,等会例子会演示。为了方便理解,我们可以launch(coroutineContext, parent = job)
这样写,说明用了父job
,而不是用launch(coroutineContext + job)
表达式。
现在,一个Job.cancel
调用,将会所有我们启动的所有协程。此外,Job.join
等待所有子协程完成,因此在下面的例子中我们也可以用cancelAndJoin
:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = Job() // 创建一个Job来专利我们自己协程的生命周期
// 为了演示,启动10个协程,每个运行不同的时间
val coroutines = List(10) { i ->
// 都是我们job对象的儿子
launch(coroutineContext, parent = job) { // 使用runBlocking的上下文,但是用我们自己的job
delay((i + 1) * 200L) // 花样等待
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // 延迟500ms
println("Cancelling the job!")
job.cancelAndJoin() // 取消所有的任务,并等待其完成
}
输出如下:
Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!
如你所见,只有前两个协程打印了消息,其他都被一单个job.cancelAndJoin()
给取消掉了。所以,在我们假想的Android应用中,需要做的,就是在activity创建的时候创建一个父job,然后在子协程创建的时候使用这个job,最后,在activity销毁时取消掉这个job即可。在Android生命周期中,我们不能join
它们,因为是同步的。在建造后端服务时,join
用于保证有限的资源访问是很有用的。
通道(Channels)
延期值(Deferred values)提供了一个在协程中转移单个值的便捷方式。通道提供了一个方式来转移数据流。
通道基础
Channel
在概念上与BlockingQueue
非常相似。不同之处是前者用可挂起的send
替代后者阻塞的put
操作,前者用可挂起的receive替代后者是阻塞的take
操作。
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
// 这里可能是重度消耗CPU的计算,或是异步逻辑,这里我们就发送几个平方数
for (x in 1..5) channel.send(x * x)
}
// 这里打印5个收到的整数
repeat(5) { println(channel.receive()) }
println("Done!")
}
结果如下:
1
4
9
16
25
Done!
关闭和遍历通道
不像队列,通道可以关闭用于表明没有更多的元素会过来了。在接收端,用for
循环可以很方便的接受通道中传来的元素。
概念上来说,close
就像给通道传递一个特殊的关闭令牌。在接受到关闭令牌之时,迭代就会停止,因此,这里保证了关闭之前发送的元素都被接收到了:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 发完了
}
// 用for循环打印接收值 (在通过关闭之前)
for (y in channel) println(y)
println("Done!")
}
建造通道生产者
协程生产一序列元素的模式是比较常见的。这是在并发代码中经常可以发现的生产者-消费者模式的一部分。你可以将生产者抽象为一个以通道为参数的函数,但与常识相背的是,结果需要被函数返回。
fun produceSquares() = produce<Int> {
for (x in 1..5) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
流水线
流水线,一种协程可产生无限数据流的模式。
fun produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 从1开始的无限整数流
}
另一个协程或多个协程会消费这个流,做一些处理,然后产出结果。下面的例子中,这些数会被平方:
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
下面的代码启动然后连接整个流水线:
fun main(args: Array<String>) = runBlocking<Unit> {
val numbers = produceNumbers() // 产生从1开始的整数流
val squares = square(numbers) // 平方整数
for (i in 1..5) println(squares.receive()) // 打印前五个
println("Done!")
squares.cancel() // 在大型应用中,需要关闭这些协程
numbers.cancel()
}
在上面这个例子中,我们不用取消掉这些协程,因为协程就像守护线程一样。但是在大点的应用中,如果我们不需要了,那就要停止掉流水线。或者,我们可以将流水线协程作为主协程的儿子,接下来会演示。
流水线获取素数
下面举例将流水线应用到极致——用流水线协程生成素数。首先,生成一个无限的整数序列。这次我们传如一个context
参数,并将这个参数传递给produce
建造器,因此,调用方可以控制协程在哪跑:
fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
var x = start
while (true) send(x++) // 从start开始的无限整数流
}
下面的流水线过滤掉了所有不能被给定素数除尽的数:
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
for (x in numbers) if (x % prime != 0) send(x)
}
现在,我们建立一个从2开始的整数流,从当前的通道获取素数,然后为找到的素数开启新的通道:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
下面的例子打印了前十个素数,在主线程的上下文中运行了整个流水线。因为所有的协程是作为runBlocking
协程上下文的儿子启动的,所以我们不必将所有我们启动的协程列下来,我们用cancelChildren
扩展函数取消所有的子协程。
fun main(args: Array<String>) = runBlocking<Unit> {
var cur = numbersFrom(coroutineContext, 2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren() // 取消所有的子协程,从而让主线程退出
}
输出如下:
2
3
5
7
11
13
17
19
23
29
注意,你可以用标准库中的buildIterator
协程建造器建造相同的流水线。将produce
用buildIterator
替换,send
用yield
替换,receive
用next
替换
,ReceiveChannel
用Iterator
替换,并去掉上下文。你也不用runBlocking
了。然而,上面展示的流水线使用通道的好处,就是能够充分利用多核CPU(如果在CommonPool
上下文运行)。
扇出
多个协程可以从同一个通道接收,任务散布于多个协程之间。让我们从一个周期产生整数(1秒10个数)的生产者协程开始:
fun produceNumbers() = produce<Int> {
var x = 1 // 从1开始
while (true) {
send(x++) // 生产下一个
delay(100) // 等1s
}
}
我们可以有多个处理者协程,在这个例子中,就只打印他们的id和接收到的数字:
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
让我们启动5个处理者,让它们运行将近1秒,看看会发生什么:
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 取消掉生产者协程,这样就能将所有的协程取消
}
输出与下面的结果类似,尽管接收每个特定整数的处理器ID可能不同。
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
注意,取消生产者协程关闭其通道,最终会结束处理者协程遍历通道。
同样的,注意我们在launchProcessor
代码中如何用for
循环遍历通道实现扇出的。不像consumeEach
,for
循环风格在多协程使用时是妥妥安全的。如果,其中的一个协程挂掉了,其他的协程还会继续处理通道。而当处理者用consumeEach
遍历时,正常或非正常结束都会将通道给取消掉。
扇入
多个协程可以向相同通道发送。例如,我们有一个字符串通道,一个有特定延迟周期发送特定字符串到通道的挂起函数。
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
现在,来看看如果启动两个协程发送字符串会怎么样(这个例子中,我们在主线程上下文中启动它们):
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收头6个
println(channel.receive())
}
coroutineContext.cancelChildren() // 取消所有的子协程,让主线程结束
}
输出是:
foo
foo
BAR!
foo
foo
BAR!
带缓冲区的通道
前面展示的通道都是不带缓冲的。没有缓冲区的通道只有在发送者和接收者见到了彼此才传递元素。如果发送先调用了,那么它将挂起直到接收被调用;如果接收先调用了,那么它将挂起直到发送被调用。
Channel()
工厂函数和produce
建造器接收一个可选的用来指定缓冲区大小capacity
参数,缓冲区可以让发送者在挂起之前发送多个元素,跟指定容量的BlockingQueue
类似,在缓冲区满了之后阻塞。
看看下面的代码会有啥效果:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // 创建带缓冲区的通道
val sender = launch(coroutineContext) { // 启动发送者协程
repeat(10) {
println("Sending $it") // 在发送之前先打印
channel.send(it) // 将会在缓冲区满的时候挂起
}
}
// 什么都不做,等着
delay(1000)
sender.cancel() // 取消发送者协程
}
用缓冲区大小为4的通道,打印了5次。
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
头4个元素加入到了缓冲区,当试图加入第五个的时候,发送者被挂起了。
时钟通道
时钟通道是一种特别的单缓冲区通道,每次自上次从此通道消费后,在给定时间后会生产一个Unit
。单独使用看起来像没什么用,但是在构建复杂的基于时间的生产流水线,然后操作者做一些窗口和其他基于时间的处理时特别有用。
用ticker
工厂方法创建时钟通道,后续元素不再需要时,用ReceiveChannel.cancel
取消掉。
看看实际中如何应用:
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 100, initialDelay = 0) // 创建时钟通道
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // 最初的延迟还没结束
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // 后续的元素都有100ms延迟
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// 模拟长时间消费延迟
println("Consumer pauses for 150ms")
delay(150)
// 下个元素立即可用
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// receive方法调用间的暂停也算进去了,下一个元素会更快收到
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // 后面的不要了
}
会打印下面几行:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
ticker
知道消费者停顿,如果有停顿,默认调整下次产生产生元素的延迟,尝试维护产生元素的固定速率。
一个可选的参数mode
,如果指定为TickerMode.FIXED_PERIOD
,那么ticker
会维护一个元素间固定延迟。默认是TickerMode.FIXED_DELAY
。
这里多讲讲两个模式的区别,后面再举个例子说明区别。
TickerMode.FIXED_PERIOD
: 为了保持产生元素的速率,会调整下一个元素产生的延迟。
TickerMode.FIXED_DELAY
: 固定的延迟产生元素。
区分两个模式的例子。
fun log(msg: String) {
println("[${Date()}] $msg")
}
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 5000, initialDelay = 0, mode = TickerMode.FIXED_DELAY)
var i = 0
for (item in tickerChannel) {
log("receive $item")
val time = if (i++ % 2 == 0) 4000 else 6000 // 切换使用4s/6s延迟
delay(time)
}
}
如果用TickerMode.FIXED_DELAY
模式:
[Sun Jul 22 16:36:17 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:22 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:28 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:33 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:36:39 CST 2018] receive kotlin.Unit
如果用TickerMode.FIXED_PERIOD
模式:
[Sun Jul 22 16:43:52 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:43:57 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:03 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:07 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:13 CST 2018] receive kotlin.Unit
[Sun Jul 22 16:44:17 CST 2018] receive kotlin.Unit
第一次延迟都是5s,后面的区别是FIXED_DELAY
延迟在5/6s间切换;FIXED_PERIOD
的延迟在4/6s间切换。相信大家已经能够区分了。
通道是公平的
对于从多个协程调用通道的顺序,向通道发送和接收操作是公平的。按照先进先出的顺序进出通道,例如,第一个调用receive
的协程获得元素。下面的例子中,两个协程("ping"和"pong")从同一个通道"table"接收"ball"对象。
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // 公用一张桌子
launch(coroutineContext) { player("ping", table) }
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // 发球
delay(1000) // 延迟一秒
coroutineContext.cancelChildren() // 游戏结束,取消它们
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // 在循环中接球
ball.hits++
println("$name $ball")
delay(300) // 等一会
table.send(ball) // 将球击回
}
}
"ping"协程先开始的,所以,它最先收到球。即使"ping"协程在将球击回桌面后立即再次开始接球,但球还是给"pong"协程接到了,因为"pong"早等着在了:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
有时候,通道可能会产生看起不公平的执行,是因协程使用到的线程池所致。
共享可变状态和并发
协程可以用多线程的调度器(例如默认的CommonPool
)并发执行。那么并发问题也接踵而至。主要问题是同步访问共享可变状态。在协程领域里,这个问题的解决方案有些与多线程领域中类似,但是有些则截然不同。
问题
让我们启动1000个协程,做同样的事情1000次(一共一百万次执行)。为了一会做比较,我们记录下执行时间:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // 启动协程的数量
val k = 1000 // 每个协程执行动作的次数
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
我们以一个非常简单的动作,在多线程的CommonPool
上下文下,累加一个共享的变量。
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
最终会打印出个啥?应该不会是"Counter = 1000000",因为1000个协程在多个线程同步的累加counter
而没有进行同步。
注意:如果你的计算机只有2核或更少,那么你会一直得到1000000,因为
CommonPool
在这种情况下是单线程的。为了“造成”这个问题,需要把代码改改:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // 定义一个2线程的上下文
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // 替代上面例子的CommonPool
counter++
}
println("Counter = $counter")
}
volatile也爱莫能助
有个常见的误解,以为volatile
可以解决并发问题。试试看:
@Volatile // kotlin 中, volatile 是个注解
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
代码跑得更慢了,但最终也没能得到"Counter = 1000000",因为volatile
保证了顺序的读和写,但是对大的操作(这里是累加)不保证原子性。
线程安全的数据结构
对协程和线程都通用的解决方案是利用一个线程安全的数据结构,这个数据结构提供对共享状态必要的同步操作。在上面的例子中,我们可以用AtomicInteger
类,它有个incrementAndGet
方法:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
这是针对这个问题最快的解决方法。这种解决方法适用于普通计数器,集合,队列和其他标准数据结构以及它们的基本操作。但是不容易扩展到复杂状态或没有现成的线程安全实现的复杂操作。
细粒度线程限制
线程限制一种解决共享可变状态的方式,它将共享变量的访问限制到单个线程上。在UI应用中很适用,因为UI状态一般都限制于事件派发或应用线程。在协程中用单线程上下文很容易实现:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // 在CommonPool中运行每个协程
withContext(counterContext) { // 但是在单个线程中累加
counter++
}
}
println("Counter = $counter")
}
这个代码跑的很慢,因为做到了细粒度的线程限制。每个独立的累加都利用withContext
块从多线程CommonPool
上下文切换到单线程的上下文。
粗粒度线程限制
实际应用中,线程限制是在大块中执行的。例如,一大块更新状态的业务逻辑限制于单个线程。例如下面这个例子,在单线程的上下文中运行每个协程:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // 在单线程的上下文中运行每个协程
counter++
}
println("Counter = $counter")
}
这就快多了,而且结果是对的。
互斥操作
互斥,利用一个用不会并发执行的临界区,保护对共享状态的修改。在阻塞的世界里,通常用synchronized
或ReentrantLock
达到互斥。协程中的替代品叫做Mutex
。它用lock
和unlock
方法界定临界区。关键不同之处是,Mutex.lock()
是挂起函数,不会阻塞线程。
有一个扩展函数withLock
,方便的帮你做了mutex.lock(); try { ... } finally { mutex.unlock() }
这事:
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
这个例子中的锁是细粒度的,因此,是有代价的。然而,在有些你必须周期修改共享状态的情况下,锁是个好选择。同时,这个状态没有限制到某个线程上。
Actors
actor
是由协程、限制并包含于此协程的状态、与其他协程交流的通道组成。一个简单的actor
可以写成一个函数,但是复杂状态的actor
更适合写成一个类。
有个actor
协程建造器,它可以方便地将actor
的邮箱通道组合到其范围内,以便从发送通道接收消息并将发送通道组合到生成的job
对象中。因此,单个actor
的引用就携带了上面所有的东西。
使用actor
的第一步是定义一个actor
要处理的消息类。Kotlin的密封类(sealed class)非常适合这个目的。首先定一个CounterMsg
密封类,子类IncCounter
作为增加计数器的消息,GetCounter
作为获取计数器值的消,后者需要发送回复。CompletableDeferred
表示将来已知的单个值,此处用于发送回复。
// counterActor的消息类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 增加计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 带回复的请求
然后我们定义一个使用actor
协程建造器启动actor
的函数:
// 此函数启动一个新的计数器actor
fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor 状态
for (msg in channel) { // 遍历进来的消息
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
主要代码很简单:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // 创建actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// 发送一个消息,用于从actor中获取计算器的值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // 关闭actor
}
actor
自身执行的上下文无关紧要。一个actor
是一个协程,协程是顺序执行的,因此,将状态限制在特定协程中可以解决共享可变状态的问题。实际上,actor
可以修改自己的私有状态,但只能通过消息相互影响(避免任何锁定)。
actor
比在负载下锁定更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文。
注意,
actor
协程构建器是produce
协程构建器的双重构件。actor
与它接收消息的通道相关联,produce
与它发送元素的通道相关联。
Select表达式
Select表达式可同时让多个挂起函数等待,并选择第一个使其激活。
从通道中选择
让我们先创建两个生产字符串的生产者:fizz
和buzz
。fizz
每300ms产生一个"Fizz"字符串:
fun fizz(context: CoroutineContext) = produce<String>(context) {
while (true) { // 每300ms发送一个"Fizz"
delay(300)
send("Fizz")
}
}
buzz
每500ms产生一个"Buzz"字符串:
fun buzz(context: CoroutineContext) = produce<String>(context) {
while (true) { // 每500ms发送一个"Buzz!"
delay(500)
send("Buzz!")
}
}
使用receive
挂起函数,我们可以从一个通道或另一个通道接收。但select
表达式允许我们使用其onReceive
子句同时从两者接收:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> 意味着select表达式没有返回
fizz.onReceive { value -> // 第一个子句
println("fizz -> '$value'")
}
buzz.onReceive { value -> // 第二个子句
println("buzz -> '$value'")
}
}
}
跑七次这个函数:
fun main(args: Array<String>) = runBlocking<Unit> {
val fizz = fizz(coroutineContext)
val buzz = buzz(coroutineContext)
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // 取消俩协程
}
结果如下:
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'
选择关闭
当通道关闭时,select
中的onReceive
子句报错,导致相应的select
抛出异常。我们可以使用onReceiveOrNull
子句在关闭通道时执行特定操作。以下示例还显示select
是一个返回其选择好的子句结果的表达式:
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
让我们来使用这个函数,传入产生"Hello"字符串四次的通道a
和产生"World"四次的频道b
:
fun main(args: Array<String>) = runBlocking<Unit> {
// 为了可预测结果,我们使用主线程上下文
val a = produce<String>(coroutineContext) {
repeat(4) { send("Hello $it") }
}
val b = produce<String>(coroutineContext) {
repeat(4) { send("World $it") }
}
repeat(8) { // 打印头8个结果
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
观察得处下列结论:
首先,select
倾向于第一个子句。当多个子句同时可选时,它们中的第一个被选择。这里,两个通道都不断的产生字符串,a
作为第一个,胜。然后,因为我们用的是不带缓冲池的通道,a
在调用send
时不时会挂起,就给了机会让b
也来一发。
选择发送
select
表达式具有onSend
子句,可以与选择的偏见性结合使用。
让我们写一个整数生产者的例子,当主通道上的消费者无法跟上它时,它会将其值发送到side
通道。
fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
for (num in 1..10) { // 产生10个数字
delay(100) // 每个延迟 100 ms
select<Unit> {
onSend(num) {} // 发送给主通道
side.onSend(num) {} // or to the side channel
}
}
}
消费者将会非常缓慢,需要250毫秒才能处理每个数字:
fun main(args: Array<String>) = runBlocking<Unit> {
val side = Channel<Int>() // 分配side通道
launch(coroutineContext) { // 给side通道一个快速的消费者
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(coroutineContext, side).consumeEach {
println("Consuming $it")
delay(250) // 慢慢消费,不着急
}
println("Done consuming")
coroutineContext.cancelChildren()
}
看看发生什么:
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
选择延期值
可以使用onAwait
子句选择延迟值。让我们从一个异步函数开始,该函数在随机延迟后返回一个延迟字符串值:
fun asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
让我们随机延迟开始十几个。
fun asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
现在,主函数等待第一个函数完成并计算仍处于活动状态的延迟值的数量。注意,我们在这里使用了select
表达式是Kotlin DSL的事实,因此我们可以使用任意代码为它提供子句。在这个例子中,我们遍历一个延迟值列表,为每个延迟值提供onAwait
子句。
fun main(args: Array<String>) = runBlocking<Unit> {
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}
输出如下:
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
切换延迟值的通道
让我们编写一个消费延迟字符串值通道的通道建造器函数,直到下一个延迟值结束或通道关闭之前,等待每个接收到的延迟值。此示例将同一select
中的onReceiveOrNull
和onAwait
子句放在一起:
fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // 从第一个收到的延迟值开始
while (isActive) { // 当通道没取消时循环
val next = select<Deferred<String>?> { // 从此select返回下一个延迟值或null
input.onReceiveOrNull { update ->
update // 换下一个值等
}
current.onAwait { value ->
send(value) // 发送当前延迟值产生的数据
input.receiveOrNull() // 使用输入通道的下一个延迟值
}
}
if (next == null) {
println("Channel was closed")
break // 退出循环
} else {
current = next
}
}
}
为了测试它,我们将使用一个简单的异步函数,它在指定的时间后返回指定的字符串:
fun asyncString(str: String, time: Long) = async {
delay(time)
str
}
main
函数只是启动一个协同程序来打印switchMapDeferreds
的结果并向它发送一些测试数据:
fun main(args: Array<String>) = runBlocking<Unit> {
val chan = Channel<Deferred<String>>() // 测试通道
launch(coroutineContext) { // 开启打印协程
for (s in switchMapDeferreds(chan))
println(s) // 打印收到的字符串
}
chan.send(asyncString("BEGIN", 100))
delay(200) // 够"BEGIN"生产出来了
chan.send(asyncString("Slow", 500))
delay(100) // 不够生产"Slow"的时间
chan.send(asyncString("Replace", 100))
delay(500) // 发送最后的字符串之前给点时间
chan.send(asyncString("END", 500))
delay(1000) // 给时间运行
chan.close() // 关闭通道
delay(500) // 等运行完
}
结果是:
BEGIN
Replace
END
Channel was closed