前言
关于RxJava介绍的文章已经很多,但是关于实战的教程却不尽人意,今天就从代码的角度分析一下RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.1.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
Observable.create(ObservableOnSubscribe<T> )
最常见的操作符,用于生产一个发射对象,在ObservableOnSubscribe的接口方法中有一个ObservableEmitter<T>对象用于发送数据,泛型T代表发送数据的类型
ObservableEmitter继承与Emitter
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
Observable可以用subscribe(Observer<? super T> observer)方法订阅一个或多个观察者,subscribe返回一个Disposable 对象
Observable.create(new ObservableOnSubscribe<Integer>(){
@Override
public void subscribe(@NonNull
ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();//onComplete继续发送就接收不到了
e.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private int i;
private Disposable mDisposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
mDisposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
i++;
if (i == 2) {
//isposable可以做到切断的操作,让Observer观察者不再接收上游事件
mDisposable.dispose();
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Observer有4个方法,onSubscribe,被订阅的时候调用,回传一个Disposable对象,可用于终止接受信号,onNext是在ObservableEmitter调用onNext时候调用,同理onComplete也是
注意:subscribe可以直接订阅Consumer,只有在上游onNext方法是会调用
subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integers) throws Exception {
}
});
Observable.just(T t1,Tt2 ....)
依次发送数据
Observable.just("1", "2", "3")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
}
});
Observable.zip( ObservableSource<? extends T1> ob1,ObservableSource<? extends T2> ob2, BiFunction<? super T1, ? super T2, ? extends R> bif)
注:Observable<T> implements ObservableSource<T>
合并事件专用,分别从两个上游事件中各取出一个组合,一个事件只能被使用一次,顺序严格按照事件发送的顺序,最终下游事件收到的是和上游事件最少的数目相同(必须两两配对,多余的舍弃)
在zip的第三个参数中,BiFunction对象有三个泛型,第一个泛型是ob1的泛型参数,第二个表示ob2的泛型参数,第三个表示返回值的类型
Observable ob1=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("A");
e.onNext("B");
e.onNext("C");
}
}
});
Observable ob2=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(1);
e.onNext(2);
e.onNext(3);
//4,5会被舍弃
e.onNext(4);
e.onNext(5);
}
}
});
Observable.zip(ob1, ob2, new BiFunction<String, Integer, String>(){
@Override
public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
return s + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
}
});
Observable<R> map(Function<? super T, ? extends R> mapper)
对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化,对参数进行加工,相当于添加了一个中间层
Function有两个泛型,第一个是要加工参数类型,第二个是apply返回类型
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws
Exception{
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
}
});
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
FlatMap将一个发送事件的上游Observable变换成多个发送事件的Observables,然后将它们发射的时间合并后放进一个单独的Observable里
flatMap和map相似,参数也是一个Function对象,不同的是Function对象的第二个泛型参数不一样,flatmap的第二个泛型参数是一个ObservableSource<T> ,即在apply方法中,返回一Observable<T>对象
可用于实现多个网络请求依次依赖,数据打平
Observable.create(new ObservableOnSubscribe<ArrayList<String>>(){
@Override
public void subscribe(@NonNull ObservableEmitter<ArrayList<String>> e) throws Exception {
List<String> list=new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " +i);
}
}
}).flatMap(new Function<ArrayList<String>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull ArrayList<String> list) throws Exception{
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception{
}
});
注意:flatMap并不保证事件的顺序,与此类似的操作符还有concatMap,concatMap作用和flatMap几乎一模一样,唯一的区别是它能保证事件的顺序
Observable<T> doOnNext(Consumer<? super T> onNext)
Observable.just(1, 2, 3, 4)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("do someThing");
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});
doOnNext和map有些类似,都会改变下游数据,但是map可以改变下游数据的类型,而doOnNext不能
Observable<T> filter(Predicate<? super T> predicate)
用于过滤数据
Observable.just(1, 20, 65, -5, 7, 19)
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return integer >= 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});
fliter传入一个Predicate对象,泛型参数类型为test方法的类型,test方法返回值为固定的布尔类型,返回true才会向下游传递
Observable<T> skip(long count)
接受一个long型参数,代表跳过多少个数目的事件再开始接收
Observable.just(1,2,3,4,5)
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});
Observable<T> take(long count)
指定订阅者最多收到多少数据
Observable.fromArray(1,2,3,4,5)
.take(2)
.subscribe(new Consumer<Integer>(){
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});
Observable<Long> timer(long delay, TimeUnit unit)
timer 操作符可以延迟执行一段逻辑,默认执行在一个新线程上。
Observable.timer(2, TimeUnit.SECONDS).create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
}).observeOn(AndroidSchedulers.mainThread()) // 由于timer默认在新线程,所以我们应该切回主线程.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
间隔执行操作,默认在新线程
Observable.interval(3, 2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
}
});
Single和SingleObserver
SingleObserver与普通的Observer相同,但是只接受上游的一个参数,并且没有onComplete方法
Single.just(new Random().nextInt())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
});
Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
连接操作符,可接受Observable的可变参数,或者Observable的集合,注意两个 Observable 的泛型应当保持一致
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}});
可用于多个连续的操作,例如三级缓存,先从内存缓存区,没有再从磁盘缓存取,最后请求网络。
Observable<T> distinct()
去重操作符,其实就是简单的去重
Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});
Observable<List<T>> buffer(int count, int skip)
将 observable 中的数据按 skip(步长)分成最长不超过 count 的 buffer,然后生成一个 observable
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 2)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(@NonNull List<Integer> integers) throws Exception {
Log.e(TAG, "buffer value : " );
for (Integer i : integers) {
Log.e(TAG, i + "");
}
Log.e(TAG, "\n");
}
});
这里有没有很熟悉,我们之前用flatMap可以把一个数组打平,而这里则是将数据生成一个数组,最后都是生成一个Observer,所以buffer可以看作是flatMap的反操作
Observable<T> debounce(long timeout, TimeUnit unit)
过滤掉发射速率过快的数据项
Observable.create(new ObservableOnSubscribe<Integer>(){
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
}).debounce(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});
Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
次订阅都会创建一个新的Observable,并且如果该Observable没有被订阅,就不会生成新的Observable
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(1, 2, 3);
}
});
Single<T> last(T defaultItem)
取出最后一个值,参数是没有值的时候的默认值
Observable.just(1, 2, 3)
.last(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});
需要注意last返回Single对象,所以在subscribe方法如果订阅一个Observer只能是SingleObserver
Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2 ...)
多个Observable合起来,接受可变参数,也支持使用迭代器集合
Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).delay(500,TimeUnit.MILLISECONDS);
Observable observable2=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(4);
e.onNext(5);
e.onNext(6);
}
});
Observable.merge(observable1,observable2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull final Integer integer) throws Exception {
Log.e(TAG, "accept: merge :" + integer + "\n" );
}
});
我们之前学过一个相似的效果concat操作符,也是合并多个Observable对象,merge和concat有什么区别吗?重点在于是否按照顺序执行,concat会按照参数的顺序执行,即使你设置了delay,它也会先等待后执行第一个Observable,而merge不会,例如上面代码会先打印456,另外delay是另外开了一个新的线程,如果我们要操作UI,需要切换回主线程
Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer)
就是一次用一个方法处理一个值,可以有一个seed作为初始值
Observable.just(1, 2, 3)
.reduce(1,new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer,
@NonNull Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept: reduce : " + integer + "\n");
}
});
1+1+2+3,所以最后打印是7
Observable<R> scan(final R initialValue, BiFunction<R, ? super T, R> accumulator)
和上面的reduce差不多,区别在于reduce()只输出结果,而scan()会将过程中每一个结果输出
Observable.just(1, 2, 3)
.scan(1,new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception{
return integer + integer2;
}
}).subscribe(new Consumer<Integer>(){
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept: scan " + integer + "\n");
}
});
Observable<Observable<T>> window(long timespan, TimeUnit unit)
按照时间划分窗口,将数据发送给不同的Observable
Observable.interval(1, TimeUnit.SECONDS).create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
e.onNext(1l);
}
}).take(15) // 最多接收15个
.window(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Observable<Long>>(){
@Override
public void accept(@NonNull Observable<Long> longObservable) throws Exception {
longObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
}
});
}
});
PublishSubject
onNext() 会通知每个观察者,仅此而已
PublishSubject<Integer> publishSubject = PublishSubject.create();
publishSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
publishSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
publishSubject.onNext(1);
publishSubject.onNext(2);
publishSubject.onNext(3);
会通知订阅的每一个观察者而不会产生覆盖
AsyncSubject
在 调用 onComplete() 之前,除了 subscribe() 其它的操作都会被缓存, 在调用 onComplete() 之后只有最后一个 onNext() 会生效
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
asyncSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
asyncSubject.onNext(1);
asyncSubject.onNext(2);
asyncSubject.onNext(3);
asyncSubject.onComplete();
只有3会被发送
BehaviorSubject
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
behaviorSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
behaviorSubject.onNext(1);
behaviorSubject.onNext(2);
behaviorSubject.onNext(3);
behaviorSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
behaviorSubject.onNext(4);
behaviorSubject.onNext(5);
behaviorSubject.onComplete();
3会被缓存,在订阅新的观察者时候,会被发送到新的观察者,并且behaviorSubject也可以订阅多个观察者,即4,5会发送给两个观察者
Completable
只关心结果,也就是说 Completable 是没有 onNext 的,要么成功要么出错,不关心过程,在 subscribe 后的某个时间点返回结果
Completable.timer(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onComplete() {
}
@Override
public void onError(@NonNull Throwable e) {
}
});