参考
https://www.jianshu.com/p/9655501e15ed
典型调用
// 使用协程:
CoroutineScope.launch { //启动一个协程,此时运行在子线程
val result = okhttp.newCall(request).execute() // 在子线程堵塞耗时
withContext(Dispatchers.Main) { // 切换到主线程
//使用这个result,比如将其展示到UI上,此处为主线程。
Log.i("TEST", "result: $result")
}
}
流程分析
1、在 Builders.common.kt类
public fun CoroutineScope.launch(
// 有默认值,可以不传
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
2、来到AbstractCoroutine类的start()方法
用给定的模块启动协程,在这个协程中最多启动一次
/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
*
*/
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
block就是我们执行的业务模块
start(block, receiver, this)
}
}
3、我们关注到start(block, receiver, this)这个方法,它传入的是第一个参数是CoroutineStart类型,它实际调用的是
public enum class CoroutineStart {
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
}
4、执行DEFAULT策略,来到Cancellable.kt类的startCoroutineCancellable()方法
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
- 执行runSafely()方法,如果抛异常,就直接返回resumeWith()
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
dispatcherFailure(completion, e)
}
}
private fun dispatcherFailure(completion: Continuation<*>, e: Throwable) {
completion.resumeWith(Result.failure(e))
throw e
}
- 如果没有返回异常,来到了DispatchedContinuation.kt类
@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
5、我们来到IntrinsicsJvm.kt类createCoroutineUnintercepted()方法
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
然后ContinuationImpl类的intercepted()方法
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
接着DispatchedContinuation类的resumeCancellableWith()方法,这样就切入到相应的线程执行
internal class DispatchedContinuation<in T>(
//这里传入的dispatcher在demo中是Dispatchers.Default
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//代码1
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//代码2
dispatcher.dispatch(context, this)
} else {
//代码3
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
}
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
......
}
internal actual typealias SchedulerTask = Task
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
......
}
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
}
使用默认的线程池DefaultScheduler
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
}
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
......
}
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
}
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
//代码1:构建Task,Task实现了Runnable接口
val task = createTask(block, taskContext)
//代码2:取当前线程转为Worker对象,Worker是一个继承自Thread的类
val currentWorker = currentWorker()
//代码3:尝试将Task提交到本地队列并根据结果执行相应的操作
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
//代码4:notAdded不为null,则再将notAdded(Task)添加到全局队列中
if (!addToGlobalQueue(notAdded)) {
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
//代码5: 创建Worker并开始执行该线程
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
internal inner class Worker private constructor() : Thread() {
.....
}
}
Worker是一个线程
internal inner class Worker private constructor() : Thread() {
override fun run() = runWorker()
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
//代码1
val task = findTask(mayHaveLocalTasks)
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
//代码2
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
if (minDelayUntilStealableTaskNs != 0L) {
if (!rescanned) {
rescanned = true
} else {
rescanned = false
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L
}
continue
}
tryPark()
}
tryReleaseCpu(WorkerState.TERMINATED)
}
fun findTask(scanLocalQueue: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
// If we can't acquire a CPU permit -- attempt to find blocking task
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
return task ?: trySteal(blockingOnly = true)
}
private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}
fun runSafely(task: Task) {
try {
task.run()
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
unTrackTask()
}
}
}
在线程池执行任务后,调用 continuation.resume()恢复状态, rusume会走到BaseContinuationImpl的rusumeWith
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
......
}
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
//非空,且未处于active状态
if (job != null && !job.isActive) {
//开始之前,协程已经被取消,将具体的Exception传出去
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
//有异常,传递异常
if (exception != null) {
continuation.resumeWithException(exception)
} else {
//代码1
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
}
6、我们反编译BaseContinuationImpl类,看到了create()方法和resumeWith()方法
- 然后和之前第四步的continuation.resumeWith(result)联系起来了
-
在调用了invokeSuspend(param)挂起的方法后,如果还有内容则继续 completion.resumeWith(outcome);
public abstract class BaseContinuationImpl implements Continuation, CoroutineStackFrame, Serializable {
public final void resumeWith(@NotNull Object result) {
Object current = (BaseContinuationImpl)this;
Object param = result;
while(true) {
...
Object outcome;
try {
outcome = $this$with.invokeSuspend(param);
if(outcome == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
return;
}
...
} catch (Throwable var15) {
Companion var12 = Result.Companion;
boolean var13 = false;
outcome = Result.constructor-impl(ResultKt.createFailure(var15));
}
current.releaseIntercepted();
if(!(completion instanceof BaseContinuationImpl)) {
completion.resumeWith(outcome);
return;
}
current = (BaseContinuationImpl)completion;
param = outcome;
}
}
@NotNull
public Continuation create(@NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
throw (Throwable)(new UnsupportedOperationException("create(Continuation) has not been overridden"));
}
@NotNull
public Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
throw (Throwable)(new UnsupportedOperationException("create(Any?;Continuation) has not been overridden"));
}
}
状态机
1、上一步的with.invokeSuspend(param);方法涉及到一个状态机的维护,下面是网上的一个例子
线程池
1、在 Builders.common.kt类,withContext()切换线程
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.checkCompletion()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
2、在Dispatchers.kt文件,定义了Default、Main、Unconfined三种类型
public actual object Dispatchers {
public actual val Default: CoroutineDispatcher = createDefaultDispatcherBasedOnMm()
public actual val Main: MainCoroutineDispatcher
get() = injectedMainDispatcher ?: mainDispatcher
public actual val Unconfined: CoroutineDispatcher get() = kotlinx.coroutines.Unconfined // Avoid freezing
}
3、Default类型
public actual val Default: CoroutineDispatcher = createDefaultDispatcherBasedOnMm()
private fun createDefaultDispatcherBasedOnMm(): CoroutineDispatcher {
return if (multithreadingSupported) createDefaultDispatcher()
else OldDefaultExecutor
}
它初始化了一个线程池
internal actual fun createDefaultDispatcher(): CoroutineDispatcher = DefaultDispatcher
private object DefaultDispatcher : CoroutineDispatcher() {
// Delegated, so users won't be able to downcast and call 'close'
// The precise number of threads cannot be obtained until KT-48179 is implemented, 4 is just "good enough" number.
private val ctx = newFixedThreadPoolContext(4, "Dispatchers.Default")
override fun dispatch(context: CoroutineContext, block: Runnable) {
ctx.dispatch(context, block)
}
}
internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher =
ClosedAfterGuideTestDispatcher(nThreads, name)
internal class PoolThread(
@JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests
target: Runnable, name: String
) : Thread(target, name) {
init {
isDaemon = true
}
}
private class ClosedAfterGuideTestDispatcher(
private val nThreads: Int,
private val name: String
) : ExecutorCoroutineDispatcher() {
private val threadNo = AtomicInteger()
override val executor: Executor =
Executors.newScheduledThreadPool(nThreads, object : ThreadFactory {
override fun newThread(target: java.lang.Runnable): Thread {
return PoolThread(
this@ClosedAfterGuideTestDispatcher,
target,
if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()
)
}
})
override fun dispatch(context: CoroutineContext, block: Runnable) {
executor.execute(wrapTask(block))
}
override fun close() {
(executor as ExecutorService).shutdown()
}
override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
}
3、Main类型,也是用handler切换主线程
internal expect fun createMainDispatcher(default: CoroutineDispatcher): MainCoroutineDispatcher
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultIoScheduler
// MainDispatchers.kt
internal object MainDispatcherLoader {
private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
}
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
return HandlerContext(mainLooper.asHandler(async = true))
}
override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
// HandlerDispatcher.kt
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
...
}
public sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay {
}
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
}
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
...
}