前言
本文是阅读协程Flow的总结笔记。
什么是Flow
Kotlin中的Flow API是可以更好的异步处理按顺序执行的数据流的方法。
在RxJava中,Observables类型是表示项目流结构的示例。 在订阅者进行订阅之前,其主体不会被执行。 订阅后,订阅者便开始获取发射的数据项。 同样,Flow在相同的条件下工作,即在流生成器内部的代码到了收集流后才开始运行。
你可以认为他是Kotlin的RxJava,但比RxJava学习成本低,天然与协程友好(协程库的一部分)
怎样使用
首先引入kotlin的协程的核心库
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1'
编写如下代码
fun main() =
runBlocking {
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
sample().collect {
println("接收到的消息${it}")
}
}
fun sample() = flow<Int> {
repeat(3) {
delay(100)
emit(it)
}
}
运行如下
I'm not blocked 1
接收到的消息0
I'm not blocked 2
接收到的消息1
I'm not blocked 3
接收到的消息2
Process finished with exit code 0
这就是Flow的基础用法,需要注意的点如下:
(1)flow是Flow的构造方法,Flow使用emit发射数据,使用collect来接收数据(与RxJava的上下游概念类似)
(2)flow代码块是可以挂起的
(3)sample函数没有用suspend关键字进行标识
流是冷的
Flow是冷数据流,表现为构建Flow的代码中的emit需要在调用collect才开始发送数据。
suspend fun main() = coroutineScope {
println("收集数据")
sample1().collect {
println(it)
}
println("再次收集数据")
sample1().collect {
println(it)
}
}
fun sample1() = flow {
repeat(3) {
emit(it)
}
}
代码运行如下
收集数据
0
1
2
再次收集数据
0
1
2
Process finished with exit code 0
从运行结果可以看到,调用了collect,flow的emit才会调用。
流的超时取消
Flow提供了withTimeoutOrNull来在超时的情况下取消并停止执行其代码的。
代码如下:
fun main() = runBlocking {
withTimeoutOrNull(300) {
sample2().collect {
println(it)
}
}
println("完成")
}
fun sample2() = flow {
repeat(3) {
kotlinx.coroutines.delay(100)
emit(it)
}
}
运行结果如下:
0
1
完成
Process finished with exit code 0
可以看到3没有打印出来
流构造器
Flow的构造器除了flow{}这种还有:
1.asFlow ,扩展函数,将相关的集合或者序列转换为流
2.flowOf(...),定义固定发射的数据流
代码如下:
suspend fun main() = coroutineScope {
listOf(1, 2, 3).asFlow().collect {
println(it)
}
flowOf(4, 5, 6).collect {
println(it)
}
}
运行结果如下:
1
2
3
4
5
6
Process finished with exit code 0
过度操作符
Flow中的过度操作符有Map和Filter两种,其中Map是把Flow中的数据转换为另外一种数据类型发射出来。Filter则是将符合条件的数据发送出来。
Map代码如下:
suspend fun main()= coroutineScope {
listOf(1, 2, 3).asFlow().map { it ->
waitAWhile(it)
}.collect {
println(it)
}
}
suspend fun waitAWhile(int: Int): String {
delay(100)
return "等了一会$int"
}
代码将Int类型转换为String类型的数据发送出来
运行结果如下:
等了一会1
等了一会2
等了一会3
Fliter操作符代码如下:
suspend fun main()= coroutineScope {
listOf(1, 2, 3).asFlow().filter {
it>1
}.collect {
println(it)
}
}
代码中只发射大于1的数据,也就是2,3
运行结果如下:
2
3
转换操作符
Flow中的转换操作符主要是TransForm,可以实现更复杂的变换(时间上map和filter都是基于Transform实现的),代码如下:
listOf(1, 2, 3).asFlow().transform {
emit("${it}发射")
emit("${it}再发射")
}.collect {
println(it)
}
代码中将发射一次转换为发射两次,运行代码如下:
1发射
1再发射
2发射
2再发射
3发射
3再发射
Process finished with exit code 0
限长操作符
Flow中的限长操作符为Take,在流触及相应限制的时候会将它的执行取消(与协程一致都是通过抛出异常的方式进行取消,该操作符已经处理了异常抛出)。
代码如下:
listOf(1, 2, 3).asFlow().take(2).collect {
println(it)
}
代码中设置了限长为2,标识只会接受两个发射的数据
运行结果如下:
1
2
末端操作符
末端操作符是在流上用于启动流收集的挂起函数。 collect 是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符
toList代码如下:
suspend fun main() {
val stringList = mutableListOf<String>()
//转换为list
flowOf(1,2,3).map {
"haha$it"
}.toList(stringList).forEach {
println(it)
}
}
运行结果如下:
haha1
haha2
haha3
Process finished with exit code 0
toSet代码如下:
flowOf(1, 2, 3).map {
"haha$it"
}.toSet(mutableSetOf()).forEach {
println(it)
}
运行结果如下:
haha1
haha2
haha3
Process finished with exit code 0
toCollection代码如下:
//转换为collection
flowOf(1, 2, 3).map {
"haha$it"
}.toCollection(mutableSetOf()).forEach {
println(it)
}
运行结果如下:
haha1
haha2
haha3
Process finished with exit code 0
first:获取第一个发射的数据,其余的抛弃
代码如下:
val first = flowOf(1, 2, 3).map {
"haha$it"
}.first()
println(first)
运行结果如下:
haha1
Process finished with exit code 0
first可以通过lambda函数作为选择条件,返回满足条件的第一个值,代码如下
//first 返回第一个满足条件的元素
val first = flowOf(1, 2, 3).map {
"haha$it"
}.first {
it.contains("2")
}
println(first)
运行结果如下:
haha2
Process finished with exit code 0
single也是返回一个发送的数据,但,他要求至多有一个数据发送,发送多个数据会报错
代码如下:
//single返回单个值
val single = flowOf(1).map {
"haha$it"
}.single()
println(single)
运行结果如下:
haha1
Process finished with exit code 0
singleOrNull 单个发射返回数值,否则返回null
//singleOrNull 单个发射返回数值,否则返回null
val singleOrNull = mutableListOf<Int>().asFlow().map {
"haha$it"
}.singleOrNull()
println(singleOrNull)
运行结果如下:
null
Process finished with exit code 0
reduce 对集合进行计算操作
val reduce = flowOf(1, 2, 3)
.reduce { accumulator, value ->
accumulator + value
}
println(reduce)
运行结果如下:
6
Process finished with exit code 0
fold带初始值的reduce
//带初始值的reduce
val fold = flowOf(1, 2, 3)
.fold(100) { acc, value ->
acc + value
}
println(fold)
运行结果如下
106
Process finished with exit code 0
流是连续的
流的每次单独收集都是按顺序执行的,除非进行特殊操作的操作符使用多个流(Zip操作符)。该收集过程直接在协程中运行,该协程调用末端操作符。 默认情况下不启动新协程。 从上游到下游每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。
简单来说就是一个发射数据执行完所有的操作符(collect除外),下一个数据才能发射并执行相关的操作符
代码如下:
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
运行结果如下:
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
流的上下文切换
流的收集总是在调用协程的上下文中发生。如无特殊指定,则默认与所在协程处于同一个上下文中。
suspend fun main() {
println(Thread.currentThread().name)
flow {
println(Thread.currentThread().name)
emit(1)
emit(2)
emit(3)
}.collect {
println(Thread.currentThread().name)
println(it)
}
}
运行结果如下
main
main
main
1
main
2
main
3
Process finished with exit code 0
但是如果发射器与接收器不处于同一个上下文时则会报错
//发射逻辑与接收逻辑不在同一个线程中,会发生报错
val flow = flow {
withContext(Dispatchers.Default) {
println("发射时的线程:${Thread.currentThread().name}")
emit(1)
emit(2)
emit(3)
}
}
flow.collect {
println(it)
}
运行如下:
···
接收时的线程:main
发射时的线程:DefaultDispatcher-worker-1
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
···
为了实现更高发射器的上下文,可以使用flowOn手动切换所处线程
val flowOn = flow {
println("发射时的线程:${Thread.currentThread().name}")
emit(1)
emit(2)
emit(3)
}.flowOn(Dispatchers.Default)
withContext(Dispatchers.IO) {
flowOn.collect {
println("接收时的线程:${Thread.currentThread().name}")
println(it)
}
}
运行结果如下:
发射时的线程:DefaultDispatcher-worker-3
接收时的线程:DefaultDispatcher-worker-1
1
接收时的线程:DefaultDispatcher-worker-1
2
接收时的线程:DefaultDispatcher-worker-1
3
Process finished with exit code 0
缓冲操作符
从收集流所花费的时间来看,将流的不同部分运行在不同的协程中将会很有帮助,特别是当涉及到长时间运行的异步操作时。例如,考虑一种情况, 一个 发射器 流的发射很慢,它每花费 100 毫秒才产生一个元素;而收集器也非常慢, 需要花费 200 毫秒来处理元素。让我们看看从该流收集三个数字要花费多长时间。
val measureTimeMillis = measureTimeMillis {
flow {
repeat(3) {
delay(100)
emit(it)
}
}.collect {
delay(200)
println(it)
}
}
println(measureTimeMillis)
运行结果如下:
0
1
2
1031
Process finished with exit code 0
那么如何加快该过程呢,答案是使用Buffer操作符
val measureTimeMillis = measureTimeMillis {
flow {
repeat(3) {
delay(100)
emit(it)
}
}.buffer().collect {
delay(200)
println(it)
}
}
println(measureTimeMillis)
运行结果如下:
0
1
2
985
它产生了相同的数字,只是更快了,由于我们高效地创建了处理流水线, 仅仅需要等待第一个数字产生的 100 毫秒以及处理每个数字各需花费的 200 毫秒。(PS:上边的flowOn也用到了类似的缓存机制,所以不需要再次调用buffer操作符)。
合并操作符
当流代表部分操作结果或操作状态更新时,可能没有必要处理每个值,而是只处理最新的那个。在本示例中,当收集器处理它们太慢的时候, conflate 操作符可以用于跳过中间值。构建前面的示例:
suspend fun main() {
flow {
repeat(3) {
delay(100)
emit(it)
}
}.conflate().collect {
delay(500)
println(it)
}
}
运行结果如下:
0
2
Process finished with exit code 0
没有打印中间的1
当发射器和收集器都很慢的时候,合并是加快处理速度的一种方式。它通过删除发射值来实现。 另一种方式是取消缓慢的收集器,并在每次发射新值的时候重新启动它。有一组与 xxx
操作符执行相同基本逻辑的 xxxLatest
操作符,但是在新值产生的时候取消执行其块中的代码。让我们在先前的示例中尝试更换 conflate 为 collectLatest:
suspend fun main() {
flow {
repeat(3) {
kotlinx.coroutines.delay(100)
emit(it)
}
}.collectLatest {
println(it)
delay(500)
println(it)
}
}
运行结果如下
2
Process finished with exit code 0
只打印出最后的2,没有执行0,1的打印
组合操作符
就像 Kotlin 标准库中的 Sequence.zip 扩展函数一样, 流拥有一个 zip 操作符用于组合两个流中的相关值(多余数据直接丢弃)
/**
* zip 组合两个流中的相关值,多余的进行丢弃
*/
suspend fun main() {
val flow1 = flowOf(1, 2, 3, 4)
val flow2 = flowOf("one", "two", "three")
flow1.zip(flow2) { i: Int, s: String ->
"$s:$i"
}.collect {
println(it)
}
}
运行结果如下
one:1
two:2
three:3
Process finished with exit code 0
当流表示一个变量或操作的最新值时(请参阅相关小节 conflation),可能需要执行计算,这依赖于相应流的最新值,并且每当上游流产生值的时候都需要重新计算。这种相应的操作符家族称为 combine。
suspend fun main() {
val flow1 = flowOf(1, 2, 3, 4).onEach { delay(300) }
val flow2 = flowOf("one", "two", "three").onEach { delay(400) }
flow1.combine(flow2) { i: Int, s: String ->
"$s:$i"
}.collect {
println(it)
}
}
运行结果如下:
one:1
one:2
two:2
two:3
three:3
three:4
Process finished with exit code 0
展开操作符
流表示异步接收的值序列,所以很容易遇到这样的情况: 每个值都会触发对另一个值序列的请求。比如说,我们可以拥有下面这样一个返回间隔 500 毫秒的两个字符串流的函数:
fun reqeustFlow(it: Int) = flow {
emit("${it}:first")
kotlinx.coroutines.delay(500)
emit("${it}:second")
}
现在,如果我们有一个包含三个整数的流,并为每个整数调用 requestFlow
val map = flowOf(1, 2, 3).map {
reqeustFlow(it)
}
然后我们得到了一个包含流的流(Flow<Flow<String>>
),需要将其进行展平为单个流以进行下一步处理。集合与序列都拥有 flatten 与 flatMap 操作符来做这件事。然而,由于流具有异步的性质,因此需要不同的展平模式。
flatMapConcat
顺序的将发射器发射的数据转变为一个新的流
flowOf(1, 2, 3).onEach { delay(100) }.flatMapConcat {
reqeustFlow(it)
}.collect {
println(it)
}
运行结果如下:
1:first
1:second
2:first
2:second
3:first
3:second
Process finished with exit code 0
flatMapMerge
并发收集所有传入的流,并将它们的值合并到一个单独的流,以便尽快的发射值。需要注意的是flatMapMerge 会顺序调用代码块(本示例中的 { requestFlow(it) }
),但是并发收集结果流,相当于执行顺序是首先执行 map { requestFlow(it) }
然后在其返回结果上调用 flattenMerge。从现象上来说就是:先获取所有的发射项,然后每个发射项逐次调用flatMapMerge中的代码(flatMapMerge有a,b代码,所有发射项先执行a,执行完后所有发射项再执行b)。
flowOf(1, 2, 3).onEach { delay(100) }.flatMapMerge {
reqeustFlow(it)
}.collect {
println(it)
}
运行代码如下:
1:first
2:first
3:first
1:second
2:second
3:second
Process finished with exit code 0
flatMapLatest
在发出新流后立即取消先前流的收集
flowOf(1, 2, 3).onEach { delay(100) }.flatMapLatest {
reqeustFlow(it)
}.collect {
println(it)
}
运行结果如下:
1:first
2:first
3:first
3:second
Process finished with exit code 0
try-catch
flow在收集器代码块使用try-catch的话,会接收到flow所有的异常抛出。我们首先看发射器发生错误的时候
val fl = flow {
emit(1)
emit(2)
emit(3)
}.map {
check(it <3) {
"上游发生错误"
}
it
}
try {
fl.collect {
println(it)
}
} catch (e: Exception) {
println(e.message)
}
运行结果如下:
1
2
上游发生错误
Process finished with exit code 0
这里需要注意的是check操作符,如果不符合条件才会执行大括号内的代码,并将返回值作为异常的消息。
如果下游发生异常的话,是否能捕获到异常呢?,看如下代码
val flowOf = flowOf(1, 2, 3)
try {
flowOf.collect {
check(it <= 2) {
"下游发生错误"
}
println(it)
}
} catch (e: Exception) {
println(e.message)
}
运行结果如下:
1
2
下游发生错误
Process finished with exit code 0
可以看到如果用try-catch包围收集器的话是可以捕获到上下游的异常的,需要注意的是流必须对异常透明,即在 flow { ... }
构建器内部的 try/catch
块中发射值是违反异常透明性的。这样可以保证收集器抛出的一个异常能被像先前示例中那样的 try/catch
块捕获。那么Flow有没有专门的捕获异常的操作符呢,答案是有的,他就是catch操作符。
flowOf(1, 2, 3).map {
check(it <= 2) {
"上游出现错误"
}
it
}.catch {
println(it.message)
}.collect {
println(it)
}
运行结果如下:
1
2
上游出现错误
Process finished with exit code 0
但是这样是不能获取到接收器发生的错误的。如果想捕获发射器收集器的异常,可采取如下的方法
flowOf(1, 2, 3).map {
check(it < 3) {
"上游发生了错误"
}
it
}.onEach {
//放开该注释,注释掉上面的check代码,则能捕获下游发生的错误
// check(it < 2) {
// "下游发生错误"
// }
println(it)
}.catch {
println(it.message)
}.collect()
运行结果如下:
1
2
上游发生了错误
Process finished with exit code 0
至于你采用try-catch还是catch操作符,那就取决于你的习惯了。
流完成
流的完成也有finally和onCompletion操作符两种。先看finally这种吧
val map = flowOf(1, 2, 3).map {
check(it < 3) {
"上游发生错误"
}
it
}
try {
map.collect {
println(it)
}
} catch (e: Exception) {
println(e.message)
}finally {
println("结束了")
}
运行结果如下
1
2
上游发生错误
结束了
Process finished with exit code 0
onCompletion操作符如下:
flowOf(1, 2, 3).map {
check(it <= 2) {
"上游发生错误"
}
it
}.catch {
println(it.message)
}.onCompletion {
if (it != null) {
println("发生了异常")
} else {
println("没有异常")
}
}.collect {
println(it)
}
运行结果
1
2
上游发生错误
没有异常
Process finished with exit code 0
这里需要注意的是onComplettion是可以捕获到相关异常的,但他不能处理异常,异常仍会向下游流动。本文中的代码因为在onCompletion之前调用了catch,捕获了异常,所以
onCompletion不会收到相关异常信息。
启动流
使用流表示来自一些源的异步事件是很简单的。 在这个案例中,我们需要一个类似 addEventListener
的函数,该函数注册一段响应的代码处理即将到来的事件,并继续进行进一步的处理。onEach 操作符可以担任该角色。 然而,onEach
是一个过渡操作符。我们也需要一个末端操作符来收集流。 否则仅调用 onEach
是无效的。
如果我们在 onEach
之后使用 collect 末端操作符,那么后面的代码会一直等待直至流被收集:
// 模仿事件流
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- 等待流收集
println("Done")
}
运行结果
Event: 1
Event: 2
Event: 3
Done
launchIn 末端操作符可以在这里派上用场。使用 launchIn
替换 collect
我们可以在单独的协程中启动流的收集,这样就可以立即继续进一步执行代码:
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- 在单独的协程中执行流
println("Done")
}
Done
Event: 1
Event: 2
Event: 3
launchIn
必要的参数 CoroutineScope 指定了用哪一个协程来启动流的收集。在先前的示例中这个作用域来自 runBlocking 协程构建器,在这个流运行的时候,runBlocking 作用域等待它的子协程执行完毕并防止 main 函数返回并终止此示例。
在实际的应用中,作用域来自于一个寿命有限的实体。在该实体的寿命终止后,相应的作用域就会被取消,即取消相应流的收集。这种成对的 onEach { ... }.launchIn(scope)
工作方式就像 addEventListener
一样。而且,这不需要相应的 removeEventListener
函数, 因为取消与结构化并发可以达成这个目的。