调度
调度者应该做什么(代码如何写)
- 启动任务会议(协程构建)
- 复杂任务如何分配(线程池分配)
- 如何更快的解决问题(并发,异步)
- 调度者需要收到问题反馈(异常处理)
- 调度这需要最后执行任务的结果进行分析(合并结果)
启动任务会议在前一篇文章中已经提到,这一节主要叙述调度者在面对复杂任务时的高效分配已更好更快的解决问题?至于对问题的反馈也就是异常处理,我们在下一节会单独说。
线程池分配-调度器
kotlin默认有四种调度方式,如下图所示
调度种类 | 含义 |
---|---|
Default | 线程池 |
Main | UI线程 |
Unconfined | 直接执行 |
IO | 线程池(jvm上定义) |
- Default和IO的主要区别是在于平台,IO只定义在JVM平台,而Default在其他不同平台上也有定义比如js,如果都在JVM平台上,这两者背后都是一个线程池
- Main中UI线程的意思是基于jvm的UI平台中可用,比如Android,Swing,直接在Idea中是用不了的
- Unconfined是一种高级机制,现在可以简单理解为在协程体中,在遇到第一个挂起函数前的代码运行在原线程中,在执行挂起函数后,运行在子线程中,这个类似于启动模式中的UNDISPATCHED的模式。
- 上述几个调度方式,是通过协程的上下文进行传递,如下:
launch(start = CoroutineStart.DEFAULT, context = Dispatchers.Default) {
log(1)
delay(500)
log(2)
}
// output
Thread[DefaultDispatcher-worker-1,5,main]1
Thread[DefaultDispatcher-worker-1,5,main]2
Thread[main,5,main]end
- 小结: 其实kotlin的默认的几种调度方式总的来说就一种,就是讲复杂任务交给jvm中的线程池自己去分配。但这种方式不够灵活会带来局限性,比如会导致一些不必要的线程切换。所以kotlin的也提供了自定义调度器。
suspend fun test() {
Executors.newScheduledThreadPool(10)
.asCoroutineDispatcher().use { dispatcher ->
GlobalScope.launch(dispatcher) {
log(1)
// 这里会默认继承父协程的调度器
val job = launch {
log(2)
delay(1000)
log(3)
"Hello"
}
log(4)
val result = job.join()
log("5. $result")
}.join()
log(6)
}
}
// output
Thread[pool-1-thread-1,5,main]1
Thread[pool-1-thread-1,5,main]4
Thread[pool-1-thread-2,5,main]2
Thread[pool-1-thread-1,5,main]3
Thread[pool-1-thread-3,5,main]5. kotlin.Unit
Thread[main,5,main]6
Thread[main,5,main]end
同样也是通过线程池转为调度器实现的。我们通过上述代码的输出可以看到在执行过程中执行了很多次的线程切换,其实这里的线程切换对于此处的需求是资源浪费了,所以我们可以改成用但线程去执行,如下:
suspend fun test() {
Executors.newSingleThreadExecutor().asCoroutineDispatcher()
.use { dispatcher ->
GlobalScope.launch(dispatcher) {
log(1)
val job = launch {
log(2)
delay(1000)
log(3)
"Hello"
}
log(4)
val result = job.join()
log("5. $result")
}.join()
log(6)
}
}
// output
Thread[pool-1-thread-1,5,main]1
Thread[pool-1-thread-1,5,main]4
Thread[pool-1-thread-1,5,main]2
Thread[pool-1-thread-1,5,main]3
Thread[pool-1-thread-1,5,main]5. Hello
Thread[main,5,main]6
Thread[main,5,main]end
从输出可以看到起并没有线程切换了。当然kotlin协程给出了更简单的api,如下:
suspend fun test() {
GlobalScope.launch(newSingleThreadContext("Dispather")) {
log(1)
val job = launch {
log(2)
delay(1000)
log(3)
"Hello"
}
log(4)
val result = job.join()
log("5. $result")
}.join()
log(6)
}
// output
Thread[pool-1-thread-1,5,main]1
Thread[pool-1-thread-1,5,main]4
Thread[pool-1-thread-1,5,main]2
Thread[pool-1-thread-1,5,main]3
Thread[pool-1-thread-1,5,main]5. Hello
Thread[main,5,main]6
Thread[main,5,main]end
这样就可以分配更少的资源实现相同的功能了。现在能较好解决了分配任务的问题,那么下一步就是如何更快更高效的解决问题了?
kotlin协程的并发与异步
想要更高效的解决复杂的任务,需要根据任务的具体情况而定,但是复杂的任务都有一个相同的特点,任务之间是有相互依赖,比如同步本地数据,需要执行的任务如下
- Task1:检索本地数据
- Task2:请求网络数据
- Task3:更新本地数据
可以很容易看出,Task3是依赖于Task2和Task1的结果的,当然这样的数据流模型之前已经有很多解决方案了比如Future,RxJava。现在我们来来kotlin协程是如何解决这类相对比较简单且常用的异步数据流问题的?
利用async方法实现并发
和launch返回一个Job不同,async返回的是Deferred类型,它的返回带结果值,通过类似future的get函数-Deferred.await()得到结果值,当然Deffered是继承Job的。下面我们分析上面的Task1,Task2,他们之间是没有依赖的,所以可以用async去实现任务并行执行
// 模拟一下检索的耗时任务
fun doTaskOne() {
val file = File("E:\\YueyueProjects\\hindict_android.rar")
val bufferedReader = BufferedReader(FileReader(file))
while (true) {
bufferedReader.readLine() ?: break
}
}
// 模拟请求网络的的耗时任务
fun doTaskTwo() {
val file = File("E:\\YueyueProjects\\hindict_android.rar")
val bufferedReader = BufferedReader(FileReader(file))
while (true) {
bufferedReader.readLine() ?: break
}
}
/**
* 并发等待测试
*/
suspend fun test6() {
// 测量总用时
val totalTime = measureTimeMillis {
// 这里coroutineScope表示协程环境继承自父协程
coroutineScope {
launch(Dispatchers.Default) {
val oneTask = async {
log("start do Task1")
// 测量单个任务的耗时
val res = measureTimeMillis { doTaskOne() }
res
}
val twoTask = async {
log("start do Task2")
val res = measureTimeMillis{ doTaskTwo() }
res
}
// 两个任务的顺序执行的逻辑时间和
log("The answer is ${oneTask.await() + twoTask.await()}")
}
}
}
log("The totalTime is $totalTime")
}
// 启动
fun main() = runBlocking {
test6()
log("end")
}
output:
Thread[DefaultDispatcher-worker-3,5,main]start do Task1
Thread[DefaultDispatcher-worker-2,5,main]start do Task2
Thread[DefaultDispatcher-worker-2,5,main]The answer is 82510
Thread[main,5,main]The totalTime is 41280
Thread[main,5,main]end
从输出可以看到,并发是有效果的,总用时减少了一半。另外从本质上讲协程是用了两个线程实现的并发,这两个线程的分配来自于Default调度器,同理这里也可以自定义,如下:
suspend fun test6() {
val totalTime = measureTimeMillis {
coroutineScope {
val oneTask = async(newSingleThreadContext("dispather-1")) {
log("start do Task1")
val res = measureTimeMillis { doTaskOne() }
res
}
val twoTask = async(newSingleThreadContext("dispather-2")) {
log("start do Task2")
val res = measureTimeMillis { doTaskTwo() }
res
}
log("The answer is ${oneTask.await() + twoTask.await()}")
}
}
log("The totalTime is $totalTime")
}
output:
Thread[dispather-1,5,main]start do Task1
Thread[dispather-2,5,main]start do Task2
Thread[main,5,main]The answer is 79023
Thread[main,5,main]The totalTime is 39523
Thread[main,5,main]end
其实这几种方案的await()
在kotlin中是一个挂起函数,他会等待结果返回才继续执行之后的代码,这种方式比回调的风格要清晰不少。后文我们会着重探究一下这个await的实现。回到之前的问题,当Task1,Task2执行完成之后,就可执行Task3了。如下
suspend fun test10() {
coroutineScope {
val oneTask = async(newSingleThreadContext("dispather-1")) {
log("start do Task1")
val res = doTaskOne()
log("end do Task1")
res
}
val twoTask = async(newSingleThreadContext("dispather-2")) {
log("start do Task2")
val res = doTaskTwo()
log("end do Task2")
res
}
oneTask.await()
twoTask.await()
// Task3
// withContext相当于async{}.await()
(withContext(newSingleThreadContext("dispather-3")) {
delay(1000)
log("end task")
})
}
}
我们可以看到执行任务3的时候又用了一个线程,但其实这里可以复用之前用过的调度器dispather-1和dispather-2,所以比较完善的方案如下:
suspend fun test11() {
coroutineScope {
// 这里是优化的地方
newSingleThreadContext("dispather-1").use { dispather1 ->
newSingleThreadContext("dispather-2").use { dispather2 ->
val oneTask = async(dispather1) {
log("start do Task1")
val res = doTaskOne()
log("end do Task1")
res
}
val twoTask = async(dispather2) {
log("start do Task2")
val res = doTaskTwo()
log("end do Task2")
res
}
oneTask.await()
twoTask.await()
// Task3
(withContext(dispather2) {
delay(1000)
log("end task")
})
}
}
}
}
OK,到这里可以说已经比较完善的解决了之前提出了问题。相信大家对协程用于解决类似数据流的问题也有所了解。下面我们进入原理分析部分了,主要是来探究神奇的await()
方法
原理
术语
先来看几个术语
- 协程上下文
贯穿整个协程(线程)生命周期的对象,其内部简单的说就是一个map,你可以在该协程的任何生命周期节点随时获取你定义的对象。比如调度器,拦截器,协程名字等。
- 挂起函数 ,关键字suspend
这可能是协程中最难理解的地方,它看起非常的神奇,现在可以简单理解为线程的await函数, 即阻塞住了线程
- 状态机
状态机模式有很多应用这里就先不介绍了。在协程中简单理解就是一段协程体的编译结果,他会把一段协程体中的挂起函数编译为一个状态,即一个挂起函数对应一个状态,举例如下
val a = a()
val y = foo(a).await() // 挂起点 #1
b()
val z = bar(a, y).await() // 挂起点 #2
c(z)
这里有三个状态,初始化,第一个挂起点,第二个挂起点。其实在IDE中续体看得更明显,如下图
- 续体 Continuation
一段协程体对应一个续体,一个续体对应多个挂起函数。当执行某个挂起函数时,在挂起函数内部能够拿到这样一个称为续体的对象,这个对象和状态机结合,调用resumeWith去恢复每一个挂起点,可简单理解调用notifyAll函数
续体+状态机+线程池实现await
有了上面的基础,我们就可以直接说出结论,其实await就是用续体+状态机+线程池实现的,不过状态机是kotlin通过编译suspend关键字得到的,废话不多说,我们从源码角度分析一段上面test11函数是如何实现的。这里再贴上test11的源码
suspend fun test11() {
coroutineScope { // #0
// 这里是优化的地方
newSingleThreadContext("dispather-1").use { dispather1 ->
newSingleThreadContext("dispather-2").use { dispather2 ->
val oneTask = async(dispather1) { // #1
log("start do Task1")
val res = doTaskOne() // #1.1
log("end do Task1")
res // #1.3
}
val twoTask = async(dispather2) { // #2
log("start do Task2")
val res = doTaskTwo()
log("end do Task2")
res
}
oneTask.await() // #3
twoTask.await() // #4
// Task3
(withContext(dispather2) { // #5
delay(1000)
log("end task")
})
}
}
}
}
-
#0
是构造的协程的运行环境是继承父协程环境,但是并没有继承父协程中的创造的续体,此处会构建一个新的续体这里称作0
号续体 -
#1
是用调度器构造了一个新的协程环境当然也会对应创造一个新的续体以便在#1
下的协程体中执行一个挂起逻辑,读者可以从下面的uml序列图中定位到关键源码:
注意第二个start是一个重载函数(是kotlin中的语法),去找CoroutineStart中的invoke()函数即可。最后定位到的源码如下:
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
这里的createCoroutineUnintercepted函数就是创建续体的函数其接受一个AbstractCoroutine作为参数,里面带有父协程上下文和async传递的协程上下文。其内部的源码是无法查看到的,这里暂且忽略。这里知道它会根据父协程上下文和子协程上下文创建续体即可,然后就会执行resumeCancellableWith()->resumeCancellableWith(result),最后会执行到ContinuationImpl
中的resumeWith
函数,在该函数调用invokeSuspend()
进入协程体执行#1.1
的耗时任务。
-
#2
逻辑和#1
一样都会创造续体 -
#3
代码行会等待#1
中的协程体执行完成后才执行#4
及之后的代码,这里的底层逻辑就会涉及到编译后的状态机了,先亮一副图,然后再叙述其中原理:
在#0
的时候会创建续体,并将续体的初始化状态设为0,然后调用#3
处代码,状态就被置为了挂起态这里为状态1,这在源码中是看不到的,只能在编译的class文件中看到,这个挂起态的flag是在Intrinsics.kt
文件中,是一个枚举实现的单例COROUTINE_SUSPENDED
。这里我贴出整个文件的代码, 当子协程里执行完成后,调用主协程的续体调用状态机(状态机其实是续体的一个实现类)代码中的resumeWith(Object result)
方法,就可以直接转移状态到状态2了,并且附带了子协程处理的结果。
@file:kotlin.jvm.JvmName("IntrinsicsKt")
@file:kotlin.jvm.JvmMultifileClass
package kotlin.coroutines.intrinsics
import kotlin.coroutines.*
import kotlin.internal.InlineOnly
@SinceKotlin("1.3")
@InlineOnly
@Suppress("UNUSED_PARAMETER", "RedundantSuspendModifier")
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T =
throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
// 挂起状态标识
@SinceKotlin("1.3")
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED
@SinceKotlin("1.3")
@PublishedApi // This class is Published API via serialized representation of SafeContinuation, don't rename/move
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
至于await
是如何形成挂起状态的,我们可以按着源码一步步走到JobSupport
类中的awaitSuspend
方法,代码如下
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
val cont = AwaitContinuation(uCont.intercepted(), this)
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
cont.getResult()
}
此处的suspendCoroutineUninterceptedOrReturn
依然是看不到源码的,就理解为将代码置位挂起状态即可,然后uCont
就是共享的续体,你可以用uCont.resumeWith()
或uCont.resume()
去恢复。这里举一个用该方式代替回调的例子:
/**
* 回调变挂起
*/
suspend fun test8() = suspendCoroutineUninterceptedOrReturn<String> {
it.resume("test8")
COROUTINE_SUSPENDED
}
fun test9(callback: (String) -> Unit) {
callback.invoke("test9")
}
@ExperimentalTime
fun main() = runBlocking {
log(test8())
log("end")
}
// output
Thread[main,5,main]test8
Thread[main,5,main]end
test8
也可以是这样, 这样会更直接一些
suspend fun test8() = suspendCoroutine<String> {
it.resume("test8")
}
可能有些同学说,这里阻塞了呀,回调是不阻塞的,没事,这个你放到主线程消息循环队列里面就好了嘛,比如在android中用如下代码代替回调就完事了呀。
GlobalScope.launch(Dispatchers.Main) {
log(test8())
log("end")
}
总结
- 本解首先通过实际生活中作为一个调度者应该解决的问题类比说明了koltin调度所要解决的几个问题,其中第一个问题前篇已叙述,第2个问题即有效分配通过协程提供的默认线程池调度器和自定义调度器可以高效的解决,第3个问题通过async也可以有效的去处理。但是第4,5个问题准备在下篇和下下篇给出方案
- 在解决了给出的问题后,知其然也知其所以然,先对术语进行较为抽象的描述,然后通过对源码的分析结合模型图的描述阐述了kotlin协程用同步的代码实现异步效果的神奇操作。其实就是通过挂起和恢复机制加线程池中的消息循环队列实现的。而在哪里挂起和在哪里恢复的具体逻辑是由编译器编译后的状态机所控制。