一、概念
RxJava是一个基于事件流、实现异步操作的库。
二、特点
由于RxJava是基于事件流的链式调用,因此具有逻辑简洁、实现优雅、使用简单等特点。
三、原理
RxJava是基于一种扩展的观察者模式,Rxjava的扩展观察者模式中有4个角色:
被观察者(Observable):产生事件。
观察者(Observer):接收事件,并给出响应动作。
订阅(Subscribe):连接被观察者与观察者。
事件(Event):被观察者与观察者沟通的载体。
被观察者(Observable)通过 订阅(Subscribe)按顺序发送事件 给观察者(Observer), 观察者(Observer)按顺序接收事件并作出对应的响应动作。
四、使用
添加依赖:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
// 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在
1.基本使用
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// RxJava的流式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 创建被观察者 & 生产事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 通过通过订阅(subscribe)连接观察者和被观察者
// 3. 创建观察者 & 定义响应事件的行为
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
}
方法调用顺序:观察者.onSubscribe() > 被观察者.subscribe() > 观察者.onNext() > 观察者.onComplete()。
2.函数式接口
RxJava 2.x 提供了多个函数式接口 ,用于实现简便式的观察者模式。
以Consumer为例:
Observable.just("hello").subscribe(new Consumer<String>() {
// 每次接收到Observable的事件都会调用Consumer.accept()
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
3.Observer#subscribe
观察者Observer的subscribe()方法具备多个重载的方法。
定义如下:
public final Disposable subscribe() {}
// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示观察者只对被观察者发送的Next事件及Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件及Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件及onSubscribe事件作出响应
public final void subscribe(Observer<? super T> observer) {}
// 表示观察者对被观察者发送的任何事件都作出响应
4.Disposable#dispose
Disposable的dispose()方法可切断观察者与被观察者之间的连接,即观察者无法继续接收被观察者的事件,但被观察者还是可以继续发送事件。
例子:
// 主要在观察者 Observer中 实现
Observer<Integer> observer = new Observer<Integer>() {
// 1. 定义Disposable类变量
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
// 2. 对Disposable类变量赋值
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
if (value == 2) {
// 设置在接收到第二个事件后切断观察者和被观察者的连接
mDisposable.dispose();
Log.d(TAG, "已经切断了连接:" + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
五、操作符
1.创建操作符
1.1 基本创建
完整的创建被观察者对象。
create
完整创建1个被观察者对象(Observable)。
例子:
// 1. 通过creat()创建被观察者对象
Observable.create(new ObservableOnSubscribe<Integer>() {
// 2. 在复写的subscribe()里定义需要发送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
} // 至此,一个被观察者对象(Observable)就创建完毕
}).subscribe(new Observer<Integer>() {
// 3. 通过通过订阅(subscribe)连接观察者和被观察者
// 4. 创建观察者 & 定义响应事件的行为
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
1.2 快速创建 & 发送事件
快速的创建被观察者对象。
just
快速创建被观察者对象(Observable) & 发送10个以下事件。
例子:
// 1. 创建时传入整型1、2、3、4
// 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
Observable.just(1, 2, 3,4)
// 至此,一个Observable对象创建完毕
// 2. 通过通过订阅(subscribe)连接观察者和被观察者
// 3. 创建观察者 & 定义响应事件的行为
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
fromArray
快速创建被观察者对象(Observable) & 发送10个以上事件(数组形式)。
例子:
/*
* 数组遍历
*/
// 1. 设置需要传入的数组
Integer[] items = { 0, 1, 2, 3, 4 };
// 2. 创建被观察者对象(Observable)时传入数组
// 在创建后就会将该数组转换成Observable & 发送该对象中的所有数据
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "数组遍历");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "数组中的元素 = "+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "遍历结束");
}
});
fromIterable
快速创建被观察者对象(Observable) & 发送10个以上事件(集合形式)。
例子:
/*
* 集合遍历
*/
// 1. 设置一个集合
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
// 2. 通过fromIterable()将集合中的对象 / 数据发送出去
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "集合遍历");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "集合中的数据元素 = "+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "遍历结束");
}
});
其它方法
// 下列方法一般用于测试使用
<-- empty() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
Observable observable1=Observable.empty();
// 即观察者接收后会直接调用onCompleted()
<-- error() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()
<-- never() -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用
1.3 延迟创建
定时操作:经过x秒后,需要自动执行y操作;
周期性操作:每隔x秒后,需要自动执行y操作。
defer
动态创建被观察者对象(Observable) & 获取最新的Observable对象数据。
例子:
<-- 1. 第1次对i赋值 ->>
Integer i = 10;
// 2. 通过defer 定义被观察者对象
// 注:此时被观察者对象还没创建
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
<-- 2. 第2次对i赋值 ->>
i = 15;
<-- 3. 观察者开始订阅 ->>
// 注:此时,才会调用defer()创建被观察者对象(Observable)
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到的整数是"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
timer
延迟指定事件,发送一个0,一般用于检测。
例子:
// 该例子 = 延迟2s后,发送一个long类型数值
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
// 注:timer操作符默认运行在一个新线程上
// 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
interval
每隔指定时间就发送事件,发送的事件序列是从0开始、无限递增1的整数序列。
例子:
// 参数说明:
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
Observable.interval(3,1,TimeUnit.SECONDS)
// 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
// 注:interval默认在computation调度器上执行
// 也可自定义指定线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)
intervalRange
每隔指定时间就发送事件,发送的事件序列是从0开始、无限递增1的整数序列,可指定发送的数据的数量。
例子:
// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 参数3 = 第1次事件延迟发送时间;
// 参数4 = 间隔时间数字;
// 参数5 = 时间单位
Observable.intervalRange(3,10,2, 1, TimeUnit.SECONDS)
// 该例子发送的事件序列特点:
// 1. 从3开始,一共发送10个事件;
// 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从0开始递增1,无限个)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
range
连续发送1个事件序列,可指定范围。
例子:
// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 注:若设置为负数,则会抛出异常
Observable.range(3,10)
// 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送10个事件
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
rangeLong
类似于range方法,区别在于该方法支持Long数据类型。
2.变换操作符
对事件序列中的 事件 / 整个事件序列 进行加工处理(即变换),使得其转变成不同的 事件 / 整个事件序列。
Map
对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件,即将被观察者发送的事件转换为任意的类型事件。
例子:
// 采用RxJava基于事件流的链式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 被观察者发送事件 = 参数为整型 = 1、2、3
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
// 2. 使用Map变换操作符中的Function函数对被观察者发送的事件进行统一变换:整型变换成字符串类型
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "使用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer ;
}
}).subscribe(new Consumer<String>() {
// 3. 观察者接收事件时,是接收到变换后的事件 = 字符串类型
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
FlatMap
将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关。
例子:
// 采用RxJava基于事件流的链式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
// 采用flatMap()变换操作符
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("我是事件 " + integer + "拆分后的子事件" + i);
// 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
// 最终合并,再发送给被观察者
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
ConcatMap
类似FlatMap操作符,区别在于新合并生成的事件序列顺序是有序的,即严格按照旧序列发送事件的顺序。
例子:
// 采用RxJava基于事件流的链式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
// 采用concatMap()变换操作符
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("我是事件 " + integer + "拆分后的子事件" + i);
// 通过concatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
// 最终合并,再发送给被观察者
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
Buffer
定期从被观察者(Obervable)需要发送的事件中获取一定数量的事件 & 放到缓存区中,最终发送。
例子:
// 被观察者 需要发送5个数字
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 1) // 设置缓存区大小 & 步长
// 缓存区大小 = 每次从被观察者中获取的事件数量
// 步长 = 每次获取新事件的数量
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> stringList) {
//
Log.d(TAG, " 缓存区里的事件数量 = " + stringList.size());
for (Integer value : stringList) {
Log.d(TAG, " 事件 = " + value);
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应" );
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3.组合 / 合并操作符
3.1 组合多个被观察者
concat / concatArray
组合多个被观察者一起发送数据,合并后按发送顺序串行执行,二者区别:组合被观察者的数量,即concat组合被观察者数量≤4个,而concatArray组合被观察者数量>4个。
// concat():组合多个被观察者(≤4个)一起发送数据
// 注:串行执行
Observable.concat(Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9),
Observable.just(10, 11, 12))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
// concatArray():组合多个被观察者一起发送数据(可>4个)
// 注:串行执行
Observable.concatArray(Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9),
Observable.just(10, 11, 12),
Observable.just(13, 14, 15))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
merge / mergeArray
组合多个被观察者一起发送数据,合并后按时间线并行执行,二者区别:组合被观察者的数量,即merge组合被观察者数量≤4个,而mergeArray组合被观察者数量>4个。与concat操作符区别:同样是组合多个被观察者一起发送数据,但concat操作符合并后是按发送顺序串行执行。
例子:
// merge():组合多个被观察者(<4个)一起发送数据
// 注:合并后按照时间线并行执行
Observable.merge(
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)) // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
// mergeArray() = 组合4个以上的被观察者一起发送数据,类似concatArray()
concatDelayError / mergeDelayError
使用contact或merge操作符时,若其中一个被观察者发出onError事件,则会马上终止其它被观察者继续发送事件,需要使用concatDelayError或mergeDelayError操作符。
无使用concatDelayError的情况:
Observable.concat(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
使用concatDelayError的情况:
<-- 使用了concatDelayError()的情况 -->
Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException()); // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
mergeDelayError操作符使用方法同理。
3.2 合并多个事件
zip
合并多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送。事件组合方式为严格按照原先事件序列进行对位合并,最终合并的事件数量等于多个被观察者(Observable)中数量最少的数量。
例子:
<-- 创建第1个被观察者 -->
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "被观察者1发送了事件1");
emitter.onNext(1);
// 为了方便展示效果,所以在发送事件后加入2s的延迟
Thread.sleep(1000);
Log.d(TAG, "被观察者1发送了事件2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "被观察者1发送了事件3");
emitter.onNext(3);
Thread.sleep(1000);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()); // 设置被观察者1在工作线程1中工作
<-- 创建第2个被观察者 -->
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "被观察者2发送了事件A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "被观察者2发送了事件B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "被观察者2发送了事件C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "被观察者2发送了事件D");
emitter.onNext("D");
Thread.sleep(1000);
emitter.onComplete();
}
}).subscribeOn(Schedulers.newThread());// 设置被观察者2在工作线程2中工作
// 假设不作线程控制,则该两个被观察者会在同一个线程中工作,即发送事件存在先后顺序,而不是同时发送
<-- 使用zip变换操作符进行事件合并 -->
// 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String string) throws Exception {
return integer + string;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "最终接收到的事件 = " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
尽管被观察者2的事件D没有事件与其合并,但还是会继续发送,若在被观察者1 & 被观察者2的事件序列最后发送onComplete()事件,则被观察者2的事件D也不会发送。
combineLatest
当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。与zip的区别:zip按个数合并,即1对1合并;而CombineLatest按时间合并,即在同一个时间点上合并。
例子:
Observable.combineLatest(
Observable.just(1L, 2L, 3L), // 第1个发送数据事件的Observable
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long o1, Long o2) throws Exception {
// o1 = 第1个Observable发送的最新(最后)1个数据
// o2 = 第2个Observable发送的每1个数据
Log.e(TAG, "合并的数据是: "+ o1 + " "+ o2);
return o1 + o2;
// 合并的逻辑 = 相加
// 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
Log.e(TAG, "合并的结果是: "+s);
}
});
combineLatestDelayError
类似于concatDelayError / mergeDelayError,即错误处理。
reduce
把被观察者需要发送的事件聚合成1个事件 & 发送,聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推。
例子:
Observable.just(1,2,3,4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
// 在该复写方法中复写聚合的逻辑
@Override
public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
return s1 * s2;
// 本次聚合的逻辑是:全部数据相乘起来
// 原理:第1次取前2个数据相乘,之后每次获取到的数据 = 返回的数据x原始下1个数据每
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer s) throws Exception {
Log.e(TAG, "最终计算的结果是: "+s);
}
});
collect
将被观察者Observable发送的数据事件收集到一个数据结构里。
例子:
Observable.just(1, 2, 3 ,4, 5, 6)
.collect(
// 1. 创建数据结构(容器),用于收集被观察者发送的数据
new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
// 2. 对发送的数据进行收集
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer)
throws Exception {
// 参数说明:list = 容器,integer = 后者数据
list.add(integer);
// 对发送的数据进行收集
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(@NonNull ArrayList<Integer> s) throws Exception {
Log.e(TAG, "本次发送的数据是: "+s);
}
});
3.3 发送事件前追加发送事件
startWith / startWithArray
在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者。
例子:
<-- 在一个被观察者发送事件前,追加发送一些数据 -->
// 注:追加数据顺序 = 后调用先追加
Observable.just(4, 5, 6)
.startWith(0) // 追加单个数据 = startWith()
.startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<-- 在一个被观察者发送事件前,追加发送被观察者 & 发送数据 -->
// 注:追加数据顺序 = 后调用先追加
Observable.just(4, 5, 6)
.startWith(Observable.just(1, 2, 3))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3.4 统计发送事件数量
count
统计被观察者发送事件的数量。
// 注:返回结果 = Long类型
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "发送的事件数量 = "+aLong);
}
});
4.功能性操作符
4.1 连接被观察者 & 观察者
subscribe
使得被观察者 & 观察者形成订阅关系。
例子:
observable.subscribe(observer);
// 前者 = 被观察者(observable);后者 = 观察者(observer 或 subscriber)
<-- 1. 分步骤的完整调用 -->
// 步骤1: 创建被观察者 Observable 对象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
// 步骤2:创建观察者 Observer 并 定义响应事件行为
Observer<Integer> observer = new Observer<Integer>() {
// 通过复写对应方法来 响应 被观察者
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
// 步骤3:通过订阅(subscribe)连接观察者和被观察者
observable.subscribe(observer);
<-- 2. 基于事件流的链式调用 -->
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 创建被观察者 & 生产事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 通过通过订阅(subscribe)连接观察者和被观察者
// 3. 创建观察者 & 定义响应事件的行为
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
内部实现:
<-- Observable.subscribe(Subscriber) 的内部实现 -->
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
// 在观察者 subscriber抽象类复写的方法 onSubscribe.call(subscriber),用于初始化工作
// 通过该调用,从而回调观察者中的对应方法从而响应被观察者生产的事件
// 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
// 同时也看出:Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
}
4.2 线程调度
方便快速指定及控制被观察者 & 观察者的工作线程。对于一般的需求场景,需要在子线程中实现耗时的操作,然后回到主线程实现 UI操作,即被观察者(Observable)在子线程中生产事件(如实现耗时操作等),观察者(Observer)在主线程接收 & 响应事件(即实现UI操作)。
RxJava内置了多种用于调度的线程类型,内部使用线程池来维护这些线程,所以线程的调度效率非常高。
线程类型如下:
subscribeOn / observeOn
指定被观察者(Observable) / 观察者(Observer)的工作线程类型。
例子:
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 步骤1:创建被观察者 Observable & 发送事件
// 在主线程创建被观察者 Observable 对象
// 所以生产事件的线程是:主线程
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, " 被观察者 Observable的工作线程是: " + Thread.currentThread().getName());
// 打印验证
emitter.onNext(1);
emitter.onComplete();
}
});
// 步骤2:创建观察者 Observer 并 定义响应事件行为
// 在主线程创建观察者 Observer 对象
// 所以接收 & 响应事件的线程是:主线程
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
Log.d(TAG, " 观察者 Observer的工作线程是: " + Thread.currentThread().getName());
// 打印验证
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
<-- 使用说明 -->
// Observable.subscribeOn(Schedulers.Thread):指定被观察者 发送事件的线程(传入RxJava内置的线程类型)
// Observable.observeOn(Schedulers.Thread):指定观察者 接收 & 响应事件的线程(传入RxJava内置的线程类型)
<-- 实例使用 -->
// 步骤3:通过订阅(subscribe)连接观察者和被观察者
observable.subscribeOn(Schedulers.newThread()) // 1. 指定被观察者 生产事件的线程
.observeOn(AndroidSchedulers.mainThread()) // 2. 指定观察者 接收 & 响应事件的线程
.subscribe(observer); // 3. 最后再通过订阅(subscribe)连接观察者和被观察者
}
}
情况1:若Observable.subscribeOn()多次指定被观察者生产事件的线程,则只有第一次指定有效,其余的指定线程无效。
// 步骤3:通过订阅(subscribe)连接观察者和被观察者
observable.subscribeOn(Schedulers.newThread()) // 第一次指定被观察者线程 = 新线程
.subscribeOn(AndroidSchedulers.mainThread()) // 第二次指定被观察者线程 = 主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
情况2:若Observable.observeOn()多次指定观察者接收 & 响应事件的线程,则每次指定均有效,即每指定一次,就会进行一次线程的切换。
// 步骤3:通过订阅(subscribe)连接观察者和被观察者
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()) // 第一次指定观察者线程 = 主线程
.doOnNext(new Consumer<Integer>() { // 生产事件
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "第一次观察者Observer的工作线程是: " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.newThread()) // 第二次指定观察者线程 = 新的工作线程
.subscribe(observer); // 生产事件
// 注:
// 1. 整体方法调用顺序:观察者.onSubscribe()> 被观察者.subscribe()> 观察者.doOnNext()>观察者.onNext()>观察者.onComplete()
// 2. 观察者.onSubscribe()固定在主线程进行
4.3 延迟操作
delay
使得被观察者延迟一段时间再发送事件。
方法介绍:
// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)
// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延迟时间 & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
例子:
Observable.just(1, 2, 3)
.delay(3, TimeUnit.SECONDS) // 延迟3s再发送
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
4.4 在事件的生命周期中操作
在事件发送 & 接收的整个生命周期过程中进行操作,如发送事件前的初始化、发送事件后的回调请求等。
do
在某个事件的生命周期中调用。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("发生错误了"));
}
})
// 1. 当Observable每发送1次数据事件就会调用1次
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG, "doOnEach: " + integerNotification.getValue());
}
})
// 2. 执行Next事件前调用
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doOnNext: " + integer);
}
})
// 3. 执行Next事件后调用
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doAfterNext: " + integer);
}
})
// 4. Observable正常发送事件完毕后调用
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete: ");
}
})
// 5. Observable发送错误事件时调用
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "doOnError: " + throwable.getMessage());
}
})
// 6. 观察者订阅时调用
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe: ");
}
})
// 7. Observable发送事件完毕后调用,无论正常发送完毕 / 异常终止
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doAfterTerminate: ");
}
})
// 8. 最后执行
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doFinally: ");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
4.5 错误处理
发送事件过程中,遇到错误时的处理机制。
onErrorReturn
遇到错误时,发送1个特殊事件 & 正常终止,可捕获在它之前发生的异常。
例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("发生错误了"));
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(@NonNull Throwable throwable) throws Exception {
// 捕捉错误异常
Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );
return 666;
// 发生错误事件后,发送一个"666"事件,最终正常结束
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
onErrorResumeNext
遇到错误时,发送1个新的Observable。
onErrorResumeNext()拦截的错误为Throwable,若需拦截Exception请用onExceptionResumeNext();若onErrorResumeNext()拦截的错误为Exception,则会将错误传递给观察者的onError方法。
例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("发生错误了"));
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {
// 1. 捕捉错误异常
Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );
// 2. 发生错误事件后,发送一个新的被观察者 & 发送事件序列
return Observable.just(11,22);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
onExceptionResumeNext
遇到错误时,发送1个新的Observable。
onExceptionResumeNext()拦截的错误为Exception;若需拦截Throwable请用onErrorResumeNext();若onExceptionResumeNext()拦截的错误为Throwable,则会将错误传递给观察者的onError方法。
例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
}
})
.onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(11);
observer.onNext(22);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
retry
当出现错误时,让被观察者(Observable)重新发射数据。
接收到onError()时,重新订阅 & 发送事件,Throwable和Exception都可拦截。
方法介绍:
<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送
<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制)
// 参数 = 重试次数
<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
// 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息)
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制)
// 参数 = 设置重试次数 & 判断逻辑
例子:
<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
.retry() // 遇到错误时,让被观察者重新发射数据(若一直错误,则一直重新发送)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制)
// 参数 = 重试次数
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
.retry(3) // 设置重试次数 = 3次
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
// 拦截错误后,判断是否需要重新发送请求
.retry(new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕获异常
Log.e(TAG, "retry错误: "+throwable.toString());
//返回false = 不重新重新发送数据 & 调用观察者的onError结束
//返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试)
// 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
// 拦截错误后,判断是否需要重新发送请求
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
// 捕获异常
Log.e(TAG, "异常错误 = "+throwable.toString());
// 获取当前重试次数
Log.e(TAG, "当前重试次数 = "+integer);
//返回false = 不重新重新发送数据 & 调用观察者的onError结束
//返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制)
// 参数 = 设置重试次数 & 判断逻辑
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
// 拦截错误后,判断是否需要重新发送请求
.retry(3, new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕获异常
Log.e(TAG, "retry错误: "+throwable.toString());
//返回false = 不重新重新发送数据 & 调用观察者的onError()结束
//返回true = 重新发送请求(最多重新发送3次)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
retryUntil
出现错误后,判断是否需要重新发送数据,若需要重新发送 & 持续遇到错误,则持续重试,具体使用类似于retry(Predicate predicate),唯一区别:返回true则不重新发送数据事件。
retryWhen
遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件。
例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
// 遇到error事件才会回调
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
// 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
// 返回Observable<?> = 新的被观察者 Observable(任意类型)
// 此处有两种情况:
// 1. 若 新的被观察者 Observable发送的事件 = Error事件,那么原始Observable则不重新发送事件:
// 2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件:
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
// 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不重新发送事件
// 该异常错误信息可在观察者中的onError()中获得
return Observable.error(new Throwable("retryWhen终止啦"));
// 2. 若返回的Observable发送的事件 = Next事件,则原始的Observable重新发送事件(若持续遇到错误,则持续重试)
// return Observable.just(1);
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应" + e.toString());
// 获取异常错误信息
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
4.6 重复发送
重复不断地发送被观察者事件。
repeat
无条件地、重复发送被观察者事件,具备重载方法,可设置重复创建次数。
例子:
// 不传入参数 = 重复发送次数 = 无限次
repeat();
// 传入参数 = 重复发送次数有限
repeatWhen(Integer int );
// 注:
// 1. 接收到.onCompleted()事件后,触发重新订阅 & 发送
// 2. 默认运行在一个新的线程上
// 具体使用
Observable.just(1, 2, 3, 4)
.repeat(3) // 重复创建次数 =- 3次
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
repeatWhen
将原始Observable停止发送事件的标识(Complete() / Error())转换成1个Object类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的Observable。若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable;若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 Observable。
例子:
Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
// 在Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
// 以此决定是否重新订阅 & 发送原来的 Observable
// 此处有2种情况:
// 1. 若新被观察者(Observable)返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable
// 2. 若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {
// 情况1:若新被观察者(Observable)返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable
return Observable.empty();
// Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()
// return Observable.error(new Throwable("不再重新订阅事件"));
// 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。
// 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
// return Observable.just(1);
// 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() / Error()事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应:" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});