写在前面的话
第一次听说Rxjava的时候,我顿时就懵b了,什么事RxJava?新的一门强大的语言么!顿时怀着对RxJava的期待,慢慢的去了解了。后来才发现,RxJava其实用到了响应式开发的一种新的开发模式。响应式开发,哇,感觉不得鸟啊!所以在不断的挖掘下,对RxJava做了一个稍微的整理。
响应式编程
在开始了解RxJava之前,得先了解一下响应式编程,这是因为Rx响应式编程也是响应式编程的一种。响应式编程:一种面向数据流和变化传播的编程范式。不懂啊,没关系,我们来举个栗子:
例如,在我们现在的命令式编程中,a = b + c 表示将表达式的结果赋值给a,那么之后,我改变b或者c之后,a会改变么?有人可能会说肯定会啊。其实是不会变的:
a = b + c;
a = ?
b = d ;
a = ? // 两次的a不会改变
所以如上,它是不会变得。那么响应式编程与命令式的区别在哪里咯?
还是如上所说,在响应式编程中 ,a的值会随着b或c的改变而改变。
当然,上面所说的,没有解决实际的问题,上面只是一个表面现象,那么最根本的区别到底是什么?
可以这样说,当b改变时候,它向a发了一个通知:hi,我改变了,你变一下。但是,命令式编程中b改变,不会去通知a改变。所以,最大的区别就是:某个数据变更,那么就会发个通知。
那么,我们还可以,再进行联想一下:既然响应式能够通知,那是不是,在它通知之前,我可以做任何事情呢?
设想一下这个情景:小明,放学回家之后,他跟他妈妈说,饭做好了叫我。“叫他”这个就是个通知,那么在这个通知之前,小明是不是可以看电视,做作业...。哟嚯,这个就有意思了,这个像不像我们所说的异步呢?
RxJava之Observables
用过或者了解过Rxjava的人都知道,在Rxjava中,使用的是Observables。
在面向对象的架构中,开发者致力于创建一组解耦的实体。这样的话,实体就可以在不用妨碍整个系统的情况下可以被测试、复用和维护。设计这种系统就带来一个棘手的负面影响:维护相关对象之间的统一。
在众多的设计模式中,观察者模式,是最符合响应式编程的一种设计模式,为什么这么说呢?
观察者模式是一种行为模式并提供一种以一对多的依赖来绑定对象的方法:即当一个对象发生变化时,依赖它的所有对象都会被通知并且会自动更新。这也就是为什么RxJava中使用到Observables的原因。
观察者模式
观察者模式的基本需求:观察者和被观察者之间是完全分离的,当被观察者的状态发生变化之后,
通过Register(注册) 或者 Subscribe(订阅)的方式,通知观察者。如上小明让他妈妈叫他吃饭的例子:观察者就是小明,被观察者是饭,那个会说了,被观察者不是他妈妈么!他妈妈其实就是一个订阅或者注册,当饭这一状态变成好了的时候,那么就会通知小明。
Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 **setOnClickListener() **方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。
Rxjava的观察者模式
在Rxjava中有四种角色:
- Observable(被观察者)
- Observer(观察者)
- Subcriber
- Subjects
Observable 和 Observer通过 subcribe(订阅)实现订阅关系,所以Observable在状态发生变化的时候通知Observer。
Observer(观察者)
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
从中我们可以看出,Observer(观察者)除了传统的onNext(onEvent)还实现了onCompleted()和onError()。
onCompleted():告知Observable没有更多的数据了,即是没有新的onNext()发出时,就执行onCompleted()。
onError():在事件处理过程中,出现了异常或者错误,就会被触发,同时整个队列将被终止,不再有事件发出。
在一个队列中,onCompleted()和onError() 都是最后触发的,而且两者中只有一个会被触发。
Subcriber
接下来我们再看看Subcriber:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
欸嘿嘿,这个怎么跟上面的Observer一毛一样啊!然后我们查看Subcriber的源码就会发现,其实Subscriber其实就是实现了Observer接口的一个抽象类。
那么问题来了:Observer和Subcriber有什么区别呢?我调用时候使用Observer还是Subcriber呢?
subcriber其实是对Observer进行了一些扩展,在Rxjava的Subcribe(订阅)中,Observer总是先辈转换成一个Subcriber再使用的,所以,如果用的话,就用Subcriber就行。
区别:
//这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
//这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于判断当前是否订阅。
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}
// 这是 Subscriber 增加的方法。它会在 Subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置
public void onStart() {
// do nothing by default
}
创建Observable
在上述中,我们已经了解了观察者的一些基本内容,那么被观察者怎么创建呢?
Observable.create()
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
}
});
通过 Observable.create()方法创建,那么就需要一个OnSubcribe的对象,卧槽,这个OnSubcribe是个什么东东?打开源就知道,这个OnSubcriber是实现了一个Acton1接口的接口。
每个Observable有一个final OnSubscribe<T> onSubscribe 成员,在唉成员方法中调用call()方法,这个call方法的参数就是 Observable.subscribe() 方法传入的 Subsriber实例。
注意:在Rxjava中ActionX系列,其实就是无返回值的的接口
订阅
上面了解了观察者和被观察者,那么就可以进行订阅了,先来看一个简单的栗子:
//被观察者
Observable<Integer> observableInteger = Observable.create(new Observable
.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
//观察者
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Observable completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Observable error");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
//进行订阅
Subscription subscription = observableInteger.subscribe(subscriber);
Log.d(TAG, "subscription: " + subscription.isUnsubscribed() + ",Observable:" + subscriber
.isUnsubscribed());
这里创建了一个观察者,一个被观察者,然后通过
Subscription subscription = observableInteger.subscribe(subscriber);
生成一个Subscription订阅关系。
为了确定这个订阅是否生效,就打印了里面的循环数据:
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onCompleted: Observable completed
subscription: true,Observable:true
可以看到,Subscriber和Observanle之间已经进行了订阅,如果没有Subscribe就不会发射事件。
在Subsriber(Observer)中不关心你给我的到底是个什么,你给我什么,我就操作什么!
有人会说了:如果直接用一个循环打印,岂不是更简单,你这样做复杂化了,太麻烦。
确实,在这个例子中复杂化了,上面讲到Rxjava的最大的优势就是:异步,并不是进行这么简单的操作,所以,稍安勿躁,请听我慢慢道来。
Observable.from()
在上面的例子中,我们创建一个序列,然后一个一个的发射他们,如果现在我们已经有一个序列呢?有没有方法去直接实现它。
List<Integer> integers = new ArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
integers.add(4);
integers.add(5);
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Observable completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Observable error");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.from(integers).subscribe(subscriber);
这个跟上面的结果一样一样的。
Observable.just()
将传入的参数依次发射出去。
Subscriber subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: Observable completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: Observable error");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable.just(1,2,3,4,5).subscribe(subscriber);
just()中可以传入1-10个参数,并且将传入参数的顺序来发射出去。
Subjects
上面我们已经了解了Observer,Subscribe和Observable,还有一个Subject,那么Subject是什么呢?
让我们来看看源码:
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
...
}
从这个当中,我们可以看出,这个Subject即实现了Observer,又继承了Observable,也就是说,Subject当中有观察者又有被观察者,这...你tm在逗我么?其实这并没有问题,就像你自己,既继承了你爸爸的一部分,又继承了你妈妈的一部分,然后形成了你,所以:存在即合理。
那有人会问了:这个既有Observer又有Observable,那该怎么用呢?
很简单,它既可以当Observer,又可以当作Observable
可以用一个表达式来表示:
Subject = Observable + Observer
当Subject作为Observer的时候,可以订阅Observable;当Subject作为Observable的时候,可以向Observer发射数据,很显然,作为一个Observable,观察者们或者其它Subject都可以订阅它。
RxJava针对不同的场景提供四种不同的Subject:
- PublishSubject
- BehaviorSubject
- ReplaySubject
- AsyncSubject
也就是说,他们并不是在所有的实现中都全部存在
AsyncSubject
当Observable完成时AsyncSubject只会发布最后一个数据给已经订阅的每一个观察者。
AsyncSubject<String> asyncSubject = AsyncSubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: AsyncSubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: AsyncSubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
};
Subscription subscription = asyncSubject.subscribe(subscriber);
asyncSubject.onNext("one!");
asyncSubject.onNext("two!");
asyncSubject.onNext("three!");
asyncSubject.onCompleted();
看一下结果如何:
onNext: three!
onCompleted: AsyncSubject Completed!
当然如果原始Observable没有发射任何值,AsyncObject也不发射任何值
AsyncSubject会把最后一个值发射给后续的观察者。如下图
那么,在我们这个AsyncSubject异常的情况下会发生什么呢?
请注意:如果在AsyncSubject异常时,那么不会向观察者发射任何值,只会传递一个错误的通知。
BehaviorSubject
当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。
先上代码,试试水:
BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: BehaviorSubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: BehaviorSubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
behaviorSubject.subscribe(subscriber);
behaviorSubject.onNext("one");
behaviorSubject.onNext("two");
behaviorSubject.onNext("three");
我们来看看结果:
onNext: default
onNext: one
onNext: two
onNext: three
不是说好的,是最近发射的嘛!怎么全部都打印出来了?还有default哪儿冒出来的?
在我们创建BehaviorSubject的时候可以携带一个默认值:
BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
所以如果去掉上面最后三行代码,还是会打印一个default。
那为什么全部都打印出来了呢?
我们把上述代码做个改变:
behaviorSubject.subscribe(subscriber);
放到倒数第2行,也就是:
behaviorSubject.onNext("one");
behaviorSubject.onNext("two");
behaviorSubject.subscribe(subscriber);
behaviorSubject.onNext("three");
欸嘿嘿,我们在来看看结果,变成什么了:
onNext: two
onNext: three
看到结果,我们不难看出,其实上面所说的发射最近所发射的数据,其实就是以
behaviorSubject.subscribe(subscriber);
为界,这句代码之前的一个和之后的所以发射。
不信你可以试试,将这句代码放到最后一行。看看是不是只有:
onNext: three
当然,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
ReplaySubject
那 ReplaySubject 又是什么呢?
ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。
ReplaySubject<String> replaySubject = ReplaySubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ReplaySubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ReplaySubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
replaySubject.subscribe(subscriber);
replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");
结果如下:
onNext: one
onNext: two
onNext: three
这个跟BehaviorSubject的不同之处就是:不论在什么地方进行订阅,都不影响我发射全部数据。
PublishSubject
PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
PublishSubject<String> publishSubject = PublishSubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: PublishSubject Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: PublishSubject Error!");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
publishSubject.onNext("one");
publishSubject.onNext("two");
publishSubject.subscribe(subscriber);
publishSubject.onNext("three");
结果如下:
onNext: three
其实这个已经很形象的说明了,从订阅开始之前的数据都将不会被发射,而只有订阅之后的数据才会被发射出去。
最后在总结一下Subject几种类型的区别:
ReplaySubject:不论订阅所处任何位置,都将发射全部数据
AsyncSubject:不论订阅所处任何位置,只会发射最后一个数据
BehaviorSubject:订阅之前的一个和订阅之后的全部数据被发射
PublishSubject:订阅之后的数据全部被发射。
后续文章:
第二篇《Rx系列之RxJava操作符》
第三篇《Rx系列之Rxjava操作符进阶-使用场景》