Rxjava2 可谓是日常开发中的利器,特别是在异步任务中更能发挥作用。响应式编程以及流式api的良好支持,给予了更好的编码体验。越来越多开发者渐渐用起来了。学习rxjava2最好的地方无外乎官方文档,详细且完整。以下结合官方文档和我自己的理解以及例子,解释各个操作符的用法,给各位以及我自己作一篇参考。
怎么用Rxjava2
要使用RxJava,需要先创建Observables(发出数据项),以各种方式转换这些Observable以获取所需要的精确数据项(通过使用Observable运算符),然后观察并响应这些需要的项目序列(通过实现观察者)
或者订阅者,然后将它们订阅到最终的变换后的Observables)。
Creating Observables 创建操作符
just
通过获取预先存在的对象并在订阅时将该特定对象发布给下游使用者来构造反应类型。为方便起见,存在2到9个参数的重载,这些对象(具有相同的常见类型)将按指定的顺序发出。就像From类似,但请注意From将传入一个数组或一个iterable或类似的东西来取出要发出的项目,而Just只是简单地发出数组或者迭代器。
请注意,如果将null传递给Just,它将返回一个Observable,它将null作为项发出。不要错误地假设这将返回一个空的Observable(一个根本不发出任何项目)。为此,需要使用Empty运算符。
fun testOpJust() {
val arr = arrayOf("mary", "tom", "ben", "lisa", "ken")
Observable.fromArray(arr).filter { it.size > 3 }.map { it + "s" }.subscribe(System.out::println)
val list = arrayListOf("mary", "tom", "ben", "lisa", "ken")
Observable.just(list).forEach { it -> System.out.println(it + "s") }
list.stream().filter { it -> it.length > 3 }.map { "$it s" }.forEach(System.out::println)
}
from
根据预先存在的源或生成器类型构造序列。当使用Observable时,如果使用的所有数据都可以表示为Observables,而不是Observables和其他类型的混合,则可以更方便。这允许使用一组运算符来控制数据流的整个生命周期。例如,Iterables可以被认为是一种的Observable;作为一种始终只发出单一项目的Observable。通过将这些对象显式转换为Observable,可以将它们作为对等体与其他Observable进行交互。因此,大多数ReactiveX实现都具有允许将特定于语言的对象和数据结构转换为Observable的方法。
注意:这些静态方法使用后缀命名约定(即,在方法名称中重复参数类型)以避免重载解析模糊。
fromIterable
从java.lang.Iterable源(例如Lists,Sets或Collections或custom Iterables)发出信号,然后完成序列。
可用于 Flowable ,Observable
fromArray
发信号通知给定数组的元素,然后完成序列。
可用于Flowable,Observable
注意:RxJava不支持原始数组,只支持(通用)引用数组。
fun testOpFrom(){
val list = arrayListOf<Int>(1,2,3,4,5,6)
Observable.fromIterable(list).subscribe(System.out::println)
Observable.fromArray(1,2,3,4,5,6).subscribe(System.out::println)
}
fromCallable
当消费者订阅时,调用给定的java.util.concurrent.Callable并将其返回值(或抛出的异常)转发给该使用者。
可用于:Observable,Flowable,Maybe,Single,Completable
备注:在Completable中,忽略实际返回值,并且Completable完成。
Observable.fromCallable<String> {
"hello"
}.subscribe(System.out::println)
Completable.fromCallable{
"complatable from callable"
}.subscribe {
System.out.println("complete")
}
fromAction
当消费者订阅时,调用给定的io.reactivex.function.Action并且消费者完成或接收Action抛出的异常。
可用于: Maybe,Completable
Maybe.fromAction<String>{
System.out.println("maybe from action")
}.subscribe(System.out::println)
以下标星先不多做解释,用得不多
*fromRunnable
*fromFuture
*from{reactive type}
将另一种反应类型包裹或转换为目标反应类型。具有以下签名模式的各种反应类型中提供以下组合:targetType.from {sourceType}()
*注意:并非所有可能的转换都是通过from {reactive type}方法系列实现的。查看to {reactive type}方法系列以获得进一步的转换可能性。
注意:fromAction和fromRunnable之间的区别在于Action接口允许抛出已受检的异常,而java.lang.Runnable则不然。
error
可用于Observable,Flowable,Maybe,Single,Completable
通过java.util.concurrent.Callable向消费者发出预先存在或生成的错误信号。
fun testOpError(){
Observable.error<Throwable>(IOException(""))
.subscribe({
System.out.print("不会打印吧")
},{
it.printStackTrace()
},{
System.out.println("也不会打印")
})
}
一个典型的用例是使用onErrorResumeNext有条件地映射或抑制链中的异常:
/**
* 抑制链上发生的异常
*/
@Test
fun testOpOnErrorResumeNext() {
val observable = Observable.fromCallable {
if (Math.random() < 0.5f) {
throw IllegalArgumentException()
}
throw IOException()
}
observable.onErrorResumeNext(Function {
if (it is IllegalArgumentException) {
Observable.empty()
} else {
Observable.error(it)
}
}).subscribe({
System.out.println("nothing")
},{
it.printStackTrace()
},{
System.out.println("empty")
})
}
这个onErrorResumeNext 厉害了,可以说之前一直不太明白怎么很好的处理。通过此操作符可以抑制错误的传递,本来如果subscribe发生了错误会触发onError回调。事实上可能发生了错误,需要不处理或者抑制产生。在onErrorResumeNext的function参数中,可以根据错误类型返回处理流程。
- empty 这种类型的源在订阅后立即表示完成。
可用于Observable,Flowable,Maybe,Single,Completable
示例可见onErrorResumeNext的例子
empty发送直接表示完成,就是订阅者直接调用onComplete回调。onNext 不会执行
- never 这种类型的源不会发出任何onNext,onSuccess,onError或onComplete的信号。这种类型的反应源可用于测试或“禁用”组合子操作符中的某些源。
可用于Observable,Flowable,Maybe,Single,Completable
不会对订阅者的任何回调进行调用。禁用也可理解,比如发送了错误,都不往下执行
- interval 定期生成无限的,不断增加的数字(Long类型)。intervalRange变体生成有限数量的此类数字。
可用于Observable,Flowable
fun testOpInterval(){
Observable.interval(1,TimeUnit.SECONDS)
.onErrorResumeNext(Function {
Observable.error(it)
})
.subscribe({
if (it.rem(5) == 0L) {
System.out.println("tick")
} else {
System.out.println("tock")
}
},{
it.printStackTrace()
},{
System.out.println("interval complete")
})
}
-
Timer运算符创建一个Observable,在指定的一段时间后发出一个特定项。
也就是说在给定的时间之后发送事件
- range 为每个消费者生成一系列值。range()方法生成Integers,rangeLong()生成Longs。Range运算符按顺序发出一系列顺序整数,您可以在其中选择范围的起点及其长度。
可用于 Observable,Flowable
fun testOpRange(){
val s = "test range operation now"
Observable.range(0,s.length- 3)
.map { "${s[it]} in range"}
.subscribe {
System.out.println(it)
}
}
发出一系列值,参数为起点,和长度。
- generate 创建一个冷,同步和有状态的值生成器。
可用于Observable,Flowable
@Test
fun testOpGenerate(){
val start = 1
val increaseValue = 2
Observable.generate<Int,Int>(Callable<Int> {
start
}, BiFunction<Int, Emitter<Int>,Int> {
t1, t2 ->
t2.onNext(t1 + increaseValue)
t1 + increaseValue
}).subscribe {
System.out.println("generate value : $it")
}
}
不太明白干啥的,具体应用场景。只是一直不间断的产生值
Filtering Observables 过滤Observable
过滤操作是非常常用且重要的,而且相关的操作符也很多
Debounce
可用于Observable,Flowable
删除响应源发出的项目,在给定的超时值到期之前,这些项目后面跟着更新的项目。计时器重置每次发射。此运算符会跟踪最近发出的项目,并且仅在有足够的时间过去而没有源发出任何其他项目时才会发出此项目。
按照我得理解就是debounde传入了超时值,在该时间之内如果多次发射,取离超时值最近得值。既然又超时那么也应该又开始时间,开始时间就是一组发射最开始值得时间,这一组发射得值的时的差是在debounce超时时间之内。
// Diagram:
// -A--------------B----C-D-------------------E-|---->
// a---------1s
// b---------1s
// c---------1s
// d---------1s
// e-|---->
// -----------A---------------------D-----------E-|-->
fun testOpDebounce(){
Observable.create<String>{
it.onNext("A")
Thread.sleep(1_500)
it.onNext("B")
Thread.sleep(500)
it.onNext("C")
Thread.sleep(250)
it.onNext("D")
Thread.sleep(2_000)
it.onNext("E")
}.debounce(1,TimeUnit.SECONDS)
.subscribe(System.out::println)
}
distinct
可用于Observable Flowable
通过仅发出与先前项目相比不同的项目来过滤反应源。可以指定io.reactivex.functions.Function,将源发出的每个项目映射到一个新值中,该值将用于与先前的映射值进行比较。Distinct运算符通过仅允许尚未发出的项目来过滤Observable。在一些实现中,存在允许调整两个项被视为“不同”的标准的变体。在一些实施例中,存在操作符的变体,其仅将项目与其前一个项目进行比较以获得更精确的比较,从而仅过滤连续的重复项目,序列中的项目。
fun testOpDistinct(){
Observable.fromArray(1,2,3,3,4,5)
.distinct()
.subscribe(System.out::println)
// 用来过滤序列中一组值前后是否相同得值
Observable.fromArray(1,1,2,3,2)
.distinct { "呵呵" }
.subscribe(System.out::println)
}
重载的方法,传入keySelectro ,作用是对每个元素应用方法得到得新得值,再决定怎么去重
distinctUntilChanged
可用于Observable Flowable
通过仅发出与其前一个元素相比较不同的项目来过滤反应源。可以指定io.reactivex.functions.Function,将源发出的每个项目映射到一个新值中,该值将用于与先前的映射值进行比较。或者,可以指定io.reactivex.functions.BiPredicate作为比较器函数来比较前一个。
Observable.fromArray(1,2,3,3,4,5)
// .distinctUntilChanged()
.distinctUntilChanged { t1, t2 ->
t1 == t2
}
.subscribe(System.out::println)
可以说是distinct的加强版,多了一个可以传入比较器的重载方法
elementAt
课用于Flowable,Observable
在来自反应源的一系列发射的数据项中,以指定的从零开始的索引发出单个项目。如果指定的索引不在序列中,则可以指定将发出的默认项。
简单说就是按照发出项的次序获取指定的位置的元素
Observable.fromArray(1,2,3,3,4,5)
.elementAt(2)
.subscribe(System.out::println)
elementAtOrError
filter
可用于Observable,Flowable,Maybe,Single
通过仅发出满足指定函数的项来过滤由反应源发出的项。
过滤偶数
Observable.fromArray(1,2,3,3,4,5)
.filter {
it.rem(2) == 0
}
.subscribe(System.out::println)}
first
可用于Flowable,Observable
仅发出反应源发出的第一个项目,或者如果源完成而不发出项目则发出给定的默认项目。这与firstElement的不同之处在于此运算符返回Single,而firstElement返回Maybe。
Observable.fromArray(1,2,3,3,4,5)
.first(-1)
.subscribe(Consumer<Int> {
System.out.println("onNext :$it")
})
Observable.fromArray(1,2,3,3,4,5)
.firstElement()
.subscribe {
System.out.println("onNext :$it")
}
firstOrError
仅发出响应源发出的第一个项目,或者如果源完成而不发出项目则发出java.util.NoSuchElementException信号。
ignoreElement
可用于Maybe Single
忽略Single或Maybe源发出的单个项目,并返回一个Completable,它仅从源中发出错误或完成事件的信号。
Maybe.timer(1L,TimeUnit.SECONDS)
.ignoreElement()
.doOnComplete {
System.out.println("done")
}
.blockingAwait()
ignoreElements
忽略Single或Maybe源发出的单个项目,并返回一个Completable,它仅从源中发出错误或完成事件的信号。
Observable.timer(1L,TimeUnit.SECONDS)
.ignoreElements()
.doOnComplete {
System.out.println("completed")
}
.blockingAwait()
last
可用于Observable,Flowable
仅发出反应源发出的最后一个项目,或者如果源完成而不发出项目则发出给定的默认项目。这与lastElement的不同之处在于此运算符返回Single,而lastElement返回Maybe。
Observable.fromArray(1,2,3,3,4,5)
.last(-1)
.subscribe(Consumer<Int>{
System.out.println("last $it")
})
lastElement
Observable.fromArray(1,2,3,3,4,5)
.lastElement()
.subscribe(Consumer<Int>{
System.out.println("last $it")
})
lastOnError
仅发出响应源发出的最后一项,或者如果源完成而不发出项,则发出java.util.NoSuchElementException信号。
ofType
可用于Flowable,Observable,Maybe
通过仅发出指定类型的项目来过滤反应源发出的项目。
Observable.fromArray(1,2.1f,3,3,4,5)
.ofType(Int::class.java)
.subscribe(Consumer<Int>{
System.out.println("last $it")
})
sample
可用于Observable Flowable
通过仅在周期性时间间隔内发出最近发出的项目来过滤反应源发出的项目。
Observable.create<String> {
it.onNext("A")
Thread.sleep(1_000)
it.onNext("B")
Thread.sleep(300)
it.onNext("C")
Thread.sleep(700)
it.onNext("D")
it.onComplete()
}.sample(1,TimeUnit.SECONDS)
.blockingSubscribe(System.out::println)
skip
删除响应源发出的前n个项目,并发出剩余项目。您可以通过使用Skip运算符修改Observable来忽略Observable发出的前n个项目,并仅参加之后的项目。
Observable.fromArray("hehe",2.1f,3,3,4,5)
// .ofType(String::class.java)
.skip(3)
.subscribe {
System.out.println(it)
}
skipLast
丢弃反应源发出的最后n个项目,并发出剩余的项目。
take
可用于Flowable Observable
仅发出反应源发出的前n项。
Observable.fromArray("hehe",2.1f,3,3,4,5)
.take(2)
.subscribe(System.out::println)
takeLast
可用于Flowable Observable
仅发出反应源发出的最后n个项目。
throttleFirst
可用于Flowable Observable
跟debounce有些相似,是取时间范围内第一个,在点击事件过滤很常用
在指定持续时间的连续时间窗口期间仅发出由反应源发出的第一个项目。
Observable.create<String> {
it.onNext("A")
Thread.sleep(300)
it.onNext("B")
Thread.sleep(400)
}.throttleFirst(1,TimeUnit.SECONDS)
.subscribe(System.out::println)
throttleLast
可用于Observable,Flowable
在指定持续时间的连续时间期间仅发出由反应源发出的最后一个项目。跟throttleFirst相反,取最后一个值
throttleWithTimeout
跟debounce的别名
public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return debounce(timeout, unit);
}
timeout
从Observable或Flowable源发出项目,但如果在从上一项开始的指定超时持续时间内未发出下一项,则以java.util.concurrent.TimeoutException终止。对于Maybe,Single和Completable,指定的超时持续时间指定等待成功或完成事件到达的最长时间。如果Maybe,Single或Completable在给定时间内没有完成,将发出java.util.concurrent.TimeoutException。
Observable.create<String>{
it.onNext("A")
Thread.sleep(600)
it.onNext("B")
Thread.sleep(1_500)
it.onNext("C")
Thread.sleep(500)
}.subscribeOn(Schedulers.io())
.subscribe({
System.out.println(it)
},{
it.printStackTrace()
})
捕获处理
一下为Kotlin编写的代码,可以看到在发生错误的情况下,通过onError() 抛出了错误,并且需要在订阅者,第二个参数传入,处理错误的回调。
fun testErrorHandle() {
Observable.create<String> {
it.onNext("start")
Thread {
try {
System.out.println("start open ...")
it.onNext("start open ...")
val stream = URL("https://www.baidu.com").openStream()
System.out.println("after url ...")
it.onNext("after url")
val br = stream.bufferedReader()
if (!it.isDisposed) {
var text = br.readText()
it.onNext(text)
}
stream.close()
br.close()
it.onNext("after open ...")
if (!it.isDisposed) {
it.onComplete()
}
}catch (e : java.lang.Exception) {
System.out.println(e)
e.printStackTrace()
it.onError(e)
}
}.start()
}.subscribe(System.out::println) {
it.printStackTrace()
System.out.println("what the fuck")
}
}
Observable通常不会抛出异常。相反,它会通过使用onError通知终止Observable序列来通知任何观察者发生了不可恢复的错误。
这有一些例外。例如,如果onError()调用本身失败,Observable将不会尝试通过再次调用onError来通知观察者,但会抛出RuntimeException,OnErrorFailedException或OnErrorNotImplementedException。
从onError通知中恢复的技术
因此,不是捕获异常,而是观察者或操作者应该更通常地响应异常的onError通知。还有各种Observable运算符可用于对来自Observable的onError通知作出反应或从中恢复。例如,可以使用运算符:
- 吞下错误并切换到备份Observable以继续序列
- 吞下错误并发出默认项
- 吞下错误并立即尝试重启失败的Observable
- 吞下错误并尝试在一些退避间隔后重新启动失败的Observable
可以使用错误处理运算符中描述的运算符来实现这些策略。
吞下的意思,应该是不处理异常
RxJava特定的异常以及如何处理它们
CompositeException
这表明发生了多个异常。可以使用异常的getExceptions()方法来检索构成组合的各个异常。
MissingBackpressureException
这表示试图将过多发出数据项应用于它的Observable。有关背压(https://github.com/ReactiveX/RxJava/wiki/Backpressure)的Observable的解决方法,请参阅Backpressure。
OnErrorFailedException
这表明Observable试图调用其观察者的onError()方法,但该方法本身引发了异常。
OnErrorNotImplementedException
这表明Observable试图调用其观察者的onError()方法,但是没有这样的方法存在。可以通过修复Observable以使其不再达到错误条件,通过在观察者中实现onError处理程序,或通过使用本页其他地方描述的其中一个运算符到达观察者之前截获onError通知来消除此问题。。
OnErrorThrowable
观察者将这种类型的throwable传递给他们的观察者的onError()处理程序。此变量的Throwable包含有关错误的更多信息以及错误发生时系统的Observable特定状态,而不是标准Throwable。