1. RxJava2 : 什么是观察者模式
2. RxJava2 : 创建操作符(无关时间)
3. Rxjava2 : 创建操作符(有关时间)
4. Rxjava2 : 变换操作符
5. Rxjava2 : 判断操作符
6. Rxjava2 : 筛选操作符
7. Rxjava2 : 合并操作符
8. Rxjava2 : do操作符
9. Rxjava2 : error处理
10. Rxjava2 : 重试
11. Rxjava2 : 线程切换
api | use |
---|---|
all | {{Observable.all}} |
contains | {{Observable.contains}} |
isEmpty | {{Observable.isEmpty}} |
defaultIfEmpty | {{Observable.defaultIfEmpty}} |
takeWhile / takeUntil | {{Observable.takeWhile}} |
all
- 判断是否全部满足,只有全部满足的时候才会返回 true
Disposable subscribe = Observable.just(1, 2, 3)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer>2;
}
})
.subscribe(aBoolean -> Log.d(TAG, "aBoolean:" + aBoolean));
log
02-11 16:33:45.936 10199-10199/... D/SplashActivity: aBoolean:false
contains
- 判断是否包含
Disposable subscribe = Observable.just(1, 2, 3, 4, 5)
.contains(1)
.subscribe(aBoolean -> Log.d(TAG, "aBoolean:" + aBoolean));
log
02-11 17:28:06.486 15538-15538/... D/SplashActivity: aBoolean:true
isEmpty
- 判断是否为空
Disposable subscribe = Observable.just(1, 2, 3, 4, 5)
.isEmpty()
.subscribe(aBoolean -> Log.d(TAG, "aBoolean:" + aBoolean));
log
02-11 17:31:04.306 15894-15894/... D/SplashActivity: aBoolean:false
defaultIfEmpty
- 当什么也不发送直接发送 complete 的时候,给与一个默认值,如果只发送一个 onError 的话,则这个默认值不会起作用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onComplete();
}
})
.defaultIfEmpty(1)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-11 17:35:40.806 16324-16324/... D/SplashActivity: integer:1
02-11 17:35:40.806 16324-16324/... D/SplashActivity: onComplete
takeWhile
takeWhile
只发送满足条件的,但一旦碰到不满足的,后面的均不会发送,发送 complete 结束takeUntil
一直发送,直到发送到满足条件的结束,并且,满足条件的那一个也会发送,之后发送complete 结束
//takeWhile
Observable.just(1, 2, 3, 4, 5)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer != 4;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-11 16:37:18.896 10620-10620/... D/SplashActivity: integer:1
02-11 16:37:18.896 10620-10620/... D/SplashActivity: integer:2
02-11 16:37:18.896 10620-10620/... D/SplashActivity: integer:3
02-11 16:37:18.896 10620-10620/... D/SplashActivity: onComplete
//takeUntil
Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer == 4;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:1
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:2
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:3
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:4
02-11 17:03:11.396 12703-12703/... D/SplashActivity: onComplete