本文需要读者对协程有基础的了解,关于协程的使用,可以参考官方教程:[play.kotlinlang.org/hands-on/In…play.kotlinlang.org/hands-on/In… to Coroutines and Channels/01_Introduction)
协程是什么?
协程库是Kotlin语言提供的一个库,用于处理异步和并发场景的框架。
"一个协程是一个可挂起计算的对象。在概念上与线程相似。从某种意义上说,协程代码块会与其他代码同时运行。然而,协程不受任何特定线程的约束。它可能会在某个线程挂起,然后在另一个线程被唤醒。"
从官方文档的描述理解,协程实际上就是一个对象。从下面的例子中,来理解协程到底是个什么东西,以及它和线程有什么不一样的地方。
printTimeAndString方法是一个打印时间和输入字符的方法,在main
函数中,通过GlobalScope.launch启动了一个协程,并且delay 1秒,在delay前后,打印时间和print in coroutine start/end
。然后,外部线程打印print in thread
,并睡眠2s,最后打印end结束main方法。
fun main() {
printTimeAndString("start")
GlobalScope.launch {
printTimeAndString("print in coroutine start")
delay(1000)
printTimeAndString("print in coroutine end")
}
printTimeAndString("print in thread")
Thread.sleep(2000)
printTimeAndString("end")
}
fun printTimeAndString(text: String) {
println("${Date()}: $text")
}
打印结果如下所示:
Sat Dec 11 19:09:21 CST 2021: start
Sat Dec 11 19:09:21 CST 2021: print in thread
Sat Dec 11 19:09:21 CST 2021: print in coroutine start
Sat Dec 11 19:09:22 CST 2021: print in coroutine end
Sat Dec 11 19:09:23 CST 2021: end
代码的逻辑顺序为:
- 打印start
- 打印“print in thread”
- 创建一个协程
- 打印"print in coroutine start"
- 线程进入阻塞状态
- 协程delay结束,打印"print in coroutine end"
- 线程阻塞结束,打印end
从打印结果来看,线程中的print in thread优先于协程执行,协程中的delay 1s,并没有造成线程阻塞,协程和线程中的逻辑在并行执行。
修改printTimeAndString
方法,加上打印当前线程信息,然后执行:
fun printTimeAndString(text: String) {
println("${Date()}: $text: ${Thread.currentThread()}")
}
打印结果:
Sat Dec 11 19:10:49 CST 2021: start: Thread[main,5,main]
Sat Dec 11 19:10:49 CST 2021: print in thread: Thread[main,5,main]
Sat Dec 11 19:10:49 CST 2021: print in coroutine start: Thread[DefaultDispatcher-worker-1,5,main]
Sat Dec 11 19:10:50 CST 2021: print in coroutine end: Thread[DefaultDispatcher-worker-1,5,main]
Sat Dec 11 19:10:51 CST 2021: end: Thread[main,5,main]
在这个打印结果中能够很容易发现,协程中的代码执行在另一个线程中,main方法协程以外的方法都是执行在main线程中。两者的线程不同,那么协程和线程的关系就是协程会创建新的线程来执行代码吗?
实际上并不是这样的,协程可以通过配置上下文元素,或者构造不同顶级协程来执行不同的协程,因为上述代码直接使用GlobalScope.launch
没有指定任何上下文,它会默认使用调度器元素Dispatchers.Default
,这个元素会指定协程在线程池中运行。
如果我们让协程指定在main线程中执行,需要替换GlobalScope.launch
为runBlocking
,runBlocking
会获取当前线程,并根据当前线程去创建一个协程:
fun main() {
printTimeAndString("start")
runBlocking {
printTimeAndString("print in coroutine start")
delay(1000)
printTimeAndString("print in coroutine end")
}
printTimeAndString("print in thread")
Thread.sleep(2000)
printTimeAndString("end")
}
打印结果为:
Sat Dec 11 19:12:02 CST 2021: start: Thread[main,5,main]
Sat Dec 11 19:12:02 CST 2021: print in coroutine start: Thread[main,5,main]
Sat Dec 11 19:12:03 CST 2021: print in coroutine end: Thread[main,5,main]
Sat Dec 11 19:12:03 CST 2021: print in thread: Thread[main,5,main]
Sat Dec 11 19:12:05 CST 2021: end: Thread[main,5,main]
首先,打印顺序发生了变化,原来先打印的“print in thread”
出现在"print in coroutine end"
之后;其次,打印时间发生了变化,整个代码执行了3秒,而使用GlobalScope.launch
用了2秒。这里可能看起来不明显,因为他们的时间长度很接近,读者们可以自己增加delay和sleep的时间自行检验;最后,他们都在同一个main线程上执行。
上面的变化说明,协程和线程在同一个线程上运行代码,协程会阻塞线程,再对比协程通过使用GlobalScope.launch
,不难理解,协程运行在线程之上,和普通代码没什么区别。协程的特殊能力是,通过实现不同的协程,可以将协程指定在不同的线程上运行。这也就是为什么网上都说,协程是一个切换线程的框架。
协程的优势
线程是CPU调度的最小单位,所有应用程序的代码都运行于线程之上。操作系统层面上提供了进程、线程的实现。如果是在 Android 平台上,我们会发现 Thread 的创建过程中,都会调用 Linux API 中的 pthread_create 函数,这直接说明了 Java 层中的 Thread 和 Linux 系统级别的中的线程是一一对应的。那么这就说明了,线程的创建与销毁,需要与操作系统进行交互,调度操作系统的资源。当需要大量创建和销毁时,系统资源就会造成浪费。所以线程,并不是很“轻量”。
协程与线程不同,协程本质上是代码、是对象。它需要在线程之上运行。所以,它很“轻量”。它不需要调用操作系统层面的方法和资源。所以,当你创建1000个协程对象,也只是内存上的问题。代码在线程上执行,所以协程也需要在线程上运行。除了每个协程都要新建一个线程来执行这种极端情况,本质上,线程与协程的轻重体现,就是创建一千个对象和一千个线程的区别。
协程的组成部分
在上一节中,我们使用GlobalScope.launch
启动了一个协程,就以launch
方法为例,来分析一下协程代码的组成:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
这是一个CoroutineScope的拓展方法,它有三个参数,一个返回值类型,加上方法本身,一共六个组成部分:
协程作用域 CoroutineScope
协程构建器 launch,协程作用域的拓展函数,协程的构建函数
协程上下文 CoroutineContext,可以通过协程上下文来指定协程调度器
协程启动项 CoroutineStart,定义协程构建的启动选项。 它用于启动、异步和其他协程构建器函数的启动参数
挂起代码块 block:suspend CoroutineScope.() -> Unit,挂起闭包,在指定上下文中调用的协程代码
协程任务 Job,可用于取消协程。
协程作用域 CoroutineScope
CoroutineScope是一个接口,而且内部只定义了一个CoroutineContext属性:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
CoroutineScope的作用是声明:实现CoroutineScope的类,都具有提供协程上下文的能力。这个接口不需要我们手动去实现,协程库提供了封装好的委托实现。
总结一下,CoroutineScope称之为协程作用域,除了从名字上翻译过来的原因外,它也代表着,CoroutineScope的对象,能够给自身内部提供协程上下文,有了协程上下文,才能使用协程,所以,在CoroutineScope对象的范围内,是协程的作用范围。
CoroutineScope构造方法
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job()) // 如果context不包含Job元素,默认创建Job()
GlobalScope 全局作用域
GlobalScope
实现了CoroutineScope
接口,是全局的协程作用域,可用于创建顶级协程。它们在应用程序的整个生命周期中都会存活,不会过早被取消。创建顶级协程时,也可使用带有适当调度器的CoroutineScope对象来启动协程;特定情况下(例如必须在程序的整个生命周期内保持活跃状态的顶级后台进程中)可以通过@OptIn(DelicateCoroutinesApi::class)来安全合法的使用GlobalScope。其他情况可以将其改造成挂起函数,例如:
fun loadConfiguration() {
GlobalScope.launch {
val config = fetchConfigFromServer() // network request
updateConfiguration(config)
}
}
替换方案如下:
suspend fun loadConfiguration() = coroutinesScope {
launch {
val config = fetchConfigFromServer() // network request
updateConfiguration(config)
}
}
这里简单介绍一个coroutineScope方法:
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont, true)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
coroutineScope方法内部会创建一个ScopeCoroutine,它也是CoroutineScope的实现类。这个方法在内部创建了一个CoroutineScope,并执行block。
MainScope
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
上下文自带SupervisorJob和Dispatchers.Main元素。
主要用于UI组件:
class Activity {
private val mainScope = MainScope()
fun destroy() {
mainScope.cancel()
}
}
协程构建器(coroutine builder)
概念:为新协程定义一个范围,协程构建器是CoroutineScope的拓展函数,并继承了CorourineScope的协程上下文来自动传递上下文元素和可取消性。
协程构建器函数有以下特点:
- CoroutineScope的拓展函数
- 会继承CoroutineScope的上下文
- 返回类型为Job
Scoping函数
withContext、coroutineScope等函数与launch不同,他们是挂起方法,一般都接收一个suspend闭包作为参数。在它们的方法内部会创建一个协程,并且可能会指定一些上下文元素(例如withContext),并将block透传,这类函数称为做Scoping函数。他们的结构一般是:
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
public suspend fun <T> withContext(context: CoroutineContext,block: suspend CoroutineScope.() -> T): T
桥接普通与挂起的方法
例如runBlocking等,这类函数与上面两种情况不同,他们可以将常规的代码切换到挂起代码。
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
runBlocking是个特例,它的作用是:创建一个新的协程并阻塞当前线程,直到协程结束。这个函数不应该在协程中使用,主要是为了main函数和测试设计的。
协程上下文
CoroutineContext是一个接口:
public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E? // 通过key获取元素
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// 确保拦截器始终在上下文中最后(因此在出现时可以快速获取)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
public fun minusKey(key: Key<*>): CoroutineContext
public interface Key<E : Element>
public interface Element : CoroutineContext {
//...
}
}
CoroutineContext中定义了运算符重载方法get、plus:
- get方法可以让我们通过
context[key]
的形式获取元素 - plus方法可以让我们以
Dispatchers.Default + SuperviorJob()
的形式拼接上下文元素。
从get和plus方法的作用就可以看出,CoroutineContext实际上就是一组元素的集合。
既然是一组元素的集合,那么它的数据结构是什么?
CoroutineContext的数据结构
这里主要是plus方法最后合并了一个CombinedContext
:
internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {
override fun <E : Element> get(key: Key<E>): E? {
var cur = this
while (true) {
cur.element[key]?.let { return it }
val next = cur.left
if (next is CombinedContext) {
cur = next
} else {
return next[key]
}
}
}
// ...
}
CombinedContext有两个属性left
和element
,很明显这是一个链表的节点。所以CoroutineContext内部的数据结构是链表。
协程启动项 CoroutineStart
CoroutineStart是一个枚举类:
public enum class CoroutineStart {
DEFAULT,
LAZY,
ATOMIC,
UNDISPATCHED;
@InternalCoroutinesApi
public val isLazy: Boolean get() = this === LAZY
}
在协程构建器中,用于launch、async和其他协程构造函数的启动参数。
- DEFAULT:根据上下文立即安排协程执行。
- LAZY:在需要的时候才启动协程,如果协程 Job 在它开始执行之前被取消,那么它根本不会开始执行,而是会以异常结束。
- ATOMIC:原子地(以不可取消的方式)根据上下文安排协程执行,这与 DEFAULT 类似,但协程在开始执行之前无法取消。
- UNDISPATCHED:立即执行协程,直到它在当前线程中的第一个暂停点,类似于使用 Dispatchers.Unconfined 启动的协程。但是,当协程从暂停状态恢复时,它会根据其上下文中的 CoroutineDispatcher 进行调度。
suspend () -> Unit
suspend T.() -> Unit
在Kotlin 1.6版本中,suspend () -> Unit
可以作为父类型:
// 定义
class MyClickAction : suspend () -> Unit {
override suspend fun invoke() { TODO() }
}
fun launchOnClick(action: suspend () -> Unit) {}
// 调用
launchOnClick(MyClickAction())
需要注意的是,suspend () -> Unit
作为父类型有两个限制:
-
不能混合使用挂起block和非挂起block作为父类。
// Compiler Error: Mixing suspend and non-suspend supertypes is not allowed class MyClickAction : suspend () -> Unit, String 复制代码
-
不能继承多个挂起类型。
因为,每个挂起都有一个invoke函数,继承多个可挂起类型时,invoke函数会混乱。
另外,普通的() -> Unit
也可以传给fun getSuspending(suspending: suspend () -> Unit) {}
,Kotlin 1.6.0 引入了从常规Block到挂起Block类型的转换。编译器会自动将普通的block自动转换为suspend的block。
还有一种形式:
suspend {
// ...
}
与上面的suspend关键字不同,这里的suspend是一个函数:
public inline fun <R> suspend(noinline block: suspend () -> R): suspend () -> R = block
suspend方法的用处是,通常在普通的函数或lambda中,不能直接使用suspend关键字,且lambda没有参数的情况下,可以使用这个方法创建一个挂起函数。
协程任务 Job
继承自CoroutineContext.Element
,是一种上下文元素。Job代表了当前协程任务的对象,赋予了协程结构化并发、生命周期和可取消的能力。
A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.
一个后台工作。 从概念上讲,工作是一个可取消的东西,其生命周期以完成为顶点。
public interface Job : CoroutineContext.Element {
public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean
public fun start(): Boolean
public fun cancel(cause: CancellationException? = null)
public suspend fun join()
}
Job代表的含义是,封装了协程中需要执行的代码逻辑。Thread
类可以代表线程在Java中的对象引用,Job
则是代表了协程对象引用。
Job的状态
与Thread相同,Job也有几个状态来表示协程的状态:
State | isActive(是否活跃) | isCompleted(是否完成) | isCancelled(是否取消) |
---|---|---|---|
New (可选初始状态) | false | false | false |
Active (默认初始状态) | true | false | false |
Completing (短暂态) | true | false | false |
Cancelling (短暂态) | false | false | true |
Cancelled (完成态) | false | true | true |
Completed (完成态) | false | true | false |
-
New
启动选项CoroutineStart设置为
CoroutineStart.LAZY
的情况下,Job的状态是New(创建但未启动),可以通过调用start或join方法来启动Job。 -
Active
通常情况下的Job是Active状态的(创建且已启动),然而,当launch的启动选项CoroutineStart设置为
CoroutineStart.LAZY
的情况下,Job的状态是New(创建但未启动),可以通过调用start或join方法来启动Job。当协程正在运行时、直到完成或协程失败或取消前,Job的状态都是是Active。
-
Cancelling
抛出异常的Job会导致其进入Cancelling状态,也可以使用cancel方法来随时取消Job使其立即转换为Cancelling状态。
直到Job等待其所有子项都取消前,一直是Cancelling状态。
-
Cancelled
当它递归取消子项,并等待所有的子项都取消后,该Job会进入Cancelled状态。
-
Completing
与Cancelling相同,在等待所有的子项完成时,会进入这个状态。需要注意的是这个完成中的状态是对于内部的子项的,对于外部观察者看来,父Job仍是Active状态。
-
Completed
当所有子项都完成,该Job会变为Completed状态。
Job的状态流转:
wait children
+-----+ start +--------+ complete +-------------+ finish +-----------+
| New | -----> | Active | ---------> | Completing | -------> | Completed |
+-----+ +--------+ +-------------+ +-----------+
| cancel / fail |
| +----------------+
| |
V V
+------------+ finish +-----------+
| Cancelling | --------------------------------> | Cancelled |
+------------+ +-----------+
关于取消,有这样一行备注:
其中取消父项会导致立即递归取消其所有子项。 具有除 CancellationException 之外的异常的子项失败会立即取消其父级,从而取消其所有其他子项。 可以使用 SupervisorJob 自定义此行为。
SupervisorJob
SupervisorJob是Job的实现类。与Job的区别是,它的子项可以单独失败,不会导致父协程和其他子协程的失败。
所以SupervisorJob可以实现一个自定义规则的失败子项处理机制:
- 使用 [launch][CoroutineScope.launch] 创建的子Job失败可以通过上下文中的 CoroutineExceptionHandler 处理。
- 使用 [async][CoroutineScope.async] 创建的子Job的失败可以通过 Deferred.await 对生成的延迟值进行处理。
如果参数parent指定了值,这个supervisor job就变成了parent的子项,并且当parent失败或取消时被取消。所有的supervisor的子项也会被取消。调用cancel方法若有异常(除了CancellationException),也会导致parent取消。
取消机制是一个比较复杂的专题,单独分享。
获取Job对象
获取Job对象的方式有两种:
- 通过协程构建器方法的返回值获取Job对象。
- 通过构造方法直接创建,这种方式通常用于协程上下文中。
val job = Job()
// 构造方法
@Suppress("FunctionName")
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)
从概念上讲,直接 val job = Job()
与使用launch创建一个空代码块相同,即等同于:
val job = launch {
// no opt
}
协程拦截器 ContinuationInterceptor
ContinuationInterceptor用来在协程真正运行前,执行拦截操作,拦截器模式中的定义接口。
协程调度器
在分析线程与协程的区别中,我们提到过一个词 -- “协程调度器”,当我们给协程配置了调度器后,协程所在的线程发生了变化。很明显协程调度器中隐藏着线程切换的秘密。
协程调度器也是一种协程上下文元素:
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// ...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
}
CoroutineDispatcher中定义了调度的关键方法dispatch
。CoroutineDispatcher最常见的实现是一个单例 Dispatchers
:
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
Dispatchers
的作用是对调度器进行分组。实际上,CoroutineDispatcher的实现类有很多,例如EventLoop、CommonPool等,而这些,都会在不同的组中使用到。
Default
如果没有在上下文中指定Dispatcher或任何其他的 ContinuationInterceptor,则所有标准构建器(如launch、async等),默认使用CoroutineDispatcher。
它由 JVM 上的共享线程池提供支持。 默认情况下,此调度程序使用的最大并行级别等于 CPU 内核数,但至少为 2。 并行度 X 保证在这个调度器中并行执行的任务不超过 X 个。
internal fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
useCoroutinesScheduler的实现:
internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
when (value) {
null, "", "on" -> true
"off" -> false
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
}
}
Default会根据useCoroutinesScheduler属性(默认为true) 去获取对应的线程池:
- DefaultScheduler :Kotlin内部自己实现的线程池逻辑
- CommonPool:Java类库中的Executor实现的线程池逻辑
DefaultScheduler
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
// ...
}
DefaultScheduler中定义了一个IO属性,他就是 Dispatchers.IO的实现。
另外DefaultScheduler的dispatch方法在父类ExperimentalCoroutineDispatcher中:
@InternalCoroutinesApi
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
// ...
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
}
public fun limited(parallelism: Int): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
}
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}
分发逻辑在CoroutineScheduler中:
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
val task = createTask(block, taskContext)
// 尝试将任务提交到本地队列并根据结果采取行动
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// 全局队列在关闭/关闭的最后一步关闭——不应接受更多任务
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
signalBlockingWork(skipUnpark = skipUnpark)
}
}
// ...
}
大致逻辑是,首先将block转换成了一个Task(实际是一个runnable),然后提交到本地队列,等待执行。
CommonPool
internal object CommonPool : ExecutorCoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
// CommonPool only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.enqueue(block)
}
}
// ...
}
CommonPool的实现是java类库中的Executor实现线程池逻辑。
IO
public val IO: CoroutineDispatcher = DefaultScheduler.IO
IO组直接使用的DefaultScheduler的IO对象,是一个LimitingDispatcher对象。
其分发实现为:
private class LimitingDispatcher(
private val dispatcher: ExperimentalCoroutineDispatcher,
private val parallelism: Int,
private val name: String?,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
private val queue = ConcurrentLinkedQueue<Runnable>()
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
// 提交进行中的任务
val inFlight = inFlightTasks.incrementAndGet()
// 快速路径,如果没有达到并行度限制,则分发任务并返回
if (inFlight <= parallelism) {
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
// 达到并行度限制,将任务添加到队列中
queue.add(taskToSchedule)
if (inFlightTasks.decrementAndGet() >= parallelism) {
return
}
taskToSchedule = queue.poll() ?: return
}
}
// ...
}
可以看出 Dispatchers.Default和IO 是在同一个线程中运行的,也就是共用相同的线程池。但是IO多了线程数量限制。在CoroutineScheduler中:
- CoroutineScheduler最多有corePoolSize个线程被创建
- corePoolSize它的取值为max(2, CPU核心数)
而在LimitingDispatcher中:
1、创建线程数不能大于maxPoolSize ,公式:max(corePoolSize, min(CPU核心数 * 128, 2^21 - 2))。
Main
需要引入kotlinx-coroutines-android库,它里面有Android对应的Dispatchers.Main实现。
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// 在Android上编译时,启动R8优化
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
通过FAST_SERVICE_LOADER_ENABLED
字段判断实现逻辑,FAST_SERVICE_LOADER_ENABLED
默认为true,FastServiceLoader.loadMainDispatcherFactory
的实现为:
internal fun loadMainDispatcherFactory(): List<MainDispatcherFactory> {
val clz = MainDispatcherFactory::class.java
if (!ANDROID_DETECTED) {
return load(clz, clz.classLoader)
}
return try {
val result = ArrayList<MainDispatcherFactory>(2)
createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory")?.apply { result.add(this) }
createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) }
result
} catch (e: Throwable) {
// Fallback to the regular SL in case of any unexpected exception
load(clz, clz.classLoader)
}
}
internal val ANDROID_DETECTED = runCatching { Class.forName("android.os.Build") }.isSuccess
关于这个方法,官方描述:
此方法尝试以 Android 友好的方式加载 MainDispatcherFactory。 如果我们不是在 Android 上,则此方法回退到常规服务加载,否则我们尝试为 AndroidDispatcherFactory 和 TestMainDispatcherFactory 执行 Class.forName 查找。
所以,Dispatchers.Main
是和Android平台强相关的,底层原理大概是:通过调用Looper.getMainLooper()获取handler ,最终通过handler来实现在主线程中运行。 实质就是把任务通过Handler运行在Android的主线程。
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true))
override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
Unconfined
Unconfined
的含义是,任务执行在默认的启动线程。之后由调用resume的线程决定恢复协程的线程:
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
internal object Unconfined : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) {
// It can only be called by the "yield" function. See also code of "yield" function.
val yieldContext = context[YieldContext]
if (yieldContext != null) {
// report to "yield" that it is an unconfined dispatcher and don't call "block.run()"
yieldContext.dispatcherWasUnconfined = true
return
}
throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
"If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
"isDispatchNeeded and dispatch calls.")
}
}
这个方法的逻辑,与DispatchedContinuation的有关。
DispatchedContinuation
是什么,首先,协程的继承关系顶层有一个Continuation
,它可以表示一个协程,而且,Continuation
中定义了协程可恢复的能力,rsumeWith
方法。而DispatchedContinuation
是Continuation
的一个实现,它的resumeWith
:
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
通过isDispatchNeeded
(是否需要dispatch,Unconfined=false,default,IO=true)判断做不同处理,Unconfined
是执行executeUnconfined
方法:
private inline fun DispatchedContinuation<*>.executeUnconfined(
contState: Any?, mode: Int, doYield: Boolean = false,
block: () -> Unit
): Boolean {
val eventLoop = ThreadLocalEventLoop.eventLoop
// If we are yielding and unconfined queue is empty, we can bail out as part of fast path
if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
return if (eventLoop.isUnconfinedLoopActive) {
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
_state = contState
resumeMode = mode
eventLoop.dispatchUnconfined(this)
true // queued into the active loop
} else {
// Was not active -- run event loop until all unconfined tasks are executed
runUnconfinedEventLoop(eventLoop, block = block)
false
}
}
这个方法会从Threadlocal中取出eventLoop(eventLoop是和当前线程相关的),判断是否在执行Unconfined任务
如果在执行则调用EventLoop的dispatchUnconfined方法把Unconfined任务放进EventLoop中。
-
如果没有在执行则直接执行。
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( eventLoop: EventLoop, block: () -> Unit ) { eventLoop.incrementUseCount(unconfined = true) try { block() while (true) { if (!eventLoop.processUnconfinedEvent()) break } } catch (e: Throwable) { handleFatalException(e, null) } finally { eventLoop.decrementUseCount(unconfined = true) } }
EventLoop
是存放在Threadlocal
中,所以是跟当前线程相关联的,而EventLoop
也是CoroutineDispatcher
的一个子类。
internal abstract class EventLoop : CoroutineDispatcher() {
private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
public fun dispatchUnconfined(task: DispatchedTask<*>) {
val queue = unconfinedQueue ?:
ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
queue.addLast(task)
}
public fun processUnconfinedEvent(): Boolean {
val queue = unconfinedQueue ?: return false
val task = queue.removeFirstOrNull() ?: return false
task.run()
return true
}
}
内部通过一个双向队列实现存放Unconfined任务:
-
EventLoop
的dispatchUnconfined
方法用于把Unconfined任务放进队列的尾部 -
processUnconfinedEvent
方法用于从队列的头部移出Unconfined任务执行
In the end
到此协程的基本组成介绍的差不多了,可能大家会觉得还有一些概念,比如挂起是什么等等没讲,这些问题会在后续的文章中,给大家一一讲清楚。
另外,关于协程的使用,可以参考官方教程:[play.kotlinlang.org/hands-on/In…play.kotlinlang.org/hands-on/In… to Coroutines and Channels/01_Introduction)
本文内容较多,如有错误和建议敬请指正。
作者:自动化BUG制造器
链接:https://juejin.cn/post/7040462292944683016