前言
使用RxJava也有一段时间了,通过这种订阅数据的思想编写代码,避免了大量的接口回调,使得数据处理更加方便,对外提供数据的方式更加统一,回避了同步接口和异步接口的不同。
本文是阅读抛物线的《给 Android 开发者的 RxJava 详解》一文后,结合阅读源码理解观察、订阅实现原理的笔记。
最简单的观察、订阅
下面是一个Observable的创建和完成订阅的示例代码
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(0);
subscriber.onCompleted();
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
需要关注的就三个:
- Observable
- OnSubscriber
- Subscriber
Observable
首先看create()如何创建了一个了一个Observable。
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
过程很简单,就是将传递给 create() 的 OnSubscribe 保存了起来就结束了。RxJavaHooks主要是用于性能优化,在RxJava的源代码中很常见。
subscribe()
// 核心代码
public final Subscription subscribe(Subscriber<? super T> subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
通过核心代码,可见流程十分简单,首先是调用传入的 subscriber#onStart 方法,该方法默认不做任何操作。之后就是将Subscriber当作参数调用Observable中的OnSubscriber#call,而在 call() 中调用了subscriber的 onNext() 和 onCompelte() 。数据就完成了从了Observable.OnSubscribe到Subscriber的数据的传递。最后返回的Subscriber是为了方便取消订阅等操作。
给subscriber添加Subscription
在实际应用中,会有这样一个需求:在subscriber退订时需要清理Observable被订阅时一起创建的资源,例如关闭socket等。示例代码如下:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
// 创建资源
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// 在退订时被调用,清理资源
}
}));
// do something
}
});
代码中给subscriber添加了一个Subscription,Subscription接口有两个方法:
- void unsubscribe();
- boolean isUnsubscribed();
void unsubscribe();
在退订时被调用。通过 Subscriptions#create 创建的Subscription会在退订时调用 Action0#call 。实现代码如下:
// 构造方法
private BooleanSubscription(Action0 action) {
actionRef = new AtomicReference<Action0>(action);
}
@Override
public boolean isUnsubscribed() {
return actionRef.get() == EMPTY_ACTION;
}
@Override
public void unsubscribe() {
Action0 action = actionRef.get();
if (action != EMPTY_ACTION) {
action = actionRef.getAndSet(EMPTY_ACTION);
if (action != null && action != EMPTY_ACTION) {
action.call();
}
}
}
可见在退订时会清除对action的引用,并且是通过判断action是否为空引用来判断是否已经被退订的,并且使用了AtomicReference类来保存引用,保证了该类线程安全。
退订时,Subscription#unsubscribe
被调用的原理可以查看 SubscriptionList 的源码知晓:
// subscriber#unsubscribe
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
// SubscriptionList
@Override
public void unsubscribe() {
if (!unsubscribed) {
List<Subscription> list;
synchronized (this) {
if (unsubscribed) {
return;
}
unsubscribed = true;
list = subscriptions;
subscriptions = null;
}
// we will only get here once
unsubscribeFromAll(list);
}
}
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
if (subscriptions == null) {
return;
}
List<Throwable> es = null;
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
}
}
Exceptions.throwIfAny(es);
}
核心思想就是退订时遍历subscriptions中的Subcription并调用 unsubscribe() 。
总结
可见完成一个基本的观察、订阅原理并不复杂,而在源码中会有很多性能优化,错误处理相关的代码,在阅读源码时需要学会挑重点、优先关注核心的逻辑代码。