前言
项目中,往往少不了事件总线,以前一般使用EventBus
,但现在由于Rx
系列的强大,也就投入了RxBus
的怀抱。下面就来说说RxBus
背压策略
在RxJava1.x
版本的时候,可能会遇到MissingBackpressureException
和OOM
这样的问题,这是很多事件不能被正常的背压处理的导致的,在RxJava2.x
版本引入了Flowable
来处理背压的问题,那么,什么是背压呢?
背压策略概念:
RxJava
在异步事件处理并且被观察者处理事件的速度比观察者处理事件的速度快的情况下,通知被观察者降低发送速度的策略
主要的实现原理就是:
Observables
分为两种主要类型:Hot Observables
和Cold Observables
,Cold Observables
是有订阅事件的观察者才会开始发送事件,比如from
,create
,just
,timer
,interval
,range
,zip
等操作符;而Hot Observables
不管是否有订阅者,创建完后就直接发送事件。所以Hot Observable
是不支持背压式,而Cold Observable
是部分操作符支持背压式策略的。而不支持背压式的操作符可以通过sample( ) 、 throttleLast( )、 throttleFirst( )、 throttleWithTimeout( ) 、 debounce( )
等等这些节流操作符去控制观察者的发送事件速度。关于这方面的知识,这篇文章关于RxJava最友好的文章——背压(Backpressure)就写的很不错。
发送事件
实现步骤
- 创建一个
FlowableProcessor(Any)
-
post(o:Any)
方法,发送事件函数, 同时传入需要发送的信息 -
toFlowable()
返回FlowableProcessor<Any>
对象
最后得到的代码如下:
object RxBus{
private val bus:FlowableProcessor<Any> = PublishProcessor.create<Any>().toSerialized()
fun post(t:Any){
bus.onNext(t)
}
fun toFlowable(): FlowableProcessor<Any> {
return bus
}
}
接收事件
实现步骤
- 通过
toFlowable()
获取到FlowableProcessor<Any>
对象, - 传入需要接收的信息类型,通过
ofType(classType)
获取 -
subscribe
订阅,获取到信息后进一步的操作。
最后得到的代码如下:
class Bus(val sub: CompositeDisposable){
fun <T : Event> subscribe(classType: Class<T>, onNext:((res: T) ->Unit)?=null,
onError:((e: Throwable) ->Unit)?=null,
onComplete:(() ->Unit)?=null) {
val subscribe = RxBus.toFlowable()
.ofType(classType)
.subscribe({
if(onNext!=null) onNext(it)
},{
if(onError!=null) onError(it)
},{
if(onComplete!=null) onComplete()
})
sub.add(subscribe)
}
}
fun BaseActivity.bus(init: Bus.() -> Unit): Bus {
val bus = Bus(sub)
bus.init()
return bus
}
fun BaseFragment.bus(init: Bus.() -> Unit): Bus {
val bus = Bus(sub)
bus.init()
return bus
}
使用
定义一个Event
open class Event
class LogoutEvent : Event()
然后发送信息
RxBus.post(LogoutEvent())
最后接收信息
bus {
subscribe(LogoutEvent::class.java,{
//进行退出登录操作
})
}
在获取信息的时候传入的CompositeDisposable
是BaseActivity/BaseFragment
的CompositeDisposable
,所以需要在BaseActivity/BaseFragment
的onDestroy()
里面进行clear
override fun onDestroy() {
sub.clear()
super.onDestroy()
}
这样,一个简单的RxBus
就封装好了。