Kotlin 中的背压有如下三种方式。通过 buffer 操作符来完成。
buffer 操作指的是设置缓冲区。当然缓冲区有大小,如果溢出了会有不同的处理策略。
- 设置缓冲区,如果溢出了,则将当前协程挂起,直到有消费了缓冲区中的数据。
- 设置缓冲区,如果溢出了,丢弃最新的数据。
- 设置缓冲区,如果溢出了,丢弃最老的数据。
缓冲区的大小可以设置为 0,也就是不需要缓冲区。
设置缓冲区,并采用挂起的策略
suspend fun flowBackpressureBuffer(overflow: BufferOverflow) {
fun currTime() = System.currentTimeMillis()
var start: Long = 0
val time = measureTimeMillis {
(1..5).asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms)")
}
.buffer(capacity = 2, overflow)
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")
}
上述代码如果采用 BufferOverflow.SUSPEND
策略,也就是挂起策略。输入如下:
Emit 1 (111ms)
Collect 1 starts (113ms)
Emit 2 (216ms)
Emit 3 (319ms)
Emit 4 (424ms)
Collect 1 ends (613ms)
Collect 2 starts (614ms)
Emit 5 (717ms)
Collect 2 ends (1117ms)
Collect 3 starts (1118ms)
Collect 3 ends (1620ms)
Collect 4 starts (1621ms)
Collect 4 ends (2125ms)
Collect 5 starts (2125ms)
Collect 5 ends (2630ms)
Cost 2765 ms
可以看到第 4 发现缓冲区满了,所以就挂起了,当消费第 2 个的时候,此时 3 和 4 存储在了缓冲区内,此时发射第五个。因为 buffer 的容量是从 0 开始计算的。
设置缓冲区,丢弃最新的数据
如果上述代码传入 BufferOverflow.DROP_LATEST
,输出如下:
Emit 1 (107ms)
Collect 1 starts (110ms)
Emit 2 (213ms)
Emit 3 (314ms)
Emit 4 (419ms)
Emit 5 (524ms)
Collect 1 ends (613ms)
Collect 2 starts (613ms)
Collect 2 ends (1117ms)
Collect 3 starts (1117ms)
Collect 3 ends (1620ms)
Cost 1728 ms
可以看到虽然发射了五个,但是丢弃了后面的两个数据。
设置缓冲区,丢弃旧数据
如果上述代码传入 BufferOverflow.DROP_OLDEST
,输出如下:
Emit 1 (109ms)
Collect 1 starts (111ms)
Emit 2 (214ms)
Emit 3 (317ms)
Emit 4 (419ms)
Emit 5 (525ms)
Collect 1 ends (613ms)
Collect 4 starts (613ms)
Collect 4 ends (1116ms)
Collect 5 starts (1116ms)
Collect 5 ends (1618ms)
Cost 1716 ms
可以看到虽然发射了五个,但是 2,3 被丢弃了。
还有一个 conflate
confilate
操作符的作用不设缓冲区,丢弃旧数据。
suspend fun flowBackpressureLatest() {
fun curTime() = System.currentTimeMillis()
var start: Long = 0
val time = measureTimeMillis {
(1..5).asFlow()
.onStart { start = curTime() }
.onEach {
delay(100)
println("Emit $it (${curTime() - start}ms)")
}
.conflate()
.collect {
println("Collect $it start (${curTime() - start}ms) ")
delay(500)
println("Collect $it ends ${curTime() - start}ms) ")
}
}
println("Cost $time")
}
输出如下:
Emit 1 (109ms)
Collect 1 start (111ms)
Emit 2 (214ms)
Emit 3 (317ms)
Emit 4 (420ms)
Emit 5 (520ms)
Collect 1 ends 615ms)
Collect 5 start (615ms)
Collect 5 ends 1120ms)
Cost 1226
conflate
操作符是不设缓冲区,也就是缓冲区大小为 0,丢弃旧数据,也就是采取 DROP_OLDEST
策略,那么不就等于 buffer(0, BufferOverflow.DROP_OLDEST)
。
我们来看看源码:
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
"Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
}
require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
// desugar CONFLATED capacity to (0, DROP_OLDEST)
var capacity = capacity
var onBufferOverflow = onBufferOverflow
**if (capacity == CONFLATED) {
capacity = 0
onBufferOverflow = BufferOverflow.DROP_OLDEST
}**
// create a flow
return when (this) {
is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
}
}
所以 conflate
是 buffer(0, BufferOverflow.DROP_OLDEST)
的一种快捷方式。
下一篇 StateFlow
与 SharedFlow
敬请期待。