一、简介
关于RxJava的介绍与使用,在网上已经有很多的相关的教程,无奈自身水平有限,绝大多数讲解的原理噼里啪啦一堆,说的一堆高大上的名词把自己绕的云里雾里,领悟不透;刚好最近公司要求做分享,自己就做了一份关于RxJava的介绍。
这里我将用河水的上游和下游代替被观察者和观察者, 用通俗易懂的话把它们的关系解释清楚, 在这里我将从事件流向这个角度来说明RxJava的基本工作原理。
二、简单原理分析
首先,我们来看一张图:
上图中河流的上游产生了三个事件,而下游就会接收到三个事件,在这里上游按照顺序发送了1,2,3三个事件,而下游在收到的事件顺序也是先1,后2,再3的顺序;简单来说,这里的上游和下游就分别对应着RxJava中的Observable和Observer,它们之间的连接就对应着subscribe()。下面用简单的代码来说明这种关系。
三、简单使用
3.1 上游Observable的创建
//创建一个上游 即Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter)
throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
//e.onError(new IOException());
}
});
在创建上游Observable的时候有涉及到的三个方法说明:
** 1. onNext(T t):**
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。
2. onComplete():
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
3. onError(Exception ex):
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
注:以上三个方法都是在被ObservableEmitter这个方法所调用的,关于这个方法又是做什么的呢?
3.2 关于上游Observable中的ObservableEmitter()方法说明
Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
然而,这并不意味着我们就可以随意的发送乱七八糟的事件,我们需要满足以下的规则才行:
1. 上游可以发送无限个onNext, 下游也可以接收无限个onNext;
2. 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件;
3. 当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件;
4. 上游可以不发送onComplete或onError;
5. 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。
说了这么多,那么我们的下游到底要如何接收处理呢,下面我们来看一下下游的创建。
3.3 下游Observer的创建
//创建一个下游 即Observer
Observer<Integer> observer = new Observer<Integer>() {
@Override public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext" + integer);
}
@Override public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError");
}
@Override public void onComplete() {
Log.d(TAG, "onComplete");
}
};
在创建下游的时候我么发现,在下游除了接收了onNext、onError、onComplete三个方法之外,还有一个onSubscribe方法,那么这个方法是做啥的?这里我简单介绍下:
关于onSubscribe(Disposable d)方法的说明:
onSubscribe是在Observer中执行的第一个方法,在onNext方法之前执行,关于它传入的参数Disposable,可以将其理解成一个上游和下游之间的机关,当我们调用其对应的dispose()方法的时候,他就会将上游和下游之间切断,从而导致下游不会接收到新的事件。
注:调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件。
上面说了这么多,那么onNext、onError、onCompleted方法到底有何区别。
3.4 onNext、onComplete、onError三个方法的区别
我们通过以下三幅图来了解关于这三个方法的区别。
3.6 上游Observable与下游Observer的关联
我们现在已经创建好了上游和下游,该如何关联起来呢,很简单,我们可以用subscribe()方法进行关联:
//建立连接
observable.subscribe(observer);
关于observable()方法,它其实有很多的重载方法,具体如下:
下面我简单的对说明一下这些重载方法:
- 不带任何参数的 subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 老子可不管你发什么.
- 带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
emitter.onNext(4);
}
}).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
打印结果如下:
3.7 关于Observable的其他创建方法
除了我们使用Observable.create()方法创建Observable之外还有:
Observable.just():
注:just()方法里存放的其实就是onNext()发射的内容,其存放的个数最多为10个,超过10个则会报错,这是因为just()方法重载了10次。
Observable.fromArray():
注:fromArray()方法里存放的同样也是onNext()发射的内容,与just()方法区别的是fromArray()里面的参数是可变数组,存放的个数没有限制。
Observable.fromIterable():
注: fromIterable()方法里传入的参数为一个集合,集合中每个条目对应的就是一个onNext()发射的内容。
下面一一举例说明:
//Observable.just()创建方法
Observable.just(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
//Observable.fromArray()创建方法
Observable.fromArray(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
//Observable.fromIterable()创建方法
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
list.add(4);
Observable.fromIterable(list).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
3.8 RxJava的线程控制
正常情况下, 上游和下游是工作在同一个线程中的, 也就是说上游在哪个线程发事件, 下游就在哪个线程接收事件.,例如我们在Activity的按钮点击事件中做如下操作:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable所在的线程:" + Thread.currentThread().getName());
e.onNext(1);
}
}).subscribe(new Consumer<Integer>() {
@Override public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "Observer所在的线程:" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
});
上面这样肯定是满足不了我们的需求的, 我们更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI,该如何做?
案例分析说明RxJava中的线程切换:
需求:从网上获取云南电信所有频道的信息,将频道中的频道名显示到UI界面上。
Observable.create(new ObservableOnSubscribe<Channel>() {
@Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
//网络请求频道的数据
Channel channel = testApi.getChannel().execute().body();
e.onNext(channel);
}
}).subscribe(new Consumer<Channel>() {
@Override public void accept(@NonNull Channel channel) throws Exception {
List<Channel.DataBean> data = channel.getData();
for (Channel.DataBean dataBean : data) {
tv.append(dataBean.getChannelName() + "\n");
}
}
});
上面的方法在按钮点击事件里面执行的,即在主线程执行的;然而我们的Observable方法执行的是网络请求,一个耗时操作,应该在子线程执行才对,当我们执行上面代码会出现以下错误:
该如何将Observable切换到子线程呢?
RxJava中有一个subscribeOn()方法,可以指定Observable的线程,因此,我们将之前的代码修改如下:
Observable.create(new ObservableOnSubscribe<Channel>() {
@Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
//网络获取频道数据
Channel channel = testApi.getChannel().execute().body();
e.onNext(channel);
}
})
.subscribeOn(Schedulers.io()) //切换上游线程操作
.subscribe(new Consumer<Channel>() {
@Override public void accept(@NonNull Channel channel) throws Exception {
List<Channel.DataBean> data = channel.getData();
for (Channel.DataBean dataBean : data) {
tv.append(dataBean.getChannelName() + "\n");
}
}
});
继续运行,发现还是报错了,什么原因?那是因为我们开始的时候将Observable切换到了子线程,根据之前说的,默认Observer里面执行的线程跟Observable线程是一致的,而这里Observer涉及到更新UI操作,应该在主线程执行才对。
该如何将Observer切换到子线程呢?
RxJava中有一个subscribeOn()方法,可以指定Observer的线程,因此,我们将之前的代码修改如下:
Observable.create(new ObservableOnSubscribe<Channel>() {
@Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
//网络获取频道数据
Channel channel = testApi.getChannel().execute().body();
e.onNext(channel);
}
})
.subscribeOn(Schedulers.io()) //切换上游线程操作到io线程
.observeOn(AndroidSchedulers.mainThread()) //切换下游线程到主线程
.subscribe(new Consumer<Channel>() {
@Override public void accept(@NonNull Channel channel) throws Exception {
List<Channel.DataBean> data = channel.getData();
for (Channel.DataBean dataBean : data) {
tv.append(dataBean.getChannelName() + "\n");
}
}
});
再来开一下运行结果:
以上表明我们的线程切换达到了我们需要的目的。
在RxJava中已经内置了很多线程选项供我们选择, 例如有:
Schedulers.io() : 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation(): 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread(): 代表一个常规的新线程
AndroidSchedulers.mainThread(): 代表Android的主线程
这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.
四、总结
关于RxJava的简单使用就介绍到这里,后期有空我会向大家介绍一下RxJava2的其他实用的方法。