Kotlin Flow 介绍

1.Kotlin Flow 介绍

Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。

A cold asynchronous data stream that sequentially emits values
 and completes normally or with an exception。

翻译下就是:按顺序发出值并正常完成或异常完成的Cold异步数据流。

  • Hot Observable:无论有没有 Subscriber 订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时,Hot Observable 与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。
  • Cold Observable :只有 Subscriber 订阅时,才开始执行发射数据流的代码。并且 Cold Observable 和 Subscriber 只能是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对 Cold Observable 而言,有多个Subscriber的时候,他们各自的事件是独立的。

2.flow使用

image.png

2.1 Flow的创建

  1. 可以使用flow构建函数构建一个Flow类型返回值的函数
  2. flow{}构建体中可以调用挂起函数,即上流
  3. 上流使用emit函数发射值
  4. 下流使用collect函数收集值
//上流函数
fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
 
fun main() {
    runBlocking {
        //下流接收数据
        simpleFlow().collect { value ->
            println(value)
        }
 
        println("finished")
    }
}

结果:
1
2
3
finished

2.2 Flow是冷流,所以collect是挂起函数,不是子协程,并且只有执行collect函数时,上流的代码才会被执行,所以在一个协程中多次调用collect,它们会按顺序执行。

fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
 
fun main() {
    runBlocking {
        simpleFlow().collect { value ->
            println(value)
        }
 
        println("collect1 finished")
 
        simpleFlow().collect { value ->
            println(value)
        }
 
        println("collect2 finished")
    }
}

结果:
1
2
3
collect1 finished
1
2
3
collect2 finished

2.3 Flow的连续性

Flow也支持函数式编程,并且从上流到下流的每个过渡操作符都会处理发射值,最终流入下流

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.filter {
            it % 2 == 0 //只取偶数
        }.map {
            "String $it"
        }.collect {
            println(it)
        }
    }
}
结果:
String 2
String 4

2.4 Flow构建器

  1. flow{}
flow {
    (5 .. 10).forEach {
              emit(it)
         }
}.collect{
   println(it)
}
    
  1. flowOf() 帮助可变数组生成 Flow 实例
flowOf(1,2,3,4,5).collect { println(it) }

其实flowOf调用的就是第一种flow{},分别emit发送值,源码如下:

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}
  1. asFlow() 面向数组、列表等集合
(5 ..10).asFlow().collect { 
                    println(it)
                }
消费数据

collect 方法和 RxJava 中的 subscribe 方法一样,都是用来消费数据的。
除了简单的用法外,这里有两个问题得注意一下:

  • collect 函数是一个 suspend 方法,所以它必须发生在协程或者带有 suspend 的方法里面,这也是我为什么在一开始的时候启动了
  • lifecycleScope.launch。lifecycleScope 是我使用的 Lifecycle 的协程扩展库当中的,你可以替换成自定义的协程作用域

3.切换线程

3.1切换线程使用的是flowOn操作符。

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }

简单点理解就是flowOn之前的操作符运行在flowOn指定的线程之内,flowOn之后的操作符运行在整个flow运行的CoroutineContext内。

例如,下面的代码collect则是在main线程:

fun main() = runBlocking {
  flowOf(1,2,3,4,5)
                    .flowOn(Dispatchers.Default)
                    .collect { 
                    println(Thread.currentThread().name+" "+it) 
                }
}

打印如下:

main 1
main 2
main 3
main 4
main 5

3.2 除了使用子协程执行上流外,我们还可以使用launchIn函数来让Flow使用全新的协程上下文。

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

fun main() {
    runBlocking {
        flow {
            println("flow :${Thread.currentThread().name}")
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.Default)
            .onEach { println("collect:${Thread.currentThread().name} $it") }
            .launchIn(CoroutineScope(Dispatchers.IO))
            .join()//主线程等待这个协程执行结束
    }
}
结果:
flow :DefaultDispatcher-worker-1
collect:DefaultDispatcher-worker-1 1
collect:DefaultDispatcher-worker-1 2
collect:DefaultDispatcher-worker-1 3
collect:DefaultDispatcher-worker-1 4
collect:DefaultDispatcher-worker-1 5

4.背压

