RxJava系列文章目录导读:
一、RxJava create操作符的用法和源码分析
二、RxJava map操作符用法详解
三、RxJava flatMap操作符用法详解
四、RxJava concatMap操作符用法详解
五、RxJava onErrorResumeNext操作符实现app与服务器间token机制
六、RxJava retryWhen操作符实现错误重试机制
七、RxJava 使用debounce操作符优化app搜索功能
八、RxJava concat操作处理多数据源
九、RxJava zip操作符在Android中的实际使用场景
十、RxJava switchIfEmpty操作符实现Android检查本地缓存逻辑判断
十一、RxJava defer操作符实现代码支持链式调用
十二、combineLatest操作符的高级使用
十三、RxJava导致Fragment Activity内存泄漏问题
十四、interval、takeWhile操作符实现获取验证码功能
一、concat操作符概述
从concatMap操作我们知道,concat操作符肯定也是有序的,而concat操作符是接收若干个Observables,发射数据是有序的,不会交叉。
下面看看官方文档和流程图的说明:
Returns an Observable that emits the items emitted by three Observables,
one after the other, without interleaving them.
[图片上传失败...(image-a2b544-1516513502783)]
二、使用示例
示例一
下面使用concat操作符
来实现个多数据源的例子,比如一个商品详情需要展示商品的信息、艺术家信息、与该商品类似的商品,可能需要访问三个接口。这个时候就可以使用concat操作符
。
卖家信息
private Observable<Object> sellerInfo = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
Seller d = goodsApi.getSeller();
if (d != null) {
subscriber.onNext(d);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
商品信息
private Observable<Object> goodsInfo = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
Goods d = goodsApi.getGoodInfo();
if (d != null) {
subscriber.onNext(d);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
类似作品推荐
private Observable<Object> relateGoods = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
List<Goods> d = goodsApi.getRelateGoods();
if (d != null) {
subscriber.onNext(d);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
Observable.concat(sellerInfo, goodsInfo, relateGoods)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object s) {
printLog(tvLogs, "Getting data from ", s);
//push data to ui
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
printLog(tvLogs, "Error: ", throwable.getMessage());
}
});
为什么泛型去设置成Object呢,因为Observable.concat只接受相同泛型的参数。
也许会有人问后去商品信息的时候没办法传递参数啊(比如商品ID), 在外面包一层方法即可,方法参数为goodID。
示例二
使用 concat操作符
对缓存进行检查,如:内存缓存、本地缓存、网络,那一层有数据立即返回。
使用一个数组来表示三个数据源:
String data[] = {null, null, "network"};
//String data[] = {null, "disk","network"};
//String data[] = {"memory", null,"network"};
//String data[] = {"memory", "disk",null};
//String data[] = {"memory", "disk","network"};
内存缓存
private Observable<String> memorySource = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String d = data[0];
printLog(tvLogs, "", "----start check memory data. value is null? " + (d == null));
if (d != null) {
subscriber.onNext(d);
}
subscriber.onCompleted();
}
});
本地缓存
private Observable<String> diskSource = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String d = data[1];
printLog(tvLogs, "", "----start check disk data. value is null? " + (d == null));
if (d != null) {
subscriber.onNext(d);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
网络
private Observable<String> networkSource = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String d = data[2];
printLog(tvLogs, "", "----start check network data. value is null? " + (d == null));
if (d != null) {
subscriber.onNext(d);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
从例子1我们知道,concat会三个数据源都会请求的。如何使得哪层有数据就用哪层的,之后就不走后面的逻辑了。
可以配合first()操作符
来实现这样的效果。
Observable.concat(memorySource, diskSource, networkSource)
.first()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
printLog(tvLogs, "Getting data from ", s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
printLog(tvLogs, "Error: ", throwable.getMessage());
}
});
需要注意的是如果memorySource, diskSource, networkSource返回的都null,那么会报一个异常:
java.util.NoSuchElementException: Sequence contains no elements......
可以使用takeFirst操作,即使都没有数据,也不会报异常。
Observable.concat(memorySource, diskSource, networkSource)
//first()-> if no data from observables will cause exception :
//java.util.NoSuchElementException: Sequence contains no elements
//takeFirst -> no exception
.takeFirst(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s != null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
printLog(tvLogs, "Getting data from ", s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
printLog(tvLogs, "Error: ", throwable.getMessage());
}
});
除了通过concat操作符来实现缓存逻辑,也可使用switchIfEmpty
,详细可以查看我的博客:
http://blog.csdn.net/johnny901114/article/details/52585912
本文的例子放在github上https://github.com/chiclaim/android-sample/tree/master/rxjava