第一版
封装一个包含DialogFragment 的Subscriber,
public class ProgressDialogSubscriber<T> implements FlowableSubscriber<T> {
private static final String TAG = "ProgressSubscriber";
private MyDialogFragment myDialogFragment;
private FragmentManager fragmentManager;
private Consumer<T> onNext;
private Consumer<Throwable> onError;
public ProgressDialogSubscriber(FragmentManager fragmentManager, Consumer<T> onNext,
Consumer<Throwable> onError) {
myDialogFragment = new MyDialogFragment();
this.fragmentManager = fragmentManager;
this.onNext = onNext;
this.onError = onError;
}
@Override public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe: ");
myDialogFragment.show(fragmentManager, "dialog");
s.request(1);
}
@Override public void onError(Throwable t) {
Log.i(TAG, "onError: ");
myDialogFragment.dismiss();
try {
onError.accept(t);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override public void onComplete() {
Log.i(TAG, "onComplete: ");
myDialogFragment.dismiss();
}
@Override public void onNext(T o) {
Log.i(TAG, "onNext: " + o);
myDialogFragment.dismiss();
try {
onNext.accept(o);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MyDialogFragment extends DialogFragment {
@Nullable @Override
public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container,
@Nullable Bundle savedInstanceState) {
return inflater.inflate(R.layout.layout_dialog_fragment, container, false);
}
}
}
P层调用
loginBiz.login(userName, pw)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new ProgressDialogSubscriber<UserEntity>(loginView.getSupportFragmentManager(),
new Consumer<UserEntity>() {
@Override public void accept(UserEntity userEntity) throws Exception {
Log.i(TAG, "accept: userEntity=" + userEntity.getUserName());
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable throwable) throws Exception {
if (throwable instanceof LoginBiz.LoginInfoException) {
Log.i(TAG, "accept:登录信息错误 " + throwable.getMessage());
} else {
Log.i(TAG, "accept: " + throwable.getMessage());
}
}
}));
P层调用的时候,需要View层暴露getSupportFragmentManager(也有传入Context的)方法。而且Subscriber和P层依赖到了特定的Android平台代码,对纯业务逻辑有伤害,这样并不好。
第二版
隔离Android平台特有的context和getSupportFragmentManager。
ILoadingView 弹框控制接口
public interface ILoadingView {
/**
* 显示弹框
*/
void show();
/**
* 关闭弹框
*/
void dismiss();
}
LoadingSubscriber 定义的Subscriber只依赖控制ILoadingView
public class LoadingSubscriber<T> implements FlowableSubscriber<T> {
private ILoadingView iLoadingView;
private Consumer<T> onNext;
private Consumer<Throwable> onError;
public LoadingSubscriber(ILoadingView iLoadingView, Consumer onNext, Consumer onError) {
this.iLoadingView = iLoadingView;
if (iLoadingView == null) throw new NullPointerException("ILoadingView 不能为空");
this.onNext = onNext;
this.onError = onError;
}
@Override public void onSubscribe(Subscription s) {
iLoadingView.show();
s.request(1);
}
@Override public void onNext(T t) {
iLoadingView.dismiss();
if (onNext != null) {
try {
onNext.accept(t);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override public void onError(Throwable t) {
iLoadingView.dismiss();
if (onError != null) {
try {
onError.accept(t);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override public void onComplete() {
}
}
View层 实现ILoadingView 接口,在P层调用如下
loginBiz.login(userName, pw)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new LoadingSubscriber<UserEntity>(loginView, new Consumer<UserEntity>() {
@Override public void accept(UserEntity userEntity) throws Exception {
Log.i(TAG, "accept: userEntity=" + userEntity.getUserName());
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable throwable) throws Exception {
if (throwable instanceof LoginBiz.LoginInfoException) {
Log.i(TAG, "accept:登录信息错误 " + throwable.getMessage());
} else {
Log.i(TAG, "accept: " + throwable.getMessage());
}
}
}));
但是这个感觉还是有点问题,在订阅的时候使用了我自己特定的Subscriber,感觉还是不够干净清爽。能不能用链式的方式把弹框的控制逻辑加进去?
第三版
使用compose+doOnXXX
仍然使用上面的ILoadingView 接口。
/**
*这个必须放在subscribe之前,紧贴着subscribe,否则doOnComplete可能不会被调用
* @param iLoadingView
* @param <T>
* @return
*/
public <T> FlowableTransformer waitLoadingTransformer(final ILoadingView iLoadingView) {
return new FlowableTransformer<T, T>() {
@Override public Publisher<T> apply(Flowable<T> upstream) {
return upstream.doOnSubscribe(new Consumer<Subscription>() {
@Override public void accept(Subscription subscription) throws Exception {
Log.i(TAG, "doOnSubscribe accept: ");
iLoadingView.show();
}
}).doOnNext(new Consumer() {
@Override public void accept(Object o) throws Exception {
Log.i(TAG, "doOnNext accept: ");
}
}).doOnError(new Consumer<Throwable>() {
@Override public void accept(Throwable throwable) throws Exception {
Log.i(TAG, "doOnError accept: ");
iLoadingView.dismiss();
}
}).doOnComplete(new Action() {
@Override public void run() throws Exception {
Log.i(TAG, "doOnComplete run: ");
iLoadingView.dismiss();
}
});
}
};
}
Flowable.create(new FlowableOnSubscribe<Long>() {
@Override public void subscribe(FlowableEmitter<Long> emitter) throws Exception {
for (int i=1;i>0;i++) {
Log.i(TAG, "subscribe: i=" + i +" Thread="+Thread.currentThread());
emitter.onNext((long) i);
try {
Thread.sleep(2*1000);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
if (i ==10) {
emitter.onError(new Throwable("TTT"));
return;
}
}
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(waitLoadingTransformer(loginView)).subscribe(new FlowableSubscriber<Long>() {
@Override public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe: ");
s.request(Integer.MAX_VALUE);
}
@Override public void onNext(Long aLong) {
Log.i(TAG, "onNext: "+aLong);
}
@Override public void onError(Throwable t) {
Log.i(TAG, "onError: "+t);
}
@Override public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
这样是不是很纯粹很Rx的方式实现了对dialog的控制!
一直觉得可以用这样的方式去实现对dialog的控制,但是百度了很多,都是传入context之类的,觉得不纯粹,于是使用伟大的Google,发现了这个库RxLoading,参考了他的实现。
这是使用compose的方式实现的,AutoDispose 链式处理的思路 ObservableConverter+as操作符,感觉也是可以实现对dialog的控制,只是还没找到关于ObservableConverter 使用的介绍,看了半天,每太看懂这个怎么用,这个是2.1.7加入的实验性的功能。先暂时放置一下,后面再花时间好好的研究AutoDispose 以及ObservableConverter的使用。
第四版
在Google的时候,发现另外一种实现方案,使用using操作符
Flowable.using(new Callable<ILoadingView>() {
@Override public ILoadingView call() throws Exception {
Log.i(TAG, "using call: 11");
//初始化
loginView.show();
return loginView;
}
}, new Function<ILoadingView, Publisher<?>>() {
@Override public Publisher<?> apply(ILoadingView iLoadingView) throws Exception {
Log.i(TAG, "using apply: 22");
//发射数据的逻辑
return Flowable.create(new FlowableOnSubscribe<Long>() {
@Override public void subscribe(FlowableEmitter<Long> emitter) throws Exception {
for (int i=1;i>0;i++) {
Log.i(TAG, "subscribe: i=" + i +" Thread="+Thread.currentThread());
emitter.onNext((long) i);
try {
Thread.sleep(2*1000);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
if (i ==10) {
emitter.onError(new Throwable("TTT"));
return;
}
}
}
},BackpressureStrategy.BUFFER);
}
}, new Consumer<ILoadingView>() {
@Override public void accept(ILoadingView iLoadingView) throws Exception {
Log.i(TAG, "using accept:333 ");
//释放资源
iLoadingView.dismiss();
}
}).compose(rxLifeCycleHelper.io_main()).compose(rxLifeCycleHelper.bindTolifecycle(
RxLifeCycleHelper.ActivityEvent.DESTROY)).subscribe(new FlowableSubscriber<Long>() {
@Override public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe: ");
s.request(Integer.MAX_VALUE);
}
@Override public void onNext(Long aLong) {
Log.i(TAG, "onNext: "+aLong);
}
@Override public void onError(Throwable t) {
Log.i(TAG, "onError: "+t);
}
@Override public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
可以直接在using的第一个参数中初始化Dialog,在第二个参数中,构造发射数据请求,第三个参数中释放dialog。
这种写法比较适合直接在Activity中初始化和显示Dialog,但是在MVP中显得不够友好,每次使用的时候写的有点复杂不好封装以通用,于是再次compose化。
using compose化
public <T> FlowableTransformer waitLoadingTransformerUsing(final ILoadingView iLoadingView) {
return new FlowableTransformer<T, T>() {
@Override public Publisher<T> apply(final Flowable<T> upstream) {
return Flowable.using(new Callable<ILoadingView>() {
@Override public ILoadingView call() throws Exception {
Log.i(TAG, "using call: 11");
//初始化
iLoadingView.show();
return iLoadingView;
}
}, new Function<ILoadingView, Publisher<? extends T>>() {
@Override public Publisher<? extends T> apply(ILoadingView iLoadingView)
throws Exception {
Log.i(TAG, "using apply: 22");
return upstream;
}
}, new Consumer<ILoadingView>() {
@Override public void accept(ILoadingView iLoadingView) throws Exception {
Log.i(TAG, "using accept:333 ");
//释放资源
iLoadingView.dismiss();
}
});
}
};
}
使用,又链式了,很Rx。
Flowable.create(new FlowableOnSubscribe<Long>() {
@Override public void subscribe(FlowableEmitter<Long> emitter) throws Exception {
for (int i=1;i>0;i++) {
Log.i(TAG, "subscribe: i=" + i +" Thread="+Thread.currentThread());
emitter.onNext((long) i);
try {
Thread.sleep(2*1000);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
if (i ==10) {
emitter.onError(new Throwable("TTT"));
return;
}
}
}
},BackpressureStrategy.BUFFER).compose(waitLoadingTransformerUsing(loginView)).compose(rxLifeCycleHelper.io_main()).compose(rxLifeCycleHelper.bindTolifecycle(
RxLifeCycleHelper.ActivityEvent.DESTROY)).subscribe(new FlowableSubscriber<Long>() {
@Override public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe: ");
s.request(Integer.MAX_VALUE);
}
@Override public void onNext(Long aLong) {
Log.i(TAG, "onNext: "+aLong);
}
@Override public void onError(Throwable t) {
Log.i(TAG, "onError: "+t);
}
@Override public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
方式四相对于方式三,不用写很多doOn方法,但是先onSubscribe 之后才执行using的第一个参数,方式三的doOnSubscribe 是在onSubscribe 之前执行。