背压的产生

  • 原因:通俗来说其实就是因为产生的速度和处理的速度或者说耗时不一致才导致了背压的产生。
  • 处理:主要分三种:挂起丢弃新的丢弃原来的,我们也可以辅助设置缓冲池,即暂时把值存下来。
  • 理解:通俗理解为水流管粗细的问题,如果上游的水管粗,下游的水管细就会产生堵住的问题,当然也有可能就是撑破了,水流出来了。
    • 处理起来就是堵住,上游不要流了;
    • 当然也可以在中间建设一个蓄水池,先把水放在蓄水池,下游可以继续了再放下去。
    • 蓄水池当然也有两种方式,上游来水了,蓄水池有水,要不把蓄水池水放掉、注入新水,或者直接放掉新水。

4.1.不处理

       var time :Long = 0

            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }

日志如下:

D/Test-TAG: it is 0,cost time 1118
D/Test-TAG: it is 1,cost time 2222
D/Test-TAG: it is 2,cost time 3323
D/Test-TAG: it is 3,cost time 4425
D/Test-TAG: it is 4,cost time 5529
D/Test-TAG: it is 5,cost time 6632
D/Test-TAG: it is 6,cost time 7735
D/Test-TAG: it is 7,cost time 8839
D/Test-TAG: it is 8,cost time 9942
D/Test-TAG: it is 9,cost time 11044
D/Test-TAG: finish cost time 11044

分析:

  • 可以看出耗时是以最后一次结束的时间计算的,也就是挂起,通俗理解就是下游堵住了,上游等着,所以耗时是下面的耗时综合。可以看出,一般情况下,上下流执行是同步的。

4.2 直接调用buffer(),不设特定参数

     var time :Long = 0

            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.buffer().collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }

日志如下:

D/Test-TAG: finish cost time 1024
D/Test-TAG: it is 0,cost time 1115
D/Test-TAG: it is 1,cost time 2117
D/Test-TAG: it is 2,cost time 3119
D/Test-TAG: it is 3,cost time 4120
D/Test-TAG: it is 4,cost time 5122
D/Test-TAG: it is 5,cost time 6123
D/Test-TAG: it is 6,cost time 7125
D/Test-TAG: it is 7,cost time 8126
D/Test-TAG: it is 8,cost time 9127
D/Test-TAG: it is 9,cost time 10129

分析:

  • 可以看出和不处理非常类似,下游耗时基本是一致的。
  • 但是finish完成的耗时只有1024,也就是全部缓存下来了,通俗理解就是水全部放在了蓄水池,一点一点往下放。

4.3 buffer参数设置,设置buffer为5,也就是缓存5个值,三种策略分别为:

1.BufferOverflow.SUSPEND(默认)
            var time :Long = 0
            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.buffer(5,BufferOverflow.SUSPEND).collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }

日志如下:

D/Test-TAG: it is 0,cost time 1120
D/Test-TAG: it is 1,cost time 2121
D/Test-TAG: it is 2,cost time 3123
D/Test-TAG: it is 3,cost time 4125
D/Test-TAG: finish cost time 4125
D/Test-TAG: it is 4,cost time 5126
D/Test-TAG: it is 5,cost time 6129
D/Test-TAG: it is 6,cost time 7131
D/Test-TAG: it is 7,cost time 8133
D/Test-TAG: it is 8,cost time 9134
D/Test-TAG: it is 9,cost time 10136

分析:

  • 可以看出在第四个值打印出来的时候完成了,原因在于发送第0个,下面在处理,下面5个都放在了缓冲池,剩下四个在挂起等待
  • 等待后面处理,处理一个,缓冲池往下放一个,上游往缓冲池放一个。
  • 下游四个处理完了的时候,在处理第五个,最后一个就放到了缓冲池,上游就结束了。
2.BufferOverflow.DROP_OLDEST
            var time :Long = 0
            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.buffer(5,BufferOverflow.DROP_OLDEST).collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }


打印如下:

D/Test-TAG: finish cost time 1028
D/Test-TAG: it is 0,cost time 1120
D/Test-TAG: it is 5,cost time 2122
D/Test-TAG: it is 6,cost time 3124
D/Test-TAG: it is 7,cost time 4124
D/Test-TAG: it is 8,cost time 5127
D/Test-TAG: it is 9,cost time 6128


