前言
开始接触并且尝试的去应用Rxjava是从抛物线的这篇文章给 Android 开发者的 RxJava 详解开始的,陆陆续续的我也看了两三遍,也越来越发的发现Rxjava真的非常非常的强大。
到目前为止,越来越多的的开发者开发者开始使用Rxjava,github上的Rxjava项目也不知道有多少,而且MVP+Rxjava+Retrofit好像是很多开源项目的标准配置(😄😄),所有,我也不能落后,这不,我也在公司的项目中慢慢的去使用Rxjava,而且现在应用的场景也越来越多。
多余的话也不说了,就说说我从环境配置到使用的一步一过程,一边是我自己的一个笔记,一边是给那些初学者一点思路。
Rxjava介绍
开始Rxjava之前,首先我们要弄清楚Rxjava到底是一个什么东东。
Rxjava中github主页上的介绍是:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
翻译过来就是:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
结合我使用的过程,使用这两个词可以概括“异步”,“逻辑简洁”
引入
compile 'io.reactivex:rxjava:1.1.6'
compile 'io.reactivex:rxandroid:1.2.1'
查看引入最新版本可以去github主页
如果中项目中使用Rxjava+Retrofit结合,同时要引入
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
使用场景
由于Rxjava使用基础中很多文章中都有写,再写也都是一样的,我这里就用一些具体的场景来介绍我使用的过程
开始之前推荐两片文章,也是我看了多次觉得比较好的
给 Android 开发者的 RxJava 详解
Rxjava从入门到出轨
废话讲的比较多,开始具体场景
场景一:延迟处理
几乎每一个项目都会有引导也,之前我们通常使用的方式是Handler延迟处理,那切换到Rxjava呢?如何实现?
这里会用到Rxjava的timer操作符,timer的具体介绍再这里也不过的的说了,可以到ReactiveX/RxJava文档中文版或者github查看
回归到延迟操作,具体实现如下
Observable
.timer(2, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//TODO
}
});
上看我们用到的是Rxjava不完整回调,而Rxjava定义来三中不完整回到
- observable.subscribe(onNextAction);
- observable.subscribe(onNextAction, onErrorAction);
- observable.subscribe(onNextAction, onErrorAction, onCompleteAction);
特别要注意的是,如果你无法把握Rxjava事件的订阅消费流程,最好是使用完整的回调
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
之所以在这里强调这个,是因为我在使用的过程,有事场景需要处理结果,而我直接使用的observable.subscribe(onNextAction)的不完整调用,而忽略了可能出现的异常情况,或者其他不确定的情况而导致报错闪退,具体错误
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:818)
场景二:短信定时器
现在的应用基本上都是手机注册+第三方登录,所有手机短信验证是必不可少的,而发送短信的过程我们要使用定时器来处理短信接收过程以及发送失败的情况,Rxjava之前我们使用的是Handler+Timer+TimerTask,这情方式的麻烦程度就不用我多说了,下面我们来看看Rxjava的是怎么实现的。。。
直接撸代码:
subscription = Observable.interval(0, 1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long increaseTime) {
return countTime - increaseTime.intValue();
}
})
.take(30)
.doOnSubscribe(new Action0() {
@Override
public void call() {
LogUtils.e("================doOnSubscribe");
timeText.setVisibility(View.VISIBLE);
failLayout.setVisibility(View.GONE);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
LogUtils.e("================doOnUnsubscribe");
stopSubScribe();
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
timeText.setVisibility(View.GONE);
failLayout.setVisibility(View.VISIBLE);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
LogUtils.e("================"+integer);
timeText.setText(Html.fromHtml(String.format(getResources().getString(R.string.verify_code_time), "<font color=\"#3d9bf5\">" + integer + "秒</font>")));
}
});
private void stopSubScribe() {
if (subscription != null) {
subscription.unsubscribe();
}
}
下面我们来分析一下上面的代码,首先是是interval(间隔一定时间发送一个数字,从0开始.本身运行在Schedulers.computation() 线程内)操作符,它是定时器使用的主要操作符,接下来observeOn说明了定时操作发生在主线程,然后就是map操作符执行了时间减1的操作,接下来是take(只保留前面的若干项数据)操作符,我们设计定时器不可能让它一直走下去,肯定是到位置执行我们制定的操作的,所以这里的take就非常重要了,从它给的意思我们能理解在这里设置take为30表示定时器执行到30秒回自动结束,所以,interval和take结合真的很完美,接下来我们看另外两个操作符doOnSubscribe,doOnUnsubscribe,它们会在Subscriber进行订阅和反订阅的时候触发回调,既是开始时我们调用doOnSubscribe,结束时调用doOnUnsubscribe。最后我们在完成的时候调用了自定义的stopSubScribe方法,用在这里的目的是为了让subscription 使用完成了就解绑订阅,不占用系统资源,同时也避免内存溢出,所有,使用完subscription 后要尽可能的把他解绑释放。
场景三:多接口调用
有些功能无法一个接口能实现了,比如我们上传一张图片到七牛,首先我们获取七牛token,上后根据token上传图片到七牛云服务器得到图片url,最后把图片url提交到我们自己的数据库中,这个场景中我们相当于执行了三次操作,那我们怎样使用rxjava去实现呢?
直接上代码
Subscription subscription = qiniuRep.getQiniuToken()
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(final String token) {
return qiniuRep.uploadImageToQiniu(token, path);
}
})
.flatMap(new Func1<String, Observable<UserEntity>>() {
@Override
public Observable<UserEntity> call(String avater) {
return userRep.updateUserInfo(UserManager.getToken(), UserManager.getUid(), nick_name, gender, birth, area, describe, avater);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
现在是不是看起来非常清晰,从上到下,一步步执行。代码也非常清晰,就不做过多的描述了。
未完待续。。。