原理
Rx是Reactive Extensions的缩写的简写,可以使用可观察数据流对编程接口进行异步编程,它结合了观察者模式,迭代器模式和函数式的精华。
Rxjava是一种异步数据处理库,也是一种观察者模式。最早是Netflix公司用于重构当前架构时减少REST调用的次数,参考了Microsoft公司的响应式编程,把Microsoft的Rx库迁移到Java JVM中,其中最有名的就是RxJava。
它的特点主要有以下:
- 支持Java 8 Lambda。
- 支持异步和同步。
- 单一依赖关系。
- 简洁,优雅。
RxAndroid
在开发项目的时候,开发者在使用Rxjava时会搭配RxAndroid,他是针对Rxjava在Android平台使用的一个响应式扩展组件。使用RxAndroid的Schedulers(调度器)可以解决Android主线程问题, 多线程等问题。
观察者模式的四大要素
- Observable 被观察者
- Observer
- 观察者 subscribe 订阅
- 事件
观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到。
扩展的观察者模式
onNext()订阅了一个事件,当事件完成时会回调onComplete(),在完成过程中发生了异常会回调onError()。
使用
依赖
//在Project的gradle下添加maven仓库
maven { url "https://oss.jfrog.org/libs-snapshot" }
implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
Hello World
//1.创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello world");
emitter.onComplete();
}
});
//2.创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe():");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("onNext():" + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError():" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete():");
}
};
//3.订阅事件
observable.subscribe(observer);
注意:onError()和onComplete()只会回调一个。
操作符
Creating Observables(创建 Observable)
Create
//链式写法
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello world");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe():"+d.toString());
}
@Override
public void onNext(String o) {
System.out.println("onNext():" + o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError():" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete():");
}
});
Just
使用将为你创建一个Observable并自动为你调用onNext( )发射数据,just中传递的参数将直接在Observer的onNext()方法中接收到。
Observable.just("hello world").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
From
将一个Iterable, 一个Future, 或者一个数组转换成一个Observable,遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add("Hello" + i);
}
Observable.fromArray(list).subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> strings) {
System.out.println(strings);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Defer
当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。 以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。
value = "2020/12/13";
Observable<String> observable = Observable.defer(new Supplier<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> get() throws Throwable {
return Observable.just(value);
}
});
value = "12345";
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Empty/Never/Throw
Empty是创建一个不发射任何数据但是正常终止的Observable。 Never是创建一个不发射数据也不终止的Observable。 Throw是创建一个不发射数据以一个错误终止的Observable。 这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。
Observable.defer(new Supplier<ObservableSource<?>>() {
@Override
public ObservableSource<?> get() throws Throwable {
return Observable.error(new Throwable("你写了个bug"));
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
});
Interval
创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定1秒一次调用onNext()方法。
//TrampolineScheduler不会立即执行,当其他排队任务结束时才执行,TrampolineScheduler运行在主线程。
Observable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.trampoline()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Long aLong) {
System.out.println(aLong);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Repeat
创建一个Observable,该Observable的事件可以重复调用。
Observable.just(123).repeat(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Start
返回一个Observable,它发射一个类似于函数声明的值。
Timer
创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。
Observable.timer(2000, TimeUnit.MILLISECONDS,Schedulers.trampoline()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println(aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Transforming Observables(转换 Observable)
Map
Map就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。
//Integer to String
Observable.just(123).map(new Function<Integer, String>() {
@Override
public String apply(Integer s) throws Exception {
return s.toString();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
flatMap对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。
Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> apply(Integer integer) throws Exception {
return Observable.just(integer.toString());
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
GroupBy
根据规则对数据进行分组。
Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer % 2==0?"偶数":"奇数";
}
}).subscribe(new Observer<GroupedObservable<String, Integer>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull final GroupedObservable<String, Integer> arg0) {
arg0.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(arg0.getKey() + "-------" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Buffer
定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
Observable.just(1, 2, 3, 4, 5,6).buffer(3).subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> integers) {
System.out.println(integers);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Scan
将数据进行累加。
Observable.range(1, 5).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Window
window和buffer相似,它返回的是一个Observable对象,它根据一系列任务规则把数据聚集到一个列表。
// window第一个参数count:每个窗口应发射前的最大大小;第二个:在启动新窗口之前需要跳过多少项
Observable.range(1, 5).window(5, 1).subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(final Observable<Integer> arg0) {
System.out.println(arg0);
arg0.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("---"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Filtering Observables(过滤 Observable)
Debounce
操作间隔一定时间内没有做任何操作,数据才会发送到观察者。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
for (int i = 0; i < 10; i++) {
Thread.sleep(2000);
arg0.onNext(i);
}
arg0.onComplete();
}
}).debounce(1, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Distinct
去掉重复数据的操作符。
Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
ElementAt
取出指定位置的数据。
Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Filter
对数据进行指定规则的过滤。
Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
First
取数据中的第一个数据。
//first参数:defaultItem: 当前Observable不发射任何内容时发出的默认项
Observable.just(1, 2, 3,4,5).first(10).subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
});
IgnoreElements
忽略所有的数据,不向观察者发送数据,直接回调onError或onComplete()。
Observable.just(6, 9, 1, 3).distinct().ignoreElements().subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
});
Last
列表数据最后指定的数位项数据。 SingleObserver只发射一条单一的数据,或者一条异常通知,不能发射完成通知,其中数据与通知只能发射一个。
Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
}).last("4").subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
});
Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
}).last("4").subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
});
Sample
对数据源进行样本采集,发送给观察者。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
arg0.onNext(i);
}
arg0.onComplete();
}
}).sample(4, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Skip
跳过指定列表项数据的指定项数据。
Observable.just(6, 3, 2, 1).skip(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
SkipLast
跳过列表数据的最后几位数据。
Observable.just(1, 2, 3, 5).skipLast(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Take
只取列表数据的前几项。
Observable.just(1, 2, 3, 4).take(2).takeLast(1).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
TakeLast
取列表数据项的最后几项数据。 Consumer是简易版的Observer,他有多重重载,可以自定义你需要处理的信息,他只提供一个回调接口accept,由于没有onError和onCompete,无法再 接受到onError或者onCompete之后,实现函数回调。
Observable.just(1, 2, 3, 4).takeLast(2).take(1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
Combining Observables(组合 Observable)
Zip
通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。当其中一个Observable发送数据结束或异常,另外一个也停止发送。
Observable<Integer> observable = Observable.just(10, 20, 30);
Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
Observable.zip(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println(integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Merge
合并多个Observables的发射物。
Observable<Integer> observable = Observable.just(10, 20, 30);
Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
Observable.merge(observable, observable1).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println(integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
StartWith
在数据序列的开头插入一条指定的项。
Observable<Integer> observable = Observable.just(10, 20, 30);
Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
Disposable subscribe = observable.startWith(observable1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
CombineLatest
当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
Observable<Integer> observable = Observable.just(1, 3, 5);
Observable<Integer> observable1 = Observable.just(2, 4, 6);
Observable.combineLatest(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Throwable {
System.out.println("integer:" + integer + "---" + "integer2:" + integer2);
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println(integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Join
任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
String[] args1 = new String[]{"张欣1", "张欣2", "张欣3", "张欣4", "张欣5"};
String[] args2 = new String[]{"春晓1", "春晓2", "春晓3", "春晓4"};
Observable<String> o1 = Observable.fromArray(args1);
Observable<String> o2 = Observable.fromArray(args2);
//相同的数组可以进行合并
o2.join(o1, new Function<String, Observable<Long>>() {
@Override
public Observable<Long> apply(String s) throws Exception {
return Observable.timer(2, TimeUnit.SECONDS);
}
}, new Function<String, Observable<Long>>() {
@Override
public Observable<Long> apply(String s) {
return Observable.timer(2, TimeUnit.SECONDS);
}
}, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + "-&--" + s2;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println(s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
SwitchOnNext
将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。
final Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
final Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);
Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
@Override
public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
emitter.onNext(observable1);
Thread.sleep(1000);
// 此时发射一个新的observable2,将会取消订阅observable1
emitter.onNext(observable2);
emitter.onComplete();
}
});
// 创建发射含有Error通知的Observable序列的Observable
Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
@Override
public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
emitter.onNext(observable1);
// emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 发射一个发射Error通知的Observable
// emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 发射一个发射Error通知的Observable
Thread.sleep(1000);
// 此时发射一个新的observable2,将会取消订阅observable1
emitter.onNext(observable2);
emitter.onComplete();
}
});
// 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
// 可选参数 bufferSize: 缓存数据项大小
// 接受一个发射Observable序列的Observable类型的sources,
// 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据
Disposable subscribe = Observable.switchOnNext(sources)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.out.println("--------------------------------------------------------------------");
// 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
// 可选参数 prefetch: 与读取数据项大小
// 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,
// 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
Observable.switchOnNextDelayError(sourcesError)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Long t) {
System.out.println("--> onNext(2): " + t);
}
@Override
public void onError(Throwable e) {
// 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
if (e instanceof CompositeException) {
CompositeException compositeException = (CompositeException) e;
List<Throwable> exceptions = compositeException.getExceptions();
System.out.println("--> onError(2): " + exceptions);
} else {
System.out.println("--> onError(2): " + e);
}
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
Error Handling Operators(处理错误)
Catch
Catch操作符拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。 还有一个叫onErrorResumeNext的操作符,它的行为与Catch相似。 RxJava将Catch实现为三个不同的操作符:
- onErrorReturn 让Observable遇到错误时发射一个特殊的项并且正常终止。
- onErrorResumeNext 让Observable在遇到错误时开始发射第二个Observable的数据序。
- onExceptionResumeNext 让Observable在遇到错误时继续发射后面的数据项。
Observable.just(1,2,3).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Throwable {
return null;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println(integer);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
});
Retry
如果原始Observable遇到错误,重新订阅它期望它能正常终止。 retryWhen和retry类似,区别是,retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。 retryWhen默认在trampoline调度器上执行,你可以通过参数指定其它的调度器。
场景:网络请求失败重试操作。
final AtomicInteger atomicInteger = new AtomicInteger(3);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext(String.valueOf(System.currentTimeMillis()));
emitter.onError(new Error(String.valueOf(atomicInteger.decrementAndGet())));
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Throwable {
return throwableObservable;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println(s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
});
更多相关文章
Android如何进阶:http://docs.qq.com/doc/DWHFqVHBMVEJPWUx
Android面试题汇总:http://docs.qq.com/doc/DWGZIRFh5VEtYWE1D
Android音视频需要学习哪些:http://docs.qq.com/doc/DWFFWZHNPTHZVdHFX
Android常有的开源框架有哪些框:docs.qq.com/doc/DWHlGYUdseVhsSUda
Android车载应需要学习哪些:docs.qq.com/doc/DWEl0blBabXVvU2Nw Android
Framework怎么学:docs.qq.com/doc/DWFdlc2JocEtNbEJ1
Schedulers(调度器)
它是RxJava以一种及其简单的方式解决多线程问题的机制。
种类
io() 用于I/O操作。 computation() 计算,计算工作默认的调度器,与I/O操作无关。 immediate() 立即执行,允许立即在当前线程执行你指定的工作。 newThread() 新线程,为指定任务创建新线程。 trampoline() 顺序处理,按需处理队列,并运行队列的每一个任务。
AndroidSchedulers
RxAndroid提供在Android平台的调度器(指定观察者在主线程)。
- SubscribeOn 方法用于每个Observable对象
- ObserveOn 方法用于每个Subscriber(Observer)对象
observable.subscribeOn(Schedulers.newThread())
.unsubscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.onErrorResumeNext(new HttpErrorHandler<T>())
.subscribe(observer);
使用场景
与Retrofit结合使用
retrofitBuilder = new Retrofit.Builder();
retrofitBuilder.client(okHttpClient)
.addConverterFactory(ScalarsConverterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create());
public <T> void doCall(LifecycleOwner owner, Observable<T> observable, final HttpCallBack<T> httpCallBack) {
if (observable == null || httpCallBack == null) {
throw new IllegalArgumentException("observable或HttpCallBack为空");
}
//观察者_网络请求状态
BaseObserver<T> observer = new BaseObserver<T>() {
@Override
public void onNext(T t) {
try {
if (t != null) {
httpCallBack.onSuccess(t);
}
} catch (Exception e) {
e.printStackTrace();
httpCallBack.onFailure(e);
}
}
@Override
public void onError(Throwable e) {
httpCallBack.onFailure(e);
}
};
if (owner == null) {
//被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
observable.subscribeOn(Schedulers.newThread())
.unsubscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(getAppErrorHandler())
.onErrorResumeNext(new HttpErrorHandler<T>())
.subscribe(observer);
} else {
//被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
observable.subscribeOn(Schedulers.newThread())
.unsubscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(getAppErrorHandler())
.onErrorResumeNext(new HttpErrorHandler<T>())
.as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
.subscribe(observer);
}
}
与RxPermission结合使用
RxPermission是基于RxJava的Android动态权限申请框架。
使用(简单封装)
public void initPermissions(String[] permissions, PermissionResult permissionResult) {
if (rxPermissions == null) {
rxPermissions = new RxPermissions(this);
}
rxPermissions.requestEachCombined(permissions)
.subscribe(permission -> {
if (permission.granted) {
permissionResult.onSuccess();
} else if (permission.shouldShowRequestPermissionRationale) {
permissionResult.onFailure();
} else {
permissionResult.onFailureWithNeverAsk();
}
});
}
代替EventBus
EventBus是一个Android端优化的publish/subscribe消息总线,简化了应用程序内各组件间、组件与后台线程间的通信。更多相关请参考Android事件总线之EventBus。 RxJava也可以实现事件总线,因为它们都依据于观察者模式。我们使用RxJava替换EventBus,可以减少App的体积。
使用
private static volatile RxBus instance;
private PublishSubject<Object> mRxtBus;
public static RxBus getDefault() {
if (instance == null) {
synchronized (RxBus.class) {
instance = new RxBus();
}
}
return instance;
}
private RxBus() {
mRxtBus = PublishSubject.create();
}
public void post(String tag, Object event) {
Message msg = new Message(tag, event);
mRxtBus.onNext(msg);
}
public <T> Observable<T> toEvent(Class<T> eventType) {
return mRxtBus.ofType(eventType);
}
}
发送
RxBus.getDefault().post("payValue",code);
接收
subscribe = RxBus.getDefault().toEvent(RxBus.Message.class).subscribe(new Consumer<RxBus.Message>() {
@Override
public void accept(RxBus.Message message) throws Throwable {
if ("payValue".equals(message.getTag())) {
Log.e("yhj", "accept: " + message.getEvent().toString());
}
}
});
解绑
if (subscribe != null && !subscribe.isDisposed()) {
subscribe.dispose();
}
Rxjava内存泄漏的处理
Rxjava的使用不当会导致内存泄漏,使用AutoDispose可以解决这个问题,它是一个随Android生命周期事件自动解绑Rxjava订阅的方便工具。
使用
结合JetPack的LifeCycle(生命周期感知型组件),根据生命周期取消订阅。
//被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
observable.subscribeOn(Schedulers.newThread())
.unsubscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(getAppErrorHandler())
.onErrorResumeNext(new HttpErrorHandler<T>())
.as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
.subscribe(observer);
总结
本文主要是对RxJava使用及Android常见使用场景进行总结,掌握这些还远远不够,RxJava还有许多强大的功能,诸如从磁盘/内存中获取缓存数据,背压策略,联想搜索优化等等。后面在项目开发中遇到相关场景再进行总结,更新。本文若有不当之处,请批评指正。