分析:

  • 可以看出,finish最先结束了,耗时只是上游的时间,每次100ms,大概就是1000ms。
  • 上游发送一个,下游在处理第一个时候,上游继续往缓冲池中放。
  • 和BufferOverflow.SUSPEND的区别在于,放入了5个(1,2,3,4,5),放第6个也就是5的时候,不再挂起,而是将1丢弃了,缓冲池变成了(2,3,4,5,6),后续类似,最后就是1,2,3,4全被丢弃了,所以打印结果就是0,5,6,7,8,9。

#######3.BufferOverflow.DROP_LATEST

            var time :Long = 0
            flow {
                repeat(10){
                    delay(100)
                    emit(it)
                }
            }.onStart {
                time = System.currentTimeMillis()
            }.onCompletion {
                Log.d(TAG.TAG, "finish cost time ${System.currentTimeMillis() - time}")
            }.buffer(5,BufferOverflow.DROP_LATEST).collect {
                delay(1000)
                Log.d(TAG.TAG, "it is $it,cost time ${System.currentTimeMillis() - time}")
            }


打印如下:

D/Test-TAG: finish cost time 1031
D/Test-TAG: it is 0,cost time 1119
D/Test-TAG: it is 1,cost time 2120
D/Test-TAG: it is 2,cost time 3122
D/Test-TAG: it is 3,cost time 4123
D/Test-TAG: it is 4,cost time 5128
D/Test-TAG: it is 5,cost time 6129

分析:

  • 可以看出,finish也最先结束了,耗时只是上游的时间,每次100ms,大概就是1000ms。
  • 上游发送一个,下游在处理第一个时候,上游继续往缓冲池中放。
  • 和BufferOverflow.DROP_OLDEST的区别在于,放入了5个(1,2,3,4,5),放第6个也就是5的时候,不再挂起,而是将5丢弃了,缓冲池不变,所以5,6,7,8,9全被丢弃了,最后打印出来就是0,1,2,3,4,5。

4.4 conflate conflate是buffer的简化使用方式,其实相当于buffer设置参数为0和BufferOverflow.DROP_OLDEST。

下流来不及处理的会被丢弃掉

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
 
        val time = measureTimeMillis {
            flow.conflate()
                .collect {
                    delay(3000)
                    println("$it")
                }
        }
 
        println("time : $time ms")
    }
}
结果:
1
3
time : 7124 ms

4.5 collectLast可以只接收上流发射的最后一个元素.

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
 
        val time = measureTimeMillis {
            flow
                .collectLatest {
                    delay(3000)
                    println("$it")
                }
        }
 
        println("time : $time ms")
    }
}
time : 6144 ms

5.Flow操作符

1.transform

在使用transform操作符时,可以任意多次调用emit。

runBlocking {

                (1..5).asFlow()
                    .transform {
                        emit(it * 2)
                        delay(100)
                        emit(it * 4)
                    }
                    .collect { println("transform:$it") }
            }

打印如下:

transform:2
transform:4
transform:4
transform:8
transform:6
transform:12
transform:8
transform:16
transform:10
transform:20

transform、transformLatest、transformWhile ,transform直接进行转换,和map不同的是transform可以控制流速,transformLatest则进行最新值的转换,类似于mapLatest ,transformWhile则要求闭包返回一个boolean值,为true则继续返回,为false则后续的值全部取消。

          val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }
           flow.transform {
                delay(1000)
                emit(it*10)
            }.collect {
                Log.d(TAG.TAG,"transform is $it")
            }

            flow.transformLatest {
                delay(1000)
                emit("transformLatest $it")
            }.collect {
                Log.d(TAG.TAG,it)
            }

            flow.transformWhile {
                emit("transformWhile $it")
                it!=5
            }.collect {
                Log.d(TAG.TAG,it)
            }


日志如下:

