前两天在线上发现了这样的一个异常:
IllegalStateException
Flow exception transparency is violated:
Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
然后我就全局搜了一下这个错误,发现是在下面的这个方法中抛出来的:
kotlinx.coroutines.flow.internal.SafeCollector#exceptionTransparencyViolated
private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
/*
* Exception transparency ensures that if a `collect` block or any intermediate operator
* throws an exception, then no more values will be received by it.
* For example, the following code:
* ```
* val flow = flow {
* emit(1)
* try {
* emit(2)
* } catch (e: Exception) {
* emit(3)
* }
* }
* // Collector
* flow.collect { value ->
* if (value == 2) {
* throw CancellationException("No more elements required, received enough")
* } else {
* println("Collected $value")
* }
* }
* ```
* is expected to print "Collected 1" and then "No more elements required, received enough" exception,
* but if exception transparency wasn't enforced, "Collected 1" and "Collected 3" would be printed instead.
*/
error("""
Flow exception transparency is violated:
Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
""".trimIndent())
什么情况下才会有上面这样的错误呢?
我们把上面注释中的代码 copy 出来运行一下:
suspend fun main() {
val flow = flow {
emit(1)
try {
emit(2)
} catch (e: Exception) {
emit(3)
}
}
// Collector
flow.collect { value ->
if (value == 2) {
throw CancellationException("No more elements required, received enough")
} else {
println("Collected $value")
}
}
}
问题复现了。那这个问题是说什么呢?翻译一下就是:
Flow 流违反了异常透明度:
前一个 “emit” 调用已经抛出了一个 exception,但是在这之后检测到还在发射 $value。为了避免未指定的行为,禁止从 catch 块中发射数据,可以使用
Flow.catch
来替代。
上面的话有些难理解,通俗来讲就是禁止在 try { emit(xx) } catch
的 catch 块中发射数据,这样的话会违反 Flow 异常的透明性。
下面我们来分析一下 Flow 的代码来看看为什么会违反 Flow 异常的透明。
flow
方法的代码如下:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
注意这里的 block 函数是我们的 lamble 表达式,也就是我们调用 emit 发射数据的函数,它是 FlowCollector<T>.() -> Unit
类型的扩展函数。
flow 函数返回了一个 SafeFlow
传入了 block 函数。
接下来我们来看看 collect
的函数:
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
collect
函数定义在 Flow
接口中。这里我们调用 collect
函数传入的是类型为 FlowCollector<T>
的函数式接口。上面调用 flow
返回的是 SafeFlow
,所以 collect 函数的实现肯定也在 SafeFlow
这个类中。我们来看看 SafeFlow
的实现。
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
// ...
}
SafeFlow
继承自 AbstractFlow
,我们这里调用的是 AbstractFlow
的 collect
方法,这里创建了一个 SafeCollector
,然后调用了 collectorSafely
方法,在 SafeFlow 的 collectSafely
方法中又调用了 block
方法,注意这里的 block
就是调用 flow
函数传入的 lambda 代表的扩展方法。(没想到吧,还能这么玩)
所以我们调用的 emit 发射方法的实现是在 SafeCollector
中。如下:
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
emit(uCont, value)
} catch (e: Throwable) {
// Save the fact that exception from emit (or even check context) has been thrown
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}
emit
是一个 suspend
的方法,通过调用 suspendCoroutineUninterceptedOrReturn
方法获取到当前 suspend 函数的 Continuation
,然后调用了非 suspend 的 emit 方法。注意这里的 try-catch
,如果调用非 emit
的方法出现了异常,将会把异常赋值给 lastEmissionContext
,然后抛出当前捕获的异常。接下来我们来看看 非 suspend
的 emit
方法。
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context // 当前的协程的 context
currentContext.ensureActive() // 确保当前协程处于 active 状态(也就是没有取消,或者发生错误等)
// This check is triggered once per flow on happy path.
val previousContext = lastEmissionContext // 前一次的协程 context,如果前面 emit 发生了异常,那么这里的 previouseContext 将会是 DownstreamExceptionElement
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value) // 检查
lastEmissionContext = currentContext
}
completion = uCont
val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
/*
* If the callee hasn't suspended, that means that it won't (it's forbidden) call 'resumeWith` (-> `invokeSuspend`)
* and we don't have to retain a strong reference to it to avoid memory leaks.
*/
if (result != COROUTINE_SUSPENDED) {
completion = null
}
return result
}
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
if (previousContext is DownstreamExceptionContext) {
exceptionTransparencyViolated(previousContext, value)
}
checkContext(currentContext)
}
在方法的一开始获取了当前协程的 Context,然后将 lastEmissionContext
赋值给 previouseContext
。如果当前的 context 的引用不等于前一个 context 的引用(注意这里用的是 !==
),那么就会调用 checkContext
来进行检查,在检查完成后会把 lastEmissionContext
赋值未当前的 currentContext
。
在 checkContext
中,如果 previousContext
是 DownstreamExceptionContext
那么就会调用 exceptionTransparencyViolated
来抛出我们最开始提到的异常了。
private fun exceptionTransparencyViolated(exception: DownstreamExceptionContext, value: Any?) {
/*
* Exception transparency ensures that if a `collect` block or any intermediate operator
* throws an exception, then no more values will be received by it.
* For example, the following code:
* ```
* val flow = flow {
* emit(1)
* try {
* emit(2)
* } catch (e: Exception) {
* emit(3)
* }
* }
* // Collector
* flow.collect { value ->
* if (value == 2) {
* throw CancellationException("No more elements required, received enough")
* } else {
* println("Collected $value")
* }
* }
* ```
* is expected to print "Collected 1" and then "No more elements required, received enough" exception,
* but if exception transparency wasn't enforced, "Collected 1" and "Collected 3" would be printed instead.
*/
error("""
Flow exception transparency is violated:
Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
""".trimIndent())
那么什么情况下会触发这个异常呢?我们可以看到官方的注释中给出了一个例子。从上面的分析中我们也可以得出结论,如果我们对 emit
方法进行 try-catch
,并且在 catch
中调用 emit
发射数据那么就会抛出上面的异常了。
思考一下:为什么要抛出这样的异常?
我们来继续分析,等分析完后,你可能就会有答案了。
检查没问题的话,接下来调用了 emitFun
。
@Suppress("UNCHECKED_CAST")
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
public fun interface FlowCollector<in T> {
/**
* Collects the value emitted by the upstream.
* This method is not thread-safe and should not be invoked concurrently.
*/
public suspend fun emit(value: T)
}
这里对 Kotlin 和 Java 理解不够深的话,理解起来还是比较困难的。我们逐步来拆解。emitFun
是一个变量(这里不能说是一个函数类型的变量,因为它的类型不是一个高级函数),它的类型是 Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
。那么为什么能把 FlowCollector<Any?>::emit
转换为 Function3
呢?
首先从 Jvm 字节码的角度出发,Follector
的 emit
就是一个方法,emit
的参数列表中的第一个参数 this
就是 Follector
,最后一个参数是 Kotlin 协程 suspend 函数添加的 Continuation
,所以这里可以转换为 Funcation3
。
需要注意的,这里的 collector 就是我们调用 collect
时传入的 lambda 表达式所对应的对象,所以这里相当于调用了它的 emit
函数。可能还是有点难以理解,我们把上面的程序中的 lambda 表达式全部替换成匿名内部类,再来理解一下。
suspend fun main() {
val flow = flow {
emit(1)
try {
emit(2)
} catch (e: Exception) {
emit(3)
}
}
// Collector
flow.collect(object : FlowCollector<Int> {
override suspend fun emit(value: Int) {
if (value == 2) {
throw CancellationException("No more elements required, received enough")
} else {
println("Collected $value")
}
}
})
}
所以我们在 flow
中传入的 lambda 表达式中调用 emit
方法是 SafeCollector
的 emit
方法,这个 emit
方法,最终会调用到我们调用 collect
传入的 FlowCollector
的 emit
方法。
那么这里就会有一个问题,我们在 flow 上游发射数据,如果对 emit 方法加 try-catch,那么可能会 catch 到下游的异常,这违反了 flow 异常的透明性。