过滤操作符
过滤操作符是过滤和选择Observable发射的数据序列,让Observable只返回满足条件的数据。
- filter
filter 操作符是对原Observable产生的结果自定义规划进行过滤,只有满足条件才会提交给订阅者
Observable.just(1, 2, 3, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
// 限制条件
return integer > 2;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " filter =" + integer);
}
});
E/zpan: filter =3
E/zpan: filter =5
- elementAt
elementAt 操作符用来返回指定位置的数据。
和它类似的有elementAtOrDefault(int,T),它可以有默认值
Observable.just(1, 2, 3, 4)
.elementAt(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " element =" + integer);
}
});
E/zpan: element =4
- distinct
distinct 操作符用来去重,其只允许还没有发射过的数据项通过。
和它类似的还有 distinctUntilChanged操作符,其用来去掉连续重复的数据。
Observable.just(1, 4, 8, 5, 5, 1, 5, 6, 8)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " distinct=" + integer);
}
});
E/zpan: distinct=1
distinct=4
distinct=8
distinct=5
distinct=6
- skip
skip 操作符将原Observable发射的数据过滤掉前n项。 - skipLast
skipLast 操作符将原Observable发射的数据从后面进行过滤操作
Observable.just(1, 2, 3, 4, 5)
.skip(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " skip = " + integer);
}
});
E/zpan: skip = 3
skip = 4
skip = 5
- take
take 操作符将原Observable发射的数据只取前n项 - takeLast
takeLast 操作符将原Observable发射的数据从后面进行过滤操作
Observable.just(1, 2, 3, 4, 5)
.take(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " take = " + integer);
}
});
E/zpan: take = 1
take = 2
- ignoreElements
ignoreElements 操作符忽略所有原Observable产生的结果,只把Observable的OnCompleted和onError事件通知给订阅者
Observable.just(1, 2, 3, 4)
.ignoreElements()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e("zpan", "==onCompleted=====");
}
@Override
public void onError(Throwable e) {
Log.e("zpan", "==onError=====");
}
@Override
public void onNext(Integer integer) {
Log.e("zpan", "==onNext=====");
}
});
E/zpan: ==onCompleted=====
- ofType
过滤指定类型的数据
Observable.just(1, 2, "3", "4")
.ofType(String.class)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("zpan", "ofType = " + s);
}
});
E/zpan: ofType = 3
ofType = 4
- first/firstOrDefault
只发射满足条件的第一项,后者可以设置默认值。 - last/lastOrDefault
只发射满足条件的最后一项,后者可以设置默认值。
Observable.just(1, 2, 3, 4)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 2;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan"," first =" + integer);
}
});
E/zpan: first =3
- throttleFirst
throttleFirst 操作符会定期发射这个时间段里原Observable发射的第一个数据,throttleFirst 操作符默认在computation调度器上执行。与throttleFirst类似的有sample操作符,它会定时的发射源Observable最近发射的数据,其他都会被过滤掉。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onCompleted();
}
}
})
// 每隔100ms发射一个数据,throttleFirst设定的时间是200ms,所以它会发射每个200ms内的第一个数据
.throttleFirst(200, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " throttleFirst=" + integer);
}
});
组合操作符
组合操作符可以同时处理多个Observable来创建我们所需要的Observable。
- startWith
startWith 操作符会在原Observable发射的数据前面插上一些数据。
Observable.just(4,5,6)
.startWith(1,2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan"," startWith = " + integer);
}
});
E/zpan: startWith = 1
startWith = 2
startWith = 4
startWith = 5
startWith = 6
- merge
merge 操作符将多个Observable合并到一个Observable中进行发射,merge可能会让合并的Observable发射的数据交错。
Observable<Integer> o1 = Observable.just(1, 2, 3);
Observable<String> o2 = Observable.just("7", "8", "9");
Observable.merge(o1,o2)
.subscribe(new Action1<Serializable>() {
@Override
public void call(Serializable serializable) {
Log.e("zpan","merge = " + serializable.toString());
}
});
E/zpan: merge = 1
merge = 2
merge = 3
merge = 7
merge = 8
merge = 9
- concat
类似于merge,只是concat严格按照顺序发射数据。
Observable<Integer> o1 = Observable.just(1, 2);
Observable<Integer> o2 = Observable.just(6, 7);
Observable.concat(o1, o2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("zpan", " concat = " + integer);
}
});
E/zpan: concat = 1
concat = 2
concat = 6
concat = 7
- zip
zip操作符合并两个或多个Observable发射出的数据,根据指定的函数变换它们,并发射一个新的值。
Observable<Integer> o1 = Observable.just(1, 2);
Observable<String> o2 = Observable.just("- A", "- B");
Observable.zip(o1, o2, new Func2<Integer, String, Object>() {
@Override
public Object call(Integer integer, String s) {
return integer + s;
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.e("zpan", " zip = " + o);
}
});
E/zpan: zip = 1- A
E/zpan: zip = 2- B
- CombineLatest
combineLatest 操作符和zip操作符类似。
将两个Observale最近发射的数据已经Func2函数的规则进展组合。
Observable<Integer> o1 = Observable.just(1, 2);
Observable<String> o2 = Observable.just("- A", "- B");
Observable.combineLatest(o1, o2, new Func2<Integer, String, Object>() {
@Override
public Object call(Integer integer, String s) {
return integer + s;
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.e("zpan", " combineLatest= " + o);
}
});
E/zpan: combineLatest = 2- A
E/zpan: combineLatest = 2- B
- join
结构:
onservableA.join(observableB, 控制observableA发射数据有效期的函数, 控制observableB发射数据有效期的函数,两个observable发射数据的合并规则)
join操作符的效果类似于排列组合,把第一个数据源A作为基座窗口,他根据自己的节奏不断发射数据元素,第二个数据源B,每发射一个数据,我们都把它和第一个数据源A中已经发射的数据进行一对一匹配;
Observable<Integer> o1 = Observable.just(1, 2);
Observable<String> o2 = Observable.just("- c", "- d");
o1.join(o2, new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(1, TimeUnit.SECONDS);
}
}, new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(0, TimeUnit.SECONDS);
}
}, new Func2<Integer, String, Object>() {
@Override
public Object call(Integer integer, String s) {
return integer + s;
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.e("zpan", " join= " + o);
}
});
E/zpan: join= 2- c
join= 1- c
join= 2- d
join= 1- d