2022-07-29 15:37:03.243 10589-10615/edu.test.demo D/Test-TAG: transform is 0
2022-07-29 15:37:04.255 10589-10615/edu.test.demo D/Test-TAG: transform is 10
2022-07-29 15:37:05.269 10589-10615/edu.test.demo D/Test-TAG: transform is 20
2022-07-29 15:37:06.281 10589-10615/edu.test.demo D/Test-TAG: transform is 30
2022-07-29 15:37:07.294 10589-10615/edu.test.demo D/Test-TAG: transform is 40
2022-07-29 15:37:08.306 10589-10615/edu.test.demo D/Test-TAG: transform is 50
2022-07-29 15:37:09.318 10589-10615/edu.test.demo D/Test-TAG: transform is 60
2022-07-29 15:37:10.330 10589-10615/edu.test.demo D/Test-TAG: transform is 70
2022-07-29 15:37:11.341 10589-10615/edu.test.demo D/Test-TAG: transform is 80
2022-07-29 15:37:12.353 10589-10615/edu.test.demo D/Test-TAG: transform is 90
2022-07-29 15:37:13.470 10589-10617/edu.test.demo D/Test-TAG: transformLatest 9
2022-07-29 15:37:13.483 10589-10617/edu.test.demo D/Test-TAG: transformWhile 0
2022-07-29 15:37:13.495 10589-10617/edu.test.demo D/Test-TAG: transformWhile 1
2022-07-29 15:37:13.509 10589-10617/edu.test.demo D/Test-TAG: transformWhile 2
2022-07-29 15:37:13.521 10589-10617/edu.test.demo D/Test-TAG: transformWhile 3
2022-07-29 15:37:13.532 10589-10617/edu.test.demo D/Test-TAG: transformWhile 4
2022-07-29 15:37:13.544 10589-10617/edu.test.demo D/Test-TAG: transformWhile 5

2.take

take操作符只取前几个emit发射。

  (1 .. 5).asFlow().take(2).collect {
                    println("take:$it")
                }

打印结果:

take:1
take:2

taketakeWhiledropdropWhiletake则是取几个值返回,takeWhile按条件取值,如果满足条件就返回,不满足则后面全部取消。drop和take相反,dropWhile和takeWhile相反。

            val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }
              flow.take(5).collect {
                Log.d(TAG.TAG,"take $it")
            }

            flow.takeWhile {
                it < 5
            }.collect {
                Log.d(TAG.TAG,"takeWhile $it")
            }

            flow.drop(5).collect {
                Log.d(TAG.TAG,"drop $it")
            }
            flow.dropWhile {
                it < 5
            }.collect {
                Log.d(TAG.TAG,"dropWhile $it")
            }

打印如下:

D/Test-TAG: take 0
D/Test-TAG: take 1
D/Test-TAG: take 2
D/Test-TAG: take 3
D/Test-TAG: take 4
D/Test-TAG: takeWhile 0
D/Test-TAG: takeWhile 1
D/Test-TAG: takeWhile 2
D/Test-TAG: takeWhile 3
D/Test-TAG: takeWhile 4
D/Test-TAG: drop 5
D/Test-TAG: drop 6
D/Test-TAG: drop 7
D/Test-TAG: drop 8
D/Test-TAG: drop 9
D/Test-TAG: dropWhile 5
D/Test-TAG: dropWhile 6
D/Test-TAG: dropWhile 7
D/Test-TAG: dropWhile 8
D/Test-TAG: dropWhile 9

分析:

  • 可以看出take5 就取了前面五个进行返回,drop刚好相反。
  • takeWhile则返回了满足条件的前五个,后面的全部取消,dropWhile刚好相反。
  • 也会有人有疑问,后面的都大于等于5了,所以都取消了,那后面如果出现个1呢,还会不会返回,那么再看如下代码,可以看出,后面即使出现满足条件的也被全部取消了:
flow{
                emit(1)
                emit(2)
                emit(5)
                emit(1)
                emit(2)
            }.takeWhile {
                it<5
            }.collect {
                Log.d(TAG.TAG,"takeWhile $it")
            }

D/Test-TAG: takeWhile 1
D/Test-TAG: takeWhile 2

3.reduce
runBlocking {
                val sum=( 1 ..5).asFlow()
//                    .map {
//                    //println("map:${it}")
//                    it*it  }   //1,4,9,16,25

                    .reduce { a, b ->
                        println("reduce:${a},${b}")
                        a*b
                    }

                 println(sum)

            }

