Kotlin协程之Flow-异步流

如何表示多个值
  • 挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢?
异步返回多个值的方案
  • 集合
  • 序列
  • 挂起函数
  • Flow
/*suspend*/ fun simpleFlow() = flow {
    for(i in 1..3){
        delay(1000)
        emit(i)
    }
}
Flow与其他方式区别
  • 名为flow的Flow类型构建器函数
  • flow{...}构建块中的代码可以挂起
  • 函数simpleFlow不再标有suspend修饰符
  • 流使用emit函数发射值
  • 流使用collect函数收集值
Flow应用
  • 在Android当中,文件下载是Flow的一个非常典型的应用


    20211222-103630@2x.png
冷流
  • Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行
fun simpleFlow() = flow {
    println("Flow started")
    for(i in 1..3){
        delay(1000)
        emit(i)
    }
}

fun testFlowIsCode() = runBlocking {
    val flow = simpleFlow()
    println("Flow Collect")
    flow.collect { println(it) }
    println("Flow Collect again")
    flow.collect { println(it) }
}

返回结果

Flow Collect
Flow started
1
2
3
Flow Collect again
Flow started
1
2
3

Process finished with exit code 0

根据以上返回结果可以看出代码执行val flow = simpleFlow()的时候没有执行flow{...}构建块中的代码,只有调用collect的时候才执行,这就是冷流

流的连续性
  • 流的每次单独收集都是按照顺序执行的,除非使用特殊操作符
  • 从上游到下游每个过度操作符都会处理每个发射出的值,然后再交给末端操作符
fun testFlowContinuation() = runBlocking {
    (1..5).asFlow().filter {
        it%2 == 0
    }.map {
        "string $it"
    }.collect {
        println(it)
    }
}
流构建器
  • flowOf构建器定义了一个发射固定值集的流
  • 使用.asFlow()扩展函数,可是将各种集合与序列转换为流
fun testFlowOf() = runBlocking {
    flowOf("one", "two", "three")
        .onEach {
            delay(1000)//每隔1s发射一个元素 
        }.collect{
            println(it)
        }
}
流上下文
  • 流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存
  • flow{...}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)
fun simpleFlow() = flow {
    println1("Flow started ${Thread.currentThread().name}")
    for(i in 1..3) {
        delay(1000)
        emit(i)
    }
}

fun testFlowContext() = runBlocking {
    simpleFlow().collect { value ->
        println1("$value ${Thread.currentThread().name}")
    }
}

看以上代码两个方法都存在于主线程,那如果flow{...}改变一下线程如下

fun simpleFlow() = flow {
   withContext(Dispatchers.IO){
       println1("Flow started ${Thread.currentThread().name}")
       for(i in 1..3) {
           delay(1000)
           emit(i)
       }
   }
}

执行一下,就会报错

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@599ad64a, BlockingEventLoop@2cbb850d],
        but emission happened in [DispatchedCoroutine{Active}@42689e0c, Dispatchers.IO].
        Please refer to 'flow' documentation or use 'flowOn' instead

这就证明必须遵循上下文保存属性,并且不允许从其他上下文中发射

  • flowOn操作符,该函数用于更改流发射的上下文
fun simpleFlow() = flow {
    println1("Flow started ${Thread.currentThread().name}")
    for(i in 1..3) {
        delay(1000)
        emit(i)
    }
}.flowOn(Dispatchers.IO)
启动流
  • 使用launchIn替换collect我们可以在单独的协程中启动流的收集
//事件源
fun event() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)

fun testEventLaunch() = runBlocking {
    event().onEach {event->
        println1("Event: $event  ${Thread.currentThread().name}")
    }.launchIn(CoroutineScope(Dispatchers.IO)).join()
}
流的取消
  • 流采用与协程同样的协作取消。像往常一样,流的收集可以是当流在一个可取消的挂起函数(例如 delay)中挂起的时候取消
fun simpleFlow1() = flow {
    println1("Flow started ${Thread.currentThread().name}")
    for(i in 1..3) {
        delay(1000)
        emit(i)
        println1("Emitting $i")
    }
}.flowOn(Dispatchers.IO)

fun testCancelFlow() = runBlocking {
    withTimeoutOrNull(2500) {
        simpleFlow1().collect { value -> println1(value) }
    }
    println1("Done")
}
流的取消监测
  • 为方便起见,流构建器对每个发射值执行附加的 ensureActive 监测以进行取消,这意味着从flow{...}发出繁忙循环是可以取消的
  • 出于性能原因,大多数其他流操作不会自行执行其他取消监测,在协程出于繁忙循环的情况下,必须明确监测是否取消
  • 通过cancellable操作符来执行此操作
