四、功能操作符
1、delay
Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=======================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=======================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "=======================onComplete");
}
});
打印:
=======================onSubscribe
=======================onNext 1
=======================onNext 2
=======================onNext 3
=======================onComplete
说明:从打印结果可以看出 onSubscribe 回调2秒之后 onNext 才会回调。
2、doOnEach
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
// e.onError(new NumberFormatException());
e.onComplete();
}
})
.doOnEach(new Consumer < Notification < Integer >> () {
@Override
public void accept(Notification < Integer > integerNotification) throws Exception {
Log.d(TAG, "==================doOnEach " + integerNotification.getValue());
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================doOnEach 1
==================onNext 1
==================doOnEach 2
==================onNext 2
==================doOnEach 3
==================onNext 3
==================doOnEach null
==================onComplete
说明:
Observable 每发送一件事件之前都会先回调这个方法。从结果就可以看出每发送一个事件之前都会回调 doOnEach 方法,并且可以取出 onNext() 发送的值。
3、doOnNext
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnNext(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "==================doOnNext " + integer);
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================doOnNext 1
==================onNext 1
==================doOnNext 2
==================onNext 2
==================doOnNext 3
==================onNext 3
==================onComplete
说明:Observable 每发送 onNext() 之前都会先回调这个方法。
4、doAfterNext
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doAfterNext(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "==================doAfterNext " + integer);
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================doAfterNext 1
==================onNext 2
==================doAfterNext 2
==================onNext 3
==================doAfterNext 3
==================onComplete
说明:Observable 每发送 onNext() 之后都会回调这个方法。
5、doOnComplete
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnComplete ");
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnComplete
==================onComplete
说明:Observable 每发送 onComplete() 之前都会回调这个方法。
6、doOnError()
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
}
})
.doOnError(new Consumer < Throwable > () {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "==================doOnError " + throwable);
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnError java.lang.NullPointerException
==================onError
说明:Observable 每发送 onError() 之前都会回调这个方法。
7、doOnSubscribe
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnSubscribe(new Consumer < Disposable > () {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "==================doOnSubscribe ");
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================doOnSubscribe
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
说明:Observable 每发送 onSubscribe() 之前都会回调这个方法。
8、doOnDispose
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnDispose ");
}
})
.subscribe(new Observer < Integer > () {
private Disposable d;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
this.d = d;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
d.dispose();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================doOnDispose
说明:当调用 Disposable 的 dispose() 之后回调该方法。
9、doOnLifecycle
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "==================doOnLifecycle accept");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnLifecycle Action");
}
})
.doOnDispose(
new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnDispose Action");
}
})
.subscribe(new Observer<Integer>() {
private Disposable d;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
this.d = d;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
d.dispose();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================doOnLifecycle accept
==================onSubscribe
==================onNext 1
==================doOnDispose Action
==================doOnLifecycle Action
说明:
在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅。
doOnLifecycle() 第二个参数的回调方法的作用与 doOnDispose() 是一样的。
可以看到当在 onNext() 方法进行取消订阅操作后,doOnDispose() 和 doOnLifecycle() 都会被回调。
如果使用 doOnLifecycle 进行取消订阅
打印:
==================doOnLifecycle accept
==================onSubscrib
说明:
可以发现 doOnDispose Action 和 doOnLifecycle Action 都没有被回调。
10、doOnTerminate() & doAfterTerminate()
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnTerminate ");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnTerminate
==================onComplete
说明:doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。doAfterTerminate 也是差不多,这里就不再赘述。
11、doFinally
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doFinally ");
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doOnDispose ");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "==================doAfterTerminate ");
}
})
.subscribe(new Observer<Integer>() {
private Disposable d;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
this.d = d;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
d.dispose();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================doOnDispose
==================doFinally
说明:在所有事件发送完毕之后回调该方法。doFinally() 和 doAfterTerminate() 到底有什么区别?区别就是在于取消订阅,如果取消订阅之后 doAfterTerminate() 就不会被回调,而 doFinally() 无论怎么样都会被回调,且都会在事件序列的最后。可以看到如果调用了 dispose() 方法,doAfterTerminate() 不会被回调。
现在试试把 dispose() 注释掉看看
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
==================doAfterTerminate
==================doFinally
说明:doAfterTerminate() 已经成功回调,doFinally() 还是会在事件序列的最后。
12、onErrorReturn
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.d(TAG, "==================onErrorReturn " + throwable);
return 404;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorReturn java.lang.NullPointerException
==================onNext 404
==================onComplete
说明:当接收到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。
13、onErrorResumeNext
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
Log.d(TAG, "==================onErrorResumeNext " + throwable);
return Observable.just(4, 5, 6);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorResumeNext java.lang.NullPointerException
==================onNext 4
==================onNext 5
==================onNext 6
==================onComplete
说明:当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。
14、onExceptionResumeNext
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Exception("404"));
}
})
.onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(333);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 333
==================onComplete
说明:与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception。
15、retry
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Exception("404"));
}
})
.retry(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onError
说明:如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数。
16、retryUntil
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Exception("404"));
}
})
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
if (i == 6) {
return true;
}
return false;
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onError
说明:出现错误事件之后,可以通过此方法判断是否继续发送事件。
17、retryWhen
Observable.create(new ObservableOnSubscribe < String > () {
@Override
public void subscribe(ObservableEmitter < String > e) throws Exception {
e.onNext("chan");
e.onNext("ze");
e.onNext("de");
e.onError(new Exception("404"));
e.onNext("haha");
}
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
@Override
public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
@Override
public ObservableSource <? > apply(Throwable throwable) throws Exception {
if(!throwable.toString().equals("java.lang.Exception: 404")) {
return Observable.just("可以忽略的异常");
} else {
return Observable.error(new Throwable("终止啦"));
}
}
});
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "==================onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext chan
==================onNext ze
==================onNext de
==================onError java.lang.Throwable: 终止啦
说明:当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。
将 onError(new Exception("404")) 改为 onError(new Exception("303"))
打印:
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
......
说明:不断的重复发送消息
18、repeat
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.repeat(2)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
打印:
===================onSubscribe
===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 1
===================onNext 2
===================onNext 3
===================onComplete
说明:重复发送被观察者的事件,times 为发送次数。该事件发送了两次。
19、repeatWhen
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.repeatWhen(new Function < Observable < Object > , ObservableSource <? >> () {
@Override
public ObservableSource <? > apply(Observable < Object > objectObservable) throws Exception {
return Observable.empty();
// return Observable.error(new Exception("404"));
// return Observable.just(4); null;
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
打印:
===================onSubscribe
===================onComplete
说明:这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件。这里分三种情况,如果新的被观察者返回 onComplete 或者 onError 事件,则旧的被观察者不会继续发送事件。如果被观察者返回其他事件,则会重复发送事件。
下面直接看看发送 onError 事件和其他事件的打印结果。
===================onSubscribe
===================onError
发送其他事件的打印结果:
===================onSubscribe
===================onNext 1
===================onNext 2
===================onNext 3
===================onComplete
20、subscribeOn
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
//.subscribeOn(Schedulers.newThread())
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "======================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "======================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "======================onError");
}
@Override
public void onComplete() {
Log.d(TAG, "======================onComplete");
}
});
打印:
======================onSubscribe
=========================currentThread name: main
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
说明:上面的打印是不调用subscribeOn
打印:
======================onSubscribe
=========================currentThread name: RxNewThreadScheduler-1
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
说明:调动了subscribeOn(Schedulers.newThread())
现在看看多次调用代码如下:
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer < Integer > () {@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "======================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "======================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "======================onError");
}
@Override
public void onComplete() {
Log.d(TAG, "======================onComplete");
}
});
打印:
======================onSubscribe
=========================currentThread name: RxComputationThreadPool-1
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
说明:可以看到第二次调动的 subscribeOn(Schedulers.newThread()) 并没有效果。
21、observeOn
Observable.just(1, 2, 3)
.observeOn(Schedulers.newThread())
.flatMap(new Function < Integer, ObservableSource < String >> () {
@Override
public ObservableSource < String > apply(Integer integer) throws Exception {
Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());
return Observable.just("chan" + integer);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "======================onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());
Log.d(TAG, "======================onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "======================onError");
}
@Override
public void onComplete() {
Log.d(TAG, "======================onComplete");
}
});
打印:
======================onSubscribe
======================flatMap Thread name RxNewThreadScheduler-1
======================flatMap Thread name RxNewThreadScheduler-1
======================flatMap Thread name RxNewThreadScheduler-1
======================onNext Thread name main
======================onNext chan1
======================onNext Thread name main
======================onNext chan2
======================onNext Thread name main
======================onNext chan3
======================onComplete
说明:指定观察者的线程,每指定一次就会生效一次。从打印结果可以知道,observeOn 成功切换了线程。
总结RxJava中的调度器:
Schedulers.computation( )
用于使用计算任务,如事件循环和回调处理
Schedulers.immediate( )
当前线程
Schedulers.io( )
用于 IO 密集型任务,如果异步阻塞 IO 操作。
Schedulers.newThread( )
创建一个新的线程
AndroidSchedulers.mainThread()
Android 的 UI 线程,用于操作 UI。
五、过滤操作符
1、filter
Observable.just(1, 2, 3)
.filter(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 2;
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onComplete
说明:通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。以上代码只有小于2的事件才会发送,
2、ofType
Observable.just(1, 2, 3, "chan", "zhide")
.ofType(Integer.class)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
说明:可以过滤不符合该类型事件
3、skip
Observable.just(1, 2, 3)
.skip(2)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 3
==================onComplete
说明:跳过正序某些事件,count 代表跳过事件的数量(skipLast() 作用也是跳过某些事件,不过它是用来跳过正序的后面的事件,这里就不再讲解了。)
4、distinct
Observable.just(1, 2, 3, 3, 2, 1)
.distinct()
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
说明:过滤事件序列中的重复事件。
5、distinctUntilChanged
Observable.just(1, 2, 3, 3, 2, 1)
.distinctUntilChanged()
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 2
==================onNext 1
==================onComplete
说明:过滤掉连续重复的事件,因为事件序列中连续出现两次3,所以第二次3并不会发出。
6、take
Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
说明:控制观察者接收的事件的数量,takeLast() 的作用就是控制观察者只能接受事件序列的后面几件事情,这里就不再讲解了,大家可以自己试试。
7、debounce
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
Thread.sleep(900);
e.onNext(2);
}
})
.debounce(1, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
打印:
===================onSubscribe
===================onNext 2
说明:如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。
可以看到事件1并没有发送出去,现在将间隔时间改为1000,
打印:
===================onSubscribe
===================onNext 1
===================onNext 2
throttleWithTimeout() 与此方法的作用一样,这里就不再赘述了。
8、firstElement() && lastElement()
Observable.just(1, 2, 3, 4)
.firstElement()
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "====================firstElement " + integer);
}
});
Observable.just(1, 2, 3, 4)
.lastElement()
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "====================lastElement " + integer);
}
});
打印:
====================firstElement 1
====================lastElement 4
说明:firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素。
9、elementAt() & elementAtOrError()
Observable.just(1, 2, 3, 4)
.elementAt(0)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "====================accept " + integer);
}
});
打印:
====================accept 1
说明:elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError() 。
将 elementAt() 的值改为5,这时是没有打印结果的,因为没有满足条件的元素。
替换 elementAt() 为 elementAtOrError(),代码如下:
Observable.just(1, 2, 3, 4)
.elementAtOrError(5)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "====================accept " + integer);
}
});
打印:
io.reactivex.exceptions.OnErrorNotImplementedException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 701)
at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java: 47)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)
at io.reactivex.Observable.subscribe(Observable.java: 10903)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)
at io.reactivex.Single.subscribe(Single.java: 2707)
at io.reactivex.Single.subscribe(Single.java: 2693)
at io.reactivex.Single.subscribe(Single.java: 2664)
at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)
at android.app.Activity.performCreate(Activity.java: 6942)
at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)
at android.app.ActivityThread. - wrap14(ActivityThread.java)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)
at android.os.Handler.dispatchMessage(Handler.java: 102)
at android.os.Looper.loop(Looper.java: 154)
at android.app.ActivityThread.main(ActivityThread.java: 6682)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)
Caused by: java.util.NoSuchElementException
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)
at io.reactivex.Observable.subscribe(Observable.java: 10903)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)
at io.reactivex.Single.subscribe(Single.java: 2707)
at io.reactivex.Single.subscribe(Single.java: 2693)
at io.reactivex.Single.subscribe(Single.java: 2664)
at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)
at android.app.Activity.performCreate(Activity.java: 6942)
at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)
at android.app.ActivityThread. - wrap14(ActivityThread.java)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)
at android.os.Handler.dispatchMessage(Handler.java: 102)
at android.os.Looper.loop(Looper.java: 154)
at android.app.ActivityThread.main(ActivityThread.java: 6682)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)
说明:这时候会抛出 NoSuchElementException 异常。
六、条件操作符
1、all
Observable.just(1, 2, 3, 4)
.all(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 5;
}
})
.subscribe(new Consumer < Boolean > () {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "==================aBoolean " + aBoolean);
}
});
打印:
==================aBoolean true
说明:判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false。
2、takeWhile
Observable.just(1, 2, 3, 4)
.takeWhile(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "========================integer " + integer);
}
});
打印:
========================integer 1
========================integer 2
说明:可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送。
3、skipWhile
Observable.just(1, 2, 3, 4)
.skipWhile(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "========================integer " + integer);
}
});
打印:
========================integer 3
========================integer 4
说明:可以设置条件,当某个数据满足条件时不发送该数据,反之则发送。
4、takeUntil
Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 3;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "========================integer " + integer);
}
});
打印:
========================integer 1
========================integer 2
========================integer 3
========================integer 4
说明:可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了。
5、skipUntil
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "========================onSubscribe ");
}
@Override
public void onNext(Long along) {
Log.d(TAG, "========================onNext " + along);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "========================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "========================onComplete ");
}
});
打印:
========================onSubscribe
========================onNext 4
========================onNext 5
========================onComplete
说明:当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者。
6、sequenceEqual
Observable.sequenceEqual(Observable.just(1, 2, 3),
Observable.just(1, 2, 3))
.subscribe(new Consumer < Boolean > () {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "========================onNext " + aBoolean);
}
});
打印:
========================onNext true
说明:判断两个 Observable 发送的事件是否相同。
7、contains
Observable.just(1, 2, 3)
.contains(3)
.subscribe(new Consumer < Boolean > () {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "========================onNext " + aBoolean);
}
});
打印:
========================onNext true
说明:判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false。
8、isEmpty
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onComplete();
}
})
.isEmpty()
.subscribe(new Consumer < Boolean > () {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "========================onNext " + aBoolean);
}
});
打印:
========================onNext true
说明:判断事件序列是否为空。
9、amb
ArrayList < Observable < Long >> list = new ArrayList < > ();
list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));
Observable.amb(list)
.subscribe(new Consumer < Long > () {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "========================aLong " + aLong);
}
});
打印:
========================aLong 6
========================aLong 7
========================aLong 8
========================aLong 9
========================aLong 10
说明:amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃。
10、Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onComplete();
}
})
.defaultIfEmpty(666)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "========================onNext " + integer);
}
});
打印:
========================onNext 666
说明:如果被观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值。