RxJava——目前最热门的响应式函数编程框架。
本文主要总结了笔者在项目中使用到的RxJava的场景,部分例子参考自网络
[笔者仍为Android初学者。如有解释错误的地方,欢迎评论区指正探讨]
本文主要介绍一些RxJava的使用场景,为了完整介绍,本文不使用Retrofit或者将RxJava进行简单的封装,原汁原味。
当然,如果结合Retrofit,下述的代码会简洁很多。
单个网络请求数据并更新UI
这个比较简单,整个流程大致是:
- 通过
Obsrvable.create
方法,调用OkHttp
网络请求 - 通过
map
方法结合gson
,将response
转换为bean
类 - 通过
onNext
,解析bean
中数据,并进行数据库存储 - 调度线程
- 通过
subscribe
,根据请求成功或异常来更新UI
Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, Bean>() {
@Override
public Bean apply(@NonNull Response response) throws Exception {
//Gson
}
}).doOnNext(new Consumer<Bean>() {
@Override
public void accept(@NonNull Bean bean) throws Exception {
//saveData
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bean>() {
@Override
public void accept(@NonNull Bean bean) throws Exception {
//refresh UI
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//get ERROR
}
});
多个网络请求依次依赖
这里主要是依赖于flatMap
关键字,FlatMap
可以将一个发射数据的Observable
变换为多个Observables
,然后将它们发射的数据合并后放进一个单独的Observable
。
利用这个特性,我们可以将Observable
转成另一个Observable
Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, FirstBean>() {
@Override
public FirstBean apply(@NonNull Response response) throws Exception {
//Gson
}
}).flatMap(new Function<FirstBean, ObservableSource<Response>>() {
@Override
public ObservableSource<Response> apply(@NonNull FirstBean bean) throws Exception {
final String s = bean.getData();
return Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url/" + s)
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
});
}
}).map(new Function<Response, SecondBean>() {
@Override
public SecondBean apply(@NonNull Response response) throws Exception {
//Gson
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<SecondBean>() {
@Override
public void accept(@NonNull SecondBean secondBean) throws Exception {
//refresh UI
}
});
先读取缓存数据并展示UI再获取网络数据刷新UI
这里需要依赖另一个操作符:Concat
concat
可以做到不交错的发射两个或多个Observable
的发射物,并且只有前一个Observable
终止(onComleted
)才会订阅下一个Obervable
利用这个特性,我们就可以依次的读取缓存数据展示UI,然后再获取网络数据刷新UI
- 首先创建一个从cache获取数据的observable
- 再创建一个从网络获取数据的Observable(可以通过map等方法转换数据类型)
- 通过concat方法将多个observable结合起来
- 通过subscribe订阅每一个observable
Observable<List<String>> cache = Observable.create(new ObservableOnSubscribe<List<String>>() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
CacheManager manager = CacheManager.getInstance();
List<String> data = manager.query();
e.onNext(data);
//一定要有onComplete,不然不会执行第二个Observale
e.onComplete();
}
});
Observable<List<String>> network = Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
e.onComplete();
}
}).map(new Function<Response, List<String>>() {
@Override
public List<String> apply(@NonNull Response response) throws Exception {
//解析数据
}
});
//两个observable的泛型应该保持一致
Observable.concat(cache, network)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(@NonNull List<String> strings) throws Exception {
//refresh ui
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//get error
}
});
获取网络数据前先读取缓存
其实和上面的那种类似,只需要稍微修改一下逻辑即可:
当缓存的Observable
获取到数据时,只执行onNext
,获取不到则只执行onComplete
Observable<String> cache = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
CacheManager manager = CacheManager.getInstance();
String data = manager.queryForPosition(0);
if (data != null) {
e.onNext(data);
} else {
//调用onComplete之后会执行下一个Observable
//如果缓存为空,那么直接结束,进行网络请求
e.onComplete();
}
}
});
Observable<String> network = Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
e.onComplete();
}
}).map(new Function<Response, String>() {
@Override
public String apply(@NonNull Response response) throws Exception {
//解析数据
}
});
//两个observable的泛型应该保持一致
Observable.concat(cache, network)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String strings) throws Exception {
//refresh ui
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//get error
}
});
当然,有的时候我们的缓存可能还会分为memory
和disk
,无差,只需要多写一个Observable
然后一样通过concat
合并即可。
结合多个接口的数据再更新UI
这个时候就需要靠zip
方法啦,zip
方法可以将多个Observable
的数据结合为一个数据源再发射出去。
Observable<FirstBean> firstRequest = Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("firstUrl")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
e.onComplete();
}
}).map(new Function<Response, FirstBean>() {
@Override
public FirstBean apply(@NonNull Response response) throws Exception {
//解析数据
}
});
Observable<SecondBean> secondRequest = Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("secondUrl")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
e.onComplete();
}
}).map(new Function<Response, SecondBean>() {
@Override
public SecondBean apply(@NonNull Response response) throws Exception {
//解析数据
}
});
Observable.zip(firstRequest, secondRequest, new BiFunction<FirstBean, SecondBean, WholeBean>() {
@Override
public WholeBean apply(@NonNull FirstBean firstBean, @NonNull SecondBean secondBean) throws Exception {
//结合数据为一体
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<WholeBean>() {
@Override
public void accept(@NonNull WholeBean strings) throws Exception {
//refresh ui
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//get error
}
});
当然,如果你的两个api返回的是相同类型的数据,那么可以直接使用merge
将数据合并,而不需要实现回调。
减少频繁的网络请求
设想一种场景:点击一次button就进行一次网络请求,或者当输入框数据变化时进行网络请求,那么这样就会在一下子产生大量的网络请求,但实际上又没有必要,这个时候就可以通过debounce
方法来处理,debounce
操作符会过滤掉发射速率过快的数据项:
为了方便处理点击事件
和Observable
的关系,我们引入RxBinding处理:
RxView.clicks(mButton)
.debounce(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
// refresh ui
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
// get error
}
});