之前没有了解过Rxjava的童鞋,建议先阅读《Rxjava 从入门到开发》这篇文章,对入门比较有帮助。
StartWith操作符
作用:在数据序列的开头增加一项数据
Observable.just(1,2).startWith(Observable.just(-1)).observeOn(Schedulers.io()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer aLong) {
Logger.i(String.valueOf(aLong));
}
});
输出结果:
例子说明:在Startwith的作用下-1输出在结果的最前面。
CombineLatest操作符
作用:当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
//产生0,5,10数列
Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(3);
//产生0,10,20数列
Observable<Long> observable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(3);
subscription=Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
System.out.println("aLong: " + aLong+" aLong2: "+aLong2);
return aLong+aLong2;
}
}).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
输出结果:
例子说明:observable1输出的是0,5,10;observable2输出的是0,10,20,输出结果的关系如下图:
数字的间隔就是时间差。
Join操作符
作用:join操作符把类似于combineLatest操作符,也是两个Observable产生的结果进行合并,合并的结果组成一个新的Observable,但是join操作符可以控制每个Observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个Observable产生的结果按照一定的规则进行合并。
join方法有四个参数,具体定义如下:
observableA.join(observableB, observableA产生结果生命周期控制函数, observableB产生结果生命周期控制函数, observableA产生的结果与observableB产生的结果的合并规则)
//产生0,5数列
Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(3);
//产生0,10,20数列
Observable<Long> observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(3);
subscription=observable1.join(observable2, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(500, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
System.out.println("aLong: " + aLong+" aLong2: "+aLong2);
return aLong + aLong2;
}
}).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
输出结果:
例子说明:其实输出排列方式和combineLatest差不多,主要差别在通过join可以进一步控制observable结果的输出时间周期,另外顺便一提的是如果observable1输出的数据只有两个,那么即使observable2输出3个数据,但最终输出的也仅有两个。
Merge操作符
作用:将多个Observable合并为一个。
//产生0,5数列
Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(2);
//产生0,10,20数列
Observable<Long> observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(3);
subscription=Observable.merge(observable1,observable2).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("out: " + aLong);
}
});
输出结果:
例子说明:把observable1,observable2的数据合并输出。
switchOnNext操作符
作用:把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者。
Observable<Observable<Long>> observable = Observable.interval(500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//每隔200毫秒产生一组数据(0,10,20,30,40)
return Observable.interval(200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(3);
}
}).take(2);
subscription=Observable.switchOnNext(observable).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("Next:" + aLong);
}
});
输出结果:
例子说明:没人Switch的作用下,输出结果为“0,10,20,0,10,20”,但是在Switch的作用下,结果为“0,10,0,10,20”,少了“20”,这就是switch的作用。
zip操作符
作用:把两个observable提交的结果,严格按照顺序进行合并。
Observable<Long> observable1= Observable.interval(1, TimeUnit.SECONDS).take(3);
Observable<Long> observable2= Observable.interval(1, TimeUnit.SECONDS).take(4);
subscription=Observable.zip(observable1, observable2, new Func2<Long, Long, Integer>() {
@Override
public Integer call(Long aLong, Long aLong2) {
System.out.println("aLong:" + aLong+" aLong2: "+aLong2);
return (int) (aLong+aLong);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("result:" + integer);
}
});
输出结果:
例子说明:最终输出为“0,2,4”,两个Observable输出的数据一一按顺序对应,无法对应的数据就抛弃,例子中的observable2本应该输出4个数据take(4)但是由于操作符zip的作用下,最终只输出3个数据。
以上便是结合操作符的主要内容了。rxjava的主要操作符讲了差不多,有啥问题欢迎大家留言交流下😊,欢迎关注。
附录:
文章demo