Flow简介

Kotlin 协程中使用挂起函数可以实现非阻塞地执行任务并将结果返回回来,但是只能返回一个计算结果。但是如果希望有多个计算结果返回回来,则可以使用 flow,flow有像Rxjava的各种操作符,实现各种功能,同时和协程一起使用,可以替代Rxjava和liveData,并且也没有像Rxjava上手这么难,所以学kotlin,flow是必须的。

flow简单使用:
flow{
    //发送者发送数值
    emit(1)   
}.collect{
    //接受者接受发送的数值
    println(it.toString())
}

看起来和Rxjava很像,但是又简单很多吧

flow的冷流与热流
  • 冷流
    上面的简单使用即是冷流,即执行是惰性的,调用末端流操作符(collect 是其中之一)之前, flow{ ... } 中的代码不会执行,只有当数据被订阅的时候(执行collect),发布者才开始执行发射数据流的代码(执行flow{ ... })。当有多个订阅者的时候,每个订阅者都会收到发送者完整的流程。即订阅者和发送者都是一对一的关系。
    例子如下:


    image.png

我们准备3个按钮,分别对应代码如下:

//发送者代码:
var test: Flow<Int>?=null
test= flow {
    for (i in 0..4) {
    Log.e(TAG+"2", i.toString())
    emit(i)
}

//订阅者1代码:
test?.collect{
    delay(1000)
    Log.e(TAG, it.toString())
}

//订阅者2代码:
test?.collect{
    delay(1000)
    Log.e(TAG, it.toString())
}

上面三个按钮的代码都贴上去了,其中订阅者1和订阅者2代码一样,当我们只是点发送者按钮时,flow {...} flow里面的代码块是没有执行的,然后我们再点击订阅者1按钮,这时候发送者代码才开始执行,从而发送给订阅者,连续执行


image.png

当我们再点击订阅者2按钮的时候,会发现和上面的订阅者1按钮的效果一样,所以印证了一对一的关系,每个订阅者都会收到发送者完整的流程。

  • 热流
    热流是共享的,有缓存的,不管订阅者是否存在,只要发送了事件就会被消费,热流和订阅者是一对多的关系,多个订阅者可以共享同一个数据流。当一个订阅者停止监听时,数据流不会自动关闭。以MutableSharedFlow举例:
val mutableSharedFlow=MutableSharedFlow<Int>()
    lifecycleScope.launch {
        mutableSharedFlow.collect{
            Log.e("mutableSharedFlow1",it.toString())
        }
    }
    lifecycleScope.launch {
        (1..5).forEach {
            Log.e("mutableSharedFlow1_before",it.toString())
            mutableSharedFlow.emit(it)
            Log.e("mutableSharedFlow1_after",it.toString())
        }
    }
    lifecycleScope.launch {
        delay(1000)
        mutableSharedFlow.collect {
        Log.e("mutableSharedFlow2",it.toString())
    }
}

运行结果如下:


image.png

可以看到,第二个订阅者是没有收到发送者的数据,因为在订阅之前已经被消费了,所以收不到数据

热流的具体实现SharedFlow和StateFlow,分别对应的实现类MutableSharedFlow和是MutableStateFlow,所以我们要讲的也就是这两个类。

1. MutableSharedFlow

有缓冲区区,并可以定义缓冲区的溢出规则,可以定义给一个新的接收器发送多少数据的缓存值。
MutableSharedFlow 的参数如下:

  • replay 相当于粘性数据
  • extraBufferCapacity //接受的慢时候,发送的入栈
  • onBufferOverflow 缓冲区溢出规则:
    1. SUSPEND: 挂起
    2. DROP_OLDEST: 移除旧的值
    3. DROP_LATEST: 移除新的值
replay:事件粘滞数

当我们把上面的MutableSharedFlow的replay设置为1是,即如下代码:

val mutableSharedFlow=MutableSharedFlow<Int>(replay = 1)
lifecycleScope.launch {
    mutableSharedFlow.collect{
        Log.e("mutableSharedFlow1",it.toString())
    }
}
lifecycleScope.launch {
    (1..5).forEach {
        mutableSharedFlow.emit(it)
    }
}
lifecycleScope.launch {
    delay(1000)
    mutableSharedFlow.collect {
        Log.e("mutableSharedFlow2",it.toString())
    }
}

运行结果如下:


image.png

可以看到,第二个订阅者收到了最后一次运行的结果5,所以replay会保留上次运行的结果,replay设置多少,他就保留最新的前多少数据。

extraBufferCapacity

缓存容量,就是先发送几个事件,不管已经订阅的消费者是否接收,都先发送先。

val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2)
lifecycleScope.launch {
    mutableSharedFlow.collect{
        Log.e("mutableSharedFlow1",it.toString())
    }
}
lifecycleScope.launch {
    (1..5).forEach {
        Log.e("mutableSharedFlow1_before",it.toString())
        mutableSharedFlow.emit(it)
        Log.e("mutableSharedFlow1_after",it.toString())
    }
}
lifecycleScope.launch {
    delay(1000)
    mutableSharedFlow.collect {
        Log.e("mutableSharedFlow2",it.toString())
    }
}

运行结果如下:


image.png

相对于上面,可以看到extraBufferCapacity设置2之后,头两个会先发送而不管有没有被消费完,超过第3个之后,才开始执行,执行完之后又先发送先发送两个而不管有没有被消费。

onBufferOverflow

因为有第二个参数,所以当没有被消费完的时候,这可能导致缓存容量过多,只管发不管消费者消费能力的情况就会出现背压,所以第3个参数就是出现背压的时候要怎么处理的。

分别是 SUSPEND: 挂起,DROP_OLDEST: 移除旧的值,DROP_LATEST: 移除新的值。

