前言
在上一篇文章中一个案例让你秒懂kotlin flow原理,已经讲清楚了flow的数据流向是怎么回事?以及flow为什么叫做冷流?但是却“遗漏”了上游发射数据到下游这个过程,这个过程牵扯到一个重要的类SafeCollector
,说是遗漏,实际上是我故意没有详解的,因为这个东西可能有点复杂,把它加进去很容易造成读者无法把控全局,产生疑惑的问题,但是它又对深刻理解底层原理有很大的作用,所以我决定把它单独拎出来讲,这篇文章可能有点烧脑,希望阅读的你尽可能看完,如果最后实在看不懂也没关系,因为你的面试官可能也没有深挖过。文末我会尽可能把干货总结写出来。
先回顾下,这是上一篇文章总结的图
直奔主题,本文探讨的是如下问题
- flow发射的安全性是怎么回事?
- flow的挂起恢复是怎么完成的?
- flow中的emit居然是个方法引用?为什么要这样做?
SafeCollector类源码
在flow{ }
代码块中调用emit发射数据的时候,实际调用的是SafeCollector的emit方法。
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
//上一次协程上下文
private var lastEmissionContext: CoroutineContext? = null
// 挂起执行完后的续体
private var completion: Continuation<Unit>? = null
//协程上下文
override val context: CoroutineContext
get() = lastEmissionContext ?: EmptyCoroutineContext
override fun invokeSuspend(result: Result<Any?>): Any {
result.onFailure { lastEmissionContext = DownstreamExceptionContext(it, context) }
completion?.resumeWith(result as Result<Unit>)
return COROUTINE_SUSPENDED
}
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
//uCont是个Continuation
try {
emit(uCont, value) //关键就是调用这个重载方法
} catch (e: Throwable) {
lastEmissionContext = DownstreamExceptionContext(e, uCont.context)
throw e
}
}
}
先来看下这个emit重载方法,再来解释suspendCoroutineUninterceptedOrReturn
作用。
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
//Continuation是持有协程上下文和保存协程状态的
val currentContext = uCont.context
currentContext.ensureActive()
// This check is triggered once per flow on happy path.
val previousContext = lastEmissionContext
//关键点1,检查协程上下文是否发生了变化,如果是,会抛出异常
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
lastEmissionContext = currentContext
}
completion = uCont
//关键点2,emitfun方法就是把数据发到FlowCollector中去
val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
/*
* If the callee hasn't suspended, that means that it won't (it's forbidden) call 'resumeWith` (-> `invokeSuspend`)
* and we don't have to retain a strong reference to it to avoid memory leaks.
*/
if (result != COROUTINE_SUSPENDED) {
completion = null
}
return result
}
先看下协程上下文变化是什么意思?
//demo1
flow {
emit(1)
withContext(Dispatchers.IO){
emit(2)
}
}.collect {
println("collect value: $it")
}
//demo2
flow {
launch {
emit(1)
}
launch {
emit(2)
}
}.collect{
println("collect value: $it")
}
在上面demo中。再调用emit方法时用了withContext或者launch都是改变协程上下文。这2个例子都会抛出运行时异常。可以使用flowOn
做切换
再看下关键点2emitFun
方法
它接受三个参数: collector ,value 和 Continuation 。它的作用是将 value 发射到 collector 中,并在发射完成后继续执行当前协程。 Continuation 对象用于在发射完成后恢复协程的执行。
真正调用的是这个方法
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
科普
先看这个
FlowCollector<Any?>::emit
,这里::
双引号是java语法中的方法引用,方法引用是一种更简洁、更易读的代码风格。方法引用可以用于 lambda 表达式、函数式接口等多种场景。方法引用的语法格式为类名::方法名
或对象名::方法名
或类名::new (构造函数引用)
。
看懂了方法引用,接着解释。emitFun
方法是将FlowCollector中只有一个参数的emit方法转换成三个参数的方法。
这样做的目:是为了实现更高级的功能和更好地与 Kotlin 协程集成。通过添加
Continuation<Unit>
参数,emit
方法成为了一个挂起函数,使得它可以在协程上下文中使用。Continuation
参数用于管理协程的执行状态,包括异常处理和挂起后恢复协程的操作。 此外,三参数的emit
方法允许更好地控制协程中数据的流动。通过向emit
方法传递Continuation
对象,协程可以在数据流中的特定点被挂起和恢复,从而实现对数据流的更精细控制。这在需要按特定顺序或具有特定时间要求处理数据的情况下非常有用。 总之,通过将emit
方法转换为带有Continuation
参数的挂起函数,Kotlin Flow 提供了更好的协程集成和更高级的功能,可以更好地管理协程中的数据流。
简单说,这个方法最终调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。
再来看下Flow的collect方法
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
关注2点
- FlowCollector的emit方法是个挂起函数
- 这个
action(value)
怎么理解呢?
flow {
emit(1)
emit(2)
}.collect {
//action(value)实际上就是调用的collect代码块啊
println(it)
}
到这里,我们可以知道,flow的消费过程是指的FlowCollector的emit方法的执行,也就是collect{ }
代码块的执行。如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,继续执行flow代码块中剩下的代码,如果发生了挂起,情况会稍有不同,下面我们要讨论的就是挂起这种情况。
关于消费过程挂起的情况
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
emit(uCont, value)
} catch (e: Throwable) {
lastEmissionContext = DownstreamExceptionContext(e, uCont.context)
throw e
}
}
}
在这段代码中, emit
是个挂起函数,用于向下游发射值。如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED
对象,suspendCoroutineUninterceptedOrReturn
方法在收到COROUTINE_SUSPENDED
后,会挂起当前协程。
这里使用 kotlin协程中的suspendCoroutineUninterceptedOrReturn
函数来挂起当前协程并等待异步操作完成。这个算是协程比较底层的东西,大家平时可能接触不到,理解下就行了。在这里, uCont 是一个 Continuation 对象,它用于在异步操作完成时恢复协程。如果异步操作成功完成, emit 方法将返回发射的值。如果异步操作失败, emit 方法将抛出异常。
挂起后的恢复
当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。
挂起执行完后的恢复要调用续体的resumeWith
方法。但是SafeCollector
并没有重写这个方法,所以最终是调用它的父类``BaseContinuationImpl的
resumeWith`方法。
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
这个方法内部会调用invokeSuspend
方法,这是个抽象方法,它是在SafeCollector
中重写的。
override fun invokeSuspend(result: Result<Any?>): Any? {
// 获取异常
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
// 如果没有异常,则恢复flow续体的执行
completion?.resumeWith(result as Result<Unit>)
// 返回挂起标识(这是协程挂起函数通用的),这里挂起的是消费过程
return COROUTINE_SUSPENDED
}
总结
- 调用
emit
时,不要改变协程上下文,否则会抛异常 - emit方法调用最终是调用的FlowCollector的emit方法,也就是collect代码块中的逻辑
- 消费过程中的挂起恢复流程没那么复杂,collect的emit方法本身是个挂起函数,如果执行消费过程发生了挂起操作,SafeCollector的emit会返回挂起标识
COROUTINE_SUSPENDED
,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED
后,会挂起当前协程并等待异步操作完成。挂起后的恢复是通过调用续体的resumeWith
方法完成的,然后恢复生产,继续执行flow代码块。
推荐阅读
Kotlin协程:创建、启动、挂起、恢复