写在前面
RxJava我一直是很想用的,扔物线老师的文章我也看了一点,但是说实话,其中很多东西交错在一起,对于我来说有点难以理解。而且看很多文章总是看了前面忘后面,还有一些结合lambda讲的,说实话,我是懵逼的。在这里把我自己对于RxJava的一些理解,看到的一些好文记录下来。
RxJava是啥
Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序。
好,对于C#不怎么了解的人一般不会知道LINQ是啥东东吧……这个介绍我们先选择略过。看看github上RxJava是怎样描述自己的。
** a library for composing asynchronous and event-based programs by using observable sequences. ** 一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。
这句话对于还未用过、看过RxJava的人来说是比较难理解的,好在关于RxJava的资料非常多,我们可以站在巨人的肩膀上来总结。首先扔物线对于RxJava给出的关键词就是** 异步 **,归根到底它就是一个实现异步操作的库。而回过头来,再看一遍这个定义,我们可以看出另外两个关键词:可观测的序列、基于事件,你可能会说这不废话吗,这句话一共才几个词,都快给我说完了。没错,因为这句话概括的非常精准,让人难以再精简了。
为啥要用RxJava
Android中实现异步的工具还是有的,那么问题来了,对于我们Android开发者来说,为什么要用RxJava而不是本来的工具?
Talk is cheap,下面选取部分我以前写的代码,用AsyncTask实现的加载数据的类:
class DownloadTask extends AsyncTask<String, Integer, ArrayList<ImageBean>> {
private ObjectOutputStream oos;
@Override
protected ArrayList<ImageBean> doInBackground(final String... params) {
try {
String imageUrl = params[0];
HttpUtils.getJsonString(imageUrl, new HttpUtils.HttpCallbackListener() {
@Override
public void onFinish(String response) {
if (JsonUtils.readJsonImageBean(response) != null) {
imageList = JsonUtils.readJsonImageBean(response);
// memoryCache.addArrayListToMemory("imageList", imageList);
if (count == 0) {
//序列化imageList
if (getActivity() != null) {
File imageCache = FileUtils.getDisCacheDir(getActivity(), "ImageBean");
try {
oos = new ObjectOutputStream(new FileOutputStream(imageCache));
oos.writeObject(imageList);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (oos != null) {
try {
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
count++;
}
}
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
});
return imageList;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
上面那段代码表示先从文件中读取list,然后再从网络获取数据(初学时的代码,没有考虑好一些逻辑关系)。当我打算重构代码,看到这一段的时候,我的内心是崩溃的。虽然逻辑并不复杂,但是这些迷之缩进实在是看的蛋疼。那么如果我用RxJava重写一下上面的逻辑,会是怎样的呢?
Observable.just(0, 1)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.newThread())
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
if (integer == TYPE_NETWORK) {
return getUrl(pageIndex, type);
}
return "cache";
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
if (s.equals("cache")) {
//加载缓存
} else {
//从网络获取数据
onCompleted();
};
}
}
});
以上代码实现的非常不科学,非常的不RxJava,但是在这里仅仅是作为一个示例,让你感受一下RxJava的特性:简洁。你可能会说这哪里简洁了啊?代码量不跟以前差不多吗,是的,甚至有的时候代码量还会增加一点,但是这样的代码能让你感觉到清晰的逻辑。一切逻辑都在链子里了,而且如果你使用lambda会得到更加简洁的代码。。
这就是我们要用RxJava的原因之一了:** 简洁 **
这里的简洁不是指代码量的少,而是指代码逻辑的简洁。而且这种优势随着逻辑的复杂而更加明显。在这里我并不会将lambda和RxJava结合在一起,一是因为自己的确不熟,二也是因为自己初接触RxJava,对于我这种入门级选手还是要先排除一些干扰项
RxJava核心&基础
在开始撸RxJava的代码之前,我们首先要弄清楚RxJava中的三个基本也是核心的概念:观察者(Observer)、订阅(Subscribe())和被观察者(Observable)。熟悉设计模式的你可能会立刻想到,这不就是观察者模式吗。是的,就是观察者模式。
观察者模式定义了对象间一种一对多的依赖关系,每当一个对象状态发生改变,所有依赖于它的对象都会得到通知并被自动更新。在Android中比较经典的例子有Button的点击,只有当Button被点击的时候,观察者OnClickListener在Button的点击状态发生改变时将点击事件传送给注册的OnClickListener。而对于RxJava来说也是如此,接下来我将换一种我喜欢的描述来讲解我所理解的RxJava。RxJava中Observable是发射数据的源,无论他是“热”启动还是“冷”启动,总之,他最终都是用来发射数据的。Observer则是数据接收者,而Observer和Observable则通过subscribe()(订阅)结合在一起,从而达到Observer接收Observable发射的数据的目的。
在讲解完了RxJava的核心之后,还需要注意一些细节:
在RxJava的文档中指出,无论哪种语言,你的观察者(Observer)需要实现以下方法的子集:
onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法调用次数取决于实现。onError(Exception e)
Observable遇到错误时会调用这个方法,这个调用会终止Observable,onError和以下将要介绍的onComplete是互斥的,即同一个事件序列中二者只能有一个被调用。onComplete()
正常终止
好了,烦人的概念时间终于过去了,让我们开始愉悦的Hello World时间!
Hello World!
打码之前记得加上依赖:
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
我这的依赖好像还是看扔物线文的时候添加的……应该比较老了……
话不多说直接上码:
//创建被观察者
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//回调
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onNext("!");
}
}).subscribe(new Subscriber<String>() {//被订阅
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
});
刚接触到这一坨代码你可能会说卧槽这什么东西,大兄弟先别忙着走,我那么写只是为了把RxJava链式调用的特点展现在你面前,接下来让我们从Observeable和Observer的创建开始。
- 创建Observable
创建Observable的方式非常的多,先介绍一下非常基本的** Create **
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onNext("!");
}
})
通过create()方法可以创建一个Observable对象我们是知道了,那么create()方法中的参数是什么呢?
/**
* Invoked when Observable.subscribe is called.
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
这个参数是个接口,那么很明显了——这个参数是用来干回调这事的,发射数据的时候将会用这个接口的实现类通过这个参数发射数据。而call方法则来源于其父接口Action1。call这个方法给出了一个subscriber参数,让我们看一下这个subscriber究竟是谁。
可能你会说这不废话吗……闭着眼我都能知道这是炮姐……继续我们的话题,在这个类实现的接口里我们发现了一个看起来非常熟悉的东西** Observer **是了,这个subscriber就是一个订阅者,一个订阅者加强版。他相对于Observer主要有两点区别,一个onStart()会在订阅开始,事件发送前执行,所以你可以在这个方法里做一些准备工作。另一点是实现的另外一个接口提供的unsubscribe(),取消订阅。据扔物线的文章说,不取消订阅可能会有内存泄露的风险,关于这一点很容易理解,异步可能会由于生命周期长短问题引发内存泄漏,在这里就不多加赘述了。
- 创建Observer
看完了Observable的创建,我们再来看一下Observer的创建
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e(TAG,s);
}
};
按照老样子,点进Observer发现这个也是个接口,那么我们在使用多态创建这个方法的时候必须要实现他的三个方法。
- 订阅
observable.subscribe(observer);
上面的代码看起来像是observable订阅了observer,但事实上这种设计是出于流式api的考虑,什么是流式api?看看我的hello world实例代码是怎么写的,那就是流式api设计的好处。整个代码看起来像是一个链子,优雅而简洁。
好了,最基本的介绍完了,你现在可以去尝试一下你的Hello World了。不过在尝试之前,我需要纠正我上述Hello World示例代码的一个错误:RxJava文档中对于Observable的描述有这么一段话,一个形式正确的Observable必须尝试调用一次onCompleted或者调用一次onError方法。很明显,我的demo是一个使用方法错误的例子。此处对Observable和Observer的api介绍非常的少,因为我觉得一次性把文档上的方法全给你搬上来并不明智,一是用不上那么多,二是容易混淆。
线程控制——Scheduler###
终于要到重点了,线程控制绝对是RxJava的重点之一。在不指定线程的情况下,RxJava遵循的是线程不变的原则,在哪个线程调用subscribe(),就在哪个线程生产、消费事件。这对于大部分开发人员来说都是难以接受的事,因为如果是耗时操作可能会阻塞当前线程,这是开发者不想看到的,好在我们是可以切换线程的。下面同样是摘自的描述:
Schedulers.computation( ):用于计算任务,如事件循环或回调处理,不要用于IO操作,默认线程数等于处理器的数量。
Schedulers.from(executor):使用指定的Executor作为调度器
Schedulers.immediate( ):在当前线程立即开始执行任务
Schedulers.io( ):用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;Schedulers.io()默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。
Schedulers.newThread( ):为每个任务创建一个新线程
Schedulers.trampoline( ):当其它排队的任务完成后,在当前线程排队开始执行
这里只是初步的了解一下,毕竟本文定位是一篇基础级的文,以下给出一个简单的加载图片的例子。
简单的例子
首先明确我们要干的事:通过一个url加载一张图,恩为了演示RxJava和线程控制,我用HttpUrlConnection来做一个实例。
Observable.create(new Observable.OnSubscribe<Bitmap>() {
@Override
public void call(Subscriber<? super Bitmap> subscriber) {
try {
URL url = new URL("http://img4.imgtn.bdimg.com/it/u=815679381,647288773&fm=21&gp=0.jpg");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(8000);
connection.setReadTimeout(8000);
InputStream in = connection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(in);
subscriber.onNext(bitmap);
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
})
.observeOn(AndroidSchedulers.mainThread()) //指定subscriber的回调发生在UI线程
.subscribeOn(Schedulers.newThread()) //指定subscribe()发生在新线程
.subscribe(new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
plan.setVisibility(View.GONE);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Bitmap bitmap) {
if (bitmap != null) {
image.setImageBitmap(bitmap);
onCompleted();
}
}
});
}
上面的代码写的很清楚了,我是通过observeOn(AndroidSchedulers.mainThread())指定订阅者的回调发生在主线程,因为这里给ImageView设置图片需要在主线程进行,通过subscribeOn(Schedulers.newThread())指定subscribe()发生在新线程。
最后请忽略最后几秒的蜜汁小圆点,因为我不摸屏幕AndroidStudio的录制就会停留在加载出图片后的那一段时间,录制出来的效果非常差。我加载的这张图是非常小的,我通过限制wifi网速为5k/s来达到“加载”这个目的。
一些反思
本文说的并不深入,只是一篇基础,看完了这篇可能你只能写两个小demo。但是就如我上文所说的,我认为学一个东西,基础是十分重要的,只要你梳理清楚基础和关键,学习起来无疑是事半功倍的。
我在文章最开头写的demo我为什么要说这很不“RxJava”?因为我只是传递了两个Integer类型的数,之后通过map操作符将这两个转换为String,在订阅的回调里处理这两个字符,并执行相应的逻辑。这给我的感觉就和以前写代码的感觉差不多,没有一种链式调用的爽快感,反而有一种强行用RxJava的感觉。那么RxJava的应用场景和操作符究竟有什么玄机?我会继续探索,继续分享。请期待~
推荐资料
给Android开发者的RxJava详解:http://gank.io/post/560e15be2dca930e00da1083#toc_14
ReactiveX/RxJava文档中文版:https://mcxiaoke.gitbooks.io/rxdocs/content/
大头鬼深入浅出RxJava系列:http://blog.csdn.net/lzyzsd/article/details/44094895
iamxiarui探索RxJava系列,也是我比较推荐的入门文,我这同学总结和配图都是一流的:http://www.jianshu.com/p/856297523728