Demo 地址
相信现在大家都已近在使用 Retrofit + RxJava 框架进行开发,我们也不例外,这里我们不会讲如何使用这套框架,而是会讲述我在开发过程中遇到的一个优化需求:自定义 Retrofit 的请求接口返回类型,即下面 GitHubService 接口中 listRepos 的返回类型
//官方示例
public interface GitHubService {
@GET("users/{user}/repos")
Call<List<Repo>> listRepos(@Path("user") String user);
}
现有的使用方式
在我们的代码中现在是这样来定义的:
public interface RestClientV1_0 {
@GET("insurance/month_card/")
Flowable<ResponseBody> getInsuranceCard();
}
ResponseBody 是我们和 API 约定好的数据结构,大概是这种形式:
public class ResponseBody {
//定义业务成功或者失败
private String status;
//Json 格式的字符串,可以反序列化成定义的 Java Bean
private String content;
private String errorCode;
private String errorMsg;
//省略大部分代码
}
对于 getInsuranceCard() 方法我们使用的形式如下:
DadaApplication.getInstance().getApiV1().getInsuranceCard()
.compose(RxSchedulers.<ResponseBody>io_main(getView(), false))
.as(getView().<ResponseBody>bindAutoDisposable())
.subscribeWith(new ProgressSubscriber<ResponseBody>(getView()) {
@Override
public void onSuccess(ResponseBody response) {
InsuranceCard insuranceCard = response.getContentAs(InsuranceCard.class);
setInsuranceData(insuranceCard);
}
});
在这里我们不关注 Retrofit 和 RxJava 的使用,可以看见在 onSuccess 回调方法之前,我们声明的泛型类型全部为 ResponseBody 类型,在 onSuccess 回调中我们将 content 这个 Json 字符串解析成 InsuranceCard 对象。
现存问题
由上可知我们现在的使用方式存在两种问题:
- 在定义接口方法的时候,全部声明为 ResponseBody 类型,实际上 Api Response 会被解析成什么类型,无法从代码声明中得知,而需要去查阅 API 文档
- 对 Response 的解析是放在 onSuccess() 方法中的,然而我们大部分的 onSuccess() 方法回调都是在主线程执行,当解析数据比较大的时候就会造成卡顿
解决思路
最终我们期望对于这款框架的使用变成如下这种形式:
//接口方法的定义
@GET("insurance/month_card/")
DadaFlowable<InsuranceCard> getInsuranceCard();
//接口方法的调用
DadaApplication.getInstance().getApiV1().getInsuranceCard()
.toFlowable()
.compose(RxSchedulers.<InsuranceCard>io_main(getView(), false))
.as(getView().<InsuranceCard>bindAutoDisposable())
.subscribeWith(new DadaProgressSubscriber<InsuranceCard>(getView()) {
@Override
public void onDadaSuccess(InsuranceCard insuranceCard) {
setInsuranceData(insuranceCard);
}
});
- ResponseBody 对使用者隐藏,只需要看到具体的业务类型
- 数据的解析应该放在子线程中
实际做了哪些
- 自定义
DadaFlowable<T>
在定义接口方法时替代Flowable<T>
类型 - 重新定义
ApiResponse<T>
用来替代原先的ResponseBody
类型 - 自定义
Converter
用来将 API 返回的 Response 转换成我们需要的ApiResponse<T>
类型 - 自定义
CallAdapter
来提取ApiResponse<T>
中实际的业务类型 T
大致通过以上四步就可以实现我们的需求,下面我们来具体看一看这四步分别都做了些什么
自定义 DadaFlowable<T>
由于现阶段我们只会对新的接口采用这种新的方式,原有的 Flowable<ResponseBody>
的形式仍然保留,因此我们需要自定义一个 DadaFlowable<T>
对象,其内部仍然是生成一个 Flowable<T>
对象,如果依然在定义接口方法时使用 Flowable<T>
类型的话,它将会匹配到官方的 RxJava2CallAdapter (有关于 Retrofit 如何选择 CallAdapter 以及 Convert 请自行阅读 ServiceMethod 类的源码,这里我也附上一篇 Retrofit 非常好的源码解析 Android:手把手带你 深入读懂 Retrofit 2.0 源码) 而无法匹配到我们接下来自定义的 CallAdapter
DadaFlowable<T>
的代码目前十分简单:
public class DadaFlowable<T> {
private final Flowable<T> flowable;
public DadaFlowable(Flowable<T> flowable) {
this.flowable = flowable;
}
public Flowable<T> toFlowable() {
return flowable;
}
}
自定义 ApiResponse<T>
ApiResponse 的定义就更简单了,几乎算是对 ResponseBody 代码的 Copy,只不过我们不在采用字符串的方式来声明 content 属性,而是采用泛型的方式:
public class ApiResponse<T> {
public static final String OK = "ok";
private static final String UNKNOWN_ERROR = "unknown_error";
/**
* api 响应状态 ok 标识成功
*/
private String status;
/**
* api 业务数据
*/
private T content;
/**
* api 响应错误码
*/
private String errorCode;
/**
* errorCode 对应错误信息
*/
private String errorMsg;
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public T getContent() {
return content;
}
public void setContent(T content) {
this.content = content;
}
public String getErrorCode() {
return errorCode;
}
public void setErrorCode(String errorCode) {
this.errorCode = errorCode;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public boolean isOk() {
return OK.equals(status);
}
public static <T> ApiResponse<T> unknownError(Throwable error) {
ApiResponse<T> apiResponse = new ApiResponse<>();
apiResponse.setStatus(UNKNOWN_ERROR);
apiResponse.setErrorMsg(error.getMessage());
return apiResponse;
}
}
自定义 Converter
converter 的作用比较简单,我们可以认为是它将接口返回的数据解析成我们需要的 Java Bean 对象:
public class FastJsonResponseBodyConverter<T> implements Converter<ResponseBody, ApiResponse<T>> {
private final Type type;
public FastJsonResponseBodyConverter(Type type) {
this.type = type;
}
@Override
public ApiResponse<T> convert(ResponseBody value) throws IOException {
try {
ApiResponse apiResponse = JSON.parseObject(value.string(), ApiResponse.class);
Object content = apiResponse.getContent();
if (apiResponse.isOk() && JSONObject.class != type && JSONArray.class != type && null != content) {
apiResponse.setContent(JSON.parseObject(content.toString(), type));
}
return apiResponse;
} catch (Throwable e) {
e.printStackTrace();
return ApiResponse.unknownError(e);
}
}
}
我们需要关注的是其中的 convert(ResponseBody value)
方法,它会将接口返回的 Response 解析成 ApiResponse<T>
对象并返回,之后我们会讲述在何处使用到了这个返回对象
自定义 CallAdapter
相关
我们定义了一个用于生产 CallAdapter 的工厂,我只贴出这个工厂类里面的核心方法:
@Override
public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
Class<?> rawType = getRawType(returnType);
if (rawType != DadaFlowable.class) {
return null;
}
//省略...
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
//省略...
//一般走到这里 responseType 就是我们声明的业务类型
responseType = observableType;
return new DadaRxJava2CallAdapter<>(responseType);
}
这里的代码也比较简单,我将官方提供的 RxJava2CallAdapterFactory 代码进行了一些修改和删减,只有在声明返回类型为 DadaFlowable<T>
的时候才会匹配到这个工厂,并且生成对应的 DadaRxJava2CallAdapter 对象:
public class DadaRxJava2CallAdapter<R> implements CallAdapter<ApiResponse<R>, Object> {
private final Type responseType;
DadaRxJava2CallAdapter(Type responseType) {
this.responseType = responseType;
}
@Override
public Type responseType() {
return responseType;
}
@Override
public Object adapt(Call<ApiResponse<R>> call) {
Observable<Response<ApiResponse<R>>> responseObservable = new DadaCallExecuteObservable<>(call);
DadaBodyObservable<R> bodyObservable = new DadaBodyObservable<>(responseObservable);
return new DadaFlowable<>(bodyObservable.toFlowable(BackpressureStrategy.LATEST));
}
}
自定义 DadaCallExecuteObservable<T>
DadaCallExecuteObservable 是照搬官方的 CallExecuteObservable 代码仅仅换了个名字而已,我们主要看它的 subScribeActual 方法:
@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone ait for each new observer.
Call<T> call = originalCall.clone();
//省略...
try {
Response<T> response = call.execute();
if (!call.isCanceled()) {
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
//省略...
}
}
省略了部分代码,当我们的下游 Subscriber 订阅了 Observe 之后,将会调用 subscribeActual 方法,我们来看看该方法中的几段重要代码:
@Override public Response<T> execute() throws IOException {
okhttp3.Call call;
//省略...
return parseResponse(call.execute());
}
Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();
//省略...
ExceptionCatchingRequestBody catchingBody = new ExceptionCatchingRequestBody(rawBody);
try {
T body = serviceMethod.toResponse(catchingBody);
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
//省略...
}
}
/** Builds a method return value from an HTTP response body. */
R toResponse(ResponseBody body) throws IOException {
return responseConverter.convert(body);
}
在执行 retrofit 中的 call 对象(实际上是 OkHttpCall 对象)的 execute 方法的时候,实际上最终它会调用的 okhttp3.Call 对象的 execute 方法帮我们执行网络请求,并且调用 parseResponse 方法对返回的 response 进行解析,最终调用到的是我们上面自定义 Converter 对象的 convert 方法,返回了具体的 ApiResponse<T>
对象(这里是对上面介绍自定义 Converter 的应用)。
由此可知 Response<T> response = call.execute();
中的 response 对象其实就是 Response<ApiResponse<某种业务类型>>
对象
在获取到 response 对象之后,我们将调用 observer.onNext(response);
方法
自定义 DadaBodyObservable<T>
由 DadaRxJava2CallAdapter 的 adapt 方法可知,我们实际上是用 DadaBodyObservable 来构造出一个 DadaFlowable 对象并且返回的,DadaBodyObservable 的代码很简单,它其实就一个代理,当我们在最外层使用 DadaFlowable.toFlowable()...这一套调用流程的时候会先调用 DadaBodyObservable 的 subscribeActual
方法,然后将该方法传入的参数(实际上就是在上面解决思路段落中的 DadaProgressSubscriber
对象)包装成 BodyObserver 对象然后对 DadaCallExecuteObservable 进行订阅,代码如下:
final class DadaBodyObservable<T> extends Observable<T> {
private final Observable<Response<ApiResponse<T>>> upstream;
DadaBodyObservable(Observable<Response<ApiResponse<T>>> upstream) {
this.upstream = upstream;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
upstream.subscribe(new BodyObserver<>(observer));
}
}
自定义 BodyObserver<R>
在 DadaCallExecuteObservable 中提到的 observer.onNext(response);
方法中的 observer 对象实际上就是 BodyObserver 对象,代码如下:
private static class BodyObserver<R> implements Observer<Response<ApiResponse<R>>> {
private final Observer<? super R> observer;
private boolean terminated;
BodyObserver(Observer<? super R> observer) {
this.observer = observer;
}
@Override
public void onNext(Response<ApiResponse<R>> response) {
if (response.isSuccessful()) {
ApiResponse<R> apiResponse = response.body();
if (apiResponse.isOk()) {
//业务 OK
observer.onNext(apiResponse.getContent());
} else {
String apiErrorCode = apiResponse.getErrorCode();
String apiErrorMessage = apiResponse.getErrorMsg();
//业务失败
Throwable t = new DadaThrowable(apiErrorCode, apiErrorMessage);
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
} else {
terminated = true;
Throwable t = new HttpException(response);
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
我们仅关注 onNext 方法,代码比较简单,对接口的请求状态和业务状态进行状态,然后回调给最外外外层 的 Subscriber 对象实际上是 DadaProgressSubscriber 对象的 onNext 或者 onError 方法
总结
其实你只要能了解的 RxJava2 的使用,并且阅读掌握 Retrofit 当中关于类型转换的源码,就可以实现这个定制的功能。