RxJava简介
RxJava的github地址
https://github.com/ReactiveX/RxJava
RxJava的英文wiki
https://github.com/ReactiveX/RxJava/wiki
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
简单来说,RxJava是利用可观察序列和操作符来编写异步和基于事件的程序。RxJava源于ReactiveX库,简称Rx库,是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。目前为止,Rx已经覆盖了几乎全部流行的编程语言。ReactiveX文档中文翻译
Android对于异步线程有着严格的管理机制,UI的更新只能在主线程中进行,而耗时操作及网络请求则是在主线程中严禁执行。谷歌官方给出了Handler,AsyncTask等解决方法,但实际运用下来,效果并不是非常好,尤其当异步操作比较复杂时,开发者会十分头痛于繁多的逻辑代码,以及时不时出现的内存泄露。而RxJava提供了一种十分优雅的处理异步线程的方式,以数据流的方式将复杂的runnable转化为清晰简便的数据流。
- 函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态。数据流的形式十分利于功能扩展。
- 简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码。配合java的lambda可以极大的减少代码量以及实现代码的简洁性。
- 异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制。
- 轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题。
RxJava的入门
RxJava的兴起也有一年多的时间了,RxJava已经成为了众多Android项目必备的第三方依赖库,配合Retrofit的使用已经替代了绝大多数包括volley在内的网络请求库。但RxJava的学习曲线比较陡峭,这里推荐一篇非常的RxJava入门级文章给 Android 开发者的 RxJava 详解。
RxJava的使用场景
RxJava本质是一个方便实现异步操作的库,所以说RxJava最适用的场景就是需要频繁切换线程的操作,所以说它是十分适用于Android开发的。
1.与Retrofit的开发结合
Retrofit的github地址
https://github.com/square/retrofit
Retrofit 是 Square 的一个非常著名的网络请求库。他即提供传统的Callback接口回调模式的API,又提供了基于RxJava库Observable回调的�API。前者是volley及自定义网络框架中经常使用的回调方式,而后者则是Retrofit为配合RxJava的特殊定义的回调方式,也是Retrofit库现在被广泛采用的原因之一(还包括十分方便的接口定义,网络配置和数据转换)。
Retrofit本身就是一个非常牛逼的第三方开源库,可以极大�简化网络请求。现在已经发布了Retrofit2.0版本,这是Retrofit两种不同的接口定义方式:
interface GitHubService {
@GET("/repos/{owner}/{repo}/contributors")
Call<List<Contributor>> repoContributors(
@Path("owner") String owner,
@Path("repo") String repo);
);
@GET("/repos/{owner}/{repo}/contributors")
Observable<List<Contributor>> repoContributors2(
@Path("owner") String owner,
@Path("repo") String repo);
);
}
第一种方法想要异步实现网络请求
Call<List<Contributor>> call =
gitHubService.repoContributors("square","retrofit");
call.enqueue(new Callback<List<Contributor>>() {
@Override void onResponse(/* ... */) {
// ...
}
@Override void onFailure(Throwable t) {
// ...
}
});
需要在异步请求时实现一个回调接口Callback,在对Response做复杂逻辑处理时往往会产生许多杂乱无章的代码,对于团队协作及后期的维护改进是十分致命的。
而二种方法返回的是RxJava的Observable对象,我们不用在考虑是否 new Thread()去处理一些耗时操作,比如说数据库的添加和比对。因为RxJava有完善的线程切换机制,可以通过线程调度器Scheduler
实现线程的切换。
-
Schedulers.immediate()
直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。 -
Schedulers.newThread()
总是启用新线程,并在新线程执行操作. -
Schedulers.io()
I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和newThread()差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。 -
Schedulers.computation()
计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。 - 还有RxAndroid里面专门提供了
AndroidSchedulers.mainThread()
,它指定的操作将在 Android 主线程运行。
Observable<List<Contributor>> observable =
gitHubService.repoContributors2("square","retrofit");
observable.subscribeOn(Schedulers.io()) //io线程网络请求
.doOnNext(list -> {/.../}) //耗时操作,数据库读写
.observeOn(AndroidSchedulers.mainThread()) //主线程更新UI
.subscribe(list -> {/.../});
代码简洁明了,逻辑一目了然。
当然你可以轻松的做业务逻辑的扩展,相对list的非空校验,只需要在subscribe
订阅前添加filter(�list -> list != null)
。
如果更深入,你也可以加上统一的错误处理,实现全局的Token过期自动刷新。RxJava基于其各种各样的操作符可以帮助你实现绝大多数逻辑,而且更加简洁,易于理解。
2.与RxBinding的开发结合
RxBinding的github地址
https://github.com/JakeWharton/RxBinding
RxBinding的作者就是大名鼎鼎的JakeWharton,这位大神是RxAndroid、Retorfit及okhttp的主要贡献者,自己则开发了 RxBinding 及Butterknife。就是把发布-订阅模式用在了Android控件的点击,文本变化上。通过RxBinding把点击监听转换成Observable之后,就有了对它进行扩展的可能
这里提供两个使用案例:
- 按钮需要防抖功能:
RxView.clicks(findViewById(R.id.btn_throttle))
.throttleFirst(1, TimeUnit.SECONDS)//一秒钟之内只取一个点击事件,防抖操作
.subscribe(aVoid -> {
Toast.makeText(this,"�click",Toast.LENGTH_SHORT).show();
});
- 搜索栏监听输入变化自动请求搜索结果:
RxTextView.textChanges(findViewById(R.id.edit_search))
.debounce(300, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())//每300毫秒采样1次
.distinctUntilChanged()//关键词去重
.doOnNext(s -> showProgress())//展示加载动画
.observeOn(Schedulers.io())//切换线程,网络请求
.switchMap(Observable::just)//当Observable发射一个新的数据项时,如果�上一次订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项
.flatMap(s -> {//通过Retrofit请求网络接口})
.retry((integer, throwable) -> throwable instanceof InterruptedIOException)//中断自动重试
.subscribeOn(AndroidSchedulers.mainThread())//切换到主线程准备更新UI
.filter(response ->{//必要的数据校验})
.subscribe(response ->{//搜索结果的展示});
另外,在这里推荐一个使Subscription与组件的生命周期同步的框架RxLifecycle,当然你也可以在Activity或者Fragment的onDestroy()中取消订阅防止内存溢出,但这种方法明显没有RxLifecycle灵活。
3.RxBus的使用原理
RxBus是使用RxJava实现类似于otto、eventbus等开源库提供的功能,它的代码量极少却能完成绝大多数后者的使用场景。
RxBus的实现主要依赖于RxJava的Subject类,对于Subject的定义:
Subject在ReactiveX是作为observer和observerable的一个bridge或者proxy。因为它是一个观察者,所以它可以订阅一个或多个可观察对象,同时因为他是一个可观测对象,所以它可以传递和释放它观测到的数据对象,并且能释放新的对象。
Subject根据用途的不同设计了四种子类,分别为AsyncSubject、BehaviorSubject、ReplaySubject和PublishSubject。
- AsyncSubject
AsyncSubject仅释放Observable释放的最后一个数据,并且仅在Observable完成之后。然而如果当Observable因为异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。 - BehaviorSubject
当Observer订阅了一个BehaviorSubject,它一开始就会释放Observable最近释放的一个数据对象,当还没有任何数据释放时,它则是一个默认值。接下来就会释放Observable释放的所有数据。如果Observable因异常终止,BehaviorSubject将不会向后续的Observer释放数据,但是会向Observer传递一个异常通知。 - ReplaySubject
不管Observer何时订阅ReplaySubject,ReplaySubject会向所有Observer释放Observable释放过的数据。 - PublishSubject
PublishSubject仅会向Observer释放在订阅之后Observable释放的数据。
通常而言,我们的RxBus将会使用的是PublishSubject,即我们只需要接受我们订阅了之后的数据,但可以根据不同的业务场景采用不同的Subject来实现我们所需要的订阅逻辑。下面提供一种基于PublishSubject的RxBus实现。
public class RxBus {
private static volatile RxBus defaultInstance;
private final Subject bus; // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
/**
* 获取RxBus单例
*/
public static RxBus getDefault() {
RxBus rxBus = defaultInstance;
if (defaultInstance == null) {
synchronized (RxBus.class) {
rxBus = defaultInstance;
if (defaultInstance == null) {
rxBus = new RxBus();
defaultInstance = rxBus;
}
}
}
return rxBus;
}
// 提供了一个新的事件
public void post (Object o) {
bus.onNext(o);
}
// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public <T> Observable<T> toObserverable (Class<T> eventType) {
return bus.ofType(eventType);
}}
我们只需要在Activity或Fragment创建时订阅,就可以收到事件了。同时,对订阅的取消是十分重要的
4.Android耗时操作的RxJava运用
Android中又许多耗时操作场景,比如数据库的读写,大容量的图片处理保存以及文件的压缩/解压工作。但由于Android的机制,主线程无法处理长耗时的操作,所以这类操作往往需要在异步线程中进行,完毕后在主线程更新。这些都可以使用RxJava处理。
我们可以在Github上面轻易地搜出许许多多的RxJava处理Android耗时操作的库,包括用RxJava处理和操作高斯模糊效果的RxBlur,用于下载操作的rxloader,用于数据库操作的RxCupboard等等。而这些通常是对现有的AndroidAPI进行针对RxJava的封装,使得这些操作具有更好的拓展性,而不是在回调接口中书写大段冗余复杂的代码,你也完全书写定义自己的RxJava逻辑。