Kotlin Flow中的SafeCollector竟如此重要

前言

在上一篇文章中一个案例让你秒懂kotlin flow原理,已经讲清楚了flow的数据流向是怎么回事?以及flow为什么叫做冷流?但是却“遗漏”了上游发射数据到下游这个过程,这个过程牵扯到一个重要的类SafeCollector,说是遗漏,实际上是我故意没有详解的,因为这个东西可能有点复杂,把它加进去很容易造成读者无法把控全局,产生疑惑的问题,但是它又对深刻理解底层原理有很大的作用,所以我决定把它单独拎出来讲,这篇文章可能有点烧脑,希望阅读的你尽可能看完,如果最后实在看不懂也没关系,因为你的面试官可能也没有深挖过。文末我会尽可能把干货总结写出来。

先回顾下,这是上一篇文章总结的图


image.png

直奔主题,本文探讨的是如下问题

  • flow发射的安全性是怎么回事?
  • flow的挂起恢复是怎么完成的?
  • flow中的emit居然是个方法引用?为什么要这样做?

SafeCollector类源码

flow{ }代码块中调用emit发射数据的时候,实际调用的是SafeCollector的emit方法。

internal actual class SafeCollector<T> actual constructor(
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {


    //上一次协程上下文
    private var lastEmissionContext: CoroutineContext? = null
    // 挂起执行完后的续体
    private var completion: Continuation<Unit>? = null

    //协程上下文
    override val context: CoroutineContext
        get() = lastEmissionContext ?: EmptyCoroutineContext

    override fun invokeSuspend(result: Result<Any?>): Any {
        result.onFailure { lastEmissionContext = DownstreamExceptionContext(it, context) }
        completion?.resumeWith(result as Result<Unit>)
        return COROUTINE_SUSPENDED
    }



    override suspend fun emit(value: T) {
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
//uCont是个Continuation
            try {
                emit(uCont, value) //关键就是调用这个重载方法
            } catch (e: Throwable) {
                lastEmissionContext = DownstreamExceptionContext(e, uCont.context)
                throw e
            }
        }
    }

先来看下这个emit重载方法,再来解释suspendCoroutineUninterceptedOrReturn作用。

 private fun emit(uCont: Continuation<Unit>, value: T): Any? {
//Continuation是持有协程上下文和保存协程状态的
        val currentContext = uCont.context
        currentContext.ensureActive()
        // This check is triggered once per flow on happy path.
        val previousContext = lastEmissionContext

//关键点1,检查协程上下文是否发生了变化,如果是,会抛出异常
        if (previousContext !== currentContext) {
            checkContext(currentContext, previousContext, value)
            lastEmissionContext = currentContext
        }

        completion = uCont
//关键点2,emitfun方法就是把数据发到FlowCollector中去
        val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
        /*
         * If the callee hasn't suspended, that means that it won't (it's forbidden) call 'resumeWith` (-> `invokeSuspend`)
         * and we don't have to retain a strong reference to it to avoid memory leaks.
         */
        if (result != COROUTINE_SUSPENDED) {
            completion = null
        }
        return result
    }

先看下协程上下文变化是什么意思?

//demo1
flow {
            emit(1)
            withContext(Dispatchers.IO){
                emit(2)
            }
        }.collect {
            println("collect value: $it")
        }

//demo2
        flow {
            launch {
                emit(1)
            }
            launch {
                emit(2)
            }
        }.collect{
            println("collect value: $it")
        }

在上面demo中。再调用emit方法时用了withContext或者launch都是改变协程上下文。这2个例子都会抛出运行时异常。可以使用flowOn做切换

再看下关键点2emitFun 方法
它接受三个参数: collector ,value 和 Continuation 。它的作用是将 value 发射到 collector 中,并在发射完成后继续执行当前协程。 Continuation 对象用于在发射完成后恢复协程的执行。

真正调用的是这个方法

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

科普

先看这个FlowCollector<Any?>::emit,这里::双引号是java语法中的方法引用,方法引用是一种更简洁、更易读的代码风格。方法引用可以用于 lambda 表达式、函数式接口等多种场景。方法引用的语法格式为 类名::方法名对象名::方法名类名::new (构造函数引用)

看懂了方法引用,接着解释。emitFun方法是将FlowCollector中只有一个参数的emit方法转换成三个参数的方法。

这样做的目:是为了实现更高级的功能和更好地与 Kotlin 协程集成。通过添加 Continuation<Unit> 参数, emit 方法成为了一个挂起函数,使得它可以在协程上下文中使用。 Continuation 参数用于管理协程的执行状态,包括异常处理和挂起后恢复协程的操作。 此外,三参数的 emit 方法允许更好地控制协程中数据的流动。通过向 emit 方法传递 Continuation 对象,协程可以在数据流中的特定点被挂起和恢复,从而实现对数据流的更精细控制。这在需要按特定顺序或具有特定时间要求处理数据的情况下非常有用。 总之,通过将 emit 方法转换为带有 Continuation 参数的挂起函数,Kotlin Flow 提供了更好的协程集成和更高级的功能,可以更好地管理协程中的数据流。

简单说,这个方法最终调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。

再来看下Flow的collect方法

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

关注2点

  • FlowCollector的emit方法是个挂起函数
  • 这个action(value)怎么理解呢?
 flow {
            emit(1)
            emit(2)
        }.collect {

//action(value)实际上就是调用的collect代码块啊
            println(it)
        }

到这里,我们可以知道,flow的消费过程是指的FlowCollector的emit方法的执行,也就是collect{ }代码块的执行。如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,继续执行flow代码块中剩下的代码,如果发生了挂起,情况会稍有不同,下面我们要讨论的就是挂起这种情况。

关于消费过程挂起的情况

override suspend fun emit(value: T) {
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                emit(uCont, value)
            } catch (e: Throwable) {
                lastEmissionContext = DownstreamExceptionContext(e, uCont.context)
                throw e
            }
        }
    }

