【Android jetpack】Flow是如何解决背压问题的

前言

随着时间的推移,越来越多的主流应用已经开始全面拥抱Kotlin,协程的引入,Flow的诞生,给予了开发很多便捷,作为协程与响应式编程结合的流式处理框架,一方面它简单的数据转换与操作符,没有繁琐的操作符处理,广受大部分开发的青睐,另一方面它并没有响应式编程带来的背压问题(BackPressure)的困扰;接下来,本文将会就Flow如何解决背压问题进行探讨

关于背压(BackPressure

背压问题是什么

首先我们要明确背压问题是什么,它是如何产生的?简单来说,在一般的流处理框架中,消息的接收处理速度跟不上消息的发送速度,从而导致数据不匹配,造成积压。如果不及时正确处理背压问题,会导致一些严重的问题

  • 比如说,消息拥堵了,系统运行不畅从而导致崩溃
  • 比如说,资源被消耗殆尽,甚至会发生数据丢失的情况

如下图所示,可以直观了解背压问题的产生,它在生产者的生产速率高于消费者的处理速率的情况下出现

定义背压策略

既然我们已经知道背压问题是如何产生的,就要去尝试正确地处理它,大致解决方案策略在于,如果你有一个流,你需要一个缓冲区,以防数据产生的速度快于消耗的速度,所以往往就会针对这个背压策略进行些讨论

  • 定义的中间缓冲区需要多大才比较合适?
  • 如果缓冲区数据已满了,我们怎么样处理新的事件?

对于以上问题,通过学习Flow里的背压策略,相信可以很快就知道答案了

Flow的背压机制

由于Flow是基于协程中使用的,它不需要一些巧妙设计的解决方案来明确处理背压,在Flow中,不同于一些传统的响应式框架,它的背压管理是使用Kotlin挂起函数suspend实现的,看下源码你会发现,它里面所有的函数方法都是使用suspend修饰符标记,这个修饰符就是为了暂停调度者的执行不阻塞线程。因此,Flow<T>在同一个协程中发射和收集时,如果收集器跟不上数据流,它可以简单地暂停元素的发射,直到它准备好接收更多。看到这,是不是觉得有点难懂.......

简单举个例子,假设我们拥有一个烤箱,可以用来烤面包,由于烤箱容量的限制,一次只能烤4个面包,如果你试着一次烤8个面包,会大大加大烤箱的承载负荷,这已经远远超过了它的内存使用量,很有可能会因此烧掉你的面包。

模拟背压问题

回顾下之前所说的,当我们消耗的速度比生产的速度慢的时候,就会产生背压,下面用代码来模拟下这个过程

  • 首先先创建一个方法,用来每秒发送元素

    fun currentTime() = System.currentTimeMillis()
    fun threadName() = Thread.currentThread().name
    var start: Long = 0
    
    fun createEmitter(): Flow<Int> =
        (1..5)
            .asFlow()
            .onStart { start = currentTime() }
            .onEach {
                delay(1000L)
                print("Emit $it (${currentTime() - start}ms) ")
            }
    
  • 接着需要收集元素,这里我们延迟3秒再接收元素, 延迟是为了夸大缓慢的消费者并创建一个超级慢的收集器。

    fun main() {
        runBlocking {
            val time = measureTimeMillis {
                createEmitter().collect {
                    print("\nCollect $it starts ${start - currentTime()}ms")
                    delay(3000L)
                    println("   Collect $it ends ${currentTime() - start}ms")
                }
            }
            print("\nCollected in $time ms")
        }
    }
    

    看下输出结果,如下图所示

这样整个过程下来,大概需要20多秒才能结束,这里我们模拟了接收元素比发送元素慢的情况,因此就需要一个背压机制,而这正是Flow本质中的,它并不需要另外的设计来解决背压

背压处理方式

使用buffer进行缓存收集

为了使缓冲和背压处理正常工作,我们需要在单独的协程中运行收集器。这就是.buffer()操作符进来的地方,它是将所有发出的项目发送Channel到在单独的协程中运行的收集器

public fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

它还为我们提供了缓冲功能,我们可以指定capacity我们的缓冲区和处理策略onBufferOverflow,所以当Buffer溢出的时候,它为我们提供了三个选项

enum BufferOverflow { 
   SUSPEND,
   DROP_OLDEST,
   DROP_LATEST
 }
  • 默认使用SUSPEND:会将当前协程挂起,直到缓冲区中的数据被消费了
  • DROP_OLDEST:它会丢弃最老的数据
  • DROP_LATEST: 它会丢弃最新的数据

好的,我们回到上文所展示的模拟示例,这时候我们可以加入缓冲收集buffer,不指定任何参数,这样默认就是使用SUSPEND,它会将当前协程进行挂起

此时当收集器繁忙的时候,程序就开始缓冲,并在第一次收集方法调用结束的时候,两次发射后再次开始收集,此时流程的耗时时长缩短到大约16秒就可以执行完毕,如下图所示输出结果

使用conflate解决

conflate操作符于Channel中的Conflate模式是一直的,新数据会直接覆盖掉旧数据,它不设缓冲区,也就是缓冲区大小为 0,丢弃旧数据,也就是采取 DROP_OLDEST 策略,那么不就等于buffer(0,BufferOverflow.DROP_OLDEST),可以看下它的源码可以佐证我们的判断

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

在某些情况下,由于根本原因是解决生产消费速率不匹配的问题,我们需要做一些取舍的操作,conflate将丢弃掉旧数据,只有在收集器空闲之前发出的最后一个元素才被收集,将上文的模拟实例改为conflate执行,你会发现我们直接丢弃掉了2和4,或者说新的数据直接覆盖掉了它们,整个流程只需要10秒左右就执行完成了

使用collectLatest解决

通过官方介绍,我们知道collectLatest作用在于当原始流发出一个新的值的时候,前一个值的处理将被取消,也就是不会被接收, 和conflate的区别在于它不会用新的数据覆盖,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消

suspend fun <T> Flow<T>.collectLatest(action: suspend (T) -> Unit)

还是上文的模拟实例,这里我们使用collectLatest看下输出结果:

这样也是有副作用的,如果每个更新都非常重要,例如一些视图,状态刷新,这个时候就不必要用collectLatest ; 当然如果有些更新可以无损失的覆盖,例如数据库刷新,就可以使用到collectLatest,具体详细的使用场景,还需要靠开发者自己去衡量选择使用

小结

对于Flow可以说不需要额外提供什么巧妙的方式解决背压问题,Flow的本质,亦或者说Kotlin协程本身就已经提供了相应的解决方案;开发者只需要在不同的场景中选择正确的背压策略即可。总的来说,它们都是通过使用Kotlin挂起函数suspend,当流的收集器不堪重负时,它可以简单地暂停发射器,然后在准备好接受更多元素时恢复它。

关于挂起函数suspend这里就不过多赘述了,只需要明白的一点是它与传统的基于线程的同步数据管道中背压管理非常相似,无非就是,缓慢的消费者通过阻塞生产者的线程自动向生产者施加背压,简单来说,suspend通过透明地管理跨线程的背压而不阻塞它们,将其超越单个线程并进入异步编程领域。

作者:RainyJiang
链接:https://juejin.cn/post/7165380647304282126

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容