SUSPEND

因为默认就是SUSPEND,所以上面的MutableSharedFlow<Int>(extraBufferCapacity = 2)就是MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.SUSPEND),所以和讲extraBufferCapacity的demo是一样的。

DROP_OLDEST
移除旧的值,保留最新的,extraBufferCapacity就保留多少,代码如下:

val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_OLDEST)
lifecycleScope.launch {
    mutableSharedFlow.collect{
         Log.e("mutableSharedFlow1",it.toString())
    }
}
lifecycleScope.launch {
    (1..5).forEach {
        Log.e("mutableSharedFlow1_before",it.toString())
        mutableSharedFlow.emit(it)
        Log.e("mutableSharedFlow1_after",it.toString())
    }
}
lifecycleScope.launch {
    delay(1000)
    mutableSharedFlow.collect {
        Log.e("mutableSharedFlow2",it.toString())
    }
}

运行效果如下:

image.png

可以看到运行5个,超出缓存容量,只保留最新的两个,这就实现了消费者消费速度小于生产者的时候的背压问题。

DROP_LATEST

移除新的值,保留最旧的,extraBufferCapacity就保留多少,代码如下:

val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST)
    lifecycleScope.launch {
        mutableSharedFlow.collect{
            Log.e("mutableSharedFlow1",it.toString())
        }
    }
    lifecycleScope.launch {
        (1..5).forEach {
            Log.e("mutableSharedFlow1_before",it.toString())
            mutableSharedFlow.emit(it)
            Log.e("mutableSharedFlow1_after",it.toString())
        }
    }
    lifecycleScope.launch {
        delay(1000)
        mutableSharedFlow.collect {
            Log.e("mutableSharedFlow2",it.toString())
        }
    }

运行结果如下:

image.png

可以看到运行5个,超出缓存容量,只保留最旧的两个。

2.MutableStateFlow

MutableStateFlow 就是reply为1的MutableSharedFlow,同时它必须要有一个初始值,此外每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。具体demo如下:

val stateFlow =MutableStateFlow(value = -1)
    lifecycleScope.launch {
        stateFlow.collect{
            Log.e("mutableStateFlow1",it.toString())
        }
    }
    lifecycleScope.launch {
        listOf(1,2,3,4,4).forEach {
            Log.e("mutableStateFlow_before",it.toString())
            stateFlow.emit(it)
            Log.e("mutableStateFlow_after",it.toString())
        }
    }
    lifecycleScope.launch {
        delay(1000)
        stateFlow.collect {
        Log.e("mutableStateFlow2",it.toString())
    }
}

运行结果如下:


image.png

可以看到,只要初始值和最新值,其他的值都不会,StateFlow重点在状态,只有初始值和最新值,而不会有中间值,这对于UI的状态更合适,防止重复刷新,而SharedFlow更适合事件的处理。

背压三剑客

从上面的讲解里,我们了解了MutableSharedFlow和MutableStateFlow的背压。
那冷流要怎么实现呢,其实操作符也有背压处理的。

背压说白了就是消费者的消费速度达不到生产者的创建速度时,就会产生数据的淤积。

  • collectLatest
    当出现背压是,只会执行最新的数据,代码如下:
flow {
    (1..5).forEach{
        emit(it)
    }
}.collectLatest {
    Log.e("collectLatest_start",it.toString())
    delay(1000)
    Log.e("collectLatest_end",it.toString())
 }

运行结果如下:

image.png

可以看到,会结束旧的数据执行即使在执行中,而执行最新的数据

  • conflate
    conflate 与collectLatest不同的是,conflate会先把旧的执行完,再去执行最新的数据,保证每次执行有始有终,而不会中途中断
flow {
    (1..5).forEach{
         emit(it)
    }
}.conflate()
 .collect {
    Log.e("conflate_start",it.toString())
    delay(1000)
     Log.e("conflate_end",it.toString())
 }

运行结果:

image.png

可以看到,对1数据也执行到结束才执行5,中间的数据直接过滤掉,有始有终

  • buffer
    首先,buffer的数据发送就不会受collect函数的影响,不用等collect执行完后才发送下一条,其二,buffer也有点像MutableSharedFlow,有两个参数,分别是capacity和onBufferOverflow,
  1. capacity: 缓存数量
  2. onBufferOverflow: 处理缓存策略

下面一个个验证,首先看发送,demo如下:

flow {
    (1..5).forEach {
        emit(it)
    }
}.onEach {
    Log.e("buffer1","$it is ready")
}.collect {
    delay(1000)
    Log.e("buffer2","$it is handled")
}

结果如下:


image.png

可以看到,执行流程是先发一个,执行完再发下一个,事件发送和处理是连续的,假如加上buffer()呢,

flow {
    (1..5).forEach {
        emit(it)
    }
}.onEach {
    Log.e("buffer1","$it is ready")
}.buffer()
.collect {
    delay(1000)
    Log.e("buffer2","$it is handled")
}

结果如下:


image.png

可以看到加了之后不管有没有执行,都先发送,然后再一个个执行,不再受collect{}影响。

其二说白了就是设置缓存数量和处理策略,即设置capacity和onBufferOverflow,和上面的MutableSharedFlow有点像,举一个例子基本可以。

flow {
    (1..5).forEach {
        emit(it)
    }
}.onEach {
    Log.e("buffer1","$it is ready")
}
.buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
.collect {
    delay(1000)
    Log.e("buffer2","$it is handled")
}

结果如下:


image.png

设置缓存数量为1,保持处理最旧的事件,DROP_OLDEST(处理缓存最旧),其他的SUSPEND(挂起),DROP_OLDEST(处理缓存最新)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容