Kotlin Flow

一、Flow

1.Flow 是什么?

  • Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库,也叫做异步流,类似 RxJava 的 Observable、Flowable 等等,所以很多人都用 Flow 与 RxJava 做对比。
  • Flow 是对 Kotlin 协程的扩展,让我们可以像运行同步代码一样运行异步代码,使得代码更加简洁,提高了代码的可读性。

2.如何创建 Flow?

  • 使用 flow 创建,生产者和消费者的通信是同步非阻塞的。

    private fun testFlow() {
        CoroutineScope(Job()).launch {
            Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
            flow {
                for (i in 1..3) {
                    delay(500)
                    Log.d(TAG, "zwm, emit: $i thread: ${Thread.currentThread().name}")
                    emit(i)
                }
            }.collect {
                Log.d(TAG, "zwm, collect: $it thread: ${Thread.currentThread().name}")
            }
        }
    }
    
    //日志打印
    2022-07-16 20:46:39.545 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
    2022-07-16 20:46:40.050 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, emit: 1 thread: DefaultDispatcher-worker-1
    2022-07-16 20:46:40.051 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, collect: 1 thread: DefaultDispatcher-worker-1
    2022-07-16 20:46:40.553 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, emit: 2 thread: DefaultDispatcher-worker-1
    2022-07-16 20:46:40.553 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, collect: 2 thread: DefaultDispatcher-worker-1
    2022-07-16 20:46:41.055 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, emit: 3 thread: DefaultDispatcher-worker-1
    2022-07-16 20:46:41.055 11250-11358/com.tomorrow.target30 D/KtActivity: zwm, collect: 3 thread: DefaultDispatcher-worker-1
    
  • 使用 channelFlow 创建,生产者和消费者的通信是异步非阻塞的。

    @ExperimentalCoroutinesApi
    private fun testFlow() {
        CoroutineScope(Job()).launch {
            Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
            channelFlow {
                for (i in 1..3) {
                    delay(500)
                    Log.d(TAG, "zwm, send: $i thread: ${Thread.currentThread().name}")
                    send(i)
                }
            }.collect {
                Log.d(TAG, "zwm, collect: $it thread: ${Thread.currentThread().name}")
            }
        }
    }
    
    //日志打印
    2022-07-16 20:37:37.327 9531-9605/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-2
    2022-07-16 20:37:37.839 9531-9604/com.tomorrow.target30 D/KtActivity: zwm, send: 1 thread: DefaultDispatcher-worker-1
    2022-07-16 20:37:37.841 9531-9605/com.tomorrow.target30 D/KtActivity: zwm, collect: 1 thread: DefaultDispatcher-worker-2
    2022-07-16 20:37:38.342 9531-9605/com.tomorrow.target30 D/KtActivity: zwm, send: 2 thread: DefaultDispatcher-worker-2
    2022-07-16 20:37:38.343 9531-9604/com.tomorrow.target30 D/KtActivity: zwm, collect: 2 thread: DefaultDispatcher-worker-1
    2022-07-16 20:37:38.844 9531-9604/com.tomorrow.target30 D/KtActivity: zwm, send: 3 thread: DefaultDispatcher-worker-1
    2022-07-16 20:37:38.845 9531-9605/com.tomorrow.target30 D/KtActivity: zwm, collect: 3 thread: DefaultDispatcher-worker-2
    

3.如何切换线程?

private fun testFlow() {
    CoroutineScope(Job()).launch {
        Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
        flow {
            for (i in 1..3) {
                delay(500)
                Log.d(TAG, "zwm, emit: $i thread: ${Thread.currentThread().name}")
                emit(i)
            }
        }.flowOn(Dispatchers.IO)
                .collect {
                    Log.d(TAG, "zwm, collect: $it thread: ${Thread.currentThread().name}")
                }
    }
}

//日志打印
2022-07-16 20:54:55.376 13260-13337/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-16 20:54:55.889 13260-13338/com.tomorrow.target30 D/KtActivity: zwm, emit: 1 thread: DefaultDispatcher-worker-2
2022-07-16 20:54:55.892 13260-13337/com.tomorrow.target30 D/KtActivity: zwm, collect: 1 thread: DefaultDispatcher-worker-1
2022-07-16 20:54:56.393 13260-13337/com.tomorrow.target30 D/KtActivity: zwm, emit: 2 thread: DefaultDispatcher-worker-1
2022-07-16 20:54:56.395 13260-13338/com.tomorrow.target30 D/KtActivity: zwm, collect: 2 thread: DefaultDispatcher-worker-2
2022-07-16 20:54:56.895 13260-13338/com.tomorrow.target30 D/KtActivity: zwm, emit: 3 thread: DefaultDispatcher-worker-2
2022-07-16 20:54:56.897 13260-13337/com.tomorrow.target30 D/KtActivity: zwm, collect: 3 thread: DefaultDispatcher-worker-1

4.操作符有哪些?

  • map 操作符

  • filter 操作符

  • 末端操作符,如 collect、toList、reduce、fold、onEach 等

  • flowOn 操作符

  • retry 操作符

  • zip 操作符

  • combine 操作符

  • 其它操作符,如 flattenMerge、take、drop、onEach、onStart 等

  • 协程背压

    Kotlin 协程支持背压。Kotlin 协程中所有函数都标有 suspend 修饰符,具有在不阻塞线程的情况下挂起调用程序执行的强大功能。因此,当流的收集器不堪重负时,它可以简单地挂起发射器,并在准备好接受更多元素时稍后将其恢复。

    buffer 操作符:没有固定大小,可以无限制添加数据,不会抛出 MissingBackpressureException 异常,但可能会导致 OOM。

    conflate 操作符:如果缓存池满了,新数据会覆盖老数据。

    collectLatest 操作符:不会直接用新数据覆盖老数据,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消。

