RxJava中提供了大量不同种类,不同场景的Operators(操作符),RxJava的强大性就来自于它所定义的操作符。主要分类:
创建操作:用于创建Observable的操作符;
变换操作:这些操作符可用于对Observable发射的数据进行变换;
过滤操作:这些操作符用于从Observable发射的数据中进行选择;
组合操作:组合操作符用于将多个Observable组合成一个单一的Observable;
异常处理:这些操作符用于从错误通知中恢复;
辅助操作:一组用于处理Observable的操作符;
其实我们创建Observable的create(),from(),just()等方法,都属于创建操作符。那么,让我们通过代码,来看看各种操作符的实现。
1.创建操作
defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
Observable.defer(newFunc0>() {@OverridepublicObservablecall(){//创建并返回新的Observable,returnnull; }});
repeat( ) — 创建一个重复发射指定数据或数据序列的Observable
Observable.just(1).repeat(3).subscribe(newAction1() {@Overridepublicvoidcall(Integer integer){ Log.d("RxJava", String.valueOf(integer)); }});
结果:将数字1,重复发射3次
11-06 15:05:39.610 7636-7636/? D/RxJava: 111-06 15:05:39.611 7636-7636/? D/RxJava: 111-06 15:05:39.611 7636-7636/? D/RxJava: 1
range( ) — 创建一个发射指定范围的整数序列的Observable
Observable.range(1,4).subscribe(newAction1() {@Overridepublicvoidcall(Integer integer){ Log.d("RxJava", String.valueOf(integer)); }});
结果:发射1-4的整数
11-06 15:12:02.076 19930-19930/keye.com.rxjavaobserver D/RxJava: 1
11-06 15:12:02.076 19930-19930/keye.com.rxjavaobserver D/RxJava: 2
11-06 15:12:02.076 19930-19930/keye.com.rxjavaobserver D/RxJava: 3
11-06 15:12:02.076 19930-19930/keye.com.rxjavaobserver D/RxJava: 4
interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable
Observable.interval(1, TimeUnit.SECONDS).subscribe(newAction1() {@Overridepublicvoidcall(Long i){ Log.d("RxJava", String.valueOf(i)); }});
结果:每个1秒,发射一个整数序列中的数
11-06 15:16:58.648 29492-29524/keye.com.rxjavaobserver D/RxJava: 111-06 15:16:59.684 29492-29524/keye.com.rxjavaobserver D/RxJava: 211-06 15:17:00.688 29492-29524/keye.com.rxjavaobserver D/RxJava: 311-06 15:17:01.683 29492-29524/keye.com.rxjavaobserver D/RxJava: 411-06 15:17:02.689 29492-29524/keye.com.rxjavaobserver D/RxJava: 511-06 15:17:03.688 29492-29524/keye.com.rxjavaobserver D/RxJava: 611-06 15:17:04.685 29492-29524/keye.com.rxjavaobserver D/RxJava: 711-06 15:17:05.683 29492-29524/keye.com.rxjavaobserver D/RxJava: 811-06 15:17:06.683 29492-29524/keye.com.rxjavaobserver D/RxJava: 9
timer( ) — 创建一个在给定的延时之后发射单个数据的Observable
Observable.timer(3, TimeUnit.SECONDS).subscribe(newAction1() {@Overridepublicvoidcall(Long i){ Log.d("RxJava", i); }});
结果:3秒后,发射了一个包含数字0的Observable
11-06 15:22:57 /keye.com.rxjavaobserver D/RxJava: 0
empty( ) — 创建一个什么都不做直接通知完成的Observable
Observable.empty().subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){ Log.d("RxJava","发送了一个空的的Observable"); }@OverridepublicvoidonError(Throwable e){ }@OverridepublicvoidonNext(Object o){ }});
结果:发送了一个空的Observable,直接调用了onCompleted()方法
11-06 15:22:57 D/RxJava: 发送了一个空的的Observable
error( ) — 创建一个什么都不做直接通知错误的Observable
Observable.empty().subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){ }@OverridepublicvoidonError(Throwable e){ Log.d("RxJava","发送了一个空的的Observable"); }@OverridepublicvoidonNext(Object o){ }});
结果:发送了一个空的Observable,直接调用了onError()方法
11-06 15:22:57 D/RxJava: 发送了一个空的的Observable