前言
在上一篇文章中给小伙伴们介绍了 Repository 创建流程和基本使用。如果你对 Agera 和 Repository 还不了解,建议先看一下前两篇文章:
Android官方响应式框架Agera详解:一、相关概念和基本使用
Android官方响应式框架Agera详解:二、Repository的创建和操作符
上一篇文章中内容比较多,而且偏向理论。所以,这篇文章的目的就是把前两篇文章中所学的知识付诸实践
纸上得来终觉浅,绝知此事要躬行
目录
一、Repository 的更新规则
二、实现一个简单的计算器功能
三、实现下载图片的功能
四、Agera + Retrofit + Okhttp 的结合
五、总结
六、相关代码
七、预告
一、Repository 的更新规则
为什么先讲这个呢?因为在前两篇文章中并没有提及到这一块的内容,而且这一部分也不能算是一个知识点,只能说对于不了解或者刚接触的小伙伴,可能会有坑。我刚接触的时候就是在这个坑里呆了好久,迷迷糊糊的,一直不理解,所以特别在这里说明一下
我先问大家两个问题,MutableRepository 和 Repository 到底什么时候触发更新呢?它俩触发更新发的机制一样么?
Repository 指的是上篇文章提到的复杂的Repository,即用数据流创建的Repository,以后默认都是指复杂的Repository
先给出答案,然后在解释一下为什么
1. MutableRepository 的更新规则
MutableRepository 触发更新有一种:
MutableRepository 在活动状态下(即至少注册一个Updatable),当 MutableRepository 中的数据值改变的时候(数据改变的判断依据是 eqauls(object) 方法),触发更新
2. Repository 的更新规则
Repository 触发更新有两种:
1. 当 Repository 由非活动状态(即没有注册任何Updatable)变为活动状态时(第一次注册Updatable),触发一次更新
2. Repository 通过 observe(Observable... observables) 方法监听了其他的 Observable,当其监听的 Observable 有更新并且 Repository 处于活动状态时并且新值与 Repository 中的旧值不相等时,触发更新(这个是默认规则,我们通过设置notifyIf方法去改变)
3. 为什么
下面介绍一下为什么会是这样,由于涉及到了源码,所以我尽可能的简单介绍,如果感兴趣的话可以自己去翻阅一下源码
先说 MutableRepository ,当我们通过 Repositories.mutableRepository(0) 创建一个 MutableRepository 对象时,实际上创建的是一个 SimpleRepository 对象
我们看一下 SimpleRepository 的源码
private static final class SimpleRepository<T> extends BaseObservable
implements MutableRepository<T> {
@NonNull
private T reference;
SimpleRepository(@NonNull final T reference) {
this.reference = checkNotNull(reference);
}
@NonNull
@Override
public synchronized T get() {
return reference;
}
@Override
public void accept(@NonNull final T reference) {
synchronized (this) {
//1. 这里用于判断新值与旧值是否相同
if (reference.equals(this.reference)) {
// Keep the old reference to have a slight performance edge if GC is generational.
return;
}
this.reference = reference;
}
//2.这个方法的作用就是通知Updatable更新
dispatchUpdate();
}
}
SimpleRepository 非常简单,它继承了 BaseObservable ,实现了 MutableRepository 接口
当我们使用 mutableRepository.accetp() 方法接收一个数据的时候,它会先使用 equals 方法判断新值与旧值是否相同,即上面代码注释1的地方
当新值与旧值相同的话,不做任何处理。否则的话,调用 dispatchUpdate() 方法去通知所有注册的 Updatable 对象,即上面注释2的地方
dispatchUpdate() 方法是 BaseObservable 类中的方法,用于事件通知
到这里你应该明白了,当 MutableRepository 中的数据值改变的时候,触发更新
下面我们来看一下 Repository 的更新规则,当我们通过 .compile() 创建一个 Repository 对象时,实际上创建的是一个 CompiledRepository 对象
final class CompiledRepository extends BaseObservable
implements Repository, Updatable, Runnable {
//1. 当Repository变为活跃状态时
@Override
protected void observableActivated() {
eventSource.addUpdatable(this);
maybeStartFlow(); // 启动数据流,即数据流执行完后更新(可能)
}
//2. 当Repository变为不活跃的状态时
@Override
protected void observableDeactivated() {
eventSource.removeUpdatable(this);
maybeCancelFlow(deactivationConfig, false); //取消数据流(可能)
}
//3. 当收到其他Obaservable的更新通知时,
@Override
public void update() {
maybeCancelFlow(concurrentUpdateConfig, true); //取消正在运行的数据流(可能)
maybeStartFlow();//重新开始运行数据流(可能)
}
}
//4. 更新此 Repository 中的值,并根据notifyChecker对象判断是否要通知Updatable更新
private void setNewValueLocked(@NonNull final Object newValue) {
final boolean shouldNotify = notifyChecker.merge(currentValue, newValue);
currentValue = newValue;
if (shouldNotify) {
dispatchUpdate();//这个方法的作用就是通知Updatable更新
}
}
CompiledRepository 继承 BaseObservable,实现了 Updatable 接口
当 CompiledRepository 变为活跃状态时,会调用 maybeStartFlow() 方法,启动数据流,数据流执行完毕后会更新通知到注册的 Updatable,对应于上面注释1的地方
当 CompiledRepository 收到其他 Obaservable 的更新通知时,可能取消正在运行的数据流,重新开始运行数据流(可能),对应于上面注释3的地方
这个可能取消正在运行的数据流是根据我们的设置觉定的,还记得第二篇里讲的 onConcurrentUpdate 方法吗?
设置成 RepositoryConfig.CANCEL_FLOW 的话,这个地方就会取消正在运行的数据流
当 CompiledRepository 中数据流执行完毕,更新仓库中的值的时候,会进行一个判断,如果旧值与新值相同的话,不通知 Updatable 更新,否则的话通知
注意,这个规则我们是可以通过 notifyIf 进行更改
RConfig<TVal> notifyIf(@NonNull Merger<? super TVal, ? super TVal, Boolean> checker);
这个方法接收一个 Merger 对象,Merger 中有两个输入值,一个输入值,第一个输入值是旧数据,第二个输入值是新数据,输出值是一个布尔型的数据,true 代表通知 Updatable 更新,false 代表不通知
通过上面的分析,我们了解到,
1. 当 Repository 由非活动状态变为活动状态时,启动数据流,数据流执行完毕后触发一次更新
2. 当 Repository 收到其监听的 Observable 的更新事件时,重新运行数据流,数据流执行完毕后如果新旧值不同,触发更新(这个规则我们可以通过 notifyIf
方法进行修改)
由于这部分内容涉及到了源码,感兴趣的同学可以去翻阅一下。不想去扒源码的话也没关系,记住上面的结论,正确使用就行
二、实现一个简单的计算器功能
这一部分,我们实现一个计算器功能,用于巩固加深所学的知识
1. 需求
提供两个0~100的整数,进行加减乘除计算(除数为0时程序应正常运行不崩溃)
2. 实现思路
- 使用 MutableRepository 进行数值和操作符(加减乘除)的监听,数值/操作符改变的时候更新对应 MutableRepository 中的值
- 使用 Repository 数据流进行计算操作,返回结果
- 将数组变化和计算结果展示在页面上
3. 实现代码
public class CalculateActivity extends Activity {
TextView tvNum1; //数值1
TextView tvNum2; //数值2
TextView tvResult; //计算结果
TextView tvNumTip1; //SeekBar上的数值提示
TextView tvNumTip2; //SeekBar上的数值提示
TextView tvOperator; //操作符
RadioGroup rgOperator;
SeekBar sb1;
SeekBar sb2;
ExecutorService mExecutor = Executors.newSingleThreadExecutor(); //计算线程
//1,定义MutableRepository,分别用于保存数值1,数值2,操作符和计算结果
MutableRepository<Integer> mValue1Repo = Repositories.mutableRepository(0);
MutableRepository<Integer> mValue2Repo = Repositories.mutableRepository(0);
MutableRepository<String> mOperatorRepo = Repositories.mutableRepository("+");
MutableRepository<String> mResultRepo = Repositories.mutableRepository("0");
//2. 定义Repository,用于计算
Repository<Result<Integer>> mTaskRepo = Repositories.repositoryWithInitialValue(Result.<Integer>absent())
.observe(mValue1Repo, mValue2Repo, mOperatorRepo)// 4. 监听数值1,数值2和操作符,即当它们有变化时启动数据流重新计算
.onUpdatesPerLoop()
.goTo(mExecutor) //5. 切换到子线程
.getFrom(new Supplier<Integer>() { //6,拿到数值1
@NonNull
@Override
public Integer get() {
return mValue1Repo.get();
}
})
.mergeIn(new Supplier<Integer>() { //7. 将数值1与数值2合并,返回一个Pair对象
@NonNull
@Override
public Integer get() {
return mValue2Repo.get();
}
}, new Merger<Integer, Integer, Pair<Integer, Integer>>() {
@NonNull
@Override
public Pair<Integer, Integer> merge(@NonNull Integer integer, @NonNull Integer tAdd) {
return new Pair<>(integer, tAdd);
}
})
.thenTransform(new Function<Pair<Integer, Integer>, Result<Integer>>() {//8.进行计算操作,最后返回一个Result对象
@NonNull
@Override
public Result<Integer> apply(@NonNull Pair<Integer, Integer> input) {
int result = 0;
switch (mOperatorRepo.get()) {
case "+":
result = input.first + input.second;
break;
case "-":
result = input.first - input.second;
break;
case "*":
result = input.first * input.second;
break;
case "/": //9. 判断除数是否为0
if (input.second != 0) {
result = input.first / input.second;
} else {
return Result.failure(new Throwable("除数不能为0")); //返回失败的Result
}
break;
}
return Result.success(result); //返回成功的Result
}
})
.onConcurrentUpdate(RepositoryConfig.CANCEL_FLOW) //10,当正在执行时监听到更新,取消当前数据流
.compile();//11,编译成 Repository
//数值1的 Updatable
Updatable mValue1Updatable = () -> {
tvNum1.setText(mValue1Repo.get() + "");
tvNumTip1.setText(mValue1Repo.get() + "");
};
//数值2的 Updatable
Updatable mValue2Updatable = () -> {
tvNum2.setText(mValue2Repo.get() + "");
tvNumTip2.setText(mValue2Repo.get() + "");
};
//操作符的 Updatable
Updatable mOperatorUpdatable = () -> {
tvOperator.setText(mOperatorRepo.get());
};
//计算结果的 Updatable
Updatable mResultUpdatable = () -> {
tvResult.setText(mResultRepo.get() + "");
};
//计算任务的 Updatable
Updatable mTaskUpdatable = () -> {
mTaskRepo.get()//11,get方法得到的是一个Result对象
.ifSucceededSendTo(new Receiver<Integer>() { //当Result成功时
@Override
public void accept(@NonNull Integer value) {
mResultRepo.accept(value + "");
}
})
.ifFailedSendTo(new Receiver<Throwable>() {//当Result失败时
@Override
public void accept(@NonNull Throwable value) {
mResultRepo.accept(value.getMessage());
}
});
};
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_cal);
tvNum1 = findViewById(R.id.tvNum1);
tvNum2 = findViewById(R.id.tvNum2);
tvResult = findViewById(R.id.tvResult);
tvNumTip1 = findViewById(R.id.tvNumTip1);
tvNumTip2 = findViewById(R.id.tvNumTip2);
tvOperator = findViewById(R.id.tvOperator);
rgOperator = findViewById(R.id.rgOperator);
sb1 = findViewById(R.id.sb1);
sb2 = findViewById(R.id.sb2);
rgOperator.setOnCheckedChangeListener((group, checkedId) -> {
switch (checkedId) {
case R.id.rbAdd:
mOperatorRepo.accept("+"); //更新操作符仓库中的值
break;
case R.id.rbSub:
mOperatorRepo.accept("-");
break;
case R.id.rbMult:
mOperatorRepo.accept("*");
break;
case R.id.rbDiv:
mOperatorRepo.accept("/");
break;
}
});
sb1.setOnSeekBarChangeListener(new SeekBar.OnSeekBarChangeListener() {
@Override
public void onProgressChanged(SeekBar seekBar, int progress, boolean fromUser) {
mValue1Repo.accept(progress); //更新数值1符仓库中的值
}
@Override
public void onStartTrackingTouch(SeekBar seekBar) {
}
@Override
public void onStopTrackingTouch(SeekBar seekBar) {
}
});
sb2.setOnSeekBarChangeListener(new SeekBar.OnSeekBarChangeListener() {
@Override
public void onProgressChanged(SeekBar seekBar, int progress, boolean fromUser) {
mValue2Repo.accept(progress); ////更新数值2仓库中的值
}
@Override
public void onStartTrackingTouch(SeekBar seekBar) {
}
@Override
public void onStopTrackingTouch(SeekBar seekBar) {
}
});
}
@Override
protected void onResume() {
super.onResume();
//注册 Updatable
mValue1Repo.addUpdatable(mValue1Updatable);
mValue2Repo.addUpdatable(mValue2Updatable);
mOperatorRepo.addUpdatable(mOperatorUpdatable);
mResultRepo.addUpdatable(mResultUpdatable);
mTaskRepo.addUpdatable(mTaskUpdatable);
}
@Override
protected void onPause() {
super.onPause();
//移除 Updatable
mValue1Repo.removeUpdatable(mValue1Updatable);
mValue2Repo.removeUpdatable(mValue2Updatable);
mOperatorRepo.removeUpdatable(mOperatorUpdatable);
mResultRepo.removeUpdatable(mResultUpdatable);
mTaskRepo.removeUpdatable(mTaskUpdatable);
}
}
上面的代码中都有相应的注释,这里对重要的点在讲解一下:
注释4,作用是监听数值1,数值2和操作符,当我们改变其中的任意一个的值的时候,整个 Repository 会重新运行数据流,以保证拿到最新的计算结果
注释9,在进行除法运算的时候,需要对除数为0的情况进行处理,当除数为0时我们返回一个封装了失败信息的 Result 对象。其他情况返回封装正常计算结果的成功的 Result 对象
注释10,设定当数据流正在执行时收到了更新事件,取消当前数据流的执行
这个与第一部分讲解 Repository 的更新规则时对应
注释11,当计算任务的 Updatable 收到更新时,我们从仓库中得到的是一个 Result 对象。因为我们并不知道 Result 是成功的还是失败的,所以我们使用 ifSucceededSendTo 和 ifFailedSendTo 两个方法分别用于接收成功和失败的结果
4. 运行效果
完整代码在文章底部给出
三、实现下载图片的功能
在上面的例子中,我们并没有涉及到到网络请求,而我们在实际的工作当中,网络相关的必不可少。这个例子就更加的贴近实际工作,一起来看一下吧
1. 需求
当我们点击按钮时,从网络上下载一张图片,压缩后显示出来。要求下载操作在子线程1上执行,压缩图片操作在子线程2上执行
2. 实现思路
- 分别定义两个子线程
- 定义一个Repository 用于下载图片操作,数值的类型是Result<Bitmap>
- 在不同的阶段使用 goto 函数切换到不同的线程
- 进行网络请求和图片压缩时,对异常情况进行处理
- 点击按钮时注册 Updatable,启动数据流
- Updatable 收到更新后,移除 Updatable
3.实现代码
public class LoadImageActivity extends Activity {
Button btnStart;
TextView tvState;
ImageView ivImage;
//下载图片的地址(注,图片来自网络,仅供学习使用)
private static final String IMAGE_URL = "http://dingyue.ws.126.net/S8bZlsJBtFYOf0xrTJfjPrpIPVtjL5MawSVqYPkey3KCd1548725430403compressflag.jpg";
private static final ExecutorService NETWORK_EXECUTOR = Executors.newSingleThreadExecutor();//下载图片的线程
private static final ExecutorService COMPRESS_EXECUTOR = Executors.newSingleThreadExecutor();//压缩图片的线程
//网络请求客户端
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY))
.build();
//1.定义一个Repository,数值的类型是Result<Bitmap>,因为从网络下载图片可能会失败,所以用Result对象封装
Repository<Result<Bitmap>> repository = Repositories.repositoryWithInitialValue(Result.<Bitmap>absent())
.observe() //2. 这里我们不需要监听其他事件源
.onUpdatesPerLoop()
.goTo(NETWORK_EXECUTOR)//3.切换到现在图片的子线程
.attemptGetFrom(() -> {
runOnUiThread(() -> tvState.setText("正在子线程进行图片下载...")); //4. 在主线程更新提示语
//5.进行网络请求
Request request = new Request.Builder()
.url(IMAGE_URL)
.get()
.build();
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
Thread.sleep(1000);//此处故意休眠1s,用于模拟网络延迟
return Result.success(response.body().byteStream()); //6.当网络请求成功时,返回一个成功的Result
} else {
//7. 网络请求失败时,返回一个失败的Result
return Result.failure(new Throwable("下载图片失败!" + response.code() + response.message()));
}
} catch (Exception e) {
e.printStackTrace();
//8. 网络请求异常时,返回一个失败的Result
return Result.failure(new Throwable("下载图片异常!" + e.getMessage()));
}
})
.orEnd(new Function<Throwable, Result<Bitmap>>() { //9.当上一步的指令执行失败时,走这个方法
@NonNull
@Override
public Result<Bitmap> apply(@NonNull Throwable input) {
/**
* 10,输入值是上一步失败的原因,输出值我们根据具体需求返回
*比如,这里我们也可以返回一个默认的图片
* return Result.success(BitmapFactory.decodeResource(getResources(),R.mipmap.ic_launcher));
*
*/
return Result.failure(input); //把上一步的失败原因封装成一个失败的Result对象,返回(数据流结束)
}
})
.goTo(COMPRESS_EXECUTOR)//11. 切换到压缩图片的线程
.thenTransform(input -> { //12. 输入值是一个 InputStream对象,这里我用了lambda表达式减少代码量
runOnUiThread(() -> tvState.setText("正在子线程进行图片压缩..."));
//13. 压缩图片
try {
BitmapFactory.Options options = new BitmapFactory.Options();
options.inSampleSize = 2;
Bitmap bitmap = BitmapFactory.decodeStream(input, new Rect(0, 0, 0, 0), options);
Thread.sleep(1500); //此处故意休眠,用于模拟网络延迟
return Result.present(bitmap); //14. 压缩图片成功,返回Result对象
} catch (Exception e) {
e.printStackTrace();
//15.压缩图片异常,返一个失败的Result对象
return Result.failure(new Throwable("压缩图片异常!" + e.getMessage()));
}
})
.compile();
Updatable updatable = new Updatable() {
@Override
public void update() {
repository.get()
.ifSucceededSendTo(value -> { //17.当Repository中的Result是成功时
tvState.setText("加载图片完成");
ivImage.setImageBitmap(value);
})
.ifFailedSendTo(value -> tvState.setText(value.getMessage()));//18.当Repository中的Result是失败时
repository.removeUpdatable(updatable);//21,接收到更新后,注销Updatable
}
};
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_loadimage);
btnStart = findViewById(R.id.btnStart);
tvState = findViewById(R.id.tvState);
ivImage = findViewById(R.id.ivImage);
btnStart.setOnClickListener(v -> {
ivImage.setImageBitmap(null);
repository.addUpdatable(updatable); //20,注册Updatable ,启动数据流
});
}
@Override
protected void onResume() {
super.onResume();
// repository.addUpdatable(updatable);//22,不在此处注册
}
@Override
protected void onPause() {
super.onPause();
// repository.removeUpdatable(updatable);//22,不在此处注销
}
}
下面我对于代码中比较重要的地方再一个讲解:
注释1,这个要讲的是什么呢,在实际的工作中,我们的数据流操作一般来说都可能在某一环节出现异常,所以我们在声明 Repository 中的数据类型的时候一般都是 Result<T>,这样才能保证正确的处理异常情况
注释5到注释8,这一部分进行了网络请求操作。关于网络请求部分,我们需要处理的情况比较多:
当网络请求成功时,即 response.isSuccessful() 为 true,我们返回一个成功的 Result ,里面的值是一个字节流(后续操作需要将其转换成Bitmap)
当网络请求失败时,即 response.isSuccessful() 为 false,我们返回一个失败的 Result ,并可以根据自己的需求去设置提示的内容
当网络请发生异常的时候,我们返回一个失败的 Result ,并可以根据自己的需求去设置提示的内容
注释9,注意注意注意,这个地方很重要!我们在上一步操作,即网络请求的时候,返回的值可能是成功(Result.sucess),也可能是失败(Result.failure)
当成功的时候,数据处理流会继续进行后续操作,并且数据流中的值就是 Result 中封装的值
当失败的时候,数据流会走到这个方法中,后续的数据流将不再执行
在这个对异常处理的方法中(即 orEnd,当然也可能是orSkip等),我们可以根据异常做最后的处理。这个方法代表的意思是,当上一步的操作发生异常的时候,接下来要怎么处理
再直白一点,反正异常已经发生了,你说接下来怎么办吧!
我们可以把异常原封不动的返回,即上面代码中的 return Result.failure(input);
也可以对异常进行修改,比如 return Result.failure(new Throwable("这里是发生异常后的处理"));
甚至,我们可以提供一个失败时的值,比如,return Result.success(BitmapFactory.decodeResource(getResources(),R.mipmap.ic_launcher))。 这样写的意思就是,当从网络上下载图片发生异常后,我就返回一个默认的图片
以上就是对网络请求时异常的处理步骤,可能会不太好理解,这一部分一定一定要理解。如果没有搞清楚的话会特别晕,感觉是那样写的,但是又不明白为啥是那样(我最开始的时候就不理解,所以用起来就很费劲...orz)
注释20到22,还记得上一个例子么,我们是在 onResume() 和 onPause() 进行注册/移除 Updatable的,这里并没有这么做,为什么呢?
当然可以这样做,但是由于我们没有监听其他的 Observable,所以整个例子就变成了打开页面的时候下载图片(这样做是没有任何问题的,只是这个例子我想在每次点击按钮的时候都重新下载一次图片)
所以呢,我就把注册的步骤放到了点击事件中,注销的步骤放到了收到更新事件的 Updatable 中
注意:Updatable 的注册/注销 必须是成对出现的
4. 运行效果
5. 思考题
上面的例子是我们每次点击事件就加载一次图片,因为我们把注册 Updatable 放到了点击数据中
假如现在要求还把注册/注销事件放到 onResume() 和 onPause() 中,并且呢还要求每次点击按钮的时候重新加载一次图片,该如果实现呢?你可以尝试实现一下,如果有疑问的话,可以留言交流讨论
给个小提示,回想一下第一部分讲解的内容
完整代码在文章底部给出
四、Agera + Retrofit + Okhttp 的结合
如何使用
从第一篇到这之前,我们了解了 Agera 的原理及如何使用。但是,在实际的工作中,我们更多的会接触到网络请求。虽然上一个例子用了 Agera 和 Okhttp 进行请求,但是这并不是我们想要的,我们想要的是更简洁更方便的请求和处理网络
这就是接下来要讲的 Agera + Retrofit + Okhttp。在这之前你可能了解过 RxJava,是的 Agera 和 RxJava 很类似,它和另外两个框架的结合也是大同小异
注,当你看到这里的时候,默认你已经掌握了 Retrofit 和 Okhttp 的结合使用,所以此处不会过多的去介绍这两个框架
下面我们来看一下具体步骤:
1. 添加相关的依赖
implementation 'com.google.android.agera:agera:1.4.0'
implementation 'me.drakeet.retrofit2:adapter-agera:2.2.0-beta'
implementation "com.squareup.okhttp3:okhttp:3.14.2"
implementation "com.squareup.okhttp3:logging-interceptor:3.14.2"
implementation 'com.squareup.retrofit2:retrofit:2.5.0'
implementation 'org.ligboy.retrofit2:converter-fastjson-android:2.1.0'
implementation 'com.alibaba:fastjson:1.2.57'
我们使用阿里的 fastjson 用于解析网络数据
Agera 和 Retrofit 的结合是通过 retrofit-agera-call-adapter 这个库来实现的,它的作用就是把 Retrofit 中网络接口的返回值 Call<T> 转换为 Agera 中的 Supplier<T>
2. 创建 Retrofit 对象时添加 AgeraCallAdapterFactory
private static Retrofit.Builder builder = new Retrofit.Builder()
.addConverterFactory(FastJsonConverterFactory.create())
.addCallAdapterFactory(AgeraCallAdapterFactory.create()) //它把 Call<T> 转换为 Supplier<T>
.client(client)
.baseUrl(BASE_URL)
3. 编写网络接口时改变返回值为 Supplier<T>
改变之前默认是这个样子的:
@GET("project/tree/json")
Call<ResponseBody> getProjectTree();
现在我们这样写:
@GET("project/tree/json")
Supplier<Result<JSONObject>> getProjectTree();
@GET("project/list/1/json")
Supplier<Result<JSONObject>> getProjectList(@Query("cid") int cid);
通过上面的几步,我们就把 Agera 和 Retrofit 结合了起来。接下来,我们会用一个实例去演示这三者是如何配合使用的
需求
假设现在有这么一个需求:
- 先从网络上下载一张图片,并把图片显示出来
- 紧接着请求一个接口,拿到接口中的数据并显示出来
- 把上一个接口中的数据作为参数,请求另一个接口并显示出返回数据
实现思路
- 定义个 Repository 用于处理上面3个任务,它的泛型是 Result<JSONObject>
- 通过 attemptGetFrom 等相关方法获取数据时,使用的 Suppiler 是 Retrofit 接口中返回的 Suppiler
- 3个任务依次执行,并对可能发生的异常做处理
- 每个任务执行完毕后分别进行数据展示
实现代码
public interface Api {
@GET()
Supplier<Result<ResponseBody>> getAgeraPic(@Url String url); //获取网络图片
@GET("project/tree/json")
Supplier<Result<JSONObject>> getProjectTree(); //获取项目列表
@GET("project/list/1/json")
Supplier<Result<JSONObject>> getProjectList(@Query("cid") int cid); //获取列表详情
}
首先是接口的定义,注意,接口的返回值都是 Supplier<T>
注,项目中使用的接口数据来自于鸿洋大神的https://wanandroid.com/
仅供学习使用
public class HttpActivity extends Activity {
Button btnStart;
TextView tvState;
ImageView ivImage;
TextView tvProjectTree;
TextView tvProjectList;
//1. 定义一个 MutableRepository ,当点击按钮时,改变其值,监听它的 Repository 就会重新执行
MutableRepository<Long> mutableRepository = Repositories.mutableRepository(System.currentTimeMillis());
//2. 定义一个 Repository 用于执行3个任务
Repository<Result<JSONObject>> repository = Repositories.repositoryWithInitialValue(Result.<JSONObject>absent())
.observe(mutableRepository) //3. 监听mutableRepository
.onUpdatesPerLoop()
.goTo(Http.THREAD_POOL) //4. 切换到子线程执行
.attemptGetFrom(Http.createService(Api.class).getAgeraPic(Http.IMAGE_URL)) //5.从接口中拿到数据
.orEnd(new Function<Throwable, Result<JSONObject>>() {
@NonNull
@Override
public Result<JSONObject> apply(@NonNull Throwable input) {
return Result.failure(input);//6. 当上一步骤(即网络请求)发送异常时,返回错误信息
}
})
.sendTo(new Receiver<ResponseBody>() {//7. 将拿到的数据进行处理
@Override
public void accept(@NonNull ResponseBody value) {
try {
Thread.sleep(1500); //模拟网络延迟,便于观察
} catch (InterruptedException e) {
e.printStackTrace();
}
//8. 压缩图片
BitmapFactory.Options options = new BitmapFactory.Options();
options.inSampleSize = 2;
Bitmap bitmap = BitmapFactory.decodeStream(value.byteStream(), new Rect(0, 0, 0, 0), options);
runOnUiThread(() -> { //9. 在主线程中更新UI
ivImage.setImageBitmap(bitmap);
tvState.setText("正在请求项目列表...");
});
}
})
.attemptGetFrom(Http.createService(Api.class).getProjectTree())//10,请求项目列表接口
.orEnd(new Function<Throwable, Result<JSONObject>>() {
@NonNull
@Override
public Result<JSONObject> apply(@NonNull Throwable input) {
return Result.failure(); //11. 当上一步骤(即网络请求)发送异常时,返回错误信息
}
})
.sendTo(new Receiver<JSONObject>() { //12. 将拿到的数据进行更新UI
@Override
public void accept(@NonNull JSONObject value) {
try {
Thread.sleep(1500); //模拟网络延迟,便于观察
} catch (InterruptedException e) {
e.printStackTrace();
}
runOnUiThread(() -> { //13. 在主线程中更新UI
tvProjectTree.setText(value.toJSONString());
tvState.setText("正在请求项目详情...");
});
}
})
.attemptTransform(new Function<JSONObject, Result<JSONObject>>() { //13.将拿到的数据进行下一次接口请求
@NonNull
@Override
public Result<JSONObject> apply(@NonNull JSONObject input) {
if ("0".equals(input.getString("errorCode"))) {
JSONArray objects = input.getJSONArray("data");
JSONObject object0 = objects.getJSONObject(0);
// 14. 接口获取成功,拿到其中的id字段,进行下一个接口请求
return Http.createService(Api.class).getProjectList(object0.getInteger("id")).get();
} else {
//15. 接口获取数据失败
return Result.failure(new Throwable("获取项目分类列表失败!" + input.getString("errorCode")));
}
}
})
.orEnd(new Function<Throwable, Result<JSONObject>>() {
@NonNull
@Override
public Result<JSONObject> apply(@NonNull Throwable input) {
return Result.failure(input); //16. 上一步操作发生失败时的处理
}
})
.thenTransform(new Function<JSONObject, Result<JSONObject>>() {//17.对于数据流的最后一步处理
@NonNull
@Override
public Result<JSONObject> apply(@NonNull JSONObject input) {
if ("0".equals(input.getString("errorCode"))) {
JSONArray objects = input.getJSONObject("data").getJSONArray("datas");
JSONObject object0 = objects.getJSONObject(0);
return Result.success(object0); //18. 请求成功,返回项目详情
} else {
//19. 请求失败的处理
return Result.failure(new Throwable("获取项目详情失败!" + input.getString("errorCode")));
}
}
})
.onConcurrentUpdate(RepositoryConfig.CANCEL_FLOW)//20.设定等数据流正在执行时收到更新请求时,取消当前数据流执行
.notifyIf(new Merger<Result<JSONObject>, Result<JSONObject>, Boolean>() {
@NonNull
@Override
public Boolean merge(@NonNull Result<JSONObject> jsonObjectResult, @NonNull Result<JSONObject> jsonObjectResult2) {
return true; //21,设定永远通知 Updatable 更新
}
})
.compile();
Updatable updatable = new Updatable() {
@Override
public void update() {
repository.get()
.ifSucceededSendTo(new Receiver<JSONObject>() {//22.收到成功的结果,更新UI
@Override
public void accept(@NonNull JSONObject value) {
try {
Thread.sleep(1000); //模拟网络延迟,便于观察
} catch (InterruptedException e) {
e.printStackTrace();
}
tvState.setText("任务执行完毕");
tvProjectList.setText(value.toJSONString());
}
})
.ifFailedSendTo(new Receiver<Throwable>() {//23,收到失败的结果,更新UI
@Override
public void accept(@NonNull Throwable value) {
tvState.setText(value.getMessage());
}
});
}
};
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_http);
ivImage = findViewById(R.id.ivImage);
btnStart = findViewById(R.id.btnStart);
tvState = findViewById(R.id.tvState);
ivImage = findViewById(R.id.ivImage);
tvProjectTree = findViewById(R.id.tvProjectTree);
tvProjectList = findViewById(R.id.tvProjectList);
btnStart.setOnClickListener(v -> {
mutableRepository.accept(System.currentTimeMillis()); //24,当我们点击按钮时,改变mutableRepository中的值
tvState.setText("正在下载图片...");
ivImage.setImageBitmap(null);
tvProjectTree.setText("");
tvProjectList.setText("");
});
}
@Override
protected void onResume() {
super.onResume();
repository.addUpdatable(updatable); //25.注册 updatable
tvState.setText("正在下载图片...");
}
@Override
protected void onPause() {
super.onPause();
repository.removeUpdatable(updatable); //26. 注销updatable
}
}
下面对部分注释进行一下讲解:
注释1,定义这个 MutableRepository 的作用就是 被执行任务 Repository 监听,当我们点击按钮时改变 MutableRepository 的值,这样 Repository 就会重新执行一遍数据流
注, 这个MutableRepository 作用纯粹是为了让数据流多次执行,如果具体的需求不需要多次执行的话,就不需要它了
注释5,10,14,我们之前在获取数据的时候都是 new 一个 Suppiler ,然后在 Suppiler 的 get()
方法中返回提供的值。现在我们是需要从网络接口中获得数据,在结合了 Retrofit 之后, Retrofit 的请求接口的返回值就是一个 Suppiler 对象,所以就是上面的写法
注释21,由于我们的get请求返回的值是一样的,如果不设置的 notifyIf 方法的话,数据流执行完毕后,如果 Repository 中已有的值和新值相同的话,就不会通知 Updatable 更新。所以我们这里设定永远返回true,即每次数据流执行完毕都都通知 Updatable 更新
运行效果
简化代码
在前面所有的例子中,为了让大家更好更清楚的了解 Agera 的使用,我都是把完整的代码写了出来,所以整体看起来就会显得特别臃肿
实际上,当 Agara 使用熟练后,我们可以通过 lambda 表达式对它进行代码简化。上面这个例子,把它的注释,还有无关代码去掉,简化完的 Repository 就是下面这样:
Repository<Result<JSONObject>> repository = Repositories.repositoryWithInitialValue(Result.<JSONObject>absent())
.observe(mutableRepository)
.onUpdatesPerLoop()
.goTo(Http.THREAD_POOL)
.attemptGetFrom(Http.createService(Api.class).getAgeraPic(Http.IMAGE_URL))
.orEnd(Result::failure)
.sendTo(value -> {
BitmapFactory.Options options = new BitmapFactory.Options();
options.inSampleSize = 2;
Bitmap bitmap = BitmapFactory.decodeStream(value.byteStream(), new Rect(0, 0, 0, 0), options);
runOnUiThread(() -> {
ivImage.setImageBitmap(bitmap);
tvState.setText("正在请求项目列表...");
});
})
.attemptGetFrom(Http.createService(Api.class).getProjectTree())
.orEnd(Result::failure)
.sendTo(value ->
runOnUiThread(() -> {
tvProjectTree.setText(value.toJSONString());
tvState.setText("正在请求项目详情...");
})
)
.attemptTransform(input -> {
if ("0".equals(input.getString("errorCode"))) {
JSONArray objects = input.getJSONArray("data");
JSONObject object0 = objects.getJSONObject(0);
return Http.createService(Api.class).getProjectList(object0.getInteger("id")).get();
} else {
return Result.failure(new Throwable("获取项目分类列表失败!" + input.getString("errorCode")));
}
})
.orEnd(Result::failure)
.thenTransform(input -> {
if ("0".equals(input.getString("errorCode"))) {
JSONArray objects = input.getJSONObject("data").getJSONArray("datas");
JSONObject object0 = objects.getJSONObject(0);
return Result.success(object0);
} else {
return Result.failure(new Throwable("获取项目详情失败!" + input.getString("errorCode")));
}
})
.onConcurrentUpdate(RepositoryConfig.CANCEL_FLOW)
.notifyIf((newValue, oldValue) -> true)
.compile();
Updatable updatable = () -> {
repository.get()
.ifSucceededSendTo(value -> {
tvState.setText("任务执行完毕");
tvProjectList.setText(value.toJSONString());
})
.ifFailedSendTo(value -> tvState.setText(value.getMessage()));
};
完整代码在文章底部给出
五、总结
这篇文章在第一部分讲解了 Repository 更新相关的部分,在第二、三、四部分分别给大家列举了几个Agera 的使用例子,重点是第四部分,Agera 与 Retrofit 和 Okhttp 的结合使用
MutableRepository 在活动状态下(即至少注册一个Updatable),当 MutableRepository 中的数据值改变的时候(数据改变的判断依据是 eqauls(object) 方法),触发更新
当 Repository 由非活动状态(即没有注册任何Updatable)变为活动状态时(第一次注册Updatable),触发一次更新
Repository 通过 observe(Observable... observables) 方法监听了其他的 Observable,当其监听的 Observable 有更新并且 Repository 处于活动状态时并且新值与 Repository 中的旧值不相等时,触发更新(这个是默认规则,我们通过设置notifyIf方法去改变)
重点掌握 Agera 在多个任务顺序执行下的使用方法和思路
重点掌握 Agera + Retrofit+ Okhttp 对网络请求的封装和异常处理
六、相关代码
https://github.com/smashinggit/Study
注:此工程包含多个module,本文所用代码均在AgeraDemo下
注:由于本人水平有限,所以难免会有理解偏差或者使用不正确的问题。如果小伙伴们有更好的理解或者发现有什么问题,欢迎留言批评指正~