前言
2017年,刚毕业的我在项目组里打下手,组里其他人决定在新项目里使用RxJava,那时网络库用的是他们自己再次封装了的noHttp,加上了RxJava后,以前简单的网络请求多写了很多代码,这是我对RxJava的第一感受。
如今的项目基本上都是MVP+Retrofit2+Rxjava2的框架,但往往只是把Rxjava2和Retrofit2搭配进行网络请求而已,我也不例外。也曾看过好几次RxJava2的入门教程、操作符总结等,过段时间就忘了,而且不知在什么样的情景下适用恰当,所以写下这篇文章,打算带着问题出发,重新整理一下。
问题:
1、RxJava是什么,它优势是什么?
2、RxJava基础知识。
3、RxJava2操作符。
4、背压策略。
5、具体的使用场景。
1、RxJava是什么,它优势是什么?
RxJava官方介绍——一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
Rxjava本质就是异步,它的优势是简洁。简洁是指相比于AsyncTask 、Handler、runOnUiThread等异步操作,RxJava的逻辑更简洁。一方面它是链式的,逻辑代码直观,二是它提供了很多强大的操作符。
2、RxJava基础知识
RxJava的基础知识,首先要知道它是用了可拓展的观察者模式,然后再了解这些概念,Observable、Observer、subscribe、处理事件、Scheduler。
1)什么是观察者模式?
观察者(Observer)不时刻盯着被观察者(Observable),而是通过注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者我需要你的某某状态,你要在它变化的时候通知我。
举例1:
A(观察者)和B(被观察者),A不需要每过 1s 就检查一次 B 的状态,而是B状态发生改变时,去通知A。
举例2:
View的点击事件,View(被观察者)->OnClickLisetener(观察者)->setOnClickListener(订阅)->OnClick(事件)。
举例3:
小偷在偷钱时告诉警察它要偷钱。小偷(被观察者)、警察(观察者)、小偷偷钱是某种状态、警察把小偷抓起来(事件)。
2)RxJava的观察者模式
Observable(被观察者)、Observer(观察者)、 subscribe(订阅)、事件。
看代码清楚明了,ObservableEmitter是事件发射器,在onSubscribe方法里可得到Disposable,可用于取消订阅。在观察者的方法体里有4个事件,解释看注释。如果发送了onError() 或 onComplete()事件,就取消了这个订阅。
一般用于网络请求时,都会写一个继承Observer的类来做处理,比如在onSubscribe()里做加载框显示,在onComplete()做某些共性判断,在onError()做统一失败处理等。所以Observer的4个方法肯定是要清楚的。
/**
* 打印结果: 只会打印1和2,3不打印
*/
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//ObservableEmitter类,事件发射器,作用是定义需要发送的事件 & 向观察者发送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
emitter.onNext(3);
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable; //用于取消订阅
/**
* 订阅成功就执行的方法
* @param d 可切断操作
*/
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
/**
* 接收事件
* @param integer
*/
@Override
public void onNext(Integer integer) {
KLog.d(TAG, "onNext: " + integer + "\n");
}
/**
* 失败,不再接收事件
* @param e
*/
@Override
public void onError(Throwable e) {
}
/**
* 完成,不再接收事件
*/
@Override
public void onComplete() {
}
});
3)线程控制Scheduler
Scheduler是线程调度,RxJava通过它来指定每一段代码运行在什么样的线程。例如网络请求用io流线程,更新UI就用mainThread,最常用也是这两个。
Schedulers.newThread():新线程。
Schedulers.io():在io()内部有线程池,可以使用空闲的线程,多数情况下比newThread()效率高。
Schedulers.computation():计算线程(没用过)。
AndroidSchedulers.mainThread():主线程。
4)取消订阅
RxJava2的取消订阅用Disposable.cancel(),如果订阅事件过多,要写管理类来管理。如果不取消订阅,就会造成内存泄漏,网上有RxLifecycle第三方库,可以很方便做取消订阅,它绑定了Activity or Fragment的生命周期。
3、RxJava2操作符
这里重新整理了一下当初学习的操作符,原本打算整理全部的操作符的,然后发现实在是太多了,就放弃了,下面的操作符绝大多数都来自于Android Rxjava:这是一篇 清晰 & 易懂的Rxjava 入门教程,想详细学习的可看这篇文章。网上也有很多文章有对操作符做简述,这篇文章比较完整Android拾萃 - RxJava2操作符汇总。最好结合代码学习,下载地址在本文最下面。
1)创建操作符
包括完整&快速创建被观察者,定时操作,周期性操作,数据/集合遍历
操作符 | 简述 |
---|---|
基本创建 | |
create() | 普通创建,可以定义要发送的事件 |
快速创建 | |
just() | 快速创建一个Observable,最多发送10个参数 |
fromArray() | 和just()类似,区别是传的是数组 |
fromIterable() | 和fromArray()类似,区别是传入list集合 |
never() | 不发送事件 |
empty() | 仅发送complete事件 |
error() | 仅发送empty事件 |
延迟创建 | |
defer() | 直到订阅才创建Observable |
timer() | 延迟一段时间再才发送 |
interval() | 周期性发送,值从0开始递增。 |
intervalRange() | 指定范围,周期性发送。可用于做验证码倒计时 |
range() | 连续发送一个时间序列,可指定范围 |
2)变换操作符
操作符 | 简述 |
---|---|
map() | 将被观察者发送的事件转换为任意的类型事件。返回的是结果集,适用于一对一转换。(数据类型转换) |
flatMap() | 将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。返回的是包含结果集的Observable,适用于一对多,多对多得场景。 |
concatMap() | 和flatMap()类似,区别是flatMap是无序的,concatMap是有序的,它的新事件序列和旧序列顺序一致。具体可以在flatMap生成事件的逻辑里加个延迟看到差异。 |
buffer() | 定期从被观察者需要发送的事件中获取一定数量的事件,放到缓存区中,最终发送。两个参数,count是缓存区大小,skip是步长。 |
switchMap() | 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据 |
scan() | 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值 |
groupBy() | 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据 |
3)组合/合并转换符
操作符 | 简述 |
---|---|
组合多个被观察者 | |
concat() / concatArray() | 组合多个被观察者,按发送顺序串行执行。有Array是指传数组 |
merge()/mergeArray() | 组合多个被观察者,按时间线并行执行 |
concatDelayError()/mergeDelayError() | onError事件推迟到其他被观察者发送事件结束后才触发 |
合并多个事件 | |
zip() | 按数量合并,最终事件的数量=多个观察者中事件最少的数量。比如被观察者1有4个事件,被观察者2有5个事件,则组合起来事件数量是4。 |
combineLatest() | 按时间合并,和最新事件合并。当两个Observable中的任何一个发送了事件后,将先发送了数据的Obervable的最新一个事件与另一个Observable发送的每个事件结合,最终基于该函数的结果发送事件。和zip的区别是,zip()按个数合并,1对1合并;combineLastest 按时间合并,在同一时间点上合并。 |
reduce() | 把被观察者需要发送的事件聚合成1个事件 |
collect() | 将被观察者Observable发送的数据事件收集到一个数据结构里 |
其他 | |
startWith() | 在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者 |
count() | 统计被观察者发送事件的数量 |
4)功能性操作符
操作符 | 简述 |
---|---|
subscribe() | 订阅,即连接观察者 & 被观察者 |
delay() | 使得被观察者延迟一段时间再发送事件 |
doXXX() | 各个事件操作符,如 doOnEach、doOnNext、doComplete等。 |
retry() | 重试,即当出现错误时,让被观察者(Observable)重新发射数据 |
retryUntil() | Observable遇到错误时,在retryUntil()的方法里决定,是否让Observable重新订阅 |
retryWhen() | retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,由这个Observable来决定是否要重新订阅原Observable。 |
repeat() | 无条件地、重复发送 被观察者事件 |
repeatWhen( int ) | 传入参数 = 重复发送次数有限 |
5)过滤操作符
操作符 | 简述 |
---|---|
filter() | 过滤 特定条件的事件 |
ofType() | 过滤 特定数据类型的数据 |
skip( int ) | 跳过n个事件 |
skipLast( int ) | 跳过最后的n个事件 |
distinct() | 过滤事件序列中重复的事件 |
distinctUntilChanged() | 只确保相邻元素不重复出现 |
take( int ) | 指定只接收n个事件 |
takeLast( int ) | 指定只接收最后发送的n个事件 |
6)条件/布尔操作符
操作符 | 简述 |
---|---|
all() | 判断发送的每项数据是否都满足设置的函数条件,若满足,返回 true;否则,返回 false。 |
takeWhile() | 判断发送的每项数据是否满足设置函数条件,为true就发送,为false不发送,且终止发送事件(后面的事件也不发送了)。 |
skipWhile() | 判断发送的每项数据是否满足设置函数条件,为true就不发送,为false发送,且终止发送事件。 |
takeUntil() | 接收第一个Observable(调用takUtil的Observable)发送的数据,当第二个Observable(takUtil参数中的Observable)发送数据时,两个Obserable会同时取消订阅。 |
skipUntil() | 与takeUtil()正好相反,不接收第一个Observable发送的数据,直到第二个Observable发送数据时才接收第一个Observable的数据,此时第二个Observable会取消订阅。 |
sequenceEqual() | 判定两个Observables需要发送的数据是否相同,相同返回 true,不相同返回 false |
contains() | 判断发送的数据中是否包含指定数据。 |
isEmpty() | 判断发送的数据是否为空 |
amb() | 当需要发送多个 Observable时,只发送最先发送的Observable的数据,而其余Observable则被丢弃。 |
4.背压策略。
当在异步订阅中,通过Observable发射、处理、响应数据流时,如果事件产生的速度远远快于事件消费的速度,这些没来得及处理的数据就会越积越多,这些数据不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,缓存池的数据一直得不到处理,最终会导致OOM等异常。这就是响应式编程中的背压问题。总结一下就是: 事件产生的速度大于事件消费的速度,数据堆积,最终造成OOM等异常。
RxJava2把对背压问题的处理逻辑从Observable中抽取出来产生了新的可观察对象Flowable,它是在Observable基础上做了优化,所以Observable能做的,它都能做,但是加了背压支持和其他的逻辑处理,它的效率比Observable慢得多,所以在需要用到背压的时候再用Flowable,其他时候还是正常使用Observable。网上找了两张图,可以很直观地看出RxJava1和2对背压的改动。
简单举个例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
//异步订阅时,代表的是 异步缓存池中可放入数据的数量,一开始是128,当产生10个事件而没有消费时,此时这个值是128-10=118。
KLog.d(TAG, "异步缓存池中可放入数据的数量 = " + emitter.requested());
// 一共发送4个事件
KLog.d(TAG, "发送事件 1");
emitter.onNext(1);
KLog.d(TAG, "发送事件 2");
emitter.onNext(2);
KLog.d(TAG, "发送事件 3");
emitter.onNext(3);
KLog.d(TAG, "发送事件 4");
emitter.onNext(4);
KLog.d(TAG, "发送事件 onComplete()");
emitter.onComplete();
KLog.d(TAG, "异步缓存池中可放入数据的数量 = " + emitter.requested());
// //模拟缓存超过128
// for (int i = 0;i< 129; i++) {
// Log.d(TAG, "发送了事件" + i);
// emitter.onNext(i);
// }
// emitter.onComplete();
//
}
}, BackpressureStrategy.ERROR) //缓存区超过128,直接抛出异常
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 在异步订阅情况下,一定要调用request,否则下流不接收事件
// 只接收多少个事件
s.request(3);
}
@Override
public void onNext(Integer integer) {
KLog.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
KLog.d(TAG, "onError:", t);
}
@Override
public void onComplete() {
KLog.d(TAG, "onComplete");
}
});
可以看到Flowable和Subscriber的使用方式和之前的Observable和Observer极其类似,不同点如下:
1、create方法中多了一个BackpressureStrategy类型的参数。
2、Flowable发射数据时,使用FlowableEmitter,而Observable用的是ObservableEmitter。
3、Subscriber中,方法onSubscribe回调的参数不是Disposable而是Subscription。
下面对这三点进行说明。
BackpressureStrategy
缓存策略,也既是当缓存区存满、上流仍然继续发送下事件时,该如何处理的策略。默认缓存区大小是128(与Flowable的buffersize大小有关)。具体有哪些策略如下:
BackpressureStrategy.ERROR: 直接抛异常。
BackpressureStrategy.MISSING: 友好提示:缓存区满了。
BackpressureStrategy.BUFFER: 将缓存区设为无限大。
BackpressureStrategy.DROP: 超过缓存区大小128的事件丢弃。
BackpressureStrategy.LATEST: 只保存最后事件,超过缓存区大小128的事件丢弃。
Subscription
Subscription比Disposable多了一个方法request()。它的作用是告诉上游,下游需要多少数据,如果不设置request默认是0。比如设置为3,那么超过范围之外的数据就不接收了。如果多次调用request(),会累加。
FlowableEmitter
它比ObservableEmitter多了一个方法requested(),这个方法返回的是异步缓存池中可放入数据的数量,比如一开始是128,当产生10个事件而没有消费时,此时这个值是128-10=118。
Tips:不管Subscription.request(xx)设置了什么值,FlowableEmitter都会发送事件的,发送了不接收的就放入缓存里。背压在异步订阅中才有用,如果是同步订阅,是不会有缓存池的。
5.具体的使用场景
实际项目中,我只用来做过验证码倒计时、网络请求,RxBus。虽然了解了一些操作符,但网上的教程讲解这些操作符的时候都是以简单的例子来讲解的,具体的应用场景还是比较玄乎。
1)RxBus
EventBus是一个基于发布/订阅的事件总线,它简化了组件之间的通信操作。而这些RxBus都能做,所以用RxBus替换EventBus,在RxJava1就已经提出来了,如果自己的事件发送的要求不高,可以自己封装一个RxBus使用。17年的时候找的一篇文章 Android 用RxJava模拟一个EventBus ———RxBus,我在这基础上稍微改动了一下,但是没有做粘性事件。
原理简单来说就是找个容器装所有的观察者,当有某个事件产生时,找到所有需要这个事件的观察者,向它们发送事件。这个是否发送事件的依据,其实就是看观察者要什么样的class,比如订阅String类型的,那么Integer类型的事件就不会发给它。
两个新知识:
1)CompositeDisposable
一个disposable的容器,可以容纳多个disposable
2)Subject
Subject可以同时代表 Observer 和 Observable,允许从数据源中多次发送结果给多个观察者。
Subject 类别 | 简述 |
---|---|
AsyncSubject | 只有当 Subject 调用 onComplete 方法时,才会将 Subject 中的最后一个事件传递给所有的 Observer。 |
BehaviorSubject | 当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。然而,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。Rxlifecycle2库用的就是这个。 |
PublishSubject | 不会改变事件的发送顺序,在已经发送了一部分事件之后注册的 Observer 不会收到之前发送的事件。RxBus用的这个。 |
ReplaySubject | 无论什么时候注册 Observer 都可以接收到任何时候通过该 Observable 发射的事件。 RxBus粘性事件可用这个。 |
UnicastSubject | 只允许一个 Observer 进行监听,在该 Observer 注册之前会将发射的所有的事件放进一个队列中, 并在 Observer 注册的时候一起通知给它。 |
public class RxBus {
/**
* CompositeDisposable定义:
* 一个disposable的容器,可以容纳多个disposable,添加和去除的复杂度为O(1)
*
* 此处使用目的:
* 是为了一个订阅者能够对应多个Disposable,在需要的时候调用 Disposable 的 dispose()取消订阅。
*
* 举个例子:
* XXActivity,订阅了事件A和事件B,关闭时要取消订阅。
* 那么只需在mSubscriptionMap里找到key为XXActivity的Value(CompositeDisposable),再取出Disposable
* 取消订阅即可。
*
* 如果不用的话,就要为每一个Activity写一个容器去保存它的订阅的事件了。
* */
private HashMap<String, CompositeDisposable> mSubscriptionMap;
private static volatile RxBus mRxBus;
private final Subject<Object> mSubject;
public static RxBus getIntanceBus(){
if (mRxBus==null){
synchronized (RxBus.class){
if(mRxBus==null){
mRxBus = new RxBus();
}
}
}
return mRxBus;
}
/**
* 正常订阅可用PublishSubject、黏性事件可用ReplaySubject。
*/
private RxBus(){
mSubject = PublishSubject.create().toSerialized();
}
/**
* 一个默认的订阅方法
* @param <T>
* @param type 过滤,只返回特定数据类型的数据
* @param next next()事件,正常接收的事件
* @param error error()事件,错误接收的事件
* @return
*/
public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){
return getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next,error);
}
/**
* 返回指定类型的带背压的Flowable实例
* 这里选定背压模式是 BackpressureStrategy.BUFFER,缓存区无限大
* @param <T>
* @param type 过滤,只返回特定数据类型的数据
* @return
*/
public <T> Flowable<T> getObservable(Class<T> type){
return mSubject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}
/**
* 发送事件
* @param o 事件
*/
public void post(Object o){
mSubject.onNext(o);
}
/**
* 判断是否已有观察者订阅
*
* @return
*/
public boolean hasObservers() {
return mSubject.hasObservers();
}
/**
* 保存订阅后的disposable,取消订阅的时候要用
* @param o 订阅的目标
* @param disposable
*/
public void addSubscription(Object o, Disposable disposable) {
if (mSubscriptionMap == null) {
mSubscriptionMap = new HashMap<>();
}
//这里key值取订阅目标的实体名称(com.xx.xx.xxx),不是底层类检称
String key = o.getClass().getName();
//disposable 放到对应的 CompositeDisposable 里
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).add(disposable);
} else {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(disposable);
mSubscriptionMap.put(key, disposables);
}
}
/**
* 取消订阅
* @param o
*/
public void unSubscribe(Object o) {
if (mSubscriptionMap == null) {
return;
}
String key = o.getClass().getName();
if (!mSubscriptionMap.containsKey(key)){
return;
}
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).dispose();
}
mSubscriptionMap.remove(key);
}
/********************************* 为RxEventBean 封装 **********************************/
/**
* 使用EventBus时,为了方便查找,一般都会封装 EventBean(int code,Object content)
* 但是所有的订阅者都是订阅的这个类型,所以要自己做判断做类型转换。
*
* 这里写死事件是RxEventBean,且错误处理一般不处理,只用处理onNext即可。
* @param context
* @param action
* */
public void register(Context context, Consumer<RxEventBean> action) {
Disposable disposable = RxBus.getIntanceBus().doSubscribe(RxEventBean.class, action,
throwable -> KLog.e("RxEventBean onError()", throwable.toString()));
RxBus.getIntanceBus().addSubscription(context,disposable);
}
/**
* 发送RxEventBean 事件
* @param code code,用于判断
* @param content 内容,接收后做类型转换
*/
public void post(int code, Object content){
RxEventBean<Object> event = new RxEventBean<>();
event.code = code;
event.content = content;
post(event);
}
}
2)验证码倒计时
其实关键代码就是用intervalRange()操作符。
public class RxCodeHelper {
private Context mContext;
private TextView codeBt;
private Disposable mDisposable;
/**
* 构造函数
* @param mContext 上下文
* @param codeBt 验证码的button
*/
public RxCodeHelper(Context mContext, TextView codeBt) {
this.mContext = mContext;
this.codeBt = codeBt;
}
/**
* 开启倒计时
*/
public void start(){
codeBt.setEnabled(false);
//从0开始,走60个数,延迟是0s,周期为1次。
Observable.intervalRange(0,60,0,1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(Long aLong) {
codeBt.setText((60 - aLong) + "s后可重发");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
//倒计时完毕置为可点击状态
codeBt.setEnabled(true);
codeBt.setText("获取验证码");
}
});
}
/**
* 请求失败时,重置状态
* 取消倒计时的订阅事件
*/
public void reset(){
if(mDisposable!=null){
mDisposable.dispose();
}
codeBt.setEnabled(true);
codeBt.setText("获取验证码");
}
/**
* 界面销毁时,取消倒计时订阅事件
*/
public void stop(){
if(mDisposable!=null){
mDisposable.dispose();
}
}
}
3)RxLifecycle
目的:解决RxJava的内存泄漏问题。
用法:
bindUntilEvent(@NonNull ActivityEvent event)——绑定指定的生命周期,在指定生命周期时取消订阅。
bindToLifecycle() ——绑定生命周期,取消订阅策略可看源码。以Activity为例,可看RxLifecycleAndroid 下的 ACTIVITY_LIFECYCLE, 实际上在onCreate()订阅的,在onDestroy()取消订阅;在onResume()订阅的,在onPause()取消订阅。
知识点:
compose():
将一种类型的Observable转换成另一种类型的Observable,保证调用的链式结构。
LifecycleTransformer:
LifecycleTransformer实现了各种Transformer接口,能够将一个Observable/Flowable/Single/Completable/Maybe对象转换成另一个 Observable/Flowable/Single/Completable/Maybe对象。正好配合上文的compose操作符,使用在链式调用中。
举例在网络请求时绑定生命周期,mvp模式。
Activity继承了RxFragmentActivity后,BaseView接口里新增两个方法,其他View层接口继承它。
public interface BaseView {
//为了让 IView 可以调用 RxLifeCycle的生命周期绑定
<T> LifecycleTransformer<T> bindToLifecycle();
<T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event);
}
在P层使用,此处模拟网络请求,3s后回调
Observable.timer(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io()) //订阅在io线程
.observeOn(AndroidSchedulers.mainThread()) //回调在主线程
.compose(mView.bindUntilEvent(ActivityEvent.DESTROY)) //指定在onDestroy销毁
// .compose(mView.bindToLifecycle()) //取消订阅交由RxLifeCycle来判断
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
mView.showLoading();
}
@Override
public void onNext(Long aLong) {
KLog.d("onTest1Success()回调成功");
mView.onTest1Success("onTest1 请求成功");
}
@Override
public void onError(Throwable e) {
mView.dismissLoading();
}
@Override
public void onComplete() {
mView.dismissLoading();
}
});
一般使用,可以用compose封装一下,如下:
/**
* 统一线程处理,且绑定生命周期
* 用法: xxx .compose(RxUtil.rxSchedulerHelper(mView))
* @param view
* @param <T>
* @return
*/
public static <T> ObservableTransformer<T,T> rxSchedulerHelper(BaseView view){
return new ObservableTransformer<T,T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream
.subscribeOn(Schedulers.io()) //订阅在io线程
.unsubscribeOn(Schedulers.io()) //取消订阅在io线程,为啥要这个,不太清楚
.observeOn(AndroidSchedulers.mainThread()) //回调在主线程
.compose(view.bindToLifecycle()); //绑定生命周期
}
};
}
结尾
在网上找了一些操作符的简单例子和应用场景,写了一个Demo,看操作符概述的时候,结合代码运行结果来看更容易理解,有兴趣的可以看看。后续会在这个项目里继续更新操作符和应用场景。
Github地址:
https://github.com/Dengszzzzz/DRxJavaSummary
参考
给 Android 开发者的 RxJava 详解
这可能是最好的RxJava 2.x 入门教程
Android拾萃 - RxJava2操作符汇总
Rxjava2入门教程五:Flowable背压支持——对Flowable最全面而详细的讲解
Android Rxjava:这是一篇 清晰 & 易懂的Rxjava 入门教程
实际应用场景
Android 用RxJava模拟一个EventBus ———RxBus
RxLifecycle详细解析
...