打印如下:

reduce:1,2
reduce:2,3
reduce:6,4
reduce:24,5
120

reduce理解起来稍微有点麻烦,我们看看源码实现加深理解:

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
    var accumulator: Any? = NULL

    collect { value ->
        accumulator = if (accumulator !== NULL) {
            @Suppress("UNCHECKED_CAST")
            operation(accumulator as S, value)
        } else {
            value
        }
    }

    if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
    @Suppress("UNCHECKED_CAST")
    return accumulator as S
}

简单点理解就是两个元素操作之后拿到的值跟后面的元素进行操作,用于把flow 简化合并为一个值。

4.fold
runBlocking {
(1 ..5).asFlow().fold(2,{
                        a, b -> a * b
                })
}

5.zip

zip主要实现组合的功能,将两个flow一一组合发出,其中一个结束,则zip结束。

fun main() = runBlocking {

    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(100) }

    val time = measureTimeMillis {
        flowA.zip(flowB) { a, b -> "$a and $b" }
            .collect { println(it) }
    }

    println("Cost $time ms")
}

打印如下:

1 and one
2 and two
3 and three
4 and four
5 and five
Cost 540 ms

如果flowA中的item个数大于flowB中的item个数,执行合并后新flow的item个数=较小的flow的item个数。

6.flattenMerge/flattenConcat

flattenMerge不会组合多个flow,而是将它们作为单个流执行。
flattenConcatflattenMergeflattenConcat将多个flow展平,通俗点讲,减少层级,flattenMergeflattenConcat类似,但是可以设置并发数。

val flowA = (1..5).asFlow()
val flowB = flowOf("one", "two", "three", "four", "five")

  flowOf(flowA,flowB).flattenMerge(2).collect {
                    println("flattenMerge:$it")
                }

                flowOf(flowA,flowB).flattenConcat().collect{println("flattenConcat:$it")}

打印如下:

flattenMerge:1
flattenMerge:2
flattenMerge:3
flattenMerge:4
flattenMerge:5
flattenMerge:one
flattenMerge:two
flattenMerge:three
flattenMerge:four
flattenMerge:five

flattenConcat:1
flattenConcat:2
flattenConcat:3
flattenConcat:4
flattenConcat:5
flattenConcat:one
flattenConcat:two
flattenConcat:three
flattenConcat:four
flattenConcat:five

展平操作符

类似于集合的集合,流里也有可能有流,那么这个时候我们就需要使用展平操作符了

7.flatMapConcat
调用 FlowA.flatMapConcat(FlowB) 代码 , 先拿到 FlowA , 然后让 FlowA 每个元素 与 FlowB 进行连接 , 以 FlowA 的元素顺序为主导 ;

flatMapConcat由map,flattenMerge操作符联合完成。
源码如下:

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

测试代码:

fun currTime() = System.currentTimeMillis()

            var start: Long = 0
            runBlocking {

                (1..5).asFlow()
                    .onStart { start = currTime() }
                    .onEach { delay(100) }
                    .flatMapConcat {
                        flow {
                            emit("$it: First")
                            delay(500)
                            emit("$it: Second")
                        }
                        
                    }
                    .collect {
                        println("$it at ${System.currentTimeMillis() - start} ms from start")
                    }
            }

在调用 flatMapConcat 后,collect 函数在收集新值之前会等待 flatMapConcat 内部的 flow 完成
打印如下:

1: First at 124 ms from start
1: Second at 625 ms from start
2: First at 726 ms from start
2: Second at 1228 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
4: First at 1930 ms from start
4: Second at 2431 ms from start
5: First at 2532 ms from start
5: Second at 3033 ms from start
8.flatMapMerge

调用 FlowA.flatMapMerge(FlowB) 代码 , 先拿到 FlowB , 然后让 FlowB 每个元素 与 FlowA 进行结合 , 以 FlowB 的元素顺序为主导 ;

并发收集flows并且将合并它们的值为一个单一flow,因此发射地值会尽快被处理。

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

可以看出来flatMapMerge并发特性:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
9.flatMapLatest

