转载自:Rxjava2入门教程五:Flowable背压支持——对Flowable最全面而详细的讲解
背压介绍
当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。
如果上下游处在同一个线程中,则不会出现背压的问题。因为下游处理完时间后,上游才会发射。
Flowable
大量数据处理需要用Flowable,而小数据则使用Observable即可
由于基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable慢得多。
由于只有在上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,才会产生背压问题。
所以,如果能够确定:
- 上下游运行在同一个线程中,
- 上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度,
- 上下游工作在不同的线程中,但是数据流中只有一条数据
则不会产生背压问题,就没有必要使用Flowable,以免影响性能。
Flowable的使用
下例使用了Flowable
来发射事件,大体与Observable
类似,只是有几点区别:
- Flowable发射数据时,使用特有的发射器FlowableEmitter,不同于Observable的ObservableEmitter
- create方法中多了一个
BackpressureStrategy
类型的参数,该参数负责当BackPress产生的时候,对应的Emitter的处理策略是什么样的 -
onSubscribe
中接收的不是Dispose
,而是Subscription
对象,并且调用了s?.request(10)
Flowable.create<Int>({ emitter ->
emitter.onNext(1)
emitter.onComplete()
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription?) { s?.request(10)}
override fun onNext(t: Int?) = System.out.println("onNext...$t")
override fun onComplete() = System.out.println("onComplete")
override fun onError(t: Throwable?) = System.out.println("onError...$t")
})
BackpressureStrategy背压策略
BackPress策略有这几种:
- MISSING
- ERROR
- BUFFER
- DROP
- LATEST
当上游发送数据的速度快于下游接收数据的速度,且运行在不同的线程中时,Flowable通过自身特有的异步缓存池,来缓存没来得及处理的数据,缓存池的容量上限为128
条。
当缓存池的容量超过128
条时,就会触发Backpress的应对策略。
BackpressureStrategy的作用便是用来设置Flowable通过异步缓存池缓存数据的策略。在FlowableCreate
类中看到,在设置完BackpressureStrategy
之后,对应的Strategy会根据映射生成不同Emitter:
-
MISSING ----> MissingEmitter
:
在此策略下,通过Create方法创建的Flowable相当于没有指定背压策略,不会对通过onNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。 -
ERROR ----> ErrorAsyncEmitter
:
在此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。 -
DROP ----> DropAsyncEmitter
:
如果Flowable的异步缓存池满了,会丢掉上游发送的数据。由于Emitter都是继承自AutomicLong
或者其他的原子数据,所以通过get()
得到的就是缓存池数据剩下的数量,如果为0,代表缓存池已经满了。 -
LATEST ----> LatestAsyncEmitter
:
与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据 -
BUFFER ----> BufferAsyncEmitter
:
默认的策略。如果Flowable默认的异步缓存池满了,会通过该Emitter中新增的缓存池暂存数据,它与Observable的异步缓存池一样,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM
背压操作符
Backpress操作符一共有这些:
- onBackpressureBuffer
- onBackpressureDrop
- onBackpressureLatest
主要的作用就是,当Flowable
不是通过create
创建时,没有传入BackpressStrategy,则可以通过这些操作符来指定BackpressStrategy。例如:
Flowable.range(0, 500).onBackpressureDrop()
Flowable的响应式拉取
Flowable在设计的时候,采用了一种新的思路——响应式拉取方式,来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据。
如果不显示调用request
则默认下游的需求量为零,上游Flowable发射的数据不会交给下游Subscriber处理。而多次调用则会将该数累加:
Flowable.create<Int>({ emitter ->
repeat(3) {
Log.e(TAG, "emitter.request:${emitter.requested()}")
emitter.onNext(it)
}
emitter.onComplete()
}, BackpressureStrategy.ERROR).onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription?) {
// 累加到2
s?.request(1)
s?.request(1)
}
override fun onNext(t: Int?) = System.out.println("onNext...$t")
override fun onComplete() = System.out.println("onComplete")
override fun onError(t: Throwable?) = System.out.println("onError...$t")
})
就会输出:
onNext...0
onNext...1