前言
相信很多做Android或是Java研发的同学对RxJava应该都早有耳闻了,尤其是在Android开发的圈子里,RxJava渐渐开始广为流行。同样有很多同学已经开始在自己的项目中使用RxJava。它能够帮助我们在处理异步事件时能够省去那些复杂而繁琐的代码,尤其是当某些场景逻辑中回调中嵌入回调时,使用RxJava依旧能够让我们的代码保持极高的可读性与简洁性。不仅如此,这种基于异步数据流概念的编程模式事实上同样也能广泛运用在移动端这种包括网络调用、用户触摸输入和系统弹框等在内的多种响应驱动的场景。那么现在,就让我们一起分析一下RxJava的响应流程吧。
(本文基于RxJava-1.1.3)
用法
首先来看一个简单的例子:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("onNext");
subscriber.onCompleted();
}
}).map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + " -> Xepher";
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("--- onCompleted ---");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onNext(String s) {
System.out.println("subscriber -> " + s);
}
});
运行结果为:
subscriber -> onNext -> Xepher
--- onCompleted ---
从结果中我们不难看出整体的调用流程:
首先通过调用Observable.create()
方法生成一个被观察者,紧接着在这里我们又调用了map()
方法对原被观察者进行数据流的变换操作,生成一个新的被观察者(为何是新的被观察者后文会讲),最后调用subscribe()
方法,传入我们的观察者,这里观察者订阅的则是调用map()
之后生成的新被观察者。
在整个过程中我们会注意到三个主角:Observable、OnSubscribe、Subscriber,所有的操作都是围绕它们进行的。不难看出这里三个角色的分工:
- Observable:被观察者的来源,亦或说是被观察者本身
- OnSubscribe:用来通知观察者的不同行为
- Subscriber:观察者,通过实现对应方法来产生具体的处理。
所以接下来我们以这三个角色为中心来分析具体的流程。
分析
一、订阅过程
首先我们进入Observable.create()
看看:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
这里调用构造函数生成了一个Observable对象并将传入的OnSubscribe赋给自己的成员变量onsubscribe
,等等,这个hook是从哪里冒出来的?我们向上找:
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
RxJavaObservableExecutionHook这个抽象Proxy类默认对OnSubscribe对象不做任何处理,不过通过继承该类并重写onCreate()
等方法我们可以对这些方法对应的时机做一些额外处理比如打Log或者一些数据收集方面的工作。
到目前最初始的被观察者已经生成了,我们再来看看观察者这边。我们知道通过调用observable.subscribe()
方法传入一个观察者即构成了观察者与被观察者之间的订阅关系,那么这内部又是如何实现的呢?看代码:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
... ...
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
... ...
return Subscriptions.unsubscribed();
}
}
这里我们略去部分无关代码看主要部分,subscribe.onStart()
默认空实现我们暂且不用管它,对于传进来的subscriber
要包装成SafeSubscriber,这个SafeSubscriber对原来的subscriber的一系列方法做了更完善的处理,包括:onError()
与onCompleted()
只会有一个被执行;保证一旦onError()
或者onCompleted()
被执行,将不再能再执onNext()
等情况。这里封装为SafeSubscriber之后,调用onSubscribe.call()
,并将subscriber
传入,这样就完成了一次订阅。
显而易见,Subscriber作为观察者,在订阅行为完成后,其具体行为在整个链式调用中起着至关重要的作用,我们来看看它内部的构成的主要部分:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private static final Long NOT_SET = Long.MIN_VALUE;
private final SubscriptionList subscriptions; //当前Subscriber所持有的“订阅”事件集
private final Subscriber<?> subscriber;
private Producer producer;//用于规定从Observable传来的数据流的总量
private long requested = NOT_SET;
... ...
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
... ...
protected final void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("number requested cannot be negative: " + n);
}
Producer producerToRequestFrom = null;
synchronized (this) {
if (producer != null) {
producerToRequestFrom = producer;
} else {
addToRequested(n);
return;
}
}
producerToRequestFrom.request(n);
}
... ...
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
if (toRequest == NOT_SET) {
passToSubscriber = true;
}
}
}
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
}
每个Subscriber都持有一个SubscriptionList
,这个list保存的是所有该观察者的订阅事件,同时Subscriber也对应实现了Subscription接口,当这个Subscriber取消订阅的时候会将持有事件列表中的所有Subscription取消订阅,并且从此不再接受任何订阅事件。
同时,通过Producer可以去限定该Subscriber所接收的数据流的总量,这个限制量其实是加在Subscriber.onNext()
方法上的,onComplete()
、onError()
则不会受到其影响。
因为是底层抽象类,onNext()
、onComplete()
、onError()
统一不在这里处理。
二、变换过程
在收到Observable的消息之前我们有可能会对数据流进行处理,例如map()
、flatMap()
、deBounce()
、buffer()
等方法,本例中我们用了map()
方法,它接收了原被观察者发射的数据并将通过该方法返回的结果作为新的数据发射出去,相当于做了一层中间转化:
我们接着看这个转化过程:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
这里是通过一个lift()
方法实现的,再查看其他的转化方法发现内部也都使用lift()
实现的,看来这个lift()
就是关键所在了,不过不急,我们先来看看这个OperationMap
是什么:
public final class OperatorMap<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
OperationMap实现了Operator接口的call()
方法,该方法接受外部传入的观察者,并将其作为参数构造出了一个新的观察者,我们不难发现o.onNext(transformer.call(t));
这一句起了至关重要的作用,这里的接口transformer
将泛型T转化为泛型R:
public interface Func1<T, R> extends Function {
R call(T t);
}
这样之后,再将转换后的数据传回至原观察者的onNext()
方法,就完成了观察数据流的转化,但是你应该也注意到了,我们用来做转换的这个新的观察者并没有实现订阅被观察者的操作,这个订阅操作又是在哪里实现的呢?答案就是接下来的lift()
:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
});
}
在这里我们新生成了一个Observable对象,在这个新对象的onSubscribe
成员的call()
方法中我们通过operator.call()
拿到之前生成的未产生订阅的观察者st
,之后将它作为参数传入一开始的onSubscribe.call()
中,即完成了这个中间订阅的过程。
现在我们将整个流程梳理一下:
1、一次map()
变换
2、根据Operator实例生成新的Subscriber
3、通过lift()
生成新的Observable
4、原Subscriber订阅新的Observavble
5、新的Observable中onSubscribe
通知新Subscriber订阅原Observable
6、新Subscriber将消息传给原Subscriber。
为了便于理解,这里借用一下扔物线的图:
以上就是一次
map()
变换的流程,事实上多次map()也是同样道理:最外层的目标Subscriber发生订阅行为后,onSubscribe.onNext()
会逐层嵌套调用,直至初始Observable被最底层的Subscriber订阅,通过Operator的一层层变化将消息传到目标Subscriber。再次祭出扔物线的图:至于其他的多种变化的实现流程也都很类似,借助于Operator的不同实现来达到变换数据流的目的。例如其中的
flatMap()
,它需要进行两次lift()
,其中第二次是OperationMerge,将转换成的每一个Observable数据流通过InnerSubscriber这个纽带订阅后,在InnerSubscriber的onNext()
中拿到R,再通过传入的parent
(也就是原MergeSubscriber)将它们全部发射(emit)出去,由最外层我们传入的Subscriber统一接收,这样就完成了 T
=> Observable<R>
=> R
的转化:// MergeSubscriber
@Override
public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
// InnerSubscriber
static final class InnerSubscriber<T> extends Subscriber<T> {
final MergeSubscriber<T> parent;
final long id;
... ...
public InnerSubscriber(MergeSubscriber<T> parent, long id) {
this.parent = parent;
this.id = id;
}
... ...
@Override
public void onNext(T t) {
parent.tryEmit(this, t);
}
}
除此之外,还有许多各式各样的操作符,如果它们还不能满足你的需要,你也可以通过实现Operator接口定制新的操作符。灵活运用它们往往能达到事半功倍的效果,比如通过使用sample()
、debounce()
等操作符有效避免backpressure的需要等等,这里就不一一介绍了。
三、线程切换过程
从上文中我们知道了RxJava能够帮助我们对数据流进行灵活的变换,以达到链式结构操作的目的,然而它的强大不止于此。下面我们就来看看它的又一利器,调度器Scheduler:
就像我们所知道的,Scheduler是给Observable数据流添加多线程功能所准备的,一般我们会通过使用subscribeOn()
、observeOn()
方法传入对应的Scheduler去指定数据流的每部分操作应该以何种方式运行在何种线程。对于我们而言,最常见的莫过于在非主线程获取并处理数据之后在主线程更新UI这样的场景了:
Observable.create(... ...)
... ...
.subscribeOn(Schedulers.io())
... ...
.observeOn(AndroidSchedulers.mainThread())
... ...
.subscribe();
这是我们十分常见的调用方法,一气呵成就把不同线程之间的处理都搞定了,因为是链式所以结构也很清晰,我们现在来看看这其中的线程切换流程。
1、subscribeOn()
当我们调用
subscribeOn()
的时候:
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
可以看到这里也是调用了create()
去生成一个Observable,而OperatorSubscribeOn
则是实现了OnSubscribe接口,同时将原始的Observable和我们需要的scheduler
传入:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source; //原Observable
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
可以看出来,这里对subscriber
的处理与前文中OperatorMap
中call()
对subscriber
的处理很相似。在这里我们同样会根据传入的subscriber
构造出新的Subscriber s
,不过这一系列的过程大部分都是由worker
通过schedule()
去执行的,从后面setProducer()
中对于线程的判断,再结合subscribeOn()
方法的目的我们能大概推测出,这个worker
在一定程度上就相当于一个新线程的代理执行者,schedule()
所实现的与Thread类中run()
应该十分类似。我们现在来看看这个worker
的执行过程。
首先从Schedulers.io()
进入:
Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}
这个通过hook
拿到scheduler
的过程我们先不管,直接进CachedThreadScheduler
,看它的createWorker()
方法:
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
这里的pool
是一个原子变量引用AtomicReference
,所持有的则是CachedWorkerPool
,因而这个pool
顾名思义就是用来保存worker
的缓存池啦,我们从缓存池里拿到需要的worker
并作了一层封装成为EventLoopWorker
:
// CachedWorkerPool
ThreadWorker get() {
if (allWorkers.isUnsubscribed()) {
return SHUTDOWN_THREADWORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY);
allWorkers.add(w);
return w;
}
在这里我们终于发现目标ThreadWorker
,它继承自NewThreadWorker
,之前的schedule()
方法最终都会到这个scheduleActual()
方法里:
// NewThreadWorker
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}
... ...
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
这里我们看到了executor
线程池,我们用Schedulers.io()
最终实现的线程切换的本质就在这里了。现在再结合之前的过程我们从头梳理一下:
在subscribeOn()时,我们会新生成一个Observable,它的成员
onSubscribe
会在目标Subscriber订阅时使用传入的Scheduler的worker
作为线程调度执行者,在对应的线程中通知原始Observable发送消息给这个过程中临时生成的Subscriber,这个Subscriber又会通知到目标Subscriber,这样就完成了subscribeOn()
的过程。
2、observeOn()
下面我们接着来看看observeOn():
// Observable
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
我们直接看最终调用的部分,可以看到这里又是一个lift()
,在这里传入了OperatorObserveOn,它与OperatorSubscribeOn不同,是一个Operator(Operator的功能我们上文中已经讲过就不赘述了),它构造出了新的观察者ObserveOnSubscriber
并实现了Action0
接口:
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
... ...
}
这个ObserveOnSubscriber
拿到了目标Scheduler recursiveScheduler
和目标Subscriber child
,并在它自己的onNext()
、onError()
、onComplete()
方法中作了schedule()
的线程变化处理:
// ObserveOnSubscriber
@Override
public void onNext(final T t) {
... ...
schedule();
}
@Override
public void onCompleted() {
... ...
schedule();
}
@Override
public void onError(final Throwable e) {
... ...
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);//实现了Action0接口,在recursiveScheduler对应线程中作精细处理
}
}
@Override
public void call() {
... ...
//检查数据流状态,会在这里进行onComplete()或者onError()的流程
if (checkTerminated(done, empty, localChild, q)) {
return;
}
localChild.onNext(localOn.getValue(v));
... ...
}
可以看出来,这里ObserveOnSubscriber
所有的发送给目标Subscriber child
的消息都被切换到了recursiveScheduler
的线程作处理,也就达到了将线程切回的目的。
总结observeOn()
整体流程如下:
对比
subscribeOn()
和observeOn()
这两个过程,我们不难发现两者的区别:subscribeOn()
将初始Observable的订阅事件整体都切换到了另一个线程;而observeOn()
则是将初始Observable发送的消息切换到另一个线程通知到目标Subscriber。前者把** “订阅 + 发送” 的切换了一个线程,后者把 “发送” ** 切换了一个线程。所以,我们的代码中所实现的功能其实是:
... ...
.subscribeOn(Schedulers.io())//将“订阅”、“发送”都切换到Schedulers.io()对应的线程
... ...
.observeOn(AndroidSchedulers.mainThread())////将“发送”再切换回到AndroidSchedulers.mainThread()对应的线程
这样就能很容易实现耗时任务在子线程操作,在主线程作更新操作等这些常见场景的功能啦。
四、其他角色
Subject
Subject在Rx系列是一个比较特殊的角色,它继承了Observable的同时也实现了Observer接口,也就是说它既可作为观察者,也可作为被观察者,他一般被用来作为连接多个不同Observable、Observer之间的纽带。可能你会奇怪,我们不是已经有了像map()
、flatMap()
这类的操作符去变化 Observable数据流了吗,为什么还要引入Subject这个东西呢?这是因为Subject所承担的工作并非是针对Observable数据流内容的转换连接,而是数据流本身在Observable、Observer之间的调度。光这么说可能还是很模糊,我们举个《RxJava Essentials》中的例子:
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no!Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");
我们通过create()
创建了一个PublishSubject
,观察者成功订阅了这个subject
,然而这个subject
却没有任何数据要发送,我们只是知道他未来会发送的会是String
值而已。之后,当我们调用subject.onNext()
时,消息才被发送,Observer
的onNext()
被触发调用,输出了"Hello World"
。
这里我们注意到,当订阅事件发生时,我们的subject
是没有产生数据流的,直到它发射了"Hello World"
,数据流才开始运转,试想我们如果将订阅过程和subject.onNext()
调换一下位置,那么Observer
就一定不会接受到"Hello World"
了(这不是废话吗- -|||),因而这也在根本上反映了Observable
的冷热区别。
一般而言,我们的Observable都属于Cold Observables,就像看视频,每次点开新视频我们都要从头开始播放;而Subject则默认属于Hot Observables,就像看直播,视频数据永远都是新的。
基于这种属性,Subject
自然拥有了对接收到的数据流进行选择调度等的能力了,因此,我们对于Subject
的使用也就通常基于如下的思路:
//接收数据流
Observable.create(...).subscribe(mSubject);
//选择调度并发射数据给观察者
mSubject.subscribe(...);
在前面的例子里我们用到的是PublishSubject
,它只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。等一下,这功能听起来是不是有些似曾相识呢?
没错,就是EventBus和Otto。
(RxJava的出现慢慢让Otto退出了舞台,现在Otto的Repo已经是Deprecated状态了,而EventBus依旧坚挺)
基于RxJava的观察订阅取消的能力和
PublishSubject
的功能,我们十分容易就能写出实现了最基本功能的简易事件总线框架:
public class RxBus {
private static volatile RxBus mInstance;
//为了保证线程安全,包装为SerializedSubject
private final Subject<Object, Object> mBus = new SerializedSubject<>(PublishSubject.create());
private final CompositeSubscription mSubscriptions = new CompositeSubscription();
private RxBus() {}
public static RxBus getDefault() {
if (mInstance == null) {
synchronized (RxBus.class) {
if (mInstance == null) {
mInstance = new RxBus();
}
}
}
return mInstance;
}
//发送事件
public void post(Object o) {
mBus.onNext(o);
}
//接收事件
public void subscribe(Action1 action) {
mSubscriptions.add(mBus.subscribe(action));
}
//取消所有订阅事件
public void unSubscribe() {
mSubscriptions.unsubscribe();
}
}
当然Subject还有其他如BehaviorSubject
、ReplaySubject
、AsyncSubject
等类型,大家可以去看官方文档,写得十分详细,这里就不介绍了。
后记
前面相信最近这段日子里,提到RxJava,大家就会想到Google最近刚刚开源的Agera。Agera作为专门为Android打造的Reactive Programming框架,难免会被拿来与RxJava做对比。本文前面RxJava的主体流程分析已近尾声,现在我们再来看看Agera这东东又是怎么一回事。
首先先上结论:RxJava和Agera的关系
Agera最初是为了Google Play Movies而开发的一个内部框架,现在开源出来了,它虽然是在RxJava之后才出现,但是完全独立于RxJava,与它没有任何关系(只不过开源的时间十分微妙罢了- -)。 与RxJava比起来,Agera更加专注于Android的生命周期,而RxJava则更加纯粹地面向Java平台而非Android。
也许你可能会问:“那么RxAndroid呢,不是还有它吗?”事实上,RxAndroid早在1.0版本的时候就进行了很大的重构,很多模块被拆分到其他的项目中去了,同时也删除了部分代码,仅存下来的部分多是和Android线程相关的部分,比如
AndroidSchedulers
、MainThreadSubscription
等。鉴于这种情况,我们暂且不去关注RxAndroid,先把目光放在Agera上。
同样也是基于观察者模式,Agera和RxJava的角色分类大致相似,在Agera中,主要角色有两个:Observable
(被观察者)、Updatable
(观察者)。
public interface Observable {
/**
* Adds {@code updatable} to the {@code Observable}.
*
* @throws IllegalStateException if the {@link Updatable} was already added or if it was called
* from a non-Looper thread
*/
void addUpdatable(@NonNull Updatable updatable);
/**
* Removes {@code updatable} from the {@code Observable}.
*
* @throws IllegalStateException if the {@link Updatable} was not added
*/
void removeUpdatable(@NonNull Updatable updatable);
}
public interface Updatable {
/**
* Called when an event has occurred.
*/
void update();
}
是的,相较于RxJava中的Observable
,Agera中的Observable
只是一个简单的接口,也没有泛型的存在,Updatable
亦是如此,这样我们要如何做到消息的传递呢?这就需要另外一个接口了:
public interface Supplier<T> {
/**
* Returns an instance of the appropriate type. The returned object may or may not be a new
* instance, depending on the implementation.
*/
@NonNull
T get();
}
终于看到了泛型T,我们的消息的传递能力就是依赖于此接口了。所以我们将这个接口和基础的Observable
结合一下:
public interface Repository<T> extends Observable, Supplier<T> {}
这里的Repository<T>
在一定程度上就是我们想要的RxJava中的Observable<T>
啦。
类似地,Repository<T>
也有两种类型的实现:
- Direct - 所包含的数据总是可用的或者是可被同步计算出来的;一个Direct的
Repository
总是处于活跃(active)状态下 - Deferred - 所包含的数据是异步计算或拉去所得;一个Deffered的
Repository
直到有Updatable
被添加进来之前都会是非活跃(inactive)状态下
是不是感到似曾相识呢?没错,Repository
也是有冷热区分的,不过我们现在暂且不去关注这一点。回到上面接着看,既然现在发数据的角色有了,那么我们要如何接收数据呢?答案就是Receiver
:
/**
* A receiver of objects.
*/
public interface Receiver<T> {
/**
* Accepts the given {@code value}.
*/
void accept(@NonNull T value);
}
相信看到这里,大家应该也隐约感觉到了:在Agera的世界里,数据的传输与事件的传递是相互隔离开的,这是目前Agera与Rx系列的最大本质区别。Agera所使用的是一种push event, pull data的模型,这意味着event并不会携带任何data,Updatable
在需要更新时,它自己会承担起从数据源拉取数据的任务。这样,提供数据的责任就从Observable
中拆分了出来交给了Repository
,让其自身能够专注于发送一些简单的事件如按钮点击、一次下拉刷新的触发等等。
那么,这样的实现有什么好处呢?
当这两种处理分发逻辑分离开时,Updatable
就不必观察到来自Repository
的完整数据变化的历史,毕竟在大多数场景下,尤其是更新UI的场景下,最新的数据往往才是有用的数据。
但是我就是需要看到变化的历史数据,怎么办?
不用担心,这里我们再请出一个角色Reservoir
:
public interface Reservoir<T> extends Receiver<T>, Repository<Result<T>> {}
顾名思义,Reservoir
就是我们用来存储变化中的数据的地方,它继承了Receiver
、Repository
,也就相当于同时具有了接收数据,发送数据的能力。通过查看其具体实现我们可以知道它的本质操作都是使用内部的Queue
实现的:通过accept()
接收到数据后入列,通过get()
拿到数据后出列。若一个Updatable
观察了此Reservoir
,其队列中发生调度变化后即将出列的下一个数据如果是可用的(非空),就会通知该Updatable
,进一步拉取这个数据发送给Receiver
。
(在Agera的Github的Sample中有一个有关Reservoir
使用场景的例子:NotesStore.java,建议读完全文后再来看此例)
现在,我们已经大概了解了这几个角色的功能属性了,接下来我们来看一段官方示例代码:
public class AgeraActivity extends Activity
implements Receiver<Bitmap>, Updatable {
private static final ExecutorService NETWORK_EXECUTOR =
newSingleThreadExecutor();
private static final ExecutorService DECODE_EXECUTOR =
newSingleThreadExecutor();
private static final String BACKGROUND_BASE_URL =
"http://www.gravatar.com/avatar/4df6f4fe5976df17deeea19443d4429d?s=";
private Repository<Result<Bitmap>> background;
private ImageView backgroundView;
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// Set the content view
setContentView(R.layout.activity_main);
// Find the background view
backgroundView = (ImageView) findViewById(R.id.background);
// Create a repository containing the result of a bitmap request. Initially
// absent, but configured to fetch the bitmap over the network based on
// display size.
background = Repositories.repositoryWithInitialValue(Result.<Bitmap>absent())
.observe() // Optionally refresh the bitmap on events. In this case never
.onUpdatesPerLoop() // Refresh per Looper thread loop. In this case never
.getFrom(new Supplier<HttpRequest>() {
@NonNull
@Override
public HttpRequest get() {
DisplayMetrics displayMetrics = getResources().getDisplayMetrics();
int size = Math.max(displayMetrics.heightPixels,
displayMetrics.widthPixels);
return httpGetRequest(BACKGROUND_BASE_URL + size)
.compile();
}
}) // Supply an HttpRequest based on the display size
.goTo(NETWORK_EXECUTOR) // Change execution to the network executor
.attemptTransform(httpFunction())
.orSkip() // Make the actual http request, skip on failure
.goTo(DECODE_EXECUTOR) // Change execution to the decode executor
.thenTransform(new Function<HttpResponse, Result<Bitmap>>() {
@NonNull
@Override
public Result<Bitmap> apply(@NonNull HttpResponse response) {
byte[] body = response.getBody();
return absentIfNull(decodeByteArray(body, 0, body.length));
}
}) // Decode the response to the result of a bitmap, absent on failure
.onDeactivation(SEND_INTERRUPT) // Interrupt thread on deactivation
.compile(); // Create the repository
}
@Override
protected void onResume() {
super.onResume();
// Start listening to the repository, triggering the flow
background.addUpdatable(this);
}
@Override
protected void onPause() {
super.onPause();
// Stop listening to the repository, deactivating it
background.removeUpdatable(this);
}
@Override
public void update() {
// Called as the repository is updated
// If containing a valid bitmap, send to accept below
background.get().ifSucceededSendTo(this);
}
@Override
public void accept(@NonNull Bitmap background) {
// Set the background bitmap to the background view
backgroundView.setImageBitmap(background);
}
}
是不是有些云里雾里的感觉呢?多亏有注释,我们大概能够猜出到底上面都做了什么:使用需要的图片规格作为参数拼接到url中,拉取对应的图片并用ImageView显示出来。我们结合API来看看整个过程:
- Repositories.repositoryWithInitialValue(Result.<Bitmap>absent())
创建一个可运行(抑或说执行)的repository
。
初始化传入值是Result
,它用来概括一些诸如apply()
、merge()
的操作的结果的不可变对象,并且存在两种状态succeeded()
、failed()
。
返回REventSource
- observe()
用于添加新的Observable
作为更新我们的图片的Event source,本例中不需要。
返回RFrequency
- onUpdatesPerLoop()
在每一个Looper Thread loop中若有来自多个Event Source的update()
处理时,只需开启一个数据处理流。
返回RFlow
- getFrom(new Supplier(...))
忽略输入值,使用来自给定Supplier
的新获取的数据作为输出值。
返回RFlow
- goTo(executor)
切换到给定的executor
继续数据处理流。 - attemptTransform(function())
使用给定的function()
变换输入值,若变换失败,则终止数据流;若成功,则取新的变换后的值作为当前流指令的输出。
返回RTermination
- orSkip()
若前面的操作检查为失败,就跳过剩下的数据处理流,并且不会通知所有已添加的Updatable
。 - thenTransform(function())
与attemptTransform(function())
相似,区别在于当必要时会发出通知。
返回RConfig
- onDeactivation(SEND_INTERRUPT)
用于明确repository
不再active
时的行为。
返回RConfig
- compile()
执行这个repository
。
返回Repository
整体流程乍看起来并没有什么特别的地方,但是真正的玄机其实藏在执行每一步的返回值里:
初始的REventSource<T, T>
代表着事件源的开端,它从传入值接收了T initialValue
,这里的<T, T>
中,第一个T
是当前repository
的数据的类型,第二个T
则是数据处理流开端的时候的数据的类型。
之后,当observe()
调用后,我们传入事件源给REventSource
,相当于设定好了需要的事件源和对应的开端,这里返回的是RFrequency<T, T>
,它继承自REventSource
,为其添加了事件源的发送频率的属性。
之后,我们来到了onUpdatesPerLoop()
,这里明确了所开启的数据流的个数(也就是前面所讲的频率)后,返回了RFlow
,这里也就意味着我们的数据流正式生成了。同时,这里也是流式调用的起点。
拿到我们的RFlow
之后,我们就可以为其提供数据源了,也就是前面说的Supplier
,于是调用getFrom()
,这样我们的数据流也就真正意义拥有了数据“干货”。
有了数据之后我们就可以按具体需要进行数据转换了,这里我们可以直接使用transform()
,返回RFlow
,以便进一步进行流式调用;也可以调用attemptTransform()
来对可能出现的异常进行处理,比如orSkip()
、orEnd()
之后继续进行流式调用。
经过一系列的流式调用之后,我们终于对数据处理完成啦,现在我们可以选择先对成型的数据在做一次最后的包装thenTransform()
,或是与另一个Supplier
合并thenMergeIn()
等。这些处理之后,我们的返回值也就转为了RConfig
,进入了最终配置和repository
声明结束的状态。
在最终的这个配置过程中,我们调用了onDeactivation()
,为这个repository
明确了最终进入非活跃状态时的行为,如果不需要其他多余的配置的话,我们就可以进入最终的compile()
方法了。当我们调用compile()
时,就会按照前面所走过的所有流程与配置去执行并生成这个repository
。到此,我们的repository
才真正被创建了出来。
以上就是repository
从无到有的全过程。当repository
诞生后,我们也就可以传输需要的数据啦。再回到上面的示例代码:
@Override
protected void onResume() {
super.onResume();
// Start listening to the repository, triggering the flow
background.addUpdatable(this);
}
@Override
protected void onPause() {
super.onPause();
// Stop listening to the repository, deactivating it
background.removeUpdatable(this);
}
我们在onResume()
、onPause()
这两个生命周期下分别添加、移除了Updatable
。相较于RxJava中通过Subscription
去取消订阅的做法,Agera的这种写法显然更为清晰也更为整洁。
我们的Activity
实现了Updatable
和Receiver
接口,直接看其实现方法:
@Override
public void update() {
// Called as the repository is updated
// If containing a valid bitmap, send to accept below
background.get().ifSucceededSendTo(this);
}
@Override
public void accept(@NonNull Bitmap background) {
// Set the background bitmap to the background view
backgroundView.setImageBitmap(background);
}
可以看到这里repository
将数据发送给了receiver
,也就是自己,在对应的accept()
方法中接收到我们想要的bitmap
后,这张图片也就显示出来了,示例代码中的完整流程也就结束了。
总结一下上述过程:
- 首先
Repositories.repositoryWithInitialValue()
生成原点REventSource
。 - 配置完
Observable
之后进入RFrequency
状态,接着配置数据流的流数。 - 前面配置完成后,数据流
RFlow
生成,之后通过getFrom()
、mergeIn()
、transform()
等方法可进一步进行流式调用;也可以使用attemptXXX()
方法代替原方法,后面接着调用orSkip()
、orEnd()
进行error handling处理。当使用attemptXXX()
方法时,数据流状态会变为RTermination
,它代表此时的状态已具有终结数据流的能力,是否终结数据流要根据failed check触发,结合后面跟着调用的orSkip()
、orEnd()
,我们的数据流会从RTermination
再次切换为RFlow
,以便进行后面的流式调用。
- 经过前面一系列的流式处理,我们需要结束数据流时,可以选择调用
thenXXX()
方法,对数据流进行最终的处理,处理之后,数据流状态会变为RConfig
;也可以为此行为添加error handling处理,选择thenAttemptXXX()
方法,后面同样接上orSkip()
、orEnd()
即可,最终数据流也会转为Rconfig
状态。 - 此时,我们可以在结束前按需要选择对数据流进行最后的配置,例如:调用
onDeactivation()
配置从“订阅”到“取消订阅”的过程是否需要继续执行数据流等等。 - 一切都部署完毕后,我们
compile()
这个RConfig
,得到最终的成型的Repository
,它具有添加Updatable
、发送数据通知Receiver
的能力。 - 我们根据需要添加
Updatable
,repository
在数据流处理完成后会通过update()
发送event通知Updatable
。 -
Updatable
收到通知后则会拉取repository
的成果数据,并将数据通过accept()
发送给Receiver
。完成 Push event, pull data 的流程。
以上就是一次Agera的流式调用的内部基本流程。可以看到,除了 Push event, pull data 这一特点、goLazy的加载模式(本文未介绍)等,依托于较为精简的方法,Agera的流式调用过程同样也能够做到过程清晰,并且上手难度相较于RxJava也要简单一些,开源作者是Google的团队也让一些G粉对其好感度提升不少。不过Agera目前版本则是 agera-1.0.0-rc2,未来的版本还有很多不确定因素,相比之下Rx系列发展了这么久,框架已经相对成熟。究竟用Agera还是RxJava,大家按自己的喜好选择吧。
新人处女作,文章中难免会有错误遗漏以及表述不清晰的地方,希望大家多多批评指正,谢谢!
参考&拓展:
RxJava Wiki
Agera Wiki
给 Android 开发者的 RxJava 详解
Google Agera vs. ReactiveX
When Iron Man becomes reactive
Top 7 Tips for RxJava on Android
How to Keep your RxJava Subscribers from Leaking
RxJava – the production line