flatMapLatest和collectLatest操作符很像,只有新flow发射了新值,那么上个flow就会被取消。

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

打印如下:

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
10.conflate

当一个flow表示操作的部分结果或者操作状态更新,它可能并不需要取处理每一个值,但是需要处理最近的一个值。在这种场景下,conflate操作符可以被用于忽略中间操作符。是一种对emit和collector慢处理的一种方式,它通过丢弃一些值来实现。

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

打印如下:

1
3
Collected in 758 ms

11. filter

filterfilterNotfilterIsInstancefilterNotNullfliter闭包返回一个Boolean值,为true则返回,false则不返回,filterNot刚好相反;filterIsInstance则进行类型过滤,如过滤出String或者Int等,filterNotNull则过滤null值,返回非空值。

            val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }
            flow.filter {
                it % 2 == 0
            }.collect {
                Log.d(TAG.TAG,"filter $it")
            }

            flow.filterNot {
                it % 2 == 0
            }.collect {
                Log.d(TAG.TAG,"filterNot $it")
            }

            flow {
                emit(1)
                emit("123")
            }.filterIsInstance<String>().collect {
                Log.d(TAG.TAG,"filterIsInstance $it")
            }

            flow {
                emit(1)
                emit(null)
                emit(2)
            }.filterNotNull().collect {
                Log.d(TAG.TAG,"filterNotNull $it")
            }


打印如下:

2022-07-29 15:50:45.376 10675-10703/edu.test.demo D/Test-TAG: filter 0
2022-07-29 15:50:45.400 10675-10703/edu.test.demo D/Test-TAG: filter 2
2022-07-29 15:50:45.422 10675-10703/edu.test.demo D/Test-TAG: filter 4
2022-07-29 15:50:45.444 10675-10703/edu.test.demo D/Test-TAG: filter 6
2022-07-29 15:50:45.466 10675-10703/edu.test.demo D/Test-TAG: filter 8
2022-07-29 15:50:45.505 10675-10703/edu.test.demo D/Test-TAG: filterNot 1
2022-07-29 15:50:45.528 10675-10703/edu.test.demo D/Test-TAG: filterNot 3
2022-07-29 15:50:45.550 10675-10703/edu.test.demo D/Test-TAG: filterNot 5
2022-07-29 15:50:45.574 10675-10703/edu.test.demo D/Test-TAG: filterNot 7
2022-07-29 15:50:45.597 10675-10703/edu.test.demo D/Test-TAG: filterNot 9
2022-07-29 15:50:45.598 10675-10703/edu.test.demo D/Test-TAG: filterIsInstance 123
2022-07-29 15:50:45.600 10675-10703/edu.test.demo D/Test-TAG: filterNotNull 1
2022-07-29 15:50:45.600 10675-10703/edu.test.demo D/Test-TAG: filterNotNull 2

12.merge

是将两个flow合并起来,将每个值依次发出来

            val flow1  = listOf(1,2).asFlow()
            val flow2 = listOf("one","two","three").asFlow()
            merge(flow1,flow2).collect {value->
                Log.d(TAG.TAG,value.toString())
            }


打印如下:

D/Test-TAG: 1
D/Test-TAG: 2
D/Test-TAG: one
D/Test-TAG: two
D/Test-TAG: three

可以看出merge在将flow1和flow2合并之后将五个值依次发送出来。

13. retry

retryretryWhen retry为retryWhen的简化版本,可设置重试次数,以及在闭包内重试开关。
retryWhen控制重试,两个回调参数cause为发生的异常,attempt为当前重试下标,从0开始。

            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retry(2).catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retry(2)  $it")
            }
            index = 0
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retry {
                it is RuntimeException
            }.catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retry{}  $it")
            }


            index = 0
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retryWhen { cause, attempt ->
                Log.d(TAG.TAG, "cause is $cause,attempt is $attempt")
                cause is RuntimeException
            } .catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retryWhen  $it")
            }


打印如下:

D/Test-TAG: retry(2)  100
D/Test-TAG: retry{}  100
D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
D/Test-TAG: retryWhen  100