fun testCancelFlowCheck() = runBlocking {
    (1..5).asFlow().cancellable().collect {
        println1(it)
        if(it == 3)cancel()
    }
}
背压

水流收到与流动方向一致的压力叫做背压或者生产者的效率大于消费者的效率

  • buffer(),并发运行流中发射元素的代码,相当于把管道延长,增加缓冲区
  • conflate(),合并发射项,不对每个值进行处理
  • collectLates(),取消并重新发射最后一个值
  • 当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓存机制,但是buffer函数显式地请求缓冲而不改变执行上下文
fun simpleFlow2() = flow {
    for(i in 1..3) {
        delay(100)
        emit(i)
        println1("Emitting $i")
    }
}

fun testFlowBackPressure() = runBlocking {
    val time = measureTimeMillis {
        simpleFlow2()
//            .flowOn(Dispatchers.Default)
//            .buffer(50)
//            .conflate()
//            .collectLatest {
//            }
            .collect {
            delay(300)
            println1("$it")
        }
    }
    println1(time)
}
过渡流操作符
  • 可以使用操作符转换流,就像使用集合与序列一样
  • 过渡操作符应用于上游流,并返回下游流
  • 这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数
  • 它运行速度很快,返回新的转换流的定义
suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun testTransformFlowOperator() = runBlocking {
    (1..5).asFlow().transform { request ->
        emit("Making request $request")
        emit(performRequest(request))
    }.collect {
        println1(it)
    }
}
限长操作符

take

fun number() = flow {
    try {
        emit(1)
        emit(2)
        emit(3)
    } finally {
        println1("Finally in numbers")
    }
}

fun testLimitLengthOperator() = runBlocking {
    number().take(2).collect { println1(it) }
}
末端操作符
  • 末端操作符是在流上用于启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些方便使用的末端操作符
  1. 转化为各种集合,例如toList和toSet
  2. 获取第一个(first)值和确保流发射单个(single)值的操作符
  3. 使用reduce与fold将流规约到单个值
fun testTerminalOperator() = runBlocking {
    val sum = (1..5).asFlow().map { it * it }.reduce { a, b ->
        a+b
    }
    println1(sum)
}
fun testTerminalOperator() = runBlocking {
    val sum = (1..5).asFlow()
    .fold(0, {acc, i -> acc + i })//以0为初始值,求1到5的和
    println1(sum)
}
组合多个流
  • 就像Kotlin标准库中的Sequence.zip扩展函数一样,流拥有一个zip操作符用于组合两个流中的相关值
fun testZip() = runBlocking {
    val numbers = (1..5).asFlow()
    val strs = flowOf("one", "two", "three")
    numbers.zip(strs) { a, b ->
        "$a -> $b"
    }.collect { println1(it) }
}
展平流
  • 流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平方式,为此,存在一系列的流展平操作符
  1. flatmapConcat 连接模式
  2. flatMapMerge 合并模式
  3. flatMapLatest最新展平模式
fun requestFlow(i: Int) = flow {
    emit("$i first")
    delay(500)
    emit("$i second")
}

fun testFlatMapConcat() = runBlocking {
    (1..3).asFlow().onEach { delay(100) }
//        .map { requestFlow(it) } // 转换后会变成 Flow<Flow<String>>因此需要展平处理
        .flatMapConcat { requestFlow(it) }
//        .flatMapConcat { requestFlow(it) }
//        .flatMapConcat { requestFlow(it) }
        .collect { println1(it) }
}

返回结果对比

flatMapConcat 模式
1 first
1 second
2 first
2 second
3 first
3 second
flatMapMerge 模式
1 first
2 first
3 first
1 second
2 second
3 second
flatMapLatest 模式
1 first
2 first
3 first
3 second
流的异常处理
  • 当运算符中发射器或代码抛出异常时,有几种处理异常的方法
  1. try/catch块
  2. catch函数
fun number() = flow {
    try {
        emit(1)
        emit(2)
        emit(3)
    } finally {
        println1("Finally in numbers")
    }
}.catch { e : Throwable ->
    // 在catch块中 还可以继续发射元素
    emit(4)
}
流的完成
  • 当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。
  1. 命令式finally块
  2. onCompletion声明式处理
fun simpleFlow4() = (1..3).asFlow()

fun testFlowCompleteInFinally() = runBlocking {
//    try {
//        simpleFlow4().collect { println1(it) }
//    } finally {
//        println1("Done")
//    }
    simpleFlow4().onCompletion {exception-> //onCompletion还可以打印出上游下游异常信息
        println1("Done ${exception ?: ""}")
    }.collect {
        println1(it)
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容