上篇看了Flow的基本用法,这篇文章就从源码的角度来看看Flow的运行机制
1.Flow创建
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(100)
emit(i)
}
}
看一下flow函数的定义
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
参数类型为
@BuilderInference block: suspend FlowCollector<T>.() -> Unit
这里的参数,可以理解为 入参是一个函数,该函数是FlowCollector的一个扩展函数,没有入参,也没有出参(返回值为Unit,相当于java的void)。对于这块不理解的,可以参阅 这里
flow函数调用了 SafeFlow的构造函数
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
AbstractFlow 代码也比较简单,稍后再说
到这里,Flow创建圆满结束了
2.接收 collect 函数
前面介绍过,Flow为冷流,冷流不会发射数据,只有到了收集(末端操作符)的时候,数据才开始生产并被发射出去。接下来就来看看emit和collect怎么发生的关联。先来看一下collect函数
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
这里可以看出,FlowCollector的emit方法,实际上调用的是collect传入的action方法。但是,我们创建Flow的FlowCollector是如何与collect方法传入的FlowCollector产生关系的呢?
关键就在于SafeFlow这个类
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
AbstractFlow代码如下
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it. * * A valid implementation of this method has the following constraints: * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values. * The emission should happen in the context of the [collect] call. * Please refer to the top-level [Flow] documentation for more details. * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not * thread-safe by default. * To automatically serialize emissions [channelFlow] builder can be used instead of [flow] * * @throws IllegalStateException if any of the invariants are violated.
*/
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
到这里可以看出SafeFlow的collect方法,实际调用的是collectSafely方法,最终是collect生成的FlowCollector调用创建时传入的block方法。
有点绕,再捋一遍。
flow构造时,传入FlowCollector的扩展方法,我们称此方法为block
当collect方法调用时,传入参数action,首先将此action方法包装成FlowCollector,我们称之为safeCollector
而collect最终调用的为safeCollector.block
到此,我们就理解了,为什么Flow是冷流了,只有末端操作符才会调用其构造时的block
3.协程切换flowOn方法
直接看源码
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
这里的when方法比较有意思,没有参数。kotlin的when支持没有参数的条件跳转,无参时需要各种条件都是一个boolean型表达式, 参见这里
以ChannelFlowOperatorImpl为例来看一下
internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
override fun dropChannelOperators(): Flow<T> = flow
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}
这里没什么有价值的代码,由于ChannelFlowOperatorImpl继承自ChannelFlowOperator看一下ChannelFlowOperator的代码
internal abstract class ChannelFlowOperator<S, T>(
@JvmField protected val flow: Flow<S>,
context: CoroutineContext,
capacity: Int,
onBufferOverflow: BufferOverflow
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
protected abstract suspend fun flowCollect(collector: FlowCollector<T>)
// Changes collecting context upstream to the specified newContext, while collecting in the original context
private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {
val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)
// invoke flowCollect(originalContextCollector) in the newContext
return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)
}
// Slow path when output channel is required
protected override suspend fun collectTo(scope: ProducerScope<T>) =
flowCollect(SendingCollector(scope))
// Optimizations for fast-path when channel creation is optional
override suspend fun collect(collector: FlowCollector<T>) {
// Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
if (capacity == Channel.OPTIONAL_CHANNEL) {
val collectContext = coroutineContext
val newContext = collectContext + context // compute resulting collect context
// #1: If the resulting context happens to be the same as it was -- fallback to plain collect
if (newContext == collectContext)
return flowCollect(collector)
// #2: If we don't need to change the dispatcher we can go without channels
if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
return collectWithContextUndispatched(collector, newContext)
}
// Slow-path: create the actual channel
super.collect(collector)
}
// debug toString
override fun toString(): String = "$flow -> ${super.toString()}"
}
collect执行的时候,如果指定的协程与现在的不一致,则走collectWithContextUndispatched方法,走到下面这个方法
internal suspend fun <T, V> withContextUndispatched(
newContext: CoroutineContext,
value: V,
countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
block: suspend (V) -> T
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
withCoroutineContext(newContext, countOrElement) {
block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
}
}
withCoroutineContext这个方法就是协程切换的地方了。