分析:

  • 可以看出虽然在一定条件会抛出异常,但是100这个值都提交成功了,这就是重试retry的作用。
  • retry的次数和闭包返回值可以同时设置,两个值为并列关系,如果一个不满足则不会重试,次数的默认值为Int.MAX_VALUE,闭包的返回值默认为true,所以我们也可以不设置值,直接调用retry()也可以实现重试的效果。
  • retryWhen和retry一致,闭包返回true则重试,返回false则不再重试。

5.Flow的异常处理

当运算符中的发射器或代码抛出异常,可以有两种方式处理
1.try catch
2.catch函数

1.try catch适用于收集时发生的异常
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }
 
        try {
            flow.collect {
                println(it)
                throw RuntimeException()
            }
        } catch (e: Exception) {
            print("caught: $e")
        }
    }
}
2.虽然上流也可以使用try catch,但是更推荐catch函数
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
                throw RuntimeException()
            }
        }.catch { e ->
            print("caught1: $e")
        }.collect {
            println(it)
        }
    }
}

6.Flow的完成

1.有时候我们需要在Flow完成时,做一些其他事情,可以使用下面的方式

fun main() {
    runBlocking {
        try{
            val flow = flow {
                for (i in 1..3) {
                    emit(i)
                }
            }.collect {
                println(it)
            }
        }finally {
            println("done")            
        }
    }
}

2.onCompletion函数

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }.onCompletion {
            println("done")
        }.collect {
            println(it)
        }
    }
}

7.取消Flow

Flow也是可以被取消的,最常用的方式就是通过withTimeoutOrNull来取消,代码如下所示。

MainScope().launch {
    withTimeoutOrNull(2500) {
        flow {
            for (i in 1..5) {
                delay(1000)
                emit(i)
            }
        }.collect {
            Log.d("xys", "Flow: $it")
        }
    }
}

这样当输出1、2之后,Flow就被取消了。

Flow的取消,实际上就是依赖于协程的取消。

8.Flow的同步非阻塞模型

首先,我们要理解下,什么叫同步非阻塞,默认场景下,Flow在没有切换线程的时候,运行在协程作用域指定的线程,这就是同步,那么非阻塞又是什么呢?我们知道emit和collect都是suspend函数,所谓suspend函数,就是会挂起,将CPU资源让出去,这就是非阻塞,因为suspend了就可以让一让,让给谁呢?让给其它需要执行的函数,执行完毕后,再把资源还给我。

flow {
    for (i in 0..3) {
        emit(i)
    }
}.onStart {
    Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
}.onEach {
    Log.d("xys", "emit value---$it")
}.collect {
    Log.d("xys", "Result---$it")
}

输出为:

D/xys: Start Flow in main
D/xys: emit value---0
D/xys: Result---0
D/xys: emit value---1
D/xys: Result---1
D/xys: emit value---2
D/xys: Result---2
D/xys: emit value---3
D/xys: Result---3

可以发现,emit一个,collect拿一个,这就是同步非阻塞,互相谦让,这样谁都可以执行,看上去flow中的代码和collect中的代码,就是同步执行的。

9. 异步非阻塞模型

假如我们给Flow增加一个线程切换,让Flow执行在子线程,同样是上面的代码,我们再来看下执行情况

flow {
    for (i in 0..3) {
        emit(i)
    }
}.onStart {
    Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
}.onEach {
    Log.d("xys", "emit value---$it")
}.flowOn(Dispatchers.IO).collect {
    Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
    Log.d("xys", "Result---$it")
}

输出为:

D/xys: Start Flow in DefaultDispatcher-worker-1
D/xys: emit value---0
D/xys: emit value---1
D/xys: emit value---2
D/xys: emit value---3
D/xys: Collect Flow in main
D/xys: Result---0
D/xys: Collect Flow in main
D/xys: Result---1
D/xys: Collect Flow in main
D/xys: Result---2
D/xys: Collect Flow in main
D/xys: Result---3

这个时候,Flow就变成了异步非阻塞模型,异步呢,就更好理解了,因为在不同线程,而此时的非阻塞,就没什么意义了,由于flow代码先执行,而这里的代码由于没有delay,所以是同步执行的,执行的同时,collect在主线程进行监听。

除了使用flowOn来切换线程,使用channelFlow也可以实现异步非阻塞模型。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容