注:只包含标准包中的操作符,用于个人学习及备忘
参考博客:http://blog.csdn.net/maplejaw_/article/details/52396175
本篇将介绍rxjava中的创建操作、合并操作、过滤操作、条件/布尔操作、聚合操作、转换操作以及变换操作,只针对用法不涉及原理,对RxJava不熟悉的可参考:http://gank.io/post/560e15be2dca930e00da1083
创建操作
-
create:使用OnSubscrib直接创建一个Observable
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("item1"); subscriber.onNext("item2"); subscriber.onCompleted(); } });
-
from:将数组或集合拆分成具体对象后,转换成发送这些对象的Observable
String[] arr = {"item1", "item2", "item3"}; Observable.from(arr) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //调用多次,分别打印出item1,item2,item3 } });
-
just:将一个或多个对象转换成发送这些对象的Obserbable
Observable.just("item1","item2","item3") .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //调用多次,分别打印出item1,item2,item3 } });
empty:创建一个直接通知完成的Observable
error:创建一个直接通知错误的Observable
-
never:创建一个什么都不做的Observable
Observable observable1 = Observable.empty(); //直接调用onCompleted()方法 Observable observable2 = Observable.error(new RuntimeException()); //直接调用onError()方法 Observable observable3 = Observable.never(); //onNext(),onCompleted(),onError()均不调用
-
timer:创建一个延时发射数据的Observable
Observable.timer(1000, TimeUnit.MILLISECONDS) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //aLong为0 } });
-
interval:创建一个按照给定的时间间隔发射送0开始的整数序列的Obervable
Observable.interval(2, 1, TimeUnit.SECONDS) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //等待2秒后开始发射数据,发射的时间间隔为1秒,从0开始计数 } }); Observable.interval(1, TimeUnit.SECONDS) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //等待1秒后开始发射数据,发射的时间间隔为1秒,从0开始计数 //相当于Observable.interval(1, 1, TimeUnit.SECONDS) } });
-
range:创建一个发射指定范围的整数序列的Observable
Observable.range(3, 4) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString());//依次发射3,4,5,6,从3开始发射4个数据 } });
-
defer:观察者订阅时才创建Observable,每次订阅返回一个新的Observable
Observable.defer(new Func0<Observable<String>>() { @Override public Observable<String> call() { return Observable.just("s"); } }).subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //打印s } });
合并操作(用于组合多个Observavle)
-
concat:按顺序连接多个Observable,注:Observable.concat(a,b)等价于a.concatWith(b)
Observable<Integer> observable1 = Observable.just(1, 2, 3); Observable<Integer> observable2 = Observable.just(4, 5, 6); Observable.concat(observable1, observable2) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2,3,4,5,6 } });
-
startWith:在数据序列的开头增加一项数据,内部调用concat
Observable.just(1, 2, 3) .startWith(Observable.just(4, 5)) //添加一个Observable .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印4,5,1,2,3 } }); Observable.just(1,2,3) .startWith(4,5) //添加多个数据 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印4,5,1,2,3 } }); List<Integer> integers = new ArrayList<>(); integers.add(4); integers.add(5); Observable.just(1,2,3) .startWith(integers) //添加一个集合 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印4,5,1,2,3 } });
merge / mergeDelayError:将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知
Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
SystemClock.sleep(1000);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable<Integer> observable2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
SystemClock.sleep(500);
subscriber.onNext(4);
subscriber.onNext(5);
SystemClock.sleep(1000);
subscriber.onNext(6);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable.merge(observable1, observable2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,4,5,2,3,6
}
});
- zip:使用一个函数组合多个Observable发射的数据集合,再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合
Observable<Integer> observable1 = Observable.just(1, 2, 3, 4, 5);
Observable<String> observable2 = Observable.just("A", "B", "C", "D");
Observable.zip(observable1, observable2, new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("debug", s); //打印1A,2B,3C,4D
}
});
- combineLatest:当两个Observable中任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。类似于zip,但是,不同的是zip只有在每个Observable都发射了数据才工作,而combineLatest任何一个发射了数据都可以工作,每次与另一个Observable最近的数据压合
Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
SystemClock.sleep(500);
subscriber.onNext(2);
SystemClock.sleep(1000);
subscriber.onNext(3);
SystemClock.sleep(300);
subscriber.onNext(4);
SystemClock.sleep(500);
subscriber.onNext(5);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
SystemClock.sleep(300);
subscriber.onNext("A");
SystemClock.sleep(300);
subscriber.onNext("B");
SystemClock.sleep(500);
subscriber.onNext("C");
subscriber.onNext("D");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable.combineLatest(observable1, observable2, new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("debug", s); //打印1A,2A,2B,2C,2D,3D,4D,5D
}
});
过滤操作
-
filter:过滤数据
Observable.just(1, 2, 3, 4) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; //过滤偶数 } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印2,4 } });
-
ofType:过滤指定类型数据
Observable.just(1, "2", 3, "4") .ofType(Integer.class) //过滤整形数据 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,3 } });
-
take:只发射前n项数据或者一定时间内的数据(无需考虑索引越界问题,配合interval操作符可作为定时器使用)
Observable.just(1, 2, 3, 4) .take(2) //只发射前2项 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2 } }); Observable.interval(1, TimeUnit.SECONDS) .take(3, TimeUnit.SECONDS) //只发射3秒内的数据 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //打印0,1(打印出来的并不是相像中的0,1,2,应该与代码代码执行的时间有关,使用时需要注意!) Log.d("debug", aLong.toString()); } });
-
takeLast:只发射最后的N项数据或者一定时间内的数据(无需考虑索引越界问题)
Observable.just(1, 2, 3, 4) .takeLast(3) //只发射后3项 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印2,3,4 } }); Observable.interval(1, TimeUnit.SECONDS) .take(10) //每1秒发射一个数据,发射10秒 .takeLast(3, TimeUnit.SECONDS) //只发射最后3秒的数据 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印6,7,8,9(同样存在些许误差,使用时需注意!) } }); Observable.interval(1, TimeUnit.SECONDS) .take(10) .takeLast(2, 3, TimeUnit.SECONDS) //只发射最后3秒内的最后2个数据 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印8,9 } });
-
takeFirst:只发射满足条件的第一项(其实就是filter+take)
Observable.just(1, 2, 3, 4) .takeFirst(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 1; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印2 } });
*first / firstOrDefault:只发射第一项或者满足条件的第一项数据,其中firstOrDefault可以指定默认值(建议使用firstOrDefault,找不到对应元素时first会报异常)
Observable.just(1, 2, 3)
.first() //发射第一项
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1
}
});
Observable.just(1, 2, 3, 4)
.first(new Func1<Integer, Boolean>() { //发射大于2的第一项
@Override
public Boolean call(Integer integer) {
return integer > 2;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印3
}
});
Integer[] arr = {};
Observable.from(arr)
.firstOrDefault(2) //发射第一项,没有可发射的数据时,发射默认值2
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印2
}
});
-
last / lastOrDefault:只发射最后一项或满足条件的最后一项,其中lastOrDefault可以指定默认值
Observable.just(1, 2, 3) .last() //发射最后一项 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3 } }); Observable.just(1, 2, 3, 4) .last(new Func1<Integer, Boolean>() { //发射大于2的最后一项 @Override public Boolean call(Integer integer) { return integer > 2; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印4 } }); Integer[] arr = {}; Observable.from(arr) .lastOrDefault(2) //发射最后一项,没有可发射的数据时,发射默认值2 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印2 } });
-
skip:跳过开始的n项数据或者一定时间内的数据(与take类似)
Observable.just(1, 2, 3, 4) .skip(2) //跳过前2项 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3,4 } }); Observable.interval(1, TimeUnit.SECONDS) .take(5) .skip(3, TimeUnit.SECONDS) //跳过前3秒 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印2,3,4,同样存在误差! } });
-
skipLast:跳过最后的n项数据或一定时间内的数据
Observable.just(1, 2, 3, 4) .skipLast(2) //跳过最后2项 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2 } }); Observable.interval(1, TimeUnit.SECONDS) .take(7) .skipLast(3, TimeUnit.SECONDS) //跳过最后3秒 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印0,1,2,同样存在误差! } });
-
elementAt / elementAtOrDefault:发射某一项数据,其中elementAtOrDefault可以指定索引越界时发射的默认值
Observable.just(1, 2, 3, 4) .elementAt(2) //发射索引为2的数据 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3 } }); Observable.just(1, 2, 3) .elementAtOrDefault(4, 5) //发射索引为4的数据,索引越界时发射默认数据5 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印5 } });
ignoreElements:丢弃所有数据,只发射错误或正常终止的通知,即只触发观察者的onError()或onCompleted()方法
-
distinct:过滤重复数据,可指定判定唯一的标准
Observable.just(1, 1, 2, 3, 2, 4) .distinct() .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2,3,4 } }); Observable.just(1, 1, 2, 3, 2, 4) //根据发射的数据生成对应的key,通过key值来判断唯一,如果两个数据的key相同,则只发射第一个 .distinct(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { //奇数对应的key为1,偶数对应的key为2 if (integer % 2 == 0) { return 2; } return 1; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2 } });
-
distinctUntilChanged:过滤掉连续重复的数据,可指定判定唯一的标准
Observable.just(1, 1, 2, 3, 2, 4) .distinctUntilChanged() .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2,3,2,4 } }); Observable.just(1, 1, 2, 3, 2, 4) //根据发射的数据生成对应的key,通过key值来判断唯一,如果两个数据的key相同,则只发射第一个 .distinctUntilChanged(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { //奇数对应的key为1,偶数对应的key为2 if (integer % 2 == 0) { return 2; } return 1; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2,3,2 } }); Observable.just(1, 1, 2, 3, 2, 4) //传入比较器的方式 .distinctUntilChanged(new Func2<Integer, Integer, Boolean>() { @Override public Boolean call(Integer integer, Integer integer2) { return integer % 2 == integer2 % 2; //同为奇数或偶数返回true } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2,3,2 } });
-
throttleFirst:定期发射Observable在该时间段发射的第一项数据
Observable.interval(0, 500, TimeUnit.MILLISECONDS) .take(10) //每500毫秒发射一次数据,发射10次 .throttleFirst(1000, TimeUnit.MILLISECONDS) //每1秒发射该秒内发射数据中的第一项数据 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //打印0,2,5,8(即第一秒发射0,1,第二秒发射2,3,4,第三秒发射5,6,7,第四秒发射8,9),同样存在误差! Log.d("debug", aLong.toString()); } });
-
throttleWithTimeout / debounce(两者使用及效果相同):发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才进行发射
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { //依次发射1-6,发射间隔不同 subscriber.onNext(1); SystemClock.sleep(500); subscriber.onNext(2); SystemClock.sleep(500); subscriber.onNext(3); SystemClock.sleep(1000); subscriber.onNext(4); SystemClock.sleep(1000); subscriber.onNext(5); SystemClock.sleep(500); subscriber.onNext(6); subscriber.onCompleted(); } }).throttleWithTimeout(700, TimeUnit.MILLISECONDS) //指定最小发射间隔时间为700毫秒 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3,4,6 } });
-
sample / throttleLast(两者使用及效果相同):定期发射Observable在该时间段发射的最后一项数据,与throttleFirst相反
Observable.interval(0, 500, TimeUnit.MILLISECONDS) .take(10) //每500毫秒发射一次数据,发射10次 .throttleLast(1000, TimeUnit.MILLISECONDS) //每1秒发射该秒内发射数据中的最后一项数据 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //打印1,3,5,7,9(即第一秒发射0,1,第二秒发射2,3,第三秒发射4,5,第四秒发射6,7,第五秒发射8,9) Log.d("debug", aLong.toString()); } });
-
timeout:如果指定时间内没有发射任何数据,就发射一个异常或者使用备用的Observavle
Observable.timer(5, TimeUnit.SECONDS) .timeout(3, TimeUnit.SECONDS) //超时则发射异常 .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.d("debug", "onError()"); //抛出异常 } @Override public void onNext(Long aLong) { Log.d("debug", aLong.toString()); } }); Observable.timer(5, TimeUnit.SECONDS) .timeout(3, TimeUnit.SECONDS, Observable.just(2L)) //设置备用Observable .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.d("debug", "onError()"); } @Override public void onNext(Long aLong) { Log.d("debug", aLong.toString()); //发射备用Observable,打印2 } });
条件 / 布尔操作
-
all:判断所有数据中是否都满足某个条件
Observable.just(1, 2, 3, 4) .all(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer < 5;//所有项都小于5 } }) .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } }); Observable.just(1, 2, 3, 4) .all(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 2;//所有项都大于2 } }) .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印false } });
-
exists:判断是否存在数据项满足某个条件
Observable.just(1, 2, 3, 4) .exists(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 2; //存在某项大于2 } }) .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } });
-
contains:判断所有数据中是否包含指定的数据(内部调用exists)
Observable.just(1, 2, 3, 4) .contains(2) //是否包含2 .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } });
-
sequenceEqual:判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3)) .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } });
-
isEmpty:用于判断Observable是否没有发射任何数据(发射null返回为false)
Observable.from(new ArrayList<Integer>()) //集合中没有数据 .isEmpty() .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } }); Observable.empty() .isEmpty() .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } });
-
amber:指定多个Observable,只允许第一个开始发射数据的Observable发射全部数据,其他Observable将会被会忽略
Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { SystemClock.sleep(500); //延迟500毫秒 subscriber.onNext(1); subscriber.onNext(2); subscriber.onCompleted(); } }.subscribeOn(Schedulers.computation())); //指定为新的线程 Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("a"); subscriber.onNext("b"); subscriber.onCompleted(); } }); Observable.amb(observable1, observable2) .subscribe(new Action1<Serializable>() { @Override public void call(Serializable serializable) { Log.d("debug", serializable.toString()); //打印a,b } });
-
switchIfEmpty:如果原始Observable正常终止后仍没有发射任何数据,就使用备用的Observable
Observable.from(new ArrayList<Integer>()) .switchIfEmpty(Observable.just(1, 2)) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2 } });
-
defaultIfEmpty:如果原始Observable正常终止后仍没有发射任何数据,就发射一个默认值(内部调用switchIfEmpty)
Observable.from(new ArrayList<Integer>()) .defaultIfEmpty(1) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1 } });
-
takeUntil:当发射的数据满足某个条件后(包含该数据),或者第二个Observable发射了一项数据或发射了一个终止通知时(观察者接受不到第二个Observable发射的数据),终止第一个Observable发送数据
Observable.just(1, 2, 3, 4) .takeUntil(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer == 3; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", "just" + integer.toString()); //打印1,2,3 } }); Observable.interval(0, 500, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.computation()) .takeUntil(Observable.timer(1200, TimeUnit.MILLISECONDS)) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印0,1,2 } });
-
takeWhile:当发射的数据对应某个条件为false时(不包含该数据),Observable终止发送数据
Observable.just(1, 2, 3, 4) .takeWhile(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer != 3; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2 } });
-
skipUnit:丢弃Observable发射的数据,直到第二个Observable开始发射数据或者发射一个终止通知时
Observable.interval(0, 500, TimeUnit.MILLISECONDS) .take(5) .subscribeOn(Schedulers.computation()) .skipUntil(Observable.timer(1200, TimeUnit.MILLISECONDS)) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印3,4 } });
-
skipWhile:丢弃Observable发射的数据,直到一个指定的条件不成立(不丢弃条件数据)
Observable.just(1, 2, 3, 4) .skipWhile(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer < 3; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3,4 } });
聚合操作
-
reduce:用一个函数接收Observable发射的数据,将函数的计算结果作为下次计算的参数,最后输出结果。
Observable.just(1, 2, 3, 4) .reduce(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { Log.d("debug", "integer1:" + integer + ",integer2:" + integer2); return integer + integer2; //求和操作 } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", "result:" + integer); } }); /** * 日志输出 * integer1:1,integer2:2 * integer1:3,integer2:3 * integer1:6,integer2:4 * result:10 */
-
collect:用于将数据收集到一个可变的数据结构(如List,Map)
Observable.just(1, 2, 3, 4) .collect(new Func0<List<Integer>>() { @Override public List<Integer> call() { return new ArrayList<Integer>(); //创建List用于收集数据 } }, new Action2<List<Integer>, Integer>() { @Override public void call(List<Integer> integers, Integer integer) { integers.add(integer); //将数据添加到List中 } }) .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3, 4] } }); Observable.just(1, 2, 3, 4) .collect(new Func0<Map<Integer, String>>() { @Override public Map<Integer, String> call() { return new HashMap<Integer, String>(); //创建Map用于收集数据 } }, new Action2<Map<Integer, String>, Integer>() { @Override public void call(Map<Integer, String> integerStringMap, Integer integer) { integerStringMap.put(integer, "value" + integer); //将数据添加到Map中 } }) .subscribe(new Action1<Map<Integer, String>>() { @Override public void call(Map<Integer, String> integerStringMap) { //打印{4=value4, 1=value1, 3=value3, 2=value2},注:HashMap保存的数据是无序的 Log.d("debug", integerStringMap.toString()); } });
-
count / countLong:计算发射的数量,内部调用的是reduce
Observable.just(1, 2, 3, 4) .count() .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", "integer:" + integer.toString()); //打印4 } });
转换操作
-
toList:将Observable发射的所有数据收集到一个列表中,返回这个列表
Observable.just(1, 2, 3, 4) .toList() .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3, 4] } });
-
toSortedList:将Observable发射的所有数据收集到一个有序列表中,返回这个列表
Observable.just(3, 2, 5, 4, 1) .toSortedList() //默认升序排序 .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3, 4, 5] } }); Observable.just(3, 2, 5, 4, 1) .toSortedList(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer2 - integer; //自定义排序规则(倒序) } }) .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[5, 4, 3, 2, 1] } });
-
toMap:将序列数据转换为一个Map,根据数据项生成key和value
Observable.just(1, 2, 3, 4) .toMap(new Func1<Integer, String>() { //根据数据项生成key,value为原始数据 @Override public String call(Integer integer) { return "key:" + integer; } }) .subscribe(new Action1<Map<String, Integer>>() { @Override public void call(Map<String, Integer> stringIntegerMap) { Log.d("debug", stringIntegerMap.toString()); //打印{key:4=4, key:2=2, key:1=1, key:3=3} } }); Observable.just(1, 2, 3, 4) .toMap(new Func1<Integer, String>() { //根据数据项生成key和value @Override public String call(Integer integer) { return "key:" + integer; } }, new Func1<Integer, String>() { @Override public String call(Integer integer) { return "value:" + integer; } }) .subscribe(new Action1<Map<String, String>>() { @Override public void call(Map<String, String> stringStringMap) { Log.d("debug", stringStringMap.toString()); //打印{key:4=value:4, key:2=value:2, key:1=value:1, key:3=value:3} } }); Observable.just(1, 2, 3, 4) .toMap(new Func1<Integer, String>() { //根据数据项生成key和value,创建指定类型的Map @Override public String call(Integer integer) { return "key:" + integer; } }, new Func1<Integer, String>() { @Override public String call(Integer integer) { return "value:" + integer; } }, new Func0<Map<String, String>>() { @Override public Map<String, String> call() { return new LinkedHashMap<String, String>(); //LinkedHashMap保证存取顺序相同 } }) .subscribe(new Action1<Map<String, String>>() { @Override public void call(Map<String, String> stringStringMap) { Log.d("debug", stringStringMap.toString()); //打印{key:1=value:1, key:2=value:2, key:3=value:3, key:4=value:4} } });
-
toMultiMap:类似toMap,不同的地方在于map的value是一个集合,使一个key可以映射多个value,多用于分组
Observable.just(1, 2, 1, 4) .toMultimap(new Func1<Integer, String>() { //根据数据项生成key,value为原始数据 @Override public String call(Integer integer) { return "key:" + integer; } }) .subscribe(new Action1<Map<String, Collection<Integer>>>() { @Override public void call(Map<String, Collection<Integer>> stringCollectionMap) { Log.d("debug", stringCollectionMap.toString()); //打印{key:4=[4], key:2=[2], key:1=[1, 1]} } }); Observable.just(1, 2, 1, 4) .toMap(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "key:" + integer; } }) .subscribe(new Action1<Map<String, Integer>>() { @Override public void call(Map<String, Integer> stringIntegerMap) { Log.d("debug", stringIntegerMap.toString()); //打印{key:4=4, key:2=2, key:1=1} } });
变换操作
-
map:对Observable发射的每一项数据都应用一个函数进行变换
Observable.just(1, 2, 3, 4) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "item:" + integer; } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //打印item:1,item:2,item:3,item:4 } });
-
cast:在发射之前强制将Observable发射的所有数据转换为指定类型(父类强转为子类)
List list = new ArrayList(); Observable.just(list) .cast(ArrayList.class) //将List强转为ArrayList .subscribe(new Action1<ArrayList>() { @Override public void call(ArrayList arrayList) { } });
-
flatMap:将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部使用marge合并(可用于一对多转换或多对多转换,也可用于网络请求的嵌套)
Observable.just(1, 2, 3) .flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer integer) { return Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(integer * 10); subscriber.onNext(integer * 100); subscriber.onCompleted(); } }); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印10,100,20,200,30,300 } });
-
flatMapIterable:和flatMap作用一样,只不过生成的是Iterable而不是Observable
Observable.just(1, 2, 3) .flatMapIterable(new Func1<Integer, Iterable<Integer>>() { @Override public Iterable<Integer> call(Integer integer) { return Arrays.asList(integer * 10, integer * 100); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印10,100,20,200,30,300 } });
concatMap:类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射
-
switchMap:和flatMap很像,将Observable发射的数据变换为Observables集合,当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable
Observable.interval(0, 500, TimeUnit.MILLISECONDS) //每500毫秒发射一次 .take(4) .switchMap(new Func1<Long, Observable<String>>() { @Override public Observable<String> call(Long aLong) { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext(aLong + "A"); SystemClock.sleep(800); //延迟800毫秒 subscriber.onNext(aLong + "B"); subscriber.onCompleted(); } }).subscribeOn(Schedulers.newThread()); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //打印0A,1A,2A,3A,3B } });
-
与reduce很像,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
Observable.just(1, 2, 3, 4) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { Log.d("debug", "integer1:" + integer + ",integer2:" + integer2); return integer + integer2; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", "result:" + integer); } }); /** * 日志输出 * result:1 * integer1:1,integer2:2 * result:3 * integer1:3,integer2:3 * result:6 * integer1:6,integer2:4 * result:10 */
-
groupBy:将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每个Observable发射一组不同的数据(类似于toMultiMap)
Observable.just(1, 2, 3, 4) .groupBy(new Func1<Integer, String>() { @Override public String call(Integer integer) { //根据数据项生成key return integer % 2 == 0 ? "偶数" : "奇数"; } }) .subscribe(new Action1<GroupedObservable<String, Integer>>() { @Override public void call(GroupedObservable<String, Integer> o) { o.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", o.getKey() + ":" + integer); //打印奇数:1,偶数:2,奇数:3,偶数:4 } }); } }); Observable.just(1, 2, 3, 4) .groupBy(new Func1<Integer, String>() { @Override public String call(Integer integer) { //根据数据项生成key return integer % 2 == 0 ? "偶数" : "奇数"; } }, new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { //根据数据项生成value return integer * 10; } }) .subscribe(new Action1<GroupedObservable<String, Integer>>() { @Override public void call(GroupedObservable<String, Integer> o) { o.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", o.getKey() + ":" + integer); //打印奇数:10,偶数:20,奇数:30,偶数:40 } }); } });
-
buffer:定期从Observable收集数据到一个集合,然后将这些数据集合打包发射
Observable.just(1, 2, 3, 4, 5) .buffer(3, 1) //count:表示从当前指针位置开始打包3个数据项到集合中,skip:表示指针向后移1位, .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", "skip" + integers.toString()); //打印[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5] } }); Observable.just(1, 2, 3, 4, 5) .buffer(3) //每3个打包成一个集合,内部就是.buffer(3,3) .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3],[4, 5] } }); Observable.interval(0, 100, TimeUnit.MILLISECONDS) .take(5) .buffer(250, TimeUnit.MILLISECONDS, 2) //将每250毫秒内发射的数据收集到多个集合中,每个集合最多存放2个数据 .subscribe(new Action1<List<Long>>() { @Override public void call(List<Long> longs) { //打印[0, 1],[2],[3, 4],[] Log.d("debug", "count:" + longs.toString()); } }); Observable.interval(0, 100, TimeUnit.MILLISECONDS) .take(5) .buffer(250, TimeUnit.MILLISECONDS) //将每250毫秒内发射的数据收集到一个集合中,集合不限制大小 .subscribe(new Action1<List<Long>>() { @Override public void call(List<Long> longs) { Log.d("debug", longs.toString()); //打印[0, 1, 2],[3, 4] } }); Observable.interval(0, 100, TimeUnit.MILLISECONDS) .take(5) //从指定时间节点开始,将该节点后250毫秒内发射的数据收集的一个集合中,初始节点为0,每发射一次集合, //节点的时间增加150毫秒,即下一次收集数据从150毫秒开始,收集150毫秒到400毫秒之间发射的数据 .buffer(250, 150, TimeUnit.MILLISECONDS) .subscribe(new Action1<List<Long>>() { @Override public void call(List<Long> longs) { Log.d("debug", longs.toString()); //打印[0, 1, 2],[2,3],[4] } });
-
window:定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项,类似于buffer,buffer发射的是集合,而window发射的是Observable
Observable.just(1, 2, 3, 4, 5) .window(3, 1) .subscribe(new Action1<Observable<Integer>>() { @Override public void call(Observable<Integer> integerObservable) { integerObservable.toList() //将所有数据搜集成一个集合,便于观察 .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5] } }); } }); Observable.just(1, 2, 3, 4, 5) .window(3) //相当于window(3,3) .subscribe(new Action1<Observable<Integer>>() { @Override public void call(Observable<Integer> integerObservable) { integerObservable.toList() //将所有数据搜集成一个集合,便于观察 .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3],[4, 5] } }); } }); //剩下其余重载方法也与buffer基本一样,不重复了