一.buffer操作符
buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅者,订阅者处理后,清空buffer列表,同时接收下一次收集的结果并提交给订阅者,周而复始。需要注意的是,一旦源Observable在产生结果的过程中出现异常,即使buffer已经存在收集到的结果,订阅者也会马上收到这个异常,并结束整个过程。
//定义邮件内容
final String[] mails = new String[]{"Here is an email!", "Another email!", "Yet another email!"};
//每隔1秒就随机发布一封邮件
Observable<String> endlessMail = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if (subscriber.isUnsubscribed()) return;
Random random = new Random();
while (true) {
String mail = mails[random.nextInt(mails.length)];
subscriber.onNext(mail);
Thread.sleep(1000); }
} catch (Exception ex) {
subscriber.onError(ex);
}
}
}).subscribeOn(Schedulers.io());
//把上面产生的邮件内容缓存到列表中,并每隔3秒通知订阅者
endlessMail.buffer(3, TimeUnit.SECONDS).subscribe(new Action1<List<String>>() {
@Override
public void call(List<String> list) {
System.out.println(String.format("You've got %d new messages! Here they are!", list.size()));
for (int i = 0; i < list.size(); i++)
System.out.println(list.get(i).toString());
}
});
打印出的结果:
You’ve got 3 new messages! Here they are!(after 3s)
Here is an email!
Another email!
Another email!
You’ve got 3 new messages! Here they are!(after 6s)
Here is an email!
Another email!
Here is an email!
……
二.flatMap操作符
flatMap操作符是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。flatMap操作符通过传入一个函数作为参数转换源Observable,在这个函数中,你可以自定义转换规则,最后在这个函数中返回一个新的Observable,然后flatMap操作符通过合并这些Observable结果成一个Observable,并依次提交结果给订阅者。
private Observable<File> listFiles(File f){
if(f.isDirectory()){
return Observable.from(f.listFiles())
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return listFiles(f);
}
});
} else {
return Observable.just(f);
}
}
@Override
public void onClick(View v) {
Observable.just(getApplicationContext().getExternalCacheDir())
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
//参数file是just操作符产生的结果,这里判断file是不是目录文件,如果是目录文件,则递归查找其子文件flatMap操作符神奇的地方在于,
//返回的结果还是一个Observable,而这个Observable其实是包含多个文件的Observable的,输出应该是ExternalCacheDir下的所有文件
return listFiles(file); }
}) .subscribe(new Action1<File>() {
@Override
public void call(File file) {
System.out.println(file.getAbsolutePath());
}
});
}
三.concatMap操作符
cancatMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。与flatMap操作符不同的是,concatMap操作符在处理产生的Observable时,采用的是“连接(concat)”的方式,而不是“合并(merge)”的方式,这就能保证产生结果的顺序性,也就是说提交给订阅者的结果是按照顺序提交的,不会存在交叉的情况。
四.switchMap操作符
switchMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。与flatMap操作符不同的是,switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果,举个例子来说,比如源Observable产生A、B、C三个结果,通过switchMap的自定义映射规则,映射后应该会产生A1、A2、B1、B2、C1、C2,但是在产生B2的同时,C1已经产生了,这样最后的结果就变成A1、A2、B1、C1、C2,B2被舍弃掉了!
//flatMap操作符的运行结果
Observable.just(10, 20, 30)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
int delay = 200;
if (integer > 10) delay = 180;
return Observable.from(new Integer[]{integer, integer / 2})
.delay(delay, TimeUnit.MILLISECONDS);
} })
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("flatMap Next:" + integer);
} });
//concatMap操作符的运行结果
Observable.just(10, 20, 30)
.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
int delay = 200;
if (integer > 10) delay = 180;
return Observable.from(new Integer[]{integer, integer / 2})
.delay(delay, TimeUnit.MILLISECONDS); } })
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("concatMap Next:" + integer);
} });
//switchMap操作符的运行结果
Observable.just(10, 20, 30)
.switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
int delay = 200;
if (integer > 10) delay = 180;
return Observable.from(new Integer[]{integer, integer / 2})
.delay(delay, TimeUnit.MILLISECONDS);
} })
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("switchMap Next:" + integer); } });
运行结果如下:
flatMap Next:20
flatMap Next:10
flatMap Next:30
flatMap Next:15
flatMap Next:10
flatMap Next:5
switchMap Next:30
switchMap Next:15
concatMap Next:10
concatMap Next:5
concatMap Next:20
concatMap Next:10
concatMap Next:30
concatMap Next:15
五.groupBy操作符
groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。值得注意的是,由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。因此,如果你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。