Kotlin 协程源码阅读笔记 —— 协程工作原理
Kotlin
协程在网上有很多的人把它吹得神乎其神,什么性能多么多么好,效率比线程高多少多少,balabala 一堆优点。首先在我看来协程和线程压根儿就没有可比性,就好像说姚明和刘翔谁更厉害一样,线程是操作系统的调度的基本单位,线程也是 CPU
执行的一个基本任务;而协程只是在编程语言上定义的一种优化多线程通信、调度的一种编程方式(至少 Kotlin
中是这样),而操作系统可不认识什么是协程,而协程中的任务最终也是在线程上执行。
在 Kotlin
协程上来说它的最大的优点只有一个它能够以同步的方式来写异步代码,能够干掉编程中的地狱回调(通过类似于 RxJava
流的编程方式也能够干掉地狱回调,不过不是本篇文章中的讨论内容),而它的其他优点也都是这一个优点的发散,不要小瞧这个优点,如果消除了各种异步 Callback
,能够在很大的程度上提高代码的可阅读性,减少 BUG
的产生,也更容易能够定位到 BUG
。
简单了协程的优点,后续就要看看它是怎么工作的了,那么准备就绪后就开启今天的内容。
用 Callback 写异步任务
假如我有以下的异步任务:
val delayExecutor: ScheduledThreadPoolExecutor by lazy {
ScheduledThreadPoolExecutor(1)
}
fun delay(time: Long, callback: () -> Unit) {
delayExecutor.schedule({ callback() }, time.coerceAtLeast(0), TimeUnit.MILLISECONDS)
}
我用 delay()
来实现一个异步耗时任务,实现就是通过 delayExecutor
添加一个定时任务,任务执行时就调用 callback
。
我有以下代码要调用异步任务:
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
helloWorld {
println(it)
}
}
fun helloWorld(callback: (s: String) -> Unit) {
hello { hello ->
world { world ->
callback("${hello}${world}")
}
}
}
fun hello(callback: (s: String) -> Unit) {
delay(500) {
callback("Hello, ")
}
}
fun world(callback: (s: String) -> Unit) {
delay(500) {
callback("World!!")
}
}
}
我在 onCreate()
函数中调用了异步任务 helloWorld()
,成功后会在 callback
中打印最后的结果。 helloWorld()
又由 hello()
与 world()
两个任务组成,hello()
任务成功后在调用 world()
任务,最后结合 hello()
与 world()
两个任务的结果的结果回调 helloWorld()
的 callback
。
用协程写异步任务
同样是上面的异步任务,我用 Kotlin
协程改造一下:
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val coroutineScope = CoroutineScope(Dispatchers.Default)
coroutineScope.launch {
println(helloWorld())
}
}
suspend fun helloWorld(): String {
val hello = hello()
val world = world()
return hello + world
}
suspend fun hello(): String {
return delaSuspend(500, "Hello, ")
}
suspend fun world(): String {
return delaSuspend(500, "World!!")
}
suspend fun <T> delaSuspend(time: Long, data: T): T {
return suspendCancellableCoroutine { cont ->
delay(time) {
cont.resumeWith(Result.success(data))
}
}
}
}
首先我把上面的 delay()
耗时任务的回调改造成了协程 suspend
方法,具体的改造参考 delaSuspend()
方法的实现。然后 hello()
与 world()
也都是 suspend()
方法,他们都是通过调用 delaSuspend()
来模拟异步任务,在 helloWrold()
中分别调用 hello()
和 world()
方法,这里都是以同步的方式调用的哦,然后组合他们的结果然后返回。在 onCreate()
新建一个协程,然后也是直接以同步的方式调用 helloWorld()
。
和改造前的代码有一个非常显著的特点就是消灭的所有的 callback
,所有的异步任务都是以同步的方式调用的,你可能也会吐槽也没感觉比之前优化了多少,上面 demo
中的任务比较简单,总共才 3 个 callback
,而且也没有处理异常的回调,callback
的层级最大也才 2。越复杂的任务协程的优势就会越大。
Kotlin 协程工作原理
虽然在源码中我们看到的协程代码是同步的,其实虚拟机执行的时候它还是一个不折不扣的 callback
,这主要归功于 Kotlin
编译器的处理,我们就以上面的 demo
来一步一步分析它的工作方式。
CoroutineScope
如果要开启一个协程就需要先通过 CoroutineScope()
方法创建一个 CoroutineSope
,在这个方法中可以指定我们的默认 CoroutineContext
。
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
CoroutineScope
的接口非常简单,只需要实现一个 CoroutineContext
。
我们来看看 CoroutineScope()
方法的实现:
@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
override val coroutineContext: CoroutineContext = context
// CoroutineScope is used intentionally for user-friendly representation
override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}
上面的代码非常简单就只是把我们添加的 CoroutineContext
设置到 CoroutineScope
,这里注意添加了一个 Job
的 CoroutineContext
,每个协程启动时都会创建一个 Job
对象,这些由 CoroutineScope
启动的协程的 Job
都是 CoroutineScope
中的 Job
的子任务,而协程里面还可以再启动子协程,这个子协程的 Job
的父 Job
就是启动他的协程的 Job
。所以通过 Job
就构成了一个任务的继承链。当父 Job
取消后他的子 Job
也会被取消。所以如果是 CoroutineScope
中的顶级父 Job
取消了,那么用他启动的所有的协程或者孙协程等等也都会被取消。CoroutineScope#cancel()
的方法实现就是通过调用 CoroutineContext
中的 Job
的 cancel()
方法实现的:
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
通常我们为了防止内存泄漏,在 Activity
或者 Fragment
或者一些什么别的组件退出后都会调用他们所对应的 CoroutineScope#cancel()
方法,避免内存泄漏。
启动一个协程
我们启动协程是通过 CoroutineScepe#launch()
扩展函数来完成的,其中的 Lambda
对象就是协程执行开始的第一个方法,我们来看看它的源码实现:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
这里会对传入的 CoroutineContext
进行修改,处理的方法是 newCoroutineContext()
;然后根据启动类型创建一个 coroutine
对象,默认的实现是 StandaloneCoroutine
类,然后调用 Coroutine#start()
方法,最后返回 Coroutine
。这里要非常注意 StandaloneCoroutine
是一个 Job
,Continuation
(很多人中文翻译它为续体,后续会重点讲),CoroutineScope
(也就是他也能够启动协程,也就是当前协程的子协程)。后面我们会再看这些对象所处理的逻辑,现在有点懵也没关系。
我们继续看看 newCoroutineContext()
方法对传入的 CoroutineContext
的处理:
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = foldCopies(coroutineContext, context, true)
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
这里会添加一个 CoroutineId
的 CoroutineContext
用来记录 Coroutine
的名字,其实就是修改协程运行时的线程的名字,添加上协程编号的信息;这里还会判断是否有 CoroutineInterceptor
的 CoroutineContext
,如果没有,使用 Dispatcher.Default
作为默认的 CoroutineInterceptor
。
然后简单看看 StandaloneCoroutine
的实现:
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
它是继承与 AbstractCoroutine
,它重写了 handleJobException()
方法来处理协程的异常,处理调用的方法是 handleCoroutineException()
,我们来看看它的实现:
@InternalCoroutinesApi
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
// Invoke an exception handler from the context if present
try {
context[CoroutineExceptionHandler]?.let {
it.handleException(context, exception)
return
}
} catch (t: Throwable) {
handleUncaughtCoroutineException(context, handlerException(exception, t))
return
}
// If a handler is not present in the context or an exception was thrown, fallback to the global handler
handleUncaughtCoroutineException(context, exception)
}
这里会查找 CoroutineContext
中是否有 CoroutineExceptionHandler
,如果有异常就交给它来处理,如果没有就由 Global
的 Handler
来处理,默认就是崩溃啦。
然后就是调用 Coroutine#start()
方法来启动一个协程了,看看它的代码:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
上面的 block
就是我们启动协程时传递过来的 Lambda
对象,然后 receiver
和 completion
都是我们上面创建的 StandaloneCoroutine
对象。继续看看 startCoroutineCancellable()
方法的实现:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
这里首先通过 createCoroutineUnintercepted()
方法将我们的 Lambda
对象构建成一个 Continuation
对象;然后调用 intercepted()
方法对原来的 Continuation
对象进行拦截器的处理;然后调用 resumeCancellableWith()
方法来触发协程的开始。上面的三个方法都非常重要。
这里在开始之前非常有必要解释一下 Continuation
,很多人中文翻译成续体,我就不用中文的名字了,还是继续用英文来表示。前面我们说到协程的本质其实就是一个 Callback
,而用来控制 Callback
回调成功/失败就是 Continuation
很重要的一个职责,同时它还记录了当前方法对应的执行的位置(像程序计数器一样),上次执行后的中间结果等等。Continuation
还会涉及到两个非常重要的概念那就是 suspend
和 resume
,中文翻译成挂起和恢复,我还是使用英文名词来表示他们。所谓的 suspend
其实就相当于我们调用了一个异步方法后等待 callback
时的协程状态,这时由于 callback
还没有回来当前的线程还可以做其他的任务;而 resume
就是 callback
回调成功需要唤醒原来的协程继续执行。这也就是很多人说得很邪乎的挂起与恢复,说白了就是调用异步任务时就挂起,callback
成功或者失败就是恢复,后面我们还会从源码中看到他具体是怎么挂起和恢复的(其实上面的 resumeCancellableWith()
方法就算是恢复)。
createCoroutineUnintercepted()
这个方法其实就是对原来 launch()
方法传递过来的 Lambda
方法进行改造,我们直接看看,反编译后的原来的 Lambda
对象:
其中我们发现它继承于 SuspendLambda
对象,而 SuspendLambda
继承于 ContinuationImpl
对象这个对象非常重要,是 Continuation
的一个实现。而当前的 Continuation
执行时的入口函数就是 invokeSuspend()
方法。后面我们会看到调用这个方法的逻辑。而它的构造函数中还有一个 Continuation
对象,这个其实就是上面的 StandalongCoroutine
对象,这个对象可以理解为父级的 Continuation
对象,在 Kotlin
源码中通常被称为 completion
,这个就是表示当前 Continuation
执行完成后需要通知父级的 Continuation
继续执行。
intercepted()
我们继续看看 intercepted()
方法对我们原来的 Continuation
做了什么处理:
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
调用了 ContinuationImpl
的 intercepted()
方法,我们继续追踪:
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
继续调用 ContinuationIntercetor#interceptContinuation()
方法,我们之前设置的 CoroutineInterceptor
是 Dispatchers.Default
。 我们来看看 CoroutineDispatcher#interceptContinuation()
方法的实现:
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
将原来的 Dispather
和 Continuation
作为参数构建了一个新的 DispatchedContinuation
对象。
resumeCancellableWith()
上面我们也讲到它就相当于 resume
协程,他也是我们第一次 resume
协程。我们来看看它的实现:
@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
由于我们在 intercetped()
方法中将原来的 Continuation
对象,转换成了 DispatchedContinuation
对象,所以我们这里调用的是 DispatchedContinuation#resumeCancellableWith()
方法,我们看看它的实现:
@Suppress("NOTHING_TO_INLINE")
internal inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
@Suppress("NOTHING_TO_INLINE")
internal inline fun resumeUndispatchedWith(result: Result<T>) {
withContinuationContext(continuation, countOrElement) {
continuation.resumeWith(result)
}
}
如果 Dispatcher#isDispatchNeeded()
返回 true
就表示可以使用 Dispatcher
来处理任务,也就是可以做到后续的任务最终在 Dispatcher
中的线程池中执行,通过 Dispatcher#dispatch()
方法下发任务,最后执行时是执行 run()
方法,实现是 DispatchedTask#run()
方法。反之就直接在当前线程调用 Continuation#resultWith()
方法。看看 DispatchedTask#run()
方法的实现:
final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
// ...
if (job != null && !job.isActive) {
// ...
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// ...
} finally {
// ...
}
}
在 run()
方法执行过程中会判断协程是否报错了,如果没有报错直接执行 Continuation#result()
方法,如果有错调用 Continuation#resumeWithException()
方法,上面说到这个 Continuation
在这里的实现类是 Lambada
对象继承 SuspendLambda
对象实现的,这个 resume()
方法最终是由 BaseContinuationImpl#resumeWith()
方法实现的,这个方法可以说是 Kotlin
协程的灵魂,我们来看看它的实现,注意理解:
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
// 通知 debug 协程已经 resume
probeCoroutineResumed(current)
with(current) {
// 父级的 continuation
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 调用 invokeSuspend 入口函数
val outcome = invokeSuspend(param)
// 如果返回 COROUTINE_SUSPENDED 就表示挂起,同时退出循环
if (outcome === COROUTINE_SUSPENDED) return
// 如果返回不是 COROUTINE_SUSPENDED 就表示该 Continuation 方法已经执行完成了,需要通知它的父级的 Continuation,然后父级的 Continuation 继续执行。
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
// 进入下次循环,调用父级的 Continuation 的 invokeSuspend() 方法
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
// 恢复顶级的 Continuation,我们的代码中是 StandaloneCoroutine
completion.resumeWith(outcome)
return
}
}
}
}
该 Continuation
中每调用一次 Kotlin
中的 suspend
的方法,Continuation
都会调用一次它的 invokeSuspend()
方法,当然每次调用 invokeSuspend()
方法执行的代码都不一样,Continuation
会通过一个 label
来记录 invokedSuspend()
执行的位置,后面我们会看到这部分代码,简单再描述一下上面的代码:
- 通过
probeCoroutineResumed()
方法通知协程进入resume
状态。 - 调用
invokeSuspend()
方法进入协程方法的具体执行,然后判断返回值,如果返回值是COROUTINE_SUSPENDED
就表示当前协程需要suspend
(也就是在执行一个异步任务,等待后续异步任务成功后再调用resumeWith()
方法resume
。);如果返回值不是COROUTINE_SUSPENDED
就表示当前的Continuation
已经执行完成了。 - 当前
Continuation
执行完成了后,就需要在下次循环中调用父级的Continuation
的invokeSuspend()
方法,直到顶级的Continuation
执行完,在我们这里是StandaloneCoroutine
,注意理解这个循环调用。
我们站在初次启动协程的逻辑来看看 resumeWith()
这个方法,初次调用 resumeWith()
时,对应的 Continuation
就是 SuspendContinuation
,而最终的实现是由我们 launch
时传递进去的 Lambda
对象生成的,而父级的 Continuation
就是 StandaloneCoroutine
,SuspendContinuation
执行完成后就会调用 StandaloneCoroutine
的 resumeWith()
方法,这也就标志当前的协程已经执行完成。
由于在 Kotlin
协程相关代码编译过程中会生成多个类似于 Lambda
对象那样的匿名对象,不利于我们后续的源码分析,我把 demo
中的编译后的代码反编译后再重新整理命名,方便后续的代码的分析。
MainActivity
public final class MainActivity extends AppCompatActivity {
@Override // androidx.fragment.app.FragmentActivity, androidx.activity.ComponentActivity, androidx.core.app.ComponentActivity, android.app.Activity
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
BuildersKt__Builders_commonKt.launch$default(CoroutineScopeKt.CoroutineScope(Dispatchers.getDefault()), null, null, new LaunchContinuation(this, null), 3, null);
}
public final java.lang.Object helloWorld(kotlin.coroutines.Continuation<? super java.lang.String> continuation) {
HelloWorldContinuation helloWorldContinuation = null;
if (!(continuation instanceof HelloWorldContinuation)) {
helloWorldContinuation = new HelloWorldContinuation(this, continuation);
} else {
helloWorldContinuation = (HelloWorldContinuation) continuation;
}
Object suspend = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (helloWorldContinuation.label) {
case 0: {
kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
StringBuilder stringBuilder = new StringBuilder();
helloWorldContinuation.param1 = this;
helloWorldContinuation.param2 = stringBuilder;
helloWorldContinuation.label = 1;
Object result = hello(helloWorldContinuation);
if (result == suspend) {
return suspend;
} else {
// 我们的代码中不会有这种情况
}
}
case 1: {
kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
MainActivity mainActivity = (MainActivity) helloWorldContinuation.param1;
StringBuilder stringBuilder = (StringBuilder) helloWorldContinuation.param2;
String lastResult = (String) helloWorldContinuation.result;
stringBuilder.append(lastResult);
helloWorldContinuation.param1 = stringBuilder;
helloWorldContinuation.param2 = null;
helloWorldContinuation.label = 2;
Object result = world(helloWorldContinuation);
if (result == suspend) {
return suspend;
} else {
// 我们的代码中不会有这种情况
}
}
case 2: {
kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
StringBuilder stringBuilder = (StringBuilder) helloWorldContinuation.param1;
String lastResult = (String) helloWorldContinuation.result;
stringBuilder.append(lastResult);
return stringBuilder.toString();
}
default: {
}
}
throw new UnsupportedOperationException("Method not decompiled: com.tans.coroutine_test.MainActivity.helloWorld(kotlin.coroutines.Continuation):java.lang.Object");
}
public final Object hello(Continuation<? super String> continuation) {
return delaSuspend(500L, "Hello, ", continuation);
}
public final Object world(Continuation<? super String> continuation) {
return delaSuspend(500L, "World!!", continuation);
}
public final <T> Object delaSuspend(long time, T t, Continuation<? super T> continuation) {
CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted(continuation), 1);
cancellable$iv.initCancellability();
DelayKt.delay(time, new MainActivity$delaSuspend$2$1(cancellable$iv, t));
Object result = cancellable$iv.getResult();
if (result == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended(continuation);
}
return result;
}
}
LaunchContinuation
LaunchContinuation
就是由 launch()
方法的 Lambda
对象生成的 Continuation
对象,它本来是一个无规则的对象名字,为了代码好阅读我把它的名字修改成了 LaunchContinuation
:
final class LaunchContinuation extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;
final MainActivity mainActivity;
public LaunchContinuation(MainActivity mainActivity, Continuation<? super LaunchContinuation> continuation) {
super(2, continuation);
this.mainActivity = mainActivity;
}
@Override
public final Continuation<Unit> create(Object obj, Continuation<?> continuation) {
return new LaunchContinuation(this.mainActivity, continuation);
}
public final Object invoke(CoroutineScope coroutineScope, Continuation<? super Unit> continuation) {
return ((LaunchContinuation) create(coroutineScope, continuation)).invokeSuspend(Unit.INSTANCE);
}
@Override
public final Object invokeSuspend(Object $result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
Object helloWorld = this.mainActivity.helloWorld(this);
if (helloWorld != coroutine_suspended) {
$result = helloWorld;
break;
} else {
return coroutine_suspended;
}
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
System.out.println($result);
return Unit.INSTANCE;
}
}
HelloWorldContinuation
HelloWroldContinuation
是由 helloWorld()
函数生成的一个 Continuation
对象,它本来也是一个匿名对象,为了方便源码阅读,我修改了该类的命名。
public final class HelloWorldContinuation extends ContinuationImpl {
public Object param1;
public Object param2;
public int label;
public Object result;
final MainActivity mainActivity;
public HelloWorldContinuation(MainActivity mainActivity, Continuation<? super HelloWorldContinuation> continuation) {
super(continuation);
this.mainActivity = mainActivity;
}
@Override
public final Object invokeSuspend(Object obj) {
this.result = obj;
this.label |= Integer.MIN_VALUE;
return this.mainActivity.helloWorld(this);
}
}
DelayKt
public final class DelayKt {
private static final Lazy delayExecutor$delegate = LazyKt.lazy(DelayKt$delayExecutor$2.INSTANCE);
public static final ScheduledThreadPoolExecutor getDelayExecutor() {
return (ScheduledThreadPoolExecutor) delayExecutor$delegate.getValue();
}
public static final void delay(long time, final Function0<Unit> callback) {
Intrinsics.checkNotNullParameter(callback, "callback");
getDelayExecutor().schedule(new Runnable() { // from class: com.tans.coroutine_test.DelayKt$$ExternalSyntheticLambda0
@Override // java.lang.Runnable
public final void run() {
DelayKt.delay$lambda$0(Function0.this);
}
}, RangesKt.coerceAtLeast(time, 0L), TimeUnit.MILLISECONDS);
}
/* JADX INFO: Access modifiers changed from: private */
public static final void delay$lambda$0(Function0 callback) {
Intrinsics.checkNotNullParameter(callback, "$callback");
callback.invoke();
}
}
-
DelayKt$delayExecutor$2
这是Delay
中Executor
的代理对象
final class DelayKt$delayExecutor$2 extends Lambda implements Function0<ScheduledThreadPoolExecutor> {
public static final DelayKt$delayExecutor$2 INSTANCE = new DelayKt$delayExecutor$2();
DelayKt$delayExecutor$2() {
super(0);
}
@Override
public final ScheduledThreadPoolExecutor invoke() {
return new ScheduledThreadPoolExecutor(1);
}
}
-
MainActivity$delaSuspend$2$1
这是MainActivity
中调用delay()
方法时的Lambda
生成的对象:
final class MainActivity$delaSuspend$2$1 extends Lambda implements Function0<Unit> {
final CancellableContinuation<T> $cont;
final T $data;
public MainActivity$delaSuspend$2$1(CancellableContinuation<? super T> cancellableContinuation, T t) {
super(0);
this.$cont = cancellableContinuation;
this.$data = t;
}
@Override // kotlin.jvm.functions.Function0
/* renamed from: invoke reason: avoid collision after fix types in other method */
public final void invoke2() {
Continuation continuation = this.$cont;
Result.Companion companion = Result.Companion;
continuation.resumeWith(Result.m122constructorimpl(this.$data));
}
}
LaunchContinuation
的执行流程
上一小节讲到协程启动执行开始的方法是调用 launch()
方法中 Lambda
生成的 Continuation
的 resumeWith()
方法,这个 Continuation
我们把它命名成 LaunchContinuation
(实际上是一个和 Lambda
对象一样的匿名对象,对象名不易阅读),而 resumeWith()
方法最终会调用 LaunchContinuation#invokeSuspend()
方法(忘记了的同学看看前面分析 BaseContinuationImpl#resumeWith()
代码的部分),我们来看看 invokeSuspend()
方法的实现:
public final Object invokeSuspend(Object $result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
// 检查是否有异常
ResultKt.throwOnFailure($result);
// 修改 label 为 1
this.label = 1;
// 调用 helloWrold 方法,注意这里把自己当参数传递了过去
Object helloWorld = this.mainActivity.helloWorld(this);
if (helloWorld != coroutine_suspended) {
// 如果返回值不等于 coroutine_suspended 表示已经得到正确的返回结果,我们的例子不会执行这里
$result = helloWorld;
break;
} else {
// 表示协程进入 suspend 状态,等待下次调用 resumeWith() 方法继续执行
return coroutine_suspended;
}
case 1:
// 第二次执行 resuemWith() 方法,表示已经获取到 helloWorld() 中的返回值,检查返回中是否有异常。
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
// 执行打印最后的结果
System.out.println($result);
return Unit.INSTANCE;
}
我们来分析一下上面的代码:
- 记录
Continuation
的执行位置的对象是label
,默认label
的值为 0。 -
label
为 0 的逻辑:
- 判断
result
中是否有异常 - 修改
label
为 1,表示下次resumeWith()
方法执行时,就是case 1
那部分的逻辑。 - 调用
MainActivity#helloWorld()
方法,注意这里将当前的Continuation
对象传递给了helloWorld()
方法。 - 这里会判断
helloWorld()
方法的返回值,如果返回值不为COROUTINE_SUSPEND
,就表示已经拿到返回值,不需要进入suspend
状态,然后进入label
为 1 的逻辑(我们的代码不会返回COROUTINE_SUSPEND
);反之就表示没有获取到返回值,需要挂起,直接返回COROUTINE_SUSPEND
,等待下次执行resumeWith()
方法恢复,那时也就表示已经获取到helloWorld()
方法的返回值。
-
label
为 1 的逻辑:
- 判断
result
中是否有异常。 - 执行打印
result
的结果。 - 这里返回了一个
Unit
而不是COROUTINE_SUSPEND
,就表示当前方法执行完毕了,我们前面提到LaunchContinuation
的父级Continuation
是StandalongCoroutine
,在讲BaseContinuationImpl#resumeWith()
方法时讲过,当前Continuation
执行完毕后就会执行它的父级Continuation
,也就是后续会执行StandalongCoroutine#resumeWith()
方法,也就是通知StandalongCoroutine
,协程已经执行完毕了。
helloWorld()
的执行流程
在 LaunchContinuation
的执行过程中会调用 MainActivity#helloWorld()
方法,我们再来看看 MainActivity#helloWorld()
方法是如何处理的:
public final java.lang.Object helloWorld(kotlin.coroutines.Continuation<? super java.lang.String> continuation) {
HelloWorldContinuation helloWorldContinuation = null;
if (!(continuation instanceof HelloWorldContinuation)) {
// 构建一个 HelloWorldContinuation 实例,注意这里将 Launch Continuation 作为 HelloWorldContinuation 的父级 Continuation
helloWorldContinuation = new HelloWorldContinuation(this, continuation);
} else {
helloWorldContinuation = (HelloWorldContinuation) continuation;
}
Object suspend = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (helloWorldContinuation.label) {
case 0: {
// 检查 result 是否有异常
kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
// 构建 StringBuilder 对象
StringBuilder stringBuilder = new StringBuilder();
// 将 MainActivity 实例赋值给 HelloWorldContinuation#param1
helloWorldContinuation.param1 = this;
// 将 StringBuilder 实例赋值给 HelloWorldContinuation#param2
helloWorldContinuation.param2 = stringBuilder;
// 修改 label 为 1
helloWorldContinuation.label = 1;
// 执行 hello() 方法
Object result = hello(helloWorldContinuation);
// 返回值为 COROUTINE_SUSPEND 表示当前协程变为 suspend 状态
if (result == suspend) {
return suspend;
} else {
// 我们的代码中不会有这种情况
}
}
case 1: {
// 检查 result 是否有异常
kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
MainActivity mainActivity = (MainActivity) helloWorldContinuation.param1;
// 获取参数 StringBuilder
StringBuilder stringBuilder = (StringBuilder) helloWorldContinuation.param2;
// 获取上次的 hello() 方法的返回结果
String lastResult = (String) helloWorldContinuation.result;
// 将 hello() 方法的返回结果添加到 StringBuilder 中
stringBuilder.append(lastResult);
// 将 StringBuilder 赋值给 HelloWorldContinuation#param1
helloWorldContinuation.param1 = stringBuilder;
helloWorldContinuation.param2 = null;
// 修改 label 为 2
helloWorldContinuation.label = 2;
// 执行 world() 方法。
Object result = world(helloWorldContinuation);
// 返回值为 COROUTINE_SUSPEND 表示当前协程变为 suspend 状态
if (result == suspend) {
return suspend;
} else {
// 我们的代码中不会有这种情况
}
}
case 2: {
// // 检查 result 是否有异常
kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
// 获取 StringBuilder
StringBuilder stringBuilder = (StringBuilder) helloWorldContinuation.param1;
// 获取 world() 方法的返回值
String lastResult = (String) helloWorldContinuation.result;
// 将 world() 方法的返回值,添加到 StringBuilder 中
stringBuilder.append(lastResult);
// 将最终结果返回,也就表示当前 Continuation 执行完毕
return stringBuilder.toString();
}
default: {
}
}
throw new UnsupportedOperationException("Method not decompiled: com.tans.coroutine_test.MainActivity.helloWorld(kotlin.coroutines.Continuation):java.lang.Object");
}
我再讲Retrofit
的源码的文章中也讲过 Kotlin
的 suspend
函数在编译处理后,会添加一个 Continuation
对象参数,然后返回值变成 Object
,在 helloWorld()
方法中也得到了印证。
这个逻辑比 LaunchContinuation
中的状态稍微多了一些,然后各种参数也复杂一点,我们来分析一下:
- 如果
continuation
参数不是HelloWorldContinuation
(这种情况就是LaunchContinuation
),构建一个HelloWorldContinuation
实例,它的父Continuation
是LaunchContinuation
。 -
label
为 0 的逻辑:
- 检查
result
中是否有异常。 - 构建一个新的
StringBuilder
对象赋值给HelloWorldContinuation#param2
。 - 修改
label
为 1。 - 执行
hello()
方法,如果返回值是COROUTINE_SUSPEND
表示协程进入suspend
状态,我们的代码一定会到这段逻辑;反之就直接执行label
为 1 的逻辑。
-
label
为 1 的逻辑:
- 检查
result
中是否有异常。 - 从
HelloWorldContinaution#param2
中获取StringBuilder
实例。 - 从
HelloWorldContinuation#result
中获取hello()
方法的返回值。 - 将
hello()
方法的返回结果写入到StringBuilder
中。 - 构建一个新的
StringBuilder
对象赋值给HelloWorldContinuation#param1
。 - 修改
label
为 2,执行world()
方法,和执行hello()
方法一样,在我们的代码中一定会进入suspend
状态。
-
label
为 2 的逻辑:
- 检查
result
中是否有异常。 - 从
HelloWorldContinaution#param1
中获取StringBuilder
实例。 - 从
HelloWorldContinuation#result
中获取world()
方法的返回值。 - 将
world()
方法的返回结果写入到StringBuilder
中。 - 将最终结果返回,也就标志
HelloWorldContinuation
执行完成了。
我们再简单看看 HelloWorldContinuation#invokeSuspend()
方法的实现:
@Override
public final Object invokeSuspend(Object obj) {
this.result = obj;
this.label |= Integer.MIN_VALUE;
return this.mainActivity.helloWorld(this);
}
代码非常简单,将上次的执行的结果写入到 result
中,然后调用 helloWorld()
方法。
hello()
和 world()
的执行流程
public final Object hello(Continuation<? super String> continuation) {
return delaSuspend(500L, "Hello, ", continuation);
}
public final Object world(Continuation<? super String> continuation) {
return delaSuspend(500L, "World!!", continuation);
}
public final <T> Object delaSuspend(long time, T t, Continuation<? super T> continuation) {
CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted(continuation), 1);
cancellable$iv.initCancellability();
DelayKt.delay(time, new MainActivity$delaSuspend$2$1(cancellable$iv, t));
Object result = cancellable$iv.getResult();
if (result == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended(continuation);
}
return result;
}
hello()
和 world()
方法都调用了 delaSuspend()
方法用来讲 delay()
方法的异步 callback
调用转换成一个协程 suspend
方法。源码是这样的:
suspend fun <T> delaSuspend(time: Long, data: T): T {
return suspendCancellableCoroutine { cont ->
delay(time) {
cont.resumeWith(Result.success(data))
}
}
}
在 delaSuspend()
的处理中会创建一个 CancellableContinuationImpl
对象,它的父级 Continuation
是 HelloWorldContinuation
,这里注意看调用了 IntrinsicsKt.intercepted()
方法来代理 HelloWorldContinuation
,前面我有讲过这个方法,它会让后续的 resumeWith()
方法在 Dispacher
中对应的线程中执行。调用了 DelayKt.delay()
方法然后传入了一个 Lambda
对象,我们看看 Lambda
对象中的方法执行:
public final void invoke2() {
Continuation continuation = this.$cont;
Result.Companion companion = Result.Companion;
continuation.resumeWith(Result.m122constructorimpl(this.$data));
}
简单且高效,在回调成功时,直接调用 CancellableContinuationImpl#resumeWith()
方法使协程进入 resume
状态,后续的逻辑的执行的线程由 Dispatcher
决定。
最后
看到这里你可能还是有点懵的状态,这是非常正常的,我再来完整的理一下整个流程:
CoroutineScope#launch()
方法中会创建一个 StandaloneCoroutine
对象,然后通过 launch()
方法传过来的 Lambda
对象构建一个 LaunchContinuation
对象它的父级 Continuation
是 StandaloneCoroutine
,然后调用 LaunchContinuation#resumeWith()
方法标志协程开始。
LaunchContinuation
第一次执行 resumeWith()
时,会调用 helloWorld()
方法,这里会 HelloWorldContinuation
对象,它的父级 Continuation
是 LaunchContinuation
对象。第一次执行 helloWorld()
方法时会调用 hello()
方法,在 hello()
方法中会构建一个 CancellableContinuationImpl
对象,它的父级 Continuation
是 HelloWorldContinuation
,当 hello()
方法的 callback
异步调用成功后会调用 CancellableContinuationImpl#resumeWith()
方法 resume
协程,最终会调用到 HelloWorldContinuation#resumeWith()
方法中去,这里也会触发 helloWorld()
方法的第二次执行,在第二次执行的过程中会调用 world()
方法,world()
方法的处理逻辑和 hello()
方法一模一样,回调完成后就会触发第三次调用 helloWorld()
方法,第三次调用的时候会组合 hello()
和 world()
两次方法的结果得到最终的结果,然后返回,这时 HelloWorldContinuation
就会调用它的父级的 Continuation
中的 resumeWith()
方法,也就是 LaunchContinuation#resumeWith()
方法,用来通知 LaunchContinuation
表示 helloWorld()
方法已经执行完毕,这个时候是第二次执行 LaunchContinuation#resumeWith()
,这时他也不用再进入 suspend
状态,又会继续调用它的父级 Continuation
的 resumeWith()
方法,也就是 StandaloneConroutine#resumeWith()
的方法,它的这个方法调用后也就标志这个协程执行完毕了。
如果到这里还是没有理解这个过程,推荐你再多看几遍,一定能够看懂的,其实就是通过 Continuation
来处理 callback
的套娃操作,当理解了这个过程后,协程的很多地方的源码你就能够看得懂了。