其它文章
1、RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用它的好处
2、RxJava之二——Single和Subject 与Observable举足轻重的类,虽然用的少,但应该知道
3、RxJava之三——RxJava 2.0 全部操作符示例
4、RxJava之四—— Lift()详解 想要了解Operators,Lift()一定要学习
5、RxJava之五—— observeOn()与subscribeOn()的详解Scheduler线程切换的原理
6、RxJava之六——RxBus 通过RxJava来替换EventBus
RxJava介绍文档
1、介绍
作为初学者我们不去看定义,太苦涩难懂了。我们需要记住的是RxJava就是一个实现异步操作的库。学习RxJava,我们需要把握两点:观察者模式和异步。
异步就是多线程、线程切换这些东西。我们先来了解一下观察者模式。
1.1 观察者模式
观察者模式,最重要的就是Observable(被观察者,事件源)和Observer(观察者),这也是RxJava中最核心的。
观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,让它们能够自动更新自己。
我们就不看这些冷冰冰的文字定义了,举个例子来理解下:
我们按下开关,灯亮了。
在这个事件中:灯作为Observer观察者,开关作为Observable被观察者,灯透过电线来观察开关的状态来并做出相应的处理。
一个开关可以控制多个灯,这就是一对多的关系。换句话说:就是一个开关被多个灯观察,当开关的状态发生改变的时候,每个灯都可以做出自己的处理。
这就是观察者模式。
1.2 帮助理解RxJava
- 开关(被观察者)
作为事件的产生方(生产“开”和“关”这两个事件),是主动的,是整个开灯事理流程的起点。
- 台灯(观察者)
作为事件的处理方(处理“灯亮”和“灯灭”这两个事件),是被动的,是整个开灯事件流程的终点。
- 事件传递
在起点和终点之间,即事件传递的过程中事件是可以被加工,过滤,转换,合并等等方式处理的。
2、RxJava事件处理流程
2.1 说明
RxJava也是基于观察者模式来组建自己的程序逻辑的,就是构建被观察者(Observable),观察者(Observer),然后建立二者的订阅(subscribe)关系(就像那根电线,连接起台灯和开关)实现观察,在事件传递过程中还可以对事件做各种处理。
2.2 创建被观察者
2.2.1创建开关类的正规写法
产生了五个事件,分别是:开,关,开,开,结束。
Observable switcher= Observable.create(newObservable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("on");
subscriber.onNext("off");
subscriber.onNext("on");
subscriber.onNext("on");
subscriber.onCompleted();
}
});
2.2.2 创建开关类的简便写法
Observable switcher=Observable.just("On","Off","On","On");
或:
String [] kk={"On","Off","On","On"};
Observable switcher=Observable.from(kk);
2.3 创建观察者
2.3.1 Observer接口实现
Observer<String> light= new Observer<String>() {
@Override
public void onNext(String s) {
//处理传过来的onNext事件
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
//被观察者的onCompleted()方法会走这里,结束观察
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
//出现错误会调用这个方法
Log.d(tag, "Error!");
}
};
2.3.2 Subscriber实现
除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类-Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:
Subscriber<String> light= new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
2.3.3 非正式写法
使用Action1。Action1是一个单纯的人畜无害的接口,和Observer没有啥关系,只不过它可以当做观察者来使,专门处理onNext 事件,这是一种为了简便偷懒的写法。当然还有Action0,Action2,Action3...,0,1,2,3分别表示call()这个方法能接受几个参数。初次接触为了方便理解,不便使用这种写法。
Action1 light = new Action1<String>(){
@Override
public void call(String s) {
Log.d(tag, ""+s);
}
}
2.3.4 Observer和Subscriber 实现的区别
在 RxJava 的subscribe订阅过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:
1. onStart()
: 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行),onStart()
就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe()
方法。
2. unsubscribe()
: 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用isUnsubscribed()
先判断一下状态。unsubscribe()
这个方法很重要,因为在 subscribe()
之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop()
等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
2.4 订阅
现在已经创建了观察者和被观察者,但是两者还没有联系起来。使用下面的代码让两者关联起来:
switcher.subscribe(light);
这里需要解释一下,开关是被观察者,灯是观察者,那怎么会是被观察者订阅观察者呢?不应该是观察者去订阅被观察者吗?
灯订阅开关,观察者去订阅被观察者,在逻辑上是这样的。但是,为了保证流式API调用风格(同一个调用主体一路调用下来,一气呵成),就变成了开关订阅灯,因为被观察者(开关)产生事件,是事件的起点,那么开头就是用被观察者(Observable)作为唯一的调用主体,一路调用下去。
也可以这么理解,就是背后的真实的逻辑依然是灯订阅了开关,但是在表面上,我们让开关“假装”订阅了灯,以便于保持流式API调用风格不变。
2.5 流程
流程:由被观察者产生事件,子传递过程中进行变化加工,最后到达观察者,被观察者处理,观察者和被观察者之间用订阅关联。结合流程图的相关代码:
当调用订阅操作(即调用Observable.subscribe()方法)的时候,被观察者才真正开始发出事件。
3、RxJava操作符
这才是RxJava最吸引人的地方
3.1 变换
RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说RxJava 真是太好用了的最大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
3.1.1 Map操作
需求:被观察者产生的事件中只有图片文件路径;,但是在观察者这里只想要bitmap,那么就需要类型变换,在事件的传递过程中将图片地址转换成bitmap对象。
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
这是操作符的力量,但是我们只使用了RxJava 的一个要点,这显然是不够的,上面的代码存在缺陷,就是读取文件,创建bitmap可能是一个耗时操作,那么就应该在子线程中执行,主线程应该仅仅做展示。线程的切换一般是比较麻烦的,但是在Rxjava中,是非常方便的,这里就要使用到第四节要讲到的RxJava 线程控制了。
改进后的代码:
Observable.just("images/logo.png") // 输入类型 String
.subscribeOn(Schedulers.newThread()) // 指定被观察者执行的线程环境
.observeOn(Schedulers.io()) // 指定接下来的操作发生在IO线程
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
由上面的代码可以看到,使用操作符将事件处理逐步分解,通过线程调度为每一步设置不同的线程环境,完全解决了线程切换的烦恼。可以说线程调度+操作符,才真正展现了RxJava无与伦比的魅力。
重要:map()中事件对象是直接变换的,具体功能上面已经介绍过,它是 RxJava 最常用的变换,示意图如下:
(被传递的就是事件对象,由圆变方,即是传递过程中的变换。)
3.1.2 FlatMap操作
1 需求:打印出一组学生中每个学生选修的课程名。
你是不是想到了,先循环读出所有学生的数据,再循环查出每个学生的课程并打印出来。在没有接触RxJava之前这种想法是没有错的,但是太繁琐了,这里面涉及到两层循环。
现在使用RxJava的flatMap()将轻松解决这一需求。
Student[] students = ...;
//创建观察者,传入课程对象
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students) //创建被观察者,获取所有学生
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber); //被观察者订阅观察者
2 原理:
flatMap() 的原理是这样的:
①使用传入的事件对象创建一个 Observable 对象;
②并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
③每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
3.示意图:
即将多维数组转换成一位数组,如上面的两层列表转换成一层列表。
4.补充
由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求(Retrofit + RxJava)。
3.2 操作符的大致分类
Creating Observables(Observable的创建操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;
Transforming Observables(Observable的转换操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;
Filtering Observables(Observable的过滤操作符),比如:observable.filter()、observable.sample()、observable.take()等等;
Combining Observables(Observable的组合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;
Error Handling Operators(Observable的错误处理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;
Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;
Conditional and Boolean Operators(Observable的条件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;
Mathematical and Aggregate Operators(Observable数学运算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;
其他如observable.toList()、observable.connect()、observable.publish()等等;
3.3 操作符的使用
具体使用请参考官方文档,这里也有一个Demo可以参考:
https://github.com/ladingwu/ApplicationDemo,此demo我已经调试运行后放在该文档同目录下了。
4、RxJava线程控制
4.1 说明
第二节主要讲了RxJava的操作流程,其实主要就是观察者模式,现在就来看RxJava另一个要点:异步。
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
RxJava的线程环境:
4.2 代码中使用
在3.1.1Map操作符时,已经提到了线程调度,其在代码中的使用如下:
//new Observable.just()执行在新线程
Observable.create(new Observable.just(getFilePath())
//指定在新线程中创建被观察者
.subscribeOn(Schedulers.newThread())
//将接下来执行的线程环境指定为io线程
.observeOn(Schedulers.io())
//map就处在io线程
.map(mMapOperater)
//将后面执行的线程环境切换为主线程,
//但是这一句依然执行在io线程
.observeOn(AndroidSchedulers.mainThread())
//指定线程无效,但这句代码本身执行在主线程
.subscribeOn(Schedulers.io())
//执行在主线程
.subscribe(mSubscriber);
4.3 总结
实际上线程调度只有subscribeOn()和observeOn()两个方法。对于初学者,只需要掌握两点:
subscribeOn()
它指示Observable在一个指定的调度器上创建(只作用于被观察者创建阶段)。只能指定一次,如果指定多次则以第一次为准
observeOn()
指定在事件传递(加工变换)和最终被处理(观察者)的发生在哪一个调度器。可指定多次,每次指定完都在下一步生效。
5、相关链接
5.1 github地址
RxJava:https://github.com/ReactiveX/RxJava>
RxAndroid:https://github.com/ReactiveX/RxAndroid
5.2 参考技术blog地址
给 Android 开发者的 RxJava 详解:
http://gank.io/post/560e15be2dca930e00da1083
关于RxJava最友好的文章:
http://www.jianshu.com/p/6fd8640046f1
关于RxJava最友好的文章(进阶):