原文地址:zjutkz's blog
Google在上周开源了一个响应式框架——agera,相信它会慢慢地被广大程序员所熟知。我个人对这样的技术是很感兴趣的,在这之前也研究过RxJava,所以在得知Google开源了这样的框架之后第一时间进行了学习,这里算是把学习的心得和大家分享。当然由于本人水平有限,这篇文章可能起的更多的作用是抛砖引玉,希望有更多的大神能加入到学习agera的大部队中,争取早日出现几篇让人信服的文章!
通过这篇文章你可能会学习到:
- agera是什么,也就是它的基本概念和大体框架
- agera的基础用法
- agera的进阶用法
- agera的源码分析
- 如何封装agera
- agera和RxJava的区别
好了,让我们正式开启agera的学习之旅吧。
agera是什么
回答agera是什么之前,我们要先了解什么是响应式编程和函数式编程,这里我不展开讲了,大家可以自行去Google或者wiki。在agera出现之前,Java上已经有了一个很著名的同类型框架,叫做RxJava,其衍生出的更适合Android的版本RxAndroid和各种层出不穷的“儿子”类似RxBus,RxBinding等等都让人眼前一亮,那Google为什么还要去写一个agera呢?这个我也不好回答,毕竟我不是写这个框架的人啊,不过有一点可以确定的是,作为Google的“亲儿子”,它在Android中拥有的潜力和发挥的威力必定是很大的,个人觉得在马上就要举行的I/O大会上,这个框架会被拿出来讲解。
好了,下面让我们具体说下agera吧,下面一段话摘自agera的GitHub主页。
agera is a set of classes and interfaces to help wirte functional,asynchronous and reactive applications for Android.Requires Android SDK version 9 or higher.
简单的翻译下,就是说agera是一个能帮助Android开发者更好的开发函数式,异步和响应式程序的框架,要求Android的SDK版本在9以上。
在了解agera是什么之后,我们还需要明白一点的就是,它和RxJava一样,是基于观察者模式开发的,所以其中会有一些概念,我在后文中会一一进行阐述。
agera的基础用法
讲完了agera是什么以后,大家有没有跃跃欲试了呢?下面就让我带大家来了解一下agera最基础的用法吧。
首先,我们要明确,既然agera是基于观察者模式的,那它其中的观察者,被观察者等是用什么来表现的呢?
在agera中,有两个基本的概念:Observable和Updatable。
Observable & Updatable
public interface Observable {
/**
* Adds {@code updatable} to the {@code Observable}.
*
* @throws IllegalStateException if the {@link Updatable} was already added or if it was called
* from a non-Looper thread
*/
void addUpdatable(@NonNull Updatable updatable);
/**
* Removes {@code updatable} from the {@code Observable}.
*
* @throws IllegalStateException if the {@link Updatable} was not added
*/
void removeUpdatable(@NonNull Updatable updatable);
}
/**
* Called when when an event has occurred. Can be added to {@link Observable}s to be notified
* of {@link Observable} events.
*/
public interface Updatable {
/**
* Called when an event has occurred.
*/
void update();
}
Updatable指代的是观察者模式中的观察者,而Observable所指代的就是观察者模式中的被观察者。整个agera就是建立在[使用Updatable去观察Observable,Observable去通知Updatable更新]的基础上进行开发的。具体到代码就是使用Observable的addUpdatable()方法去将Updatable注册到Observable中,并且在合适的实际调用Updatable的update()方法去通知Updatable更新。下面让我们看一个具体的例子。
首先界面很简单,就一个Button和一个TextView,我们的目标是点击Button之后,改变TextView的文字显示。
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:orientation="vertical"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:fitsSystemWindows="true"
tools:context="zjutkz.com.guide.MainActivity">
<Button
android:text="trigger"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:onClick="trigger"/>
<TextView
android:id="@+id/show"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:text="wait for trigger..."
android:textSize="20sp"
android:gravity="center"/>
</LinearLayout>
public class MainActivity extends AppCompatActivity implements Updatable{
private TextView show;
private Observable observable = new Observable() {
@Override
public void addUpdatable(@NonNull Updatable updatable) {
updatable.update();
}
@Override
public void removeUpdatable(@NonNull Updatable updatable) {
}
};
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
show = (TextView)findViewById(R.id.show);
}
public void trigger(View view){
observable.addUpdatable(this);
}
@Override
public void update() {
show.setText("update!!");
}
}
看我们的activity的代码,首先我们要做的就是让我们的activity实现Updatable这个接口,然后在update()方法中将TextView的文字进行改变。接着,创造出一个Observable,当我们点击Button的时候,使用Observable的addUpdatable()方法,而我们前面定义的那个Observable在其addUpdatable()方法中就调用了对应Updatable实例的update(),这样,我们就完成了一个最简单的事件订阅。
但是上面的代码有一个很大的问题,不知道大家看出来没有,那就是Observable和Updatable之间的通信,完全没有数据的存在,也就是说当你的Observable想要传递一些数据给Updatable的时候,通过这样的方式是没办法实现的,而且不管你怎么搞都不行,因为对应的方法参数中就没有和数据相关的逻辑。
看到这你可能会说,”这不坑爹吗!连数据都传递不了,还谈什么观察者模式,谈什么响应式编程!“不要着急,这是Google故意而为之的,他们的想法就是要让数据从Observable和Updatable中剥离,从而达到他们所期望的“Push event,pull data model”。这个我在后面和RxJava的比较中会讲,RxJava是”Push data model”。
Repository
前文中最后一段虽然讲明白了Google为什么要这样做,但是还是没有说解决数据传递的方案,这个时候如果你兴冲冲地去GitHub上给他们提issue,他们会这样和你说:“你啊,不要老是想着搞个大新闻,你问我滋不滋辞数据传递,我当然说是滋辞的啦。“
那到底怎么滋辞,啊不是,支持数据传递呢?Google已经给我们提供了一个接口,叫做Repository。
public interface Repository<T> extends Observable, Supplier<T> {}
可以看到,它继承自Observable,说明是一个被观察者,那这个Supplier又是什么呢?
public interface Supplier<T> {
/**
* Returns an instance of the appropriate type. The returned object may or may not be a new
* instance, depending on the implementation.
*/
@NonNull
T get();
}
看这个代码,配上接口的名字大家就可以猜出来,这是一个提供数据的东西。
综上所述,Repository的作用就是——既是一个被观察者,同时也提供数据给观察者。
还是让我们用代码来说话吧。
界面还是一样,这里不贴了,一个Button一个TextView。
private Supplier<String> supplier = new Supplier() {
@NonNull
@Override
public Object get() {
return "update!!";
}
};
private Repository<String> repository = Repositories.repositoryWithInitialValue("a")
.observe()
.onUpdatesPerLoop()
.thenGetFrom(supplier)
.compile();
public void trigger(View view){
repository.addUpdatable(this);
}
@Override
public void update() {
show.setText(repository.get());
}
上面的那两个初始化代码大家可以先不用懂,具体看下面的,点击Button(进入trigger(View view)方法)之后,我们和刚才一样,使用了addUpdatable将我们继承自Updatable的activity注册到repository中,然后repository发现有东西注册到了自己这儿,经过一系列的方法执行,就会调用Updatable的update()方法,然后我们通过repository.get()去拿到对应的数据就OK了。
这里给大家捋一捋agera中几个基础但是很重要的概念:
(1) Observable:agera中的被观察者,用于在合适的时机去通知观察者进行更新。
(2) Updatable:agera中的观察者,用于观察Observable。
(3) Supplier:agera中提供数据的接口,通过范型指定数据类型,通过get()方法获取数据。
(4) Repository:agera中集成了Observable和Supplier功能的一个[提供数据的被观察者]。
说到这里,大家可能会有一个问题,前面说了agera是”Push event,pull data model”,也就是数据和事件分离的,那这个Repository的出现不是自己打自己的脸吗?
其实不是的,大家可以看GitHub上wiki里的这一句:
This does not change the push event, pull data model: the repository notifies the registered updatables to update themselves when the data changes; and the updatables pull data from the repository when they individually react to this event.
通过代码来解释就是,Repository经过一系列的方法执行之后,调用了Updatable的update()方法,这个是事件传递,也就是push event,而Updatable在接收到唤醒事件之后,通过调用Repository的get()方法,自己去获取数据而不是从updata()方法中拿到传递过来的数据,类似update(T value),这是pull data。这样的好处是可以lazy load,这个我们在后文中会讲。
agera的进阶用法
讲完了agera基础的概念,让我们来看看它的正确使用姿势。
前面我们有讲到Repository,大家通过代码肯定看的一头雾水,这里让我们来聊聊它吧。
Repository
首先看一个例子。
private Supplier<String> strSupplier = new Supplier<String>() {
@NonNull
@Override
public String get() {
return "value";
}
};
private Function<String,String> transform = new Function<String, String>() {
@NonNull
@Override
public String apply(@NonNull String input) {
return "new " + input;
}
};
private Supplier<Integer> integerSupplier = new Supplier<Integer>() {
@NonNull
@Override
public Integer get() {
return 100;
}
};
private Merger<String,Integer,String> merger = new Merger<String, Integer, String>() {
@NonNull
@Override
public String merge(@NonNull String s, @NonNull Integer integer) {
return s + "plus " + String.valueOf(integer);
}
};
private Updatable updatable = new Updatable() {
@Override
public void update() {
Log.d("TAG", repository.get());
}
};
repository = Repositories.repositoryWithInitialValue("default")
.observe()
.onUpdatesPerLoop()
.getFrom(strSupplier)
.transform(transform)
.thenMergeIn(integerSupplier,merger)
.compile();
repository.addUpdatable(updatable);
这段代码大家能看懂的部分我相信只有repository.addUpdatable(updatable);这一句。。
从大体上说,就是将一个updatable通过repository.addUpdatable(updatable);这个方法注册到对应的repository中,然后repository经过一系列的方法调用去通知updatable更新,大家可以在logcat中看到输出的结果是
那最主要的这段代码是什么意思呢?
private Supplier<String> strSupplier = new Supplier<String>() {
@NonNull
@Override
public String get() {
return "value";
}
};
private Function<String,String> transform = new Function<String, String>() {
@NonNull
@Override
public String apply(@NonNull String input) {
return "new " + input;
}
};
private Supplier<Integer> integerSupplier = new Supplier<Integer>() {
@NonNull
@Override
public Integer get() {
return 100;
}
};
private Merger<String,Integer,String> merger = new Merger<String, Integer, String>() {
@NonNull
@Override
public String merge(@NonNull String s, @NonNull Integer integer) {
return s + " plus " + String.valueOf(integer);
}
};
private Updatable updatable = new Updatable() {
@Override
public void update() {
Log.d("TAG", repository.get());
}
};
repository = Repositories.repositoryWithInitialValue("default")
.observe()
.onUpdatesPerLoop()
.getFrom(strSupplier)
.transform(transform)
.thenMergeIn(integerSupplier,merger)
.compile();
这里就不得不提一下RxJava了,大家知道在RxJava中存在很多帮助大家进行数据转换的操作符,像map,flatMap,take等等,而这里的getFrom,transform和thenMergeIn也是一样,是Google封装好了帮助大家进行数据操作的。而且从名字就可以看出来:
repositoryWithInitialValue意思是创建一个Repository并且赋一个初始值。
getFrom的意思是从一个Supplier那里获取数据。
transfrom就是进行转换,这里通过一个Function将repository从strSupplier那里得到的数据前面加上一个”new”字符串,这个操作符很像RxJava中的map。
而最后那个thenMergeIn则是将intergerSupplier中提供的数据和我们现在repository中的数据进行一个整合。
最后通过complie得到Repository实例。
是不是和RxJava很相似呢?就是一种可以看作流式的操作。
看到这里大家可能又要问了,那前面的observe()和onUpdatesPerLoop()是什么呢?为什么最后那个叫thenMergeIn()不叫mergeIn()呢?
这里要给大家讲一个概念,agera通过这样去创建一个Repository,是有一个state,也就是状态的概念的。
public interface RepositoryCompilerStates {
interface REventSource<TVal, TStart> {
@NonNull
RFrequency<TVal, TStart> observe(@NonNull Observable... observables);
}
interface RFrequency<TVal, TStart> extends REventSource<TVal, TStart> {
@NonNull
RFlow<TVal, TStart, ?> onUpdatesPer(int millis);
@NonNull
RFlow<TVal, TStart, ?> onUpdatesPerLoop();
}
interface RFlow<TVal, TPre, TSelf extends RFlow<TVal, TPre, TSelf>>
extends RSyncFlow<TVal, TPre, TSelf> {
@NonNull
@Override
<TCur> RFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier);
@NonNull
@Override
<TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptGetFrom(
@NonNull Supplier<Result<TCur>> attemptSupplier);
@NonNull
@Override
<TAdd, TCur> RFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier,
@NonNull Merger<? super TPre, ? super TAdd, TCur> merger);
@NonNull
@Override
<TAdd, TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptMergeIn(
@NonNull Supplier<TAdd> supplier,
@NonNull Merger<? super TPre, ? super TAdd, Result<TCur>> attemptMerger);
@NonNull
@Override
<TCur> RFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);
@NonNull
@Override
<TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptTransform(
@NonNull Function<? super TPre, Result<TCur>> attemptFunction);
@NonNull
TSelf goTo(@NonNull Executor executor);
@NonNull
RSyncFlow<TVal, TPre, ?> goLazy();
}
interface RSyncFlow<TVal, TPre, TSelf extends RSyncFlow<TVal, TPre, TSelf>> {
@NonNull
<TCur> RSyncFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier);
@NonNull
<TCur>
RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptGetFrom(
@NonNull Supplier<Result<TCur>> attemptSupplier);
@NonNull
<TAdd, TCur> RSyncFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier,
@NonNull Merger<? super TPre, ? super TAdd, TCur> merger);
@NonNull
<TAdd, TCur>
RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptMergeIn(
@NonNull Supplier<TAdd> supplier,
@NonNull Merger<? super TPre, ? super TAdd, Result<TCur>> attemptMerger);
@NonNull
<TCur> RSyncFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);
@NonNull
<TCur> RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptTransform(
@NonNull Function<? super TPre, Result<TCur>> attemptFunction);
@NonNull
RTermination<TVal, TPre, TSelf> check(@NonNull Predicate<? super TPre> predicate);
@NonNull
<TCase> RTermination<TVal, TCase, TSelf> check(
@NonNull Function<? super TPre, TCase> caseFunction,
@NonNull Predicate<? super TCase> casePredicate);
@NonNull
TSelf sendTo(@NonNull Receiver<? super TPre> receiver);
@NonNull
<TAdd> TSelf bindWith(@NonNull Supplier<TAdd> secondValueSupplier,
@NonNull Binder<? super TPre, ? super TAdd> binder);
@NonNull
RConfig<TVal> thenSkip();
@NonNull
RConfig<TVal> thenGetFrom(@NonNull Supplier<? extends TVal> supplier);
@NonNull
RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptGetFrom(
@NonNull Supplier<? extends Result<? extends TVal>> attemptSupplier);
@NonNull
<TAdd> RConfig<TVal> thenMergeIn(@NonNull Supplier<TAdd> supplier,
@NonNull Merger<? super TPre, ? super TAdd, ? extends TVal> merger);
@NonNull
<TAdd> RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptMergeIn(
@NonNull Supplier<TAdd> supplier,
@NonNull Merger<? super TPre, ? super TAdd,
? extends Result<? extends TVal>> attemptMerger);
@NonNull
RConfig<TVal> thenTransform(
@NonNull Function<? super TPre, ? extends TVal> function);
@NonNull
RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptTransform(
@NonNull Function<? super TPre, ? extends Result<? extends TVal>> attemptFunction);
}
interface RTermination<TVal, TTerm, TRet> {
@NonNull
TRet orSkip();
@NonNull
TRet orEnd(@NonNull Function<? super TTerm, ? extends TVal> valueFunction);
}
interface RConfig<TVal> {
@NonNull
RConfig<TVal> notifyIf(@NonNull Merger<? super TVal, ? super TVal, Boolean> checker);
@NonNull
RConfig<TVal> onDeactivation(@RepositoryConfig int deactivationConfig);
@NonNull
RConfig<TVal> onConcurrentUpdate(@RepositoryConfig int concurrentUpdateConfig);
@NonNull
Repository<TVal> compile();
@NonNull
<TVal2> RFrequency<TVal2, TVal> compileIntoRepositoryWithInitialValue(@NonNull TVal2 value);
}
我们可以看到这个接口,里面的方法很多,不过只要仔细看就会发现它里面定义的方法正是我们刚才repository中为了操作数据而使用的,不同的是,它们的返回并不是Repository,而是一些其他的东西。而这个返回值,就表示了Repository正在处理的数据的状态。
这里给大家总结一下几种代表性的状态,其他没提到的都是继承自其中的一个,表示的状态是差不多的。
REventSource:这个是最初的状态,Repositories.repositoryWithInitialValue()这个方法的返回值就是REventSource,表明事件源的开始。
RFrequency:表示事件源发送的频率。
RFlow:表示数据处理流,这里定义的方法都是和数据处理相关的,比如getFrom(),mergeIn()等等。可以看到,getFrom()这样的方法返回值都是RFlow,说明我们可以流式的调用,比如在getFrom()后面调用mergeIn(),但是其余的thenXXX()返回的都是RTermination,说明如果你调用了这样的方法,那么数据处理流也就结束了。
RTermination:表示最后终止数据处理流。
RConfig:其余各种配置,比如notifyIf()这样的是否要唤醒Updatable等等。
通过这样定义状态,我们可以很清晰的知道现在处理什么状态,也能更好的理解整个函数的调用过程。
初始化(Repositories.repositoryWithInitialValue(…))->
表示事件开始(observe())->
规定事件发送的频率(onUpdatesPerLoop()或者onUpdatesPer(…))->
处理数据流(各种处理函数)->
结束数据流->
配置一些属性(notifyIf(…)等等)->
complie()。
整个过程是不可逆的,也就是说你不能在调用了thenMergeIn()之后去调用类似getFrom()这样的函数,你调用了thenXXX()就表示你要结束这个数据处理流了。
说到这里我们就说完了整个Repository数据处理流的过程,但是我们会发现,上面看到的代码都只是一个抽象的接口,那么具体的实现在哪里呢?(这里为了让大家更好的理解agera,要看一点源码了,虽然标题是进阶使用。。)
让我们回头最开始,看一下Repositories.repositoryWithInitialValue()这个函数。
@NonNull
public static <T> REventSource<T, T> repositoryWithInitialValue(@NonNull final T initialValue) {
return RepositoryCompiler.repositoryWithInitialValue(initialValue);
}
调用了RepositoryCompiler的同名函数。让我们看看RepositoryCompiler是个啥东西。
final class RepositoryCompiler implements
RepositoryCompilerStates.RFrequency,
RepositoryCompilerStates.RFlow,
RepositoryCompilerStates.RTermination,
RepositoryCompilerStates.RConfig {
.......
}
我们惊奇的发现,它实现了上面提到的那些接口,也就是说RepositoryCompiler就是agera用来管理Repository数据处理流状态的类。让我们看看最后compiler()方法到底生成了怎样一个Repository。
@NonNull
@Override
public Repository compile() {
Repository repository = compileRepositoryAndReset();
recycle(this);
return repository;
}
@NonNull
private Repository compileRepositoryAndReset() {
checkExpect(CONFIG);
Repository repository = CompiledRepository.compiledRepository(initialValue, eventSources, frequency, directives,
notifyChecker, concurrentUpdateConfig, deactivationConfig);
expect = NOTHING;
initialValue = null;
eventSources.clear();
frequency = 0;
directives.clear();
goLazyUsed = false;
notifyChecker = objectsUnequal();
deactivationConfig = RepositoryConfig.CONTINUE_FLOW;
concurrentUpdateConfig = RepositoryConfig.CONTINUE_FLOW;
return repository;
}
可以看到调用了CompiledRepository的compiledRepository方法。
@NonNull
static Repository compiledRepository(
@NonNull final Object initialValue,
@NonNull final List<Observable> eventSources,
final int frequency,
@NonNull final List<Object> directives,
@NonNull final Merger<Object, Object, Boolean> notifyChecker,
@RepositoryConfig final int concurrentUpdateConfig,
@RepositoryConfig final int deactivationConfig) {
Observable eventSource = perMillisecondObservable(frequency,
compositeObservable(eventSources.toArray(new Observable[eventSources.size()])));
Object[] directiveArray = directives.toArray();
return new CompiledRepository(initialValue, eventSource,
directiveArray, notifyChecker, deactivationConfig, concurrentUpdateConfig);
}
分析到这里我们就清楚了,我们使用的Repository,原来都是compiledRepository!
其实这些类的名字已经帮我们很好的理解了整个流程。
首先第一步是调用Repositories.repositoryWithInitialValue()。[Repositories]这个名字就是一个utils类,说明是帮助我们生成Respository的。
后面的各种状态处理都在RepositoryCompiler类中,意思是Repository的编译者,专门为了生成Repository而创造的。
最后生成的是CompiledRepository,表示编译过后的Repository,拥有完善的功能。
好了,到这里关于Repository的东西就讲完了,大家可以尝试着自己去写一下,这些个数据处理的方法能让我们像RxJava一样轻松的处理数据。当然,agera也提供了异步操作的封装,like this:
private Executor executor = Executors.newSingleThreadExecutor();
repository = Repositories.repositoryWithInitialValue("default")
.observe()
.onUpdatesPerLoop()
.goTo(executor)
.thenGetFrom(new Supplier<Object>() {
@NonNull
@Override
public Object get() {
//some biz work,may be block the main thread.
return null;
}
})
.compile();
使用goTo操作符就可以了。
Attempt & Result
在上面的例子中,我们使用了Repository去代替原始的Observable,配合上操作符已经能初步完成我们的各种需求了。但是这里有一个问题,万一在Supplier的get()方法中发生了错误呢?比如这样
private Supplier<Integer> strSupplier = new Supplier<Integer>() {
@NonNull
@Override
public Integer get() {
return 1/0;
}
};
当然这种代码在实际情况下是不会产生的,但是总会有错误发生啊,对于RxJava,它有很好的error handling机制,那agera有吗?答案是有的。就是通过操作符attemptXXX()和Result类来解决。
首先看一段代码
repository = Repositories.repositoryWithInitialValue(0)
.observe()
.onUpdatesPerLoop()
.thenGetFrom(strSupplier)
.compile();
repository.addUpdatable(this);
如果使用我们刚才的方式去做,strSupplier的get()方法中return 1/0,这样就爆炸了。。程序直接退出,你一天美好的心情就此终结。但是如果这样
private Supplier<Result<Integer>> safeStrSupplier = new Supplier<Result<Integer>>() {
@NonNull
@Override
public Result<Integer> get() {
try{
return Result.success(1/ 0);
}catch (ArithmeticException e){
return Result.failure(e);
}
}
};
safeRepository = Repositories.repositoryWithInitialValue(Result.<Integer>absent())
.observe()
.onUpdatesPerLoop()
.attemptGetFrom(safeStrSupplier).orEnd(new Function<Throwable, Result<Integer>>() {
@NonNull
@Override
public Result<Integer> apply(@NonNull Throwable input) {
return Result.success(2222);
}
})
.thenTransform(new Function<Integer, Result<Integer>>() {
@NonNull
@Override
public Result<Integer> apply(@NonNull Integer input) {
return Result.absentIfNull(input);
}
})
.compile();
safeRepository.addUpdatable(this);
可以看到,我们尝试用attempGetFrom()去代替getFrom(),后面跟上了orEnd(),这里你也可以使用orSkip()两个函数的差别是如果接受到了异常,前者还是会通知Updatable去更新,而后者直接跳过。Supplier也有差别,我们在safeSupplier中使用Result类去包裹住了我们操作的数据,并且通过调用success()或者failure()去执行成功或者失败。
所以这里,如果你写了1/0这样的代码并且引发了异常,我们可以安全的捕获它并且做你想要做的操作。另外大家可以看thenTransform()中,我们return Result.absentIfNull(input);表示如果数据是空的,我们就返回缺省值。
我们在日常编码中,尽量要采用这样的方式去防止异常的发生。
Receiver
上面说了Result,这里我们可以使用Receiver去配合Result进行使用。
private Receiver<Throwable> errorReceiver = new Receiver<Throwable>() {
@Override
public void accept(@NonNull Throwable value) {
trigger.setText(value.toString());
}
};
private Receiver<Integer> successReceiver = new Receiver<Integer>() {
@Override
public void accept(@NonNull Integer value) {
trigger.setText(String.valueOf(value));
}
};
@Override
public void update() {
safeRepository.get()
.ifFailedSendTo(errorReceiver)
.ifSucceededSendTo(successReceiver);
}
看上面这段代码,和上一节的代码一样,我们safeRepository指定的范型是Result,所以在update()方法中get到的就是一个Result,它的ifFailedSendTo()和ifFailedSendTo()表示如果整个数据流成功发送给xx或者失败发送给xx,这里的xx必须要实现Receiver接口。
/**
* A receiver of objects.
*/
public interface Receiver<T> {
/**
* Accepts the given {@code value}.
*/
void accept(@NonNull T value);
}
然后我们可以在accept()方法中拿到对应的值进行操作。
Reservoir
这个东西呢,简单来说就是响应式编程中的queue,用来进行生产者/消费者操作的。
public interface Reservoir<T> extends Receiver<T>, Repository<Result<T>> {}
可以看到它继承自Receiver和Repository,所以它可以使用accept()去接受数据,也可以使用get()去返回数据。
我们在使用中通过调用下面的代码去获取一个Reservior。
private Reservoir<String> provider = Reservoirs.reservoir();
跟踪Reservoirs的源码看一下。
@NonNull
public static <T> Reservoir<T> reservoir(@NonNull final Queue<T> queue) {
return new SynchronizedReservoir<>(checkNotNull(queue));
}
private static final class SynchronizedReservoir<T> extends BaseObservable
implements Reservoir<T> {
@NonNull
private final Queue<T> queue;
private SynchronizedReservoir(@NonNull final Queue<T> queue) {
this.queue = checkNotNull(queue);
}
@Override
public void accept(@NonNull T value) {
boolean shouldDispatchUpdate;
synchronized (queue) {
boolean wasEmpty = queue.isEmpty();
boolean added = queue.offer(value);
shouldDispatchUpdate = wasEmpty && added;
}
if (shouldDispatchUpdate) {
dispatchUpdate();
}
}
@NonNull
@Override
public Result<T> get() {
T nullableValue;
boolean shouldDispatchUpdate;
synchronized (queue) {
nullableValue = queue.poll();
shouldDispatchUpdate = !queue.isEmpty();
}
if (shouldDispatchUpdate) {
dispatchUpdate();
}
return absentIfNull(nullableValue);
}
@Override
protected void observableActivated() {
synchronized (queue) {
if (queue.isEmpty()) {
return;
}
}
dispatchUpdate();
}
}
可以看到SynchronizedReservoir中有一个queue,accpet的时候去存放数据,get的时候去取出数据。
很惭愧,这里关于Reservio我还不是非常的明白,只知道如何用,不知道为什么这样用,所以这里就不给大家过多的介绍了,以免让产生大家错误的理解。有兴趣的同学可以去看这页wiki。
Function的使用
通过前面的学习我们知道了agera和RxJava一样存在很多使用的操作符,但是让我们想象一下,如果有一个非常复杂的操作,那我们是不是要写一堆的transform()这样的操作符呢?我相信这样做是可以的,但是再考虑一点,对于一个通用的操作,你这样去使用怎么达到复用的目的呢?难道5个页面都有想用的操作,你要每个页面都去写一遍吗?
Google显示不会让我们陷入这样的窘境,所以就有了[Functions]这个类。
看名字就知道,和之前的Repositories一样,它是一个工具类。它可以将多个Function有机地结合在一起。
private Supplier<String> supplier = new Supplier<String>() {
@NonNull
@Override
public String get() {
return "url";
}
};
private Function<String,List<Integer>> strToList = new Function<String, List<Integer>>() {
@NonNull
@Override
public List<Integer> apply(@NonNull String input) {
List<Integer> data = new ArrayList<>();
for(int i = 0;i < 10;i++){
data.add(i);
}
return data;
}
};
private Predicate<Integer> filter = new Predicate<Integer>() {
@Override
public boolean apply(@NonNull Integer value) {
return value > 5;
}
};
private Function<Integer,String> intToStr = new Function<Integer, String>() {
@NonNull
@Override
public String apply(@NonNull Integer input) {
return String.valueOf(input);
}
};
private Function<List<String>, Integer> getSize = new Function<List<String>, Integer>() {
@NonNull
@Override
public Integer apply(@NonNull List<String> input) {
return input.size();
}
};
Function<String,Integer> finalFunc = Functions.functionFrom(String.class)
.unpack(strToList)
.filter(filter)
.map(intToStr)
.thenApply(getSize);
private Repository<String> repository;
repository = Repositories.repositoryWithInitialValue("default")
.observe()
.onUpdatesPerLoop()
.getFrom(supplier)
.transform(finalFunc)
.thenTransform(new Function<Integer, String>() {
@NonNull
@Override
public String apply(@NonNull Integer input) {
return String.valueOf(input);
}
})
.compile();
repository.addUpdatable(this);
其中重点关注
Function<String,Integer> finalFunc = Functions.functionFrom(String.class)
.unpack(strToList)
.filter(filter)
.map(intToStr)
.thenApply(getSize);
Functions类提供的各种操作符类似unpack(),filter()等,将一个个操作符连了起来并且生成一个最终的操作符。我们就可以拿这个操作符放到我们的Repository的数据处理状态机中,并且你还可以把这样的finalFunc保存起来,哪里要用了直接拿出来用,达到复用的目的。
到这儿关于agera的进阶使用也说完了。怎么说呢,这里我也是带大家入个门,了解怎么使用agera才是正确的,后面还是要靠大家自己啊!
agera的源码分析
说完了agera的使用,让我们来分析下它的源码,知己知彼才能百战百胜。
我们这里只分析和Repository相关的源码,一来Repository在agera现有代码中占的比重最大,而来其他的代码还是比较简单的,大家可以自行read the fucking source code。
首先,前面我们已经分析了Repository是如何产生的,既通过RepositoryCompiler产生一个CompiledRepository。让我们看看RepositoryCompiler中具体做了什么,先看它的repositoryWithInitialValue()方法。
@NonNull
static <TVal> RepositoryCompilerStates.REventSource<TVal, TVal> repositoryWithInitialValue(
@NonNull final TVal initialValue) {
checkNotNull(Looper.myLooper());
RepositoryCompiler compiler = compilers.get();
if (compiler == null) {
compiler = new RepositoryCompiler();
} else {
// Remove compiler from the ThreadLocal to prevent reuse in the middle of a compilation.
// recycle(), called by compile(), will return the compiler here. ThreadLocal.set(null) keeps
// the entry (with a null value) whereas remove() removes the entry; because we expect the
// return of the compiler, don't use the heavier remove().
compilers.set(null);
}
return compiler.start(initialValue);
}
去ThreadLocal中拿到对应线程的compiler,然后调用它的start()方法。
@NonNull
private RepositoryCompiler start(@NonNull final Object initialValue) {
checkExpect(NOTHING);
expect = FIRST_EVENT_SOURCE;
this.initialValue = initialValue;
return this;
}
首先是对expect的判断,表示现在处在一个什么状态,start对应的FIRST_EVENT_SOURCE,这个通过之前的分析很好理解。
接着,让我们看observe代码。
@NonNull
@Override
public RepositoryCompiler observe(@NonNull final Observable... observables) {
checkExpect(FIRST_EVENT_SOURCE, FREQUENCY_OR_MORE_EVENT_SOURCE);
for (Observable observable : observables) {
eventSources.add(checkNotNull(observable));
}
expect = FREQUENCY_OR_MORE_EVENT_SOURCE;
return this;
}
我们前面observer()方法中都没有传任何的参数,所以这里先就当参数是空,一会儿再说有参数的情况。
然后是frequency状态的操作。
@NonNull
@Override
public RepositoryCompiler onUpdatesPer(int millis) {
checkExpect(FREQUENCY_OR_MORE_EVENT_SOURCE);
frequency = Math.max(0, millis);
expect = FLOW;
return this;
}
@NonNull
@Override
public RepositoryCompiler onUpdatesPerLoop() {
return onUpdatesPer(0);
}
可以看到如果调用的是onUpdatesPerLoop()方法,则表示每次都会去触发事件,所以millis为0。
接着,就是各种flow状态的事件,这里我们以getFrom()和thenTransform()为例。
@NonNull
@Override
public RepositoryCompiler getFrom(@NonNull final Supplier supplier) {
checkExpect(FLOW);
addGetFrom(supplier, directives);
return this;
}
调用了addGetFrom()方法。
static void addGetFrom(@NonNull final Supplier supplier,
@NonNull final List<Object> directives) {
directives.add(GET_FROM);
directives.add(supplier);
}
直接将supplier和对应的GET_FROM装进了一个list。注意这里的list是之前方法传过来的。这个GET_FROM只是一个标记位。
private static final int END = 0;
private static final int GET_FROM = 1;
private static final int MERGE_IN = 2;
private static final int TRANSFORM = 3;
private static final int CHECK = 4;
private static final int GO_TO = 5;
private static final int GO_LAZY = 6;
private static final int SEND_TO = 7;
private static final int BIND = 8;
private static final int FILTER_SUCCESS = 9;
可以看到每个操作都有自己对应的标记位。
接着是thenTransform()。
@NonNull
@Override
public RepositoryCompiler thenTransform(@NonNull final Function function) {
transform(function);
endFlow(false);
return this;
}
直接调用了transform()和endFlow()。
@NonNull
@Override
public RepositoryCompiler transform(@NonNull final Function function) {
checkExpect(FLOW);
addTransform(function, directives);
return this;
}
private void endFlow(boolean skip) {
addEnd(skip, directives);
expect = CONFIG;
}
static void addEnd(boolean skip, @NonNull final List<Object> directives) {
directives.add(END);
directives.add(skip);
}
transform()中做了和getFrom()一样的事情,把标记位和function加入到list中,这样一来,我们的list的size现在就是4了。endFlow也是一样。
经过getFrom()和thenTransform()两个操作,我们得到了一个size为6的list。
这里大家知道为什么thenXXX()要调用endFlow()吗?因为我们前面说了,调用thenXXX()就表示你要终止这个数据处理流,对应的状态会进入termination,所以当然要endFlow()啦。
最后通过compile去生成我们的CompiledRepository。
到这儿我们就分析完了整个Repository生成的过程。接着就是Repository.addUpdatable()。
首先我们看一下CompiledRepository是什么。
final class CompiledRepository extends BaseObservable
implements Repository, Updatable, Runnable {
.....
}
它继承自BaseObservable,实现了Repository,Updatable和Runnable接口。
/**
* A partial implementation of {@link Observable} that adheres to the threading contract between
* {@link Observable}s and {@link Updatable}s. Subclasses can use {@link #observableActivated()} and
* {@link #observableDeactivated()} to control the activation and deactivation of this observable,
* and to send out notifications to client updatables with {@link #dispatchUpdate()}.
*
* <p>For cases where subclassing {@link BaseObservable} is impossible, for example when the
* potential class already has a base class, consider using {@link Observables#updateDispatcher()}
* to help implement the {@link Observable} interface.
*/
public abstract class BaseObservable implements Observable {
@NonNull
private final Worker worker;
protected BaseObservable() {
checkState(Looper.myLooper() != null, "Can only be created on a Looper thread");
worker = new Worker(this);
}
@Override
public final void addUpdatable(@NonNull final Updatable updatable) {
checkState(Looper.myLooper() != null, "Can only be added on a Looper thread");
worker.addUpdatable(updatable);
}
@Override
public final void removeUpdatable(@NonNull final Updatable updatable) {
checkState(Looper.myLooper() != null, "Can only be removed on a Looper thread");
worker.removeUpdatable(updatable);
}
/**
* Notifies all registered {@link Updatable}s.
*/
protected final void dispatchUpdate() {
worker.dispatchUpdate();
}
/**
* Called from the worker looper thread when this {@link Observable} is activated by transitioning
* from having no client {@link Updatable}s to having at least one client {@link Updatable}.
*/
protected void observableActivated() {}
/**
* Called from the worker looper thread when this {@link Observable} is deactivated by
* transitioning from having at least one client {@link Updatable} to having no client
* {@link Updatable}s.
*/
protected void observableDeactivated() {}
public Updatable getUpdatable(){
return worker.getUpdatable();
}
/**
* Worker and synchronization lock behind a {@link BaseObservable}.
*/
static final class Worker {
@NonNull
private static final Object[] NO_UPDATABLES_OR_HANDLERS = new Object[0];
@NonNull
private final BaseObservable baseObservable;
@NonNull
private final WorkerHandler handler;
@NonNull
private Object[] updatablesAndHandlers;
private int size;
Worker(@NonNull final BaseObservable baseObservable) {
this.baseObservable = baseObservable;
this.handler = workerHandler();
this.updatablesAndHandlers = NO_UPDATABLES_OR_HANDLERS;
this.size = 0;
}
public Updatable getUpdatable(){
return (Updatable)updatablesAndHandlers[0];
}
synchronized void addUpdatable(@NonNull final Updatable updatable) {
add(updatable, workerHandler());
if (size == 1) {
handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();
}
}
synchronized void removeUpdatable(@NonNull final Updatable updatable) {
remove(updatable);
if (size == 0) {
handler.obtainMessage(MSG_LAST_REMOVED, this).sendToTarget();
}
}
void dispatchUpdate() {
handler.obtainMessage(MSG_UPDATE, this).sendToTarget();
}
private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) {
boolean added = false;
for (int index = 0; index < updatablesAndHandlers.length; index += 2) {
if (updatablesAndHandlers[index] == updatable) {
throw new IllegalStateException("Updatable already added, cannot add.");
}
if (updatablesAndHandlers[index] == null && !added) {
updatablesAndHandlers[index] = updatable;
updatablesAndHandlers[index + 1] = handler;
added = true;
}
}
if (!added) {
final int newIndex = updatablesAndHandlers.length;
updatablesAndHandlers = Arrays.copyOf(updatablesAndHandlers,
Math.max(newIndex * 2, newIndex + 2));
updatablesAndHandlers[newIndex] = updatable;
updatablesAndHandlers[newIndex + 1] = handler;
}
size++;
}
private void remove(@NonNull final Updatable updatable) {
for (int index = 0; index < updatablesAndHandlers.length; index += 2) {
if (updatablesAndHandlers[index] == updatable) {
((WorkerHandler) updatablesAndHandlers[index + 1]).removeMessages(
WorkerHandler.MSG_CALL_UPDATABLE, updatable);
updatablesAndHandlers[index] = null;
updatablesAndHandlers[index + 1] = null;
size--;
return;
}
}
throw new IllegalStateException("Updatable not added, cannot remove.");
}
synchronized void sendUpdate() {
for (int index = 0; index < updatablesAndHandlers.length; index = index + 2) {
final Updatable updatable = (Updatable) updatablesAndHandlers[index];
final WorkerHandler handler =
(WorkerHandler) updatablesAndHandlers[index + 1];
if (updatable != null) {
if (handler.getLooper() == Looper.myLooper()) {
updatable.update();
} else {
handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget();
}
}
}
}
void callFirstUpdatableAdded() {
baseObservable.observableActivated();
}
void callLastUpdatableRemoved() {
baseObservable.observableDeactivated();
}
}
}
这里我特意把Observable的注释也截了出来,配合注释我们可以清楚的了解到其实它就相当于一个基类,定义了一个Worker工作着,封装了一些通用的操作。我们Repository的addUpdatable()方法也是在这里调用的。
@Override
public final void addUpdatable(@NonNull final Updatable updatable) {
checkState(Looper.myLooper() != null, "Can only be added on a Looper thread");
worker.addUpdatable(updatable);
}
首先会去判断当前线程的Looper是否为空,这是因为agera中的Push event都是基础Android的handler机制的,对于handler机制不了解的同学可以去看我的这篇博客。
之后调用了worker的同名函数。
synchronized void addUpdatable(@NonNull final Updatable updatable) {
add(updatable, workerHandler());
if (size == 1) {
handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();
}
}
首先调用了add()方法。
private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) {
boolean added = false;
for (int index = 0; index < updatablesAndHandlers.length; index += 2) {
if (updatablesAndHandlers[index] == updatable) {
throw new IllegalStateException("Updatable already added, cannot add.");
}
if (updatablesAndHandlers[index] == null && !added) {
updatablesAndHandlers[index] = updatable;
updatablesAndHandlers[index + 1] = handler;
added = true;
}
}
if (!added) {
final int newIndex = updatablesAndHandlers.length;
updatablesAndHandlers = Arrays.copyOf(updatablesAndHandlers,
Math.max(newIndex * 2, newIndex + 2));
updatablesAndHandlers[newIndex] = updatable;
updatablesAndHandlers[newIndex + 1] = handler;
}
size++;
}
将对应的Updatable和handler存放在updatablesAndHandlers这个数组中。而handler则是通过workerHandler()方法创建的。
private static final ThreadLocal<WeakReference<WorkerHandler>> handlers = new ThreadLocal<>();
@NonNull
static WorkerHandler workerHandler() {
final WeakReference<WorkerHandler> handlerReference = handlers.get();
WorkerHandler handler = handlerReference != null ? handlerReference.get() : null;
if (handler == null) {
handler = new WorkerHandler();
handlers.set(new WeakReference<>(handler));
}
return handler;
}
通过弱引用是为了防止内存泄露,毕竟是handler,可能会有一些延时操作。
在add()方法的最后,将size++。然后让我们回到addUpdatable()方法中。
synchronized void addUpdatable(@NonNull final Updatable updatable) {
add(updatable, workerHandler());
if (size == 1) {
handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();
}
}
如果size是1,则使用handler发送消息。从这个Message的名字也可以看出来,MSG_FIRST_ADDED,肯定是只有第一次addUpdatable的时候才会触发。
static final class WorkerHandler extends Handler {
static final int MSG_FIRST_ADDED = 0;
static final int MSG_LAST_REMOVED = 1;
static final int MSG_UPDATE = 2;
static final int MSG_CALL_UPDATABLE = 3;
static final int MSG_CALL_MAYBE_START_FLOW = 4;
static final int MSG_CALL_ACKNOWLEDGE_CANCEL = 5;
static final int MSG_CALL_LOW_PASS_UPDATE = 6;
@Override
public void handleMessage(final Message message) {
switch (message.what) {
case MSG_UPDATE:
((Worker) message.obj).sendUpdate();
break;
case MSG_FIRST_ADDED:
((Worker) message.obj).callFirstUpdatableAdded();
break;
case MSG_LAST_REMOVED:
((Worker) message.obj).callLastUpdatableRemoved();
break;
case MSG_CALL_UPDATABLE:
((Updatable) message.obj).update();
break;
case MSG_CALL_MAYBE_START_FLOW:
((CompiledRepository) message.obj).maybeStartFlow();
break;
case MSG_CALL_ACKNOWLEDGE_CANCEL:
((CompiledRepository) message.obj).acknowledgeCancel();
break;
case MSG_CALL_LOW_PASS_UPDATE:
((LowPassFilterObservable) message.obj).lowPassUpdate();
break;
default:
}
}
}
接着我们来看这个WorkerHandler,其中定义了一些Message对应着操作,比如MSG_FIRST_ADDED表示第一次addUpdatable,MSG_UPDATE表示要通知Updatable更新等等。我们这里会进入MSG_FIRST_ADDED这个case,调用了Worker的callFirstUpdatableAdded()。
void callFirstUpdatableAdded() {
baseObservable.observableActivated();
}
调用了BaseObservable的observableActivated()方法。
protected void observableActivated() {}
这是一个空方法,在继承BaseObservable的子类中重写,也就是CompiledRepository。
@Override
protected void observableActivated() {
eventSource.addUpdatable(this);
maybeStartFlow();
}
这个eventSource我们姑且不管,可以看到调用了maybeStartFlow()。
void maybeStartFlow() {
synchronized (this) {
if (runState == IDLE || runState == PAUSED_AT_GO_LAZY) {
runState = RUNNING;
lastDirectiveIndex = -1; // this could be pointing at the goLazy directive
restartNeeded = false;
} else {
return; // flow already running, do not continue.
}
}
intermediateValue = currentValue;
runFlowFrom(0, false);
}
这里会调用runFlowFrom()。
private void runFlowFrom(final int index, final boolean asynchronously) {
final Object[] directives = this.directives;
final int length = directives.length;
int i = index;
while (0 <= i && i < length) {
int directiveType = (Integer) directives[i];
if (asynchronously || directiveType == GO_TO || directiveType == GO_LAZY) {
// Check cancellation before running the next directive. This needs to be done while locked.
// For goTo and goLazy, because they need to change the states and suspend the flow, they
// need the lock and are therefore treated specially here.
synchronized (this) {
if (checkCancellationLocked()) {
break;
}
if (directiveType == GO_TO) {
setPausedAtGoToLocked(i);
// the actual executor delivery is done below, outside the lock, to eliminate any
// deadlock possibility.
} else if (directiveType == GO_LAZY) {
setLazyAndEndFlowLocked(i);
return;
}
}
}
// A table-switch on a handful of options is a good compromise in code size and runtime
// performance comparing to a full-fledged double-dispatch pattern with subclasses.
switch (directiveType) {
case GET_FROM:
i = runGetFrom(directives, i);
break;
case MERGE_IN:
i = runMergeIn(directives, i);
break;
case TRANSFORM:
i = runTransform(directives, i);
break;
case CHECK:
i = runCheck(directives, i);
break;
case GO_TO:
i = runGoTo(directives, i);
break;
case SEND_TO:
i = runSendTo(directives, i);
break;
case BIND:
i = runBindWith(directives, i);
break;
case FILTER_SUCCESS:
i = runFilterSuccess(directives, i);
break;
case END:
i = runEnd(directives, i);
break;
// Missing GO_LAZY but it has already been dealt with in the synchronized block above.
}
}
}
又是一大堆的case。
int directiveType = (Integer) directives[i];
其中switch的是这个,还记得directives这个list吗?存的是我们刚才的那些数据处理流的方式。我们刚才调用了getFrom()和thenTransform(),对应的case是GET_FROM, TRANSFORM和END,调用了runGetFrom(),runTransform()和runEnd()。
private int runGetFrom(@NonNull final Object[] directives, final int index) {
Supplier supplier = (Supplier) directives[index + 1];
intermediateValue = checkNotNull(supplier.get());
return index + 2;
}
private int runTransform(@NonNull final Object[] directives, final int index) {
Function function = (Function) directives[index + 1];
intermediateValue = checkNotNull(function.apply(intermediateValue));
return index + 2;
}
private int runEnd(@NonNull final Object[] directives, final int index) {
boolean skip = (Boolean) directives[index + 1];
if (skip) {
skipAndEndFlow();
} else {
setNewValueAndEndFlow(intermediateValue);
}
return -1;
}
这下大家明白了吧,在对应的函数中调用了对应的方法,比如runGetFrom()调用了supplier的get(),而这个supplier就是我们在初始化的时候传递进去的!
这就是说:
repository = Repositories.repositoryWithInitialValue("default")
.observe()
.onUpdatesPerLoop()
.getFrom(supplier)
.thenTransform(function)
.compile();
执行的时候,CompiledRepository会在directives这个list中存储相应的操作,而在
repository.addUpdatable(this);
这段代码执行的时候,CompiledRepository会去directives取出相应的操作并执行。
然后我们来看最后的runEnd()。
private int runEnd(@NonNull final Object[] directives, final int index) {
boolean skip = (Boolean) directives[index + 1];
if (skip) {
skipAndEndFlow();
} else {
setNewValueAndEndFlow(intermediateValue);
}
return -1;
}
这里我们没有调用skip操作,所以直接到了setNewValueAndEndFlow()。
private synchronized void setNewValueAndEndFlow(@NonNull final Object newValue) {
boolean wasRunningLazily = runState == RUNNING_LAZILY;
runState = IDLE;
intermediateValue = initialValue; // GC the intermediate value but field must be kept non-null.
if (wasRunningLazily) {
currentValue = newValue; // Don't notify if this new value is produced lazily
} else {
setNewValueLocked(newValue); // May notify otherwise
}
checkRestartLocked();
}
这里我们也没有调用goLazy操作,所以会调用setNewValueLocked()。
private void setNewValueLocked(@NonNull final Object newValue) {
boolean shouldNotify = notifyChecker.merge(currentValue, newValue);
currentValue = newValue;
if (shouldNotify) {
dispatchUpdate();
}
}
这里notifyChecker.merge(currentValue, newValue);这个操作就是去判断newValue和currentValue是否一致,newValue是我们经过一系列runXXX()得到的,而currentValue是CompiledRepository缓存的上一次的值,如果是第一次就直接是缺升值。如果两个值不同,则调用dispatchUpdate();
protected final void dispatchUpdate() {
worker.dispatchUpdate();
}
调用的是worker的同名函数。
void dispatchUpdate() {
handler.obtainMessage(MSG_UPDATE, this).sendToTarget();
}
同样的,通过handler传递消息。
case MSG_UPDATE:
((Worker) message.obj).sendUpdate();
break;
在handler的case中调用了Worker的sendUpdate()。
synchronized void sendUpdate() {
for (int index = 0; index < updatablesAndHandlers.length; index = index + 2) {
final Updatable updatable = (Updatable) updatablesAndHandlers[index];
final WorkerHandler handler =
(WorkerHandler) updatablesAndHandlers[index + 1];
if (updatable != null) {
if (handler.getLooper() == Looper.myLooper()) {
updatable.update();
} else {
handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget();
}
}
}
}
还记得updatablesAndHandlers这个数组吗?我们在调用addUpdatable()这个方法的时候把对应的Updatable和[Updatable的handler]添加到了这个数组中。这里为什么要强调时Updatable的handler呢?因为很容易把它和Repository的handler搞混了。其实这里存在两个handler,一个是Repository的handler,也就是Worker中持有的,它的作用是分发各种事件。
static final int MSG_FIRST_ADDED = 0;
static final int MSG_LAST_REMOVED = 1;
static final int MSG_UPDATE = 2;
static final int MSG_CALL_UPDATABLE = 3;
static final int MSG_CALL_MAYBE_START_FLOW = 4;
static final int MSG_CALL_ACKNOWLEDGE_CANCEL = 5;
static final int MSG_CALL_LOW_PASS_UPDATE = 6;
Worker(@NonNull final BaseObservable baseObservable) {
this.baseObservable = baseObservable;
this.handler = workerHandler();
this.updatablesAndHandlers = NO_UPDATABLES_OR_HANDLERS;
this.size = 0;
}
可以看到它在Worker的构造函数中初始化。
而这里我们从updatablesAndHandlers中取到的handler是Updatable的handler。
synchronized void addUpdatable(@NonNull final Updatable updatable) {
add(updatable, workerHandler());
if (size == 1) {
handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();
}
}
private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) {
boolean added = false;
for (int index = 0; index < updatablesAndHandlers.length; index += 2) {
if (updatablesAndHandlers[index] == updatable) {
throw new IllegalStateException("Updatable already added, cannot add.");
}
if (updatablesAndHandlers[index] == null && !added) {
updatablesAndHandlers[index] = updatable;
updatablesAndHandlers[index + 1] = handler;
added = true;
}
}
........
}
是在addUpdatable()中创建的。大家千万不要把两者搞混了!
回到sendUpdate()。
if (updatable != null) {
if (handler.getLooper() == Looper.myLooper()) {
updatable.update();
} else {
handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget();
}
}
这里判断,如果Updatable的handler的looper和Looper.myLooper(),也就是Repository的handler的looper是一样的,则直接调用updatable.update(),否则使用Updatable的handler发送MSG_CALL_UPDATABLE。
case MSG_CALL_UPDATABLE:
((Updatable) message.obj).update();
break;
虽然两者同样都会调用Updatable的update(),但是意义是不同的。首先我们来理解if (handler.getLooper() == Looper.myLooper())这个if判断。它的意思其实就是判断Updatable和Repository是否在同一个线程中。我们现在当然是,但是如果我们把代码改成这样:
new Thread(new Runnable() {
@Override
public void run() {
repository.addUpdatable(this);
}
}).start();
这表明我们在子线程中调用了addUpdatable。对应的:
synchronized void addUpdatable(@NonNull final Updatable updatable) {
add(updatable, workerHandler());
if (size == 1) {
handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget();
}
}
@NonNull
static WorkerHandler workerHandler() {
final WeakReference<WorkerHandler> handlerReference = handlers.get();
WorkerHandler handler = handlerReference != null ? handlerReference.get() : null;
if (handler == null) {
handler = new WorkerHandler();
handlers.set(new WeakReference<>(handler));
}
return handler;
}
这些代码就是在子线程中进行的,通过ThreadLocal获取到handler当然也是子线程的。所以在这种情况下,那个if判断就不成立,Repository调用Updatable的handler去分发事情。大家都知道handler在哪个线程创建的就会在哪个线程执行handleMessage()。
这样的机制就保证了我们的Updatable在哪个线程被注册的,对应的update()函数就会在哪个线程被执行。这也是agera为什么要用handler作为整个事件传递框架的原因。
好了,到这儿整个创建Repository和注册Updatable的过程就分析完了。不过不知道大家发现什么问题没有,首先这样讲下来,observe()和onUpdatesPerLoop()方法完全没用嘛,其次,刚才分析Repository的代码也提到,如果currentValue()和newValue相等,它是不会去调用sendUpdate()函数的,也就是说如果我们想在不同的时间点注册两个不同的Updatable到一个Repository中并且获取一样的数据(有点绕口。。),以现在看来是不可以的。对于第一个问题呢,我这里不做讲解,想让大家自己去探索,因为这里面牵扯到很多Repository,Observable和Updatable之间的关系,我觉得如果你自己走通了,会对agera这样的模式有比较深刻的理解。至于第二个问题,我来告诉大家解决方案——使用goLazy()函数。
Like this:
repository = Repositories.repositoryWithInitialValue("default")
.observe()
.onUpdatesPerLoop()
.goLazy()
.thenGetFrom(supplier)
.compile();
这样,对于同样的一个Result,Repository还是会进行事件发送的。至于原因,我们还是来看源码。
@NonNull
@Override
public RepositoryCompiler goLazy() {
checkExpect(FLOW);
checkGoLazyUnused();
addGoLazy(directives);
goLazyUsed = true;
return this;
}
一样的,调用了addGoLazy(directives)
static void addGoLazy(@NonNull final List<Object> directives) {
directives.add(GO_LAZY);
}
然后让我们看看具体执行逻辑的runFlowFrom()对GO_LAZY是怎么处理的。
private void runFlowFrom(final int index, final boolean asynchronously) {
final Object[] directives = this.directives;
final int length = directives.length;
int i = index;
while (0 <= i && i < length) {
int directiveType = (Integer) directives[i];
if (asynchronously || directiveType == GO_TO || directiveType == GO_LAZY) {
// Check cancellation before running the next directive. This needs to be done while locked.
// For goTo and goLazy, because they need to change the states and suspend the flow, they
// need the lock and are therefore treated specially here.
synchronized (this) {
if (checkCancellationLocked()) {
break;
}
if (directiveType == GO_TO) {
setPausedAtGoToLocked(i);
// the actual executor delivery is done below, outside the lock, to eliminate any
// deadlock possibility.
} else if (directiveType == GO_LAZY) {
setLazyAndEndFlowLocked(i);
return;
}
}
}
// A table-switch on a handful of options is a good compromise in code size and runtime
// performance comparing to a full-fledged double-dispatch pattern with subclasses.
switch (directiveType) {
case GET_FROM:
i = runGetFrom(directives, i);
break;
case MERGE_IN:
i = runMergeIn(directives, i);
break;
case TRANSFORM:
i = runTransform(directives, i);
break;
case CHECK:
i = runCheck(directives, i);
break;
case GO_TO:
i = runGoTo(directives, i);
break;
case SEND_TO:
i = runSendTo(directives, i);
break;
case BIND:
i = runBindWith(directives, i);
break;
case FILTER_SUCCESS:
i = runFilterSuccess(directives, i);
break;
case END:
i = runEnd(directives, i);
break;
// Missing GO_LAZY but it has already been dealt with in the synchronized block above.
}
}
}
重点看这段:
else if (directiveType == GO_LAZY) {
setLazyAndEndFlowLocked(i);
return;
}
如果是GO_LAZY,那么后面一切和数据操作相关的方法都不会执行!看到这里,相信大家是这样的
go lazy就lazy成这样??不要急,我们看下去。
rivate void setLazyAndEndFlowLocked(final int resumeIndex) {
lastDirectiveIndex = resumeIndex;
runState = PAUSED_AT_GO_LAZY;
dispatchUpdate();
checkRestartLocked();
}
首先缓存这个index,lastDirectiveIndex = resumeIndex;然后dispatchUpdate();但是这并没有什么用,因为我们的数据流根本没有执行。。这样就完了,别说解决问题了,这不是创造问题了吗?!连数据都获取不到了!
真的获取不到吗?上面我们说,执行了dispatchUpdate(),也就是说会调用了对应的Updatable的udate()方法。而我们在update()方法中一般会调用Repository的get()方法,秘密就在这个get()中。
@NonNull
@Override
public synchronized Object get() {
if (runState == PAUSED_AT_GO_LAZY) {
int index = lastDirectiveIndex;
runState = RUNNING_LAZILY;
runFlowFrom(continueFromGoLazy(directives, index), false);
}
return currentValue;
}
get()中判断,如果runState是PAUSED_AT_GO_LAZY,就执行逻辑否则直接返回currentValue。而我们这里由于调用了goLazy(),所以是PAUSED_AT_GO_LAZY。逻辑中会重新执行runFlowFrom()。
而这个时候,由于我们的runState是RUNNING_LAZILY而并非GO_LAZY,所以会执行下面的数据流操作,直到runEnd()。
private int runEnd(@NonNull final Object[] directives, final int index) {
boolean skip = (Boolean) directives[index + 1];
if (skip) {
skipAndEndFlow();
} else {
setNewValueAndEndFlow(intermediateValue);
}
return -1;
}
同样的,调用setNewValueAndEndFlow()。
private synchronized void setNewValueAndEndFlow(@NonNull final Object newValue) {
boolean wasRunningLazily = runState == RUNNING_LAZILY;
runState = IDLE;
intermediateValue = initialValue; // GC the intermediate value but field must be kept non-null.
if (wasRunningLazily) {
currentValue = newValue; // Don't notify if this new value is produced lazily
} else {
setNewValueLocked(newValue); // May notify otherwise
}
checkRestartLocked();
}
由于我们现在处于RUNNING_LAZILY,所以进的是if直接把newValue赋值给了currentValue。最后在get()中返回。回想一下,如果不调用goLazy,我们在第一次runFlowFrom()就会因为currentValue和newValue相等而返回,根本不会执行setNewValueLocked(newValue)!
到这儿相信大家都明白了,但是其实goLazy这个方法的作用主要是字面意思,就是“懒加载”,如果我们不是用goLazy,当我们调用addUpdatable()方法的时候就会去做数据流的操作,而如果我们使用了goLazy(),所有的数据流操作会延迟到get()中去操作。这是agera”Push event,pull data model”的特点。
agera的封装
看完了源码,大家是不是有点累了呢,这里为了让大家振奋起来,我决定说一些干货,不过其实所谓干货也不是我写的,我只是带大家看看Google是怎么使用agera封装一些功能的,告诉大家正确的使用姿势。
首先先说个简单的,封装click事件。
public class MainActivity extends AppCompatActivity implements Updatable{
private Button observableBtn;
private TextView show;
private ClickObservable clickObservable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
observableBtn = (Button)findViewById(R.id.observable_btn);
show = (TextView)findViewById(R.id.show);
clickObservable = new ClickObservable();
clickObservable.addUpdatable(this);
observableBtn.setOnClickListener(clickObservable);
}
@Override
public void update() {
show.setText("update!!");
}
public static class ClickObservable extends BaseObservable implements View.OnClickListener{
@Override
public void onClick(View v) {
dispatchUpdate();
}
}
}
很简单,这就是所有的代码。我们的ClickObservable继承自BaseObservable,说明它是一个被观察者,实现了View.OnClickListener,说明它具有click的功能。所以我们要做的就是让我们的activity继承自Updatable,并且注册到ClickObservable中,在onClick的时候去分发事件就好了。
这里核心的概念是继承和实现的不同点。如果你要去使用agera封装一个功能。你可以考虑像上面一样,去继承BaseObservable,表示它可以[是]一个被观察者,并且实现对应功能的接口,表示它[拥有]这样的功能,最后在功能需要分发事件的地方去使用agera的Observable/Updatable逻辑完成事件传递就可以了。
下面我们考虑另外一个场景,如果我们先有的这个类已经继承自了一个基类怎么办,由于java不支持多重继承,所以我们没办法继承BaseObservable了,这是不是意味着我们没办法使用agera框架了呢?答案是否定的,我们可以通过封装Broadcast来讲解。
public static final class BroadcastObservable extends BroadcastReceiver
implements ActivationHandler, Observable {
@NonNull
private final UpdateDispatcher updateDispatcher;
@NonNull
private final Context context;
@NonNull
private final IntentFilter filter;
BroadcastObservable(@NonNull final Context applicationContext,
@NonNull final String... actions) {
this.context = checkNotNull(applicationContext);
this.updateDispatcher = Observables.updateDispatcher(this);
this.filter = new IntentFilter();
for (final String action : actions) {
this.filter.addAction(action);
}
}
@Override
public void observableActivated(@NonNull final UpdateDispatcher caller) {
context.registerReceiver(this, filter);
}
@Override
public void observableDeactivated(@NonNull final UpdateDispatcher caller) {
context.unregisterReceiver(this);
}
@Override
public void onReceive(final Context context, final Intent intent) {
updateDispatcher.update();
}
@Override
public void addUpdatable(@NonNull final Updatable updatable) {
updateDispatcher.addUpdatable(updatable);
}
@Override
public void removeUpdatable(@NonNull final Updatable updatable) {
updateDispatcher.removeUpdatable(updatable);
}
}
这个就是封装完的BroadcastObservable了。由于它必须要继承BroadcastReceiver,所以Google相出了一个方法,那就是创造了ActivationHandler接口。
public interface ActivationHandler {
/**
* Called when the the {@code caller} changes state from having no {@link Updatable}s to
* having at least one {@link Updatable}.
*/
void observableActivated(@NonNull UpdateDispatcher caller);
/**
* Called when the the {@code caller} changes state from having {@link Updatable}s to
* no longer having {@link Updatable}s.
*/
void observableDeactivated(@NonNull UpdateDispatcher caller);
}
方法是不是很熟悉。它要配合updateDispatcher一起使用,下面看看我们如何使用updateDispatcher。
this.updateDispatcher = Observables.updateDispatcher(this);
我们在BroadcastObservable的构造函数中有这么一句话,看看updateDispatcher()做了什么。
@NonNull
public static UpdateDispatcher updateDispatcher(
@NonNull final ActivationHandler activationHandler) {
return new AsyncUpdateDispatcher(activationHandler);
}
private static final class AsyncUpdateDispatcher extends BaseObservable
implements UpdateDispatcher {
@Nullable
private final ActivationHandler activationHandler;
private AsyncUpdateDispatcher(@Nullable ActivationHandler activationHandler) {
this.activationHandler = activationHandler;
}
@Override
protected void observableActivated() {
if (activationHandler != null) {
activationHandler.observableActivated(this);
}
}
@Override
protected void observableDeactivated() {
if (activationHandler != null) {
activationHandler.observableDeactivated(this);
}
}
@Override
public void update() {
dispatchUpdate();
}
}
UpdateDispatcher继承自了BaseObservable,实现了UpdateDispatcher接口。
public interface UpdateDispatcher extends Observable, Updatable {}
回到我们BroadcastObservable类中,看看它的onReceive()方法。
@Override
public void onReceive(final Context context, final Intent intent) {
updateDispatcher.update();
}
直接调用了updateDispatcher.update()。
而在AsyncUpdateDispatcher类中
@Override
public void update() {
dispatchUpdate();
}
直接调用了dispatchUpdate(),这样就会通知注册到其中的Updatable。下面让我们看看如何进行注册。
public class MainActivity extends AppCompatActivity implements Updatable{
private static final String ACTION = "action";
private TextView trigger;
private ContentObservables.BroadcastObservable observable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
trigger = (TextView)findViewById(R.id.trigger);
observable = (ContentObservables.BroadcastObservable) ContentObservables.broadcastObservable(this,ACTION);
observable.addUpdatable(this);
}
public void send(View view){
Intent intent = new Intent();
intent.setAction(ACTION);
sendBroadcast(intent);
}
@Override
public void update() {
trigger.setText("update!!");
}
@Override
protected void onDestroy() {
super.onDestroy();
observable.removeUpdatable(this);
}
}
注册就是直接调用了observable.addUpdatable(this)。
@Override
public void addUpdatable(@NonNull final Updatable updatable) {
updateDispatcher.addUpdatable(updatable);
}
在调用了updateDispatcher的addUpdatable()后,我们知道会调用observableActivated()方法。
@Override
protected void observableActivated() {
if (activationHandler != null) {
activationHandler.observableActivated(this);
}
}
而其中activationHandler就是我们的BroadcastObservable!
@Override
public void observableActivated(@NonNull final UpdateDispatcher caller) {
context.registerReceiver(this, filter);
}
直接注册了广播,之后我们sendBroadcast()就会回调到它的onReceive()中。
@Override
public void onReceive(final Context context, final Intent intent) {
updateDispatcher.update();
}
调用了updateDispatcher.update(),而我们知道这就会调用dispatchUpdate()从而通知到注册进来的Updatable。
这里的核心思想是,既然你的这个类已经继承自了一个基类A,那么它[就是]基类A了,不可能在[是]Observable了,那怎么办呢?我们通过实现ActivationHandler, Observable这两个接口让它[拥有]对应的功能,并且通过[组合]的方式将UpdateDispatcher放置到其中,万事大吉。如果大家想要封装类似的功能,不妨按这样的思路和方式试一试。
agera和RxJava的比较
好了,终于到了最后一关了,我们来说点轻松的话题。
agera和RxJava,同为响应式框架,它们的不同点在哪里呢?
首先,agera[更轻],这里我从方法数上来看:
忽略水印。。这是从我微博上截取的。可以看到agera的方法数比RxJava少很多,当然这也有agera刚刚开源,还在迭代的原因。不过从目前来看agera确实[更专注于Android]。
第二点,也就是最重要,我反复说的一点,agera是”Push event,pull data model”,而RxJava是”Push data model”的,由于agera将event和data分离,所以我们可以看到,存在所谓的goLazy,知道get()方法执行的时候才去处理数据。
更多的大家可以去看这个issue。