RxJava2介绍
RxJava是通过事件传递,并且在传递过程中对事件内部数据进行修改,最终发送给接收者的响应式框架。
借助某个同学的一张图可以更直观的了解:
上图只是在同个线程中,可以让事件携带数据按顺序从上层流转到下层。而在事件流转的过程中,RxJava提供了很多操作符可以对源头事件进行处理再往下传递。
RxJava2的优势
- 书写简便,没有层层回调
- 流式调用,可以简单的看出来整个过程
- 操作符非常强大,可以在事件流转中间进行各种处理
- 可以保证多线程之间事件的顺序(不过数据同步需要自己保证)
RxJava2的配置
在Module中引入即可:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
RxJava2中基本类介绍
Observable
:一个可被subscribe
的对象,也可以理解成被监听的对象,而该对象中保存着一个名为ObservableEmitter
的对象,ObservableEmitter
对象就是上面提到的发送事件的对象。
Observer
:接收Observable
发送事件的对象。
Consumer
: 只接收onNext
事件的对象
本质RxJava就是一套非常强大的Observer框架
Observable与Observer基本使用
在Observable
对象中调用onNext
发射(Emitter)了1
,2
,以及Error
、Complete
这四个事件。
而在Observer
对应的几个响应函数中打印日志(为了方便,把Log.e
替换成了System.out.println
)。
Observable.create<Int> { emitter ->
Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
emitter.onNext(1)
Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
emitter.onNext(2)
Log.e(TAG, "Emitter onError...${Thread.currentThread().name}")
emitter.onError(NullPointerException())
Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
emitter.onComplete()
}.subscribe(object : Observer<Int> {
override fun onComplete() = System.out.println("onComplete...${Thread.currentThread().name}")
override fun onSubscribe(d: Disposable?) = System.out.println("onSubscribe...${Thread.currentThread().name}...Disposable:$d")
override fun onNext(value: Int?) = System.out.println("onNext...${Thread.currentThread().name}...Value:$value")
override fun onError(e: Throwable?) = System.out.println("onError...${Thread.currentThread().name}...Throwable:$e")
})
而在同一个线程中,输出结果如下:
E/SelectImageActivity: onSubscribe...main...Disposable:null
E/SelectImageActivity: Emitter onNext1...main
E/SelectImageActivity: onNext...main...Value:1
E/SelectImageActivity: Emitter onNext2...main
E/SelectImageActivity: onNext...main...Value:2
E/SelectImageActivity: Emitter onError...main
E/SelectImageActivity: onError...main...Throwable:java.lang.NullPointerException
E/SelectImageActivity: Emitter onComplete...main
可以发现:
1. 在同一个线程中,发送一个事件,就会接收一个事件,再发送下一个事件
2. 在发送完onError
事件后,即使发送了onComplete
事件,也无法接收
3. 在发送完onComplete
事件后,再发送了onError
事件,则会将该Throwable
对象抛出,出现crash
4. 在发送完onComplete
与onError
事件后,再发送onNext
事件,则无法接收
事件的消费者Consumer
在大多数情况下,我们只用关心onNext
或者onError
单独的事件,而对于其他的事件均不关心,这种情况下,我们就可以使用Consumer
对象
对于subscribe
函数的重载函数有这些:
public final void subscribe(Observer<? super T> observer)
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)
举例,我们只关心onNext
事件,则可以这样来表示:
Observable.create<Int> { emitter ->
Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
emitter.onNext(1)
Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
emitter.onNext(2)
Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
emitter.onComplete()
Log.e(TAG, "Emitter onNext3...${Thread.currentThread().name}")
emitter.onNext(3)
}.subscribe { data ->
Log.e(TAG, "onNext...$data")
}
在接收端,仅仅只接收了onNext
事件。
E/SelectImageActivity: Emitter onNext1...main
E/SelectImageActivity: onNext...1
E/SelectImageActivity: Emitter onNext2...main
E/SelectImageActivity: onNext...2
E/SelectImageActivity: Emitter onComplete...main
E/SelectImageActivity: Emitter onNext3...main