RxJava
RxJava
Github的wiki的介绍:
RxJava
is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
大概意思是RxJava
是Java VM
上一个使用可观测序列来组成的一个异步的、基于事件的库。
从这么一句话,看来有两个关键点: ① 异步 ② 事件 ③可观测 。 概括性的话总是那么抽象,看了这句话,还是不清楚,RxJava
到底是干什么的。一个亘古不变的道理, 实践出真理。只有你用了,你才知道它是什么。
在我看来,RxJava
就是一个异步的扩展的观察者模式。区别于AsynTask
以及Handler的优点在于代码简洁,链式调用。但是注意啦,我这里说的代码简洁,可不是说它的代码量少,而是因为它逻辑清晰简洁,当你回顾代码的时候,可以方便的看出当初写的逻辑。而不像Handler,要找到发送message的地方,以及接受message的地方, 更有各种缩进让你一番好找,不方便代码的回溯。而RxJava
的链式调用,使整体代码结构看起来很清晰。
RxJava
的神奇之处不仅在于异步的观察者模式,更强大的在于它的操作符,它提供了大量的操作符来帮助你更好的解决复杂的逻辑。结合Retrofit
可以很方便的嵌套请求网络,解决类似当你要获取服务器上面的数据的时候,首先要登录,而登录之前要先获取服务器端的一个认证挑战字。代码如下:
InnerServerApi.requestLoginCode(mLoginCodeUrl, mAccount).subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
.flatMap(new Func1<LoginCode, Observable<AAAData>>() {
@Override
public Observable<AAAData> call(final LoginCode loginCode) {
Log.d(TAG, loginCode.getLoginCode() + "HAHAHA");
return InnerServerApi.requestLoginData(mAAALoginUrl, loginCode.getLoginCode(), mAccount, mPassword);
}
}).subscribeOn(Schedulers.io()).subscribe(new Subscriber<AAAData>() {
@Override
public void onCompleted() {
Logger.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "login:onFailure");
}
@Override
public void onNext(AAAData loginData) {
Log.d(TAG, "login:onSuccess:");
}
});
我们不说代码量是否多了,反正逻辑是清晰了,这才是我们追求的。 如果采用AsyncTask
避免不了的CallBack嵌套,嵌套到回溯起来,自己一时半会都反应不过来呢。 这就是RxJava迷人地方之一,也是最迷人的操作符。下面,我会讲一些比较通用的操作符。更多操作符也可以查看API,里面有图片为你展示事件序列经过操作符之后的流向。
下面就开始学习基本概念啦。。。
RxJava的四个关键
-
Observable
被观察者,事件的发出者。 -
Observer
观察者,被动观察事件的发出并做相应的处理。 通常我们都是用Subscriber
,Subscriber
是继承自Observer
的一个类,其实就算你实例化的是一个Observer对象,底层也是用Subscriber包裹了这个Observer对象来实现相应的逻辑的,所以可以直接使用Subscriber
。 - event 事件 被观察者发出的事件
- 订阅 (
Subscribe
),观察者订阅被观察者,即观察者监听被观察者的动态,一旦被观察者发出了什么动作就传送到了观察者那里,举个例子,就像我们关注某个微信公众号,关注这个动作就相当于 订阅, 然后微信公众号有新的消息推送,我们就可以看到。我们就是观察者,公众号就是被我们观察的对象。
通过这四个关键,更加说明了它是一个异步的扩展的观察者模式。
RxJava 的家长里短
RxJava的功能强大在于通过操作符变换,线程调度灵活的实现各种事件序列的处理,我觉得Observable的每一个方法都可以描述成一个操作符,下面就用操作符来说。
RxJava的观察者模式如图:
简单使用模板
使用RxJava
基本的步骤是下面三步:
创建被观察者
Observable
创建一个Observable的方法有很多个。其中一个就是create,有关于更多的创建操作符,创建型操作符 点这个可以了解到。
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
})
call
方法里面是被观察者订阅之后要向观察者发出的事件。
创建观察者
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.i(TAG, e.toString());
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, integer + "");
}
}
onCompleted
是整个事件序列结束之后调用的方法。在一个正确运行的事件序列中,onCompleted()
和 onError()
有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()
和 onError()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个,在自己创建的Observable里面应该注意控制。
onError
是当事件队列异常。在事件处理过程中出异常时,onError()
会被触发,同时队列自动终止,不允许再有事件发出。
onNext
,相当于监听按钮点击事件里面的onClick
,即接收到被观察者发出的事件时,观察者做处理的函数。
其实还有一个onStart()
方法,不过这个是Subscriber中新添加的,Observer里面是没有这个方法的。在 subscribe
刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart()
就不适用了,因为它总是在 subscribe
所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe()
方法
订阅
//被观察者,主动去订阅观察者,为了实现链式调用就设计成观察者主动订阅观察者。
observeable.subscribe(subscriber);
这样就完成了一个事件的订阅的流程。
线程控制Scheduler
在RxJava中我们可以通过Scheduler
来实现多线程,指定事件发送的线程,以及事件处理的线程。事件发送就是Observable
的call方法执行的代码,通过subscribeOn
我们可以指定call
方法执行的线程。事件处理即Subscriber
事件处理方法执行的线程,通过observerOn可以指定事件处理的线程。不过调用subscribeOn
或 observerOn
切换线程,没有再做切换,则此次事件一直处于最后一次切换的线程。
需要注意的是,①observerOn
可以多次调用并且有效,但SchedulerOn 虽然也可以多次调用,但是只有第一次是有效的。②默认的Observable
的产生事件,以及通知事件给Observer
是在同一个线程里面的,这个线程就是subscribe
所在的线程。③有些操作符是有默认的调度器的,可以查看官方文档。比如timer
默认就是computation Scheduler
.
RxJava内置Scheduler
-
Schedulers.immediate()
: 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。 -
Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。 -
Schedulers.io()
: I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。 -
Schedulers.computation()
: 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。 -
AndroidSchedulers.mainThread()
,它指定的操作将在 Android 主线程运行,如果希望Observer
的事件处理发生在主线程的话,就要调用observerOn(AndroidSchedulers.mainThread()).
常用的几个操作符
-
创建型操作符
from
github介绍:
from() — convert an Iterable, a Future, or an Array into an Observable
from(java.lang.Iterable<? extends T> iterable) /from(T[] array),Observable
可用于将传入的数组或者集合中拆分成具体的对象,分发下去。
ArrayList<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(0);
list.add(4);
Observable.from(list); //观察者依次接收到 1,2,0,4.
just
just() — convert an object or several objects into an Observable that emits that object or those objects
just(T t1, T t2··· T tn),其实just与from 差不多,也是将t1, t2··· tn依次分发下去。
Observable.just("Tom" , "John", "Mary");
timer
timer() — create an Observable that emits a single item after a given delay
常用的timer(long delay, java.util.concurrent.TimeUnit unit)
, delay
延迟的时间,unit
延迟的时间的单位。用于定时在一定时间之后,再分发事件,观察者接收到信号,做相应处理。相当于Handler里面的延迟发送。
Observable.timer(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
mThread = Thread.currentThread();
Log.i(TAG, "name2" +mThread.getName() + "pid" + mThread.getId());
}
});
interval
interval( ) — create an Observable that emits a sequence of integers spaced by a given time interval
interval(long interval, java.util.concurrent.TimeUnit unit)
,相当于Timer定时器,定时分发事件。与timer
的区别在于可以定时的重复分发事件。而timer只操作一次。repeatWhen
跟interval的作用是一样的。
- 过滤型操作符
filter
filter() — filter items emitted by an Observable
可以过滤事件,只有符合条件的事件才能被继续分发下去。
Observable.just(1,2,0,3).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//过滤掉等于0的事件
if(integer != 0) {
return integer;
}
return false;
}
})
last
last() — emit only the last item emitted by an Observable
只能发整个事件序列的最后一个事件。同理,first(),只分发整个事件序列的第一个事件。
-
强大的转换操作符
RxJava
中最强大的就是转换型操作符了,可以对Observable进行一些转换,做更多的操作,实现嵌套之类的逻辑。在我看来转换操作符的作用就是将一个Observable
转换成另外一个Observable
,我们这里假设只有两个Observable
,当订阅之后,第一个Observable
的call方法首先被调用,即第一个Observable
发送事件序列,第二个Observable
对第一个Observable发送出来的事件做处理(比如filiter对事件进行过滤)或者对第一个Observable
返回的数据类型进行处理转换成另外一个对象或者Observable
(比如Map以及flatMap)然后第二个Observable
开始发送事件序列,最后在Subscribe
里面进行处理。在官方的API里面有形象的"弹珠图"来演示事件发送的发展。可以点这里去看 ===> RxJava API
map
map
将一个对象转换成另外一个对象分发下去,返回的是一个Observable对象。
Observable.just(1,2,3,4).map(new Func1<Integer, String>() {
@Override
public String call(Integer i) {
Log.i(TAG, "integer" +i);
return s;
}
});
上述代码就是将Integer转换成String对象分发下去。
flatMap
flatMap
可以网络请求的嵌套,比如请求服务器要先申请到挑战字 再登录服务器,代码就可以如下使用:
InnerServerApi.requestLoginCode(mLoginCodeUrl, getSN()).subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
.flatMap(new Func1<LoginCode, Observable<AAAData>>() {
@Override
public Observable<AAAData> call(LoginCode loginCode) {
Log.d(TAG, loginCode.getLoginCode());
String displayId = android.os.Build.DISPLAY;
return InnerServerApi.requestLoginData(mAAALoginUrl,loginCode.getLoginCode(), "doris", mPassword);
}
// if error, retry ,3 times.
}).subscribeOn(Schedulers.io()).retry(3).subscribe(new Subscriber<AAAData>() {
@Override
public void onCompleted() {
Logger.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.toString());
Log.i(TAG, "login error");
}
@Override
public void onNext(AAAData loginData) {
Log.i(TAG, "login Success");
}
});
map
与 flatMap
的区别在于map
是一对一,而且map是直接返回一个对象,而flatMap
则是返回Observable
,用于分发事件。flatMap
的一对多的体现看下面这段代码:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
这段代码,先通过第一个Observable
把一个个目录发送出去,然后通过flatMap
再把目录中的文件一个个分发下去。这是map做不到的。
compose
对Observable
进行一个整体的变化,flatMap
只是对接收到的事件一个一个的转换,而compose
是对整个Observable
做一些处理。
//自定义一个转换器
final Observable.Transformer schedulersTransformer = new Observable.Transformer() {
@Override
public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
//这样就对observable进行了上面call方法里面的操作。
observable.compose(schedulersTransformer);
- 还有很多
zipWith
,merge
等这些合并Observable
,可以看这里Combing操作符
-
错误处理操作符
retry
retry(long count)
常用的retry方法,当发生错误时,retry count次,比如我们网络请求的时候失败了,我们可以重新请求三次,retry(3).
onErrorResumeNext
当发生错误的时候,调用这个操作符里面的方法,将当前的Observable
转换成另一个Observable
继续发送事件。应用场景,当我们请求当前服务器失败的时候,可以选择一个备用的服务器地址重新请求数据。
RxJava 与 Retrofit 结合实例
Retrofit
与RxJava
结合可以更好的完成网络请求。不了解Retrofit
的,赶紧去学习吧,Retrofit
的底层使用OkHttp
完成网络请求。下面给大家讲讲,我用到的网络请求的例子以及踩过的坑。
-
从服务器获取文件,失败的话重试
/** downloadFile是返回的是一个Observable对象,picUrl是要下载的图片url,我将这个事件发送在IO线程中执行。
retry(3),如果失败的话,重试三次,下载文件
map 将得到的数据写入到文件中。
**/
downloadFile(picurl).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).retry(3).filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String picUrl) {
//调用filter,过滤掉null的Url,如果picUrl是空的话,那么就不下载
return !TextUtils.isEmpty(picUrl) && !picUrl.equals("null");
}
}).map(new Func1<ResponseBody, String>() {
@Override
public String call(ResponseBody responseBody) {
return writeToFile(responseBody, mFileName);
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Logger.i(TAG, "testAAA downloadCompleted");
}
@Override
public void onError(Throwable e) {
Logger.i(TAG, "testAAA downloadFail" + e.toString());
}
@Override
public void onNext(String s) {
//返回 下载的文件存放路径
Logger.i(TAG, "onNext" + s);
}
});
在上述代码中,通过filter
过滤掉了不合理的url
,其实也可以添加一个判断是否是一个Http网址的判断,你可以自己试试。并且通过map
直接将下载到的数据,存储到了文件当中,而且如果下载过程中出现类似的 socket timeout
错误,可以通过,retry
重新请求,一个简单的链式调用就下载文件并保存的逻辑写好了。 So easy! 必须给RxJava
怒赞!!!
-
嵌套网络请求
InnerServerApi.requestLoginCode(mLoginCodeUrl, mAccount).subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
.flatMap(new Func1<LoginCode, Observable<AAAData>>() {
@Override
public Observable<AAAData> call(final LoginCode loginCode) {
Log.d(TAG, loginCode.getLoginCode() + "HAHAHA");
return InnerServerApi.requestLoginData(mAAALoginUrl, loginCode.getLoginCode(), mAccount, mPassword);
}
}).subscribeOn(Schedulers.io()).subscribe(new Subscriber<AAAData>() {
@Override
public void onCompleted() {
Logger.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "login:onFailure");
}
@Override
public void onNext(AAAData loginData) {
Log.d(TAG, "login:onSuccess:");
}
});
解释一下,requestLoginCode
是通过Retrofit
定义的一个返回Observable
对象的一个方法,获取我要登录服务器需要用到的 验证字段, 得到验证字段之后,通过flatMap
直接通过得到的LoginCode
调用登录方法requestLoginData
,requestLoginData
返回的也是一个Observable对象,然后订阅,最后获取到登录返回的数据。这样说可能大家不明白,我说说我这个代码的应用场景,你要登录一个服务器,首先你要向他申请一个跟你的账号相关的 验证码,然后再通过验证码,以及账号密码登录。 就想你要吃苹果,那么你要先买个水果刀,然后才能切水果最后吃到水果。 大家可以回想一下,没有RxJava
之前,利用AsyncTask
我们都是如何完成这个逻辑的,首先获取到LoginCode,然后再Callback的onSuccess
方法中再调用一个AsyncTask
请求,然后就出现了很多迷之缩进,相比之下,RxJava
简直是飞流直下,逻辑明了清晰。不相信的机智的你,可以写一段对比一下。 我这里就不贴出来的。
-
错误处理 onErrorResumeNext
在应用开发中,会遇到这样的需求,有几个备份的网络地址,当你请求第一个网络地址不成功的时候,你想要用备份的网络地址。RxJava
可以很方便的,帮你捕捉到错误,并且用备份的地址,重新请求。 就获取网络时间的代码,来举个例子吧。
Observable.create(new Observable.OnSubscribe<Date>() {
@Override
public void call(Subscriber<? super Date> subscriber) {
try {
//第一次从百度上面获取网络时间
mTimeUrl = "http://www.baidu.com/";
Logger.d(TAG, "mTimeUrl" + mTimeUrl);
Date date = GetNetworkTime.getWebsiteDate();
Logger.d(TAG, "call date" + date);
subscriber.onNext(date);
subscriber.onCompleted();
} catch (IOException e) {
//如果网络超时则调用onError,触发onErrorResumeNext通过备份的地址 http://www.beijing-time.org 获取网络时间。
subscriber.onError(e);
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).onErrorResumeNext(new Func1<Throwable, Observable<? extends Date>>() {
@Override
public Observable<? extends Date> call(Throwable throwable) {
return Observable.create(new Observable.OnSubscribe<Date>() {
@Override
public void call(Subscriber<? super Date> subscriber) {
//通过http://www.beijing-time.org 请求时间,如果这次访问失败,则获取网络时间失败
mTimeUrl = "http://www.beijing-time.org";
try {
Date date = GetNetworkTime.getWebsiteDate();
Logger.d(TAG, "call1" + date);
subscriber.onNext(date);
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
});
}
}).subscribe(new Observer<Date>() {
@Override
public void onCompleted() {
Logger.d(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Date date) {
Logger.d(TAG, "call3" + date);
netWorkCallback.doSometing(date);
}
});
在上述代码中,当第一个Observable
发送获取网络时间的时候超时的时候,我们自己调用OnError
方法,从而onErrorResumeNext
拦截到错误并且将当前的Observable
转换成另外一个Observable
,通过这个新的Observable
继续发送新的事件,这里的新的事件就是通过备份的url,再次获取网络是事件。这个就可以完成多个地址切换请求网络数据,当第一个地址不成功,换第二个地址。这在做应用开发时也经常用到。我觉得挺实用的,当然onErrorResumeNext
的应用场景还有很多,就等你慢慢发掘吧。
总结
在用RxJava的时候,要注意当前的操作是哪个线程。注意不要把应该在UI线程操作的放在了子线程,也不要把大量的操作放在主线程。
在
Observable
发送一串事件序列的时候,如果其中有一个出错了,那么接下来的事件,观察者都不会接收到。即OnError
是整个事件结束的标志,如果出错了,并且没有做什么出错处理,那么就直接调用了OnError
,结束整个事件。 所以这意味着,你利用RxJava
发送了一个下载十个文件的事件序列时,如果其中有一个文件下载失败,其他的文件就停止下载了。这个时候你只能用,for循环,然后一个一个下载,方便控制。
参考链接
最后的最后,我说的哪里不对的或者有什么问题,欢迎留言,大家共进步。O