二、冷流与热流

1.冷流是什么?

在 Kotlin 中,Flow 是冷流。只有订阅者订阅时,才开始执行发射数据流的代码。并且冷流和订阅者只能是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对冷流而言,有多个订阅者的时候,它们各自的事件是独立的。

2.热流是什么?

在 Kotlin 中,StateFlow、SharedFlow 是热流。无论有没有订阅者订阅,事件始终都会发生。当热流有多个订阅者时,热流与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。

三、StateFlow

1.StateFlow 是什么?

StateFlow 是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。可以通过其 value 属性读取当前状态值,如需更新状态并将其发送到数据流,那么就需要使用 MutableStateFlow。

2.StateFlow 如何使用?

private fun testFlow() {
    val data = MutableStateFlow(100) //需要有初始值
    CoroutineScope(Job()).launch {
        Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
        data.collect {
            Log.d(TAG, "zwm, collect thread: ${Thread.currentThread().name}, data: ${data.value}")
        }
    }

    Handler(Looper.getMainLooper()).postDelayed(Runnable {
        Log.d(TAG, "zwm, postDelayed thread: ${Thread.currentThread().name}")
        CoroutineScope(Job()).launch {
            Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
            Log.d(TAG, "zwm, current data: ${data.value}")
            data.value = 200 //上一个值为 100,更新值为 200,正常接收
//            data.value = 100 //上一个值为 100,更新值为 100,不接收
        }
    }, 3000)
}

//日志打印
2022-07-17 10:35:13.175 3097-3325/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-17 10:35:13.176 3097-3325/com.tomorrow.target30 D/KtActivity: zwm, collect thread: DefaultDispatcher-worker-1, data: 100
2022-07-17 10:35:16.174 3097-3097/com.tomorrow.target30 D/KtActivity: zwm, postDelayed thread: main
2022-07-17 10:35:16.176 3097-3325/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-17 10:35:16.176 3097-3325/com.tomorrow.target30 D/KtActivity: zwm, current data: 100
2022-07-17 10:35:16.178 3097-3326/com.tomorrow.target30 D/KtActivity: zwm, collect thread: DefaultDispatcher-worker-2, data: 200

四、SharedFlow

1.SharedFlow 是什么?

SharedFlow 提供了 SharedFlow 与 MutableSharedFlow 两个版本,平时使用较多的是 MutableSharedFlow。MutableSharedFlow 没有起始值,发送数据时需要调用 emit()、tryEmit() 方法。构造函数如下:

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
  • replay:表示当新的订阅者 Collect 时,发送几个已经发送过的数据给它,默认为 0,即默认新订阅者不会获取以前的数据。

  • extraBufferCapacity:表示减去 replay,MutableSharedFlow 还缓存多少数据,默认为 0。

  • onBufferOverflow:表示缓存策略,即缓冲区满了之后 Flow 如何处理,默认为挂起。除此之外,还支持 DROP_OLDEST 和 DROP_LATEST。

2.SharedFlow 如何使用?

private fun testFlow() {
    val data = MutableSharedFlow<Int>() //不需要初始值
    CoroutineScope(Job()).launch {
        Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
        data.collect {
            Log.d(TAG, "zwm, collect thread: ${Thread.currentThread().name}, data: $it")
        }
    }

    Handler(Looper.getMainLooper()).postDelayed(Runnable {
        Log.d(TAG, "zwm, postDelayed thread: ${Thread.currentThread().name}")
        CoroutineScope(Job()).launch {
            Log.d(TAG, "zwm, launch thread: ${Thread.currentThread().name}")
            data.emit(200)
            data.emit(200) //上一个值为 200,更新值为 200,正常接收
        }
    }, 3000)
}

//日志打印
2022-07-17 10:49:00.782 5908-6008/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-17 10:49:03.782 5908-5908/com.tomorrow.target30 D/KtActivity: zwm, postDelayed thread: main
2022-07-17 10:49:03.787 5908-6008/com.tomorrow.target30 D/KtActivity: zwm, launch thread: DefaultDispatcher-worker-1
2022-07-17 10:49:03.793 5908-6010/com.tomorrow.target30 D/KtActivity: zwm, collect thread: DefaultDispatcher-worker-2, data: 200
2022-07-17 10:49:03.801 5908-6010/com.tomorrow.target30 D/KtActivity: zwm, collect thread: DefaultDispatcher-worker-2, data: 200

3.StateFlow 与 SharedFlow 的区别是什么?

  • StateFlow 与 SharedFlow 都是热流,都是为了满足流的多个订阅者的使用场景。
  • SharedFlow 配置更为灵活,支持配置重播个数、缓冲区大小等。StateFlow 是 SharedFlow 的特殊化版本,重播个数固定为 1,缓冲区大小默认为 0。
  • StateFlow 当 value 重复时,不会回调 collect 给新的订阅者。SharedFlow 支持发射和接收重复值。
  • StateFlow 只会重播当前最新值,SharedFlow 可配置重播元素个数,默认为 0,即不重播。
  • StateFlow 与 LiveData 类似,支持通过 myFlow.value 获取当前状态,如果有这个需求,必须使用 StateFlow。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容