0. 概述
感谢:对RxJava中.repeatWhen()和.retryWhen()操作符的思考
本文章讲解的是RxJava1.0版本中的错误处理的操作符,
其操作符主要有两个:
Catch:从onError通知中恢复发射数据
Retry:原始Observable遇到错误,重新订阅它期望它能正常终止,
1. Retry
Retry,顾名思义,是重试。当原始调用链抛出了onError事件,则,就会
触发Retry。
在RxJava中,关于Retry,有几个版本:
- retry() 没有传入参数,若有onError事件,则一直重试。
- retry(long) 传入重试的次数。
- retry(Func) 这个函数的两个参数是:重试次数和导
致发射 onError 通知的 Throwable 。这个函数返回一个布尔值,如果返回 true , retry 应该
再次订阅和镜像原始的Observable,如果返回 false , retry 会将最新的一个 onError 通知
传递给它的观察者。
我们来看具体的例子:
- retry():没有传入参数,若有onError事件,则一直重试。
Observable.just(1)
.map(integer -> {
LogUtils.e(OpenLog,TAG,"getInt:"+integer);
int a = 0;
int b = 1;
int c = b/a;
return integer + 10;
})
.retry()
.subscribe(
integer -> {
LogUtils.e(OpenLog,TAG,"onNext:"+integer);
},
throwable -> {
LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
},
() -> {
LogUtils.e(OpenLog,TAG,"onComplete");
});
这里,我们发生了一个整数1,在Map里面故意制造了一个onError事件,
如果,没有onError,retry()不会起效果,把接收到的数据继续往下传。
在这里,出现了onError,因为是retry(),所以会不断地重试。
- retry(long):传入重试的次数。
Observable.just(1)
.map(integer -> {
LogUtils.e(OpenLog,TAG,"getInt:"+integer);
int a = 0;
int b = 1;
int c = b/a;
return integer + 10;
})
.retry(3)
.subscribe(
integer -> {
LogUtils.e(OpenLog,TAG,"onNext:"+integer);
},
throwable -> {
LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
},
() -> {
LogUtils.e(OpenLog,TAG,"onComplete");
});
这里的输出是:
12-18 14:38:05.434 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
12-18 14:38:05.437 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
12-18 14:38:05.437 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
12-18 14:38:05.438 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
12-18 14:38:05.438 11348-11348/testmodules.chestnut E/MainActivity: error:divide by zero
可以看到,重试了3次。
-
retry(Func): 这个函数的两个参数是:integer, throwable,当前重试的次数和错误体。
返回值:true:进入重试,false:结束重试,并把最新一次的throwable传递。
Observable.just(1)
.map(integer -> {
LogUtils.e(OpenLog,TAG,"getInt:"+integer);
int a = 0;
int b = 1;
int c = b/a;
return integer + 10;
})
.retry((integer, throwable) -> {
LogUtils.e(OpenLog,TAG,"retry:"+integer);
if (integer>=3)
return false;
else
return true;
})
.subscribe(
integer -> {
LogUtils.e(OpenLog,TAG,"onNext:"+integer);
},
throwable -> {
LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
},
() -> {
LogUtils.e(OpenLog,TAG,"onComplete");
});
在retry里面,我们自定义了一个规则,当重试的次数达到3次,我们就返回false,也就是停止重试了。
输出:
12-18 14:40:20.950 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
12-18 14:40:20.951 11348-11348/testmodules.chestnut E/MainActivity: retry:1
12-18 14:40:20.951 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
12-18 14:40:20.952 11348-11348/testmodules.chestnut E/MainActivity: retry:2
12-18 14:40:20.952 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
12-18 14:40:20.952 11348-11348/testmodules.chestnut E/MainActivity: retry:3
12-18 14:40:20.953 11348-11348/testmodules.chestnut E/MainActivity: error:divide by zero
-
retryWhen(Func1)
**retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)**
我们先明确:
1 .Func1像个工厂类,用来实现你自己的重试逻辑。
2 .输入的是一个Observable<Throwable>。
3 .输出的是一个Observable<?>。
如果发送的是onCompleted或者onError事件,将不会触发重订阅。相对的,如果它发送onNext事件,则触发重订阅(不管onNext实际上是什么事件)。
其实这个跟retry差不多,只是每次把throwable用Obserable包裹了起来,当收到onError事件的时候,你可以直接在里面使用链式去判断,是否重试。
Observable.just(1)
.map(integer -> {
LogUtils.e(OpenLog,TAG,"getInt:"+integer);
int a = 1/0;
return integer + 10;
})
.retryWhen(observable -> {
return Observable.error(new Throwable("终止重试条件")); //终止重试条件:发射onComplete或者error
})
.subscribe(
integer -> {
LogUtils.e(OpenLog,TAG,"onNext:"+integer);
},
throwable -> {
LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
},
() -> {
LogUtils.e(OpenLog,TAG,"onComplete");
});
以上,在遇到onError后,直接是把Throwable再度抛出结束。
Observable.just(1)
.map(integer -> {
LogUtils.e(OpenLog,TAG,"getInt:"+integer);
int a = 1/0;
return integer + 10;
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<String>>() {
@Override
public Observable<String> call(Observable<? extends Throwable> observable) {
return observable.map(o -> {
LogUtils.e(OpenLog,TAG,"retryWhen:"+(o.getMessage()!=null?o.getMessage():"null"));
return "retryWhen,error";
});
}
})
.subscribe(
integer -> {
LogUtils.e(OpenLog,TAG,"onNext:"+integer);
},
throwable -> {
LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
},
() -> {
LogUtils.e(OpenLog,TAG,"onComplete");
});
上面这个,直接对错误链,用map操作符输出了一下,然后重试,其效果跟无参数的retry()差不多。
-
retryWhen(Func1,Scheduler)
这个版本,可以制定其运行的调度器。默认的是:trampoline
。
2. Catch
catch和程序中的catch是一样的,都是捕抓异常。
在链式的传递中,当其发生错误时onError的时候,我们希望不终止链式的传递,而是希望根据
异常去恢复链式。在Java中,Catch被实现成:onErrorReturn.
-
onErrorReturn
**Observable<T> onErrorReturn(Func1<? super Throwable, ? extends T> resumeFunction)**
- 由方法的定义可以看到,其传入异常体:throwable,然后返回一个异常前的对象体:? extends T
- onErrorReturn 方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者
的 onError 调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观
察者的 onCompleted 方法。
例子:
Observable.just(1)
.map(integer -> {
LogUtils.e(OpenLog,TAG,"map:"+integer);
int a = 1;
int b = 0;
int c = a/b;
return integer;
})
.onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
LogUtils.e(OpenLog,TAG,"onErrorReturn:"+throwable.getMessage());
return 12321;
}
})
.subscribe(
integer -> {
LogUtils.e(OpenLog,TAG,"onNext:"+integer);
},
throwable -> {
LogUtils.e(OpenLog,TAG,"onError:"+throwable.getMessage());
},
() -> {
LogUtils.e(OpenLog,TAG,"onCompleted:");
});
Log输出如下:
12-19 16:47:10.686 8522-8522/testmodules.chestnut E/MainActivity: map:1
12-19 16:47:10.690 8522-8522/testmodules.chestnut E/MainActivity: onErrorReturn:divide by zero
12-19 16:47:10.691 8522-8522/testmodules.chestnut E/MainActivity: onNext:12321
12-19 16:47:10.691 8522-8522/testmodules.chestnut E/MainActivity: onCompleted:
-
onErrorResumeNext
和onErrorReturn一样,只不过,它返回的是一个新的Observable。
Observable.just(1)
.map(integer -> {
LogUtils.e(OpenLog,TAG,"map:"+integer);
int a = 1;
int b = 0;
int c = a/b;
return integer;
})
.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> call(Throwable throwable) {
return Observable.just(123);
}
})
.subscribe(
integer -> {
LogUtils.e(OpenLog,TAG,"onNext:"+integer);
},
throwable -> {
LogUtils.e(OpenLog,TAG,"onError:"+throwable.getMessage());
},
() -> {
LogUtils.e(OpenLog,TAG,"onCompleted:");
});
-
onExceptionResumeNext
和 onErrorResumeNext 类似, onExceptionResumeNext 方法返回一个镜像原有Observable行为
的新Observable,也使用一个备用的Observable,不同的是,如果 onError 收到
的 Throwable 不是一个 Exception ,它会将错误传递给观察者的 onError 方法,不会使用备用
的Observable。