在这段代码中, emit 是个挂起函数,用于向下游发射值。如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED后,会挂起当前协程。

这里使用 kotlin协程中的suspendCoroutineUninterceptedOrReturn 函数来挂起当前协程并等待异步操作完成。这个算是协程比较底层的东西,大家平时可能接触不到,理解下就行了。在这里, uCont 是一个 Continuation 对象,它用于在异步操作完成时恢复协程。如果异步操作成功完成, emit 方法将返回发射的值。如果异步操作失败, emit 方法将抛出异常。

挂起后的恢复

当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。

挂起执行完后的恢复要调用续体的resumeWith方法。但是SafeCollector并没有重写这个方法,所以最终是调用它的父类``BaseContinuationImplresumeWith`方法。

public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

这个方法内部会调用invokeSuspend方法,这是个抽象方法,它是在SafeCollector中重写的。

override fun invokeSuspend(result: Result<Any?>): Any? {
    // 获取异常
    result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
    // 如果没有异常,则恢复flow续体的执行
    completion?.resumeWith(result as Result<Unit>)
    // 返回挂起标识(这是协程挂起函数通用的),这里挂起的是消费过程
    return COROUTINE_SUSPENDED
}

总结

  • 调用emit时,不要改变协程上下文,否则会抛异常
  • emit方法调用最终是调用的FlowCollector的emit方法,也就是collect代码块中的逻辑
  • 消费过程中的挂起恢复流程没那么复杂,collect的emit方法本身是个挂起函数,如果执行消费过程发生了挂起操作,SafeCollector的emit会返回挂起标识COROUTINE_SUSPENDED,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED后,会挂起当前协程并等待异步操作完成。挂起后的恢复是通过调用续体的resumeWith方法完成的,然后恢复生产,继续执行flow代码块。

推荐阅读
Kotlin协程:创建、启动、挂起、恢复

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容