基础知识
角色关系(1.x)
Observable(被观察者,事件源)
Subscribe(订阅动作)
Observer(观察者)/Subscriber(订阅者)
三者关系:
现实中——观察者/订阅者 订阅 事件
RxJava中——obervable.subscribe(observer/subscriber)-->返回Subscription对象
两者相反,需注意
回调方法
在RxJava中,有三个回调方法,onNext(),onError()和onCompelete().。onNext()是最终输出及处理数据的回调,在发射数据过程中出现错误异常会回调OnError()方法,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。,OnError()和onCompleted()是互斥的。下面举一个最简单的例子:
Observable observable2 = Observable.just("也许当初忙着微笑和哭泣", "忙着追逐天空中的流星", "人理所当然的忘记", "是谁风里雨里一直默默守护在原地");
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " )
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ")
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+s )
}
};
observable.subscribe(subscriber);
运行结果为:
onNext: 也许当初忙着微笑和哭泣
onNext: 忙着追逐天空中的流星
onNext: 人理所当然的忘记
onNext: 是谁风里雨里一直默默守护在原地
onCompleted:
RxJava2.x
RxJava 2x 不再支持 null 值,如果传入一个null会抛出 NullPointerException
Observable升级为Flowable ,Flowable是Publisher的子类,解决MissingBackpressureException异常
在2.x中,添加了Flowable来支持背压,而把Observable设计成非背压的
Subscriber回调方法中新增onSubscribe(Disposable s)方法。Disposable就是1.x中的Subscription,只是为了避免冲突而改名的。
在2.x中,我们在onSubscribe()回调中必须调用s.request()方法去请求资源,参数就是要请求的数量,一般如果不限制请求数量,可以写成Long.MAX_VALUE,之后会立即触发onNext()方法!
以下粗斜体部分为引用别人文章的内容:所以当你在onSubscribe()/onStart()中做了一些初始化的工作,而这些工作是在request()后面时,会出现一些问题,在onNext()执行时,你的初始化工作的那部分代码还没有执行。为了避免这种情况,请确保你调用request()时,已经把所有初始化工作做完了。
先写一个最简单的常规写法:
//创建订阅者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
//这一步是必须,我们通常可以在这里做一些初始化操作,调用request()方法表示初始化工作已经完成
//调用request()方法,会立即触发onNext()方法
//在onComplete()方法完成,才会再执行request()后边的代码
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String value) {
Log.e("onNext", value);
}
@Override
public void onError(Throwable t) {
Log.e("onError", t.getMessage());
}
@Override
public void onComplete() {
//由于Reactive-Streams的兼容性,方法onCompleted被重命名为onComplete
Log.e("onComplete", "complete");
}
};
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("Hello,I am DoggieX!");
}
}, BackpressureStrategy.BUFFER)
.subscribe(subscriber);
BackpressureStrategy为背压,该参数需要传入一个背压模式,关于背压这个知识点,看不太懂,哈哈!背压知识点传送门
总之就是解决被观察者发送消息太快导致观察者来不及做处理的策略,这也就是Flowable存在的原因。(Observable不支持背压)
接下来分析一下新增的onSubcribe方法和request方法。
首先贴上测试代码,看着代码来分析:
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "初始化");
// s.request(Long.MAX_VALUE);
s.request(2);
Log.e(TAG, "初始化2");
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext:" + s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
};
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("第一句话");
e.onNext("第二句话");
e.onNext("第三句话");
e.onComplete();
}
}, BackpressureStrategy.BUFFER)
//1.x中subscribe返回Subscription对象 2.x中返回void 若要返回值 使用subscribeWith方法
.subscribe(subscriber);
测试结果:
E/RxTest: 初始化
E/RxTest: 初始化2
E/RxTest: onNext:第一句话
E/RxTest: onNext:第二句话
根据结果并结合上文标识为粗斜体部分的引用文,我们能够得出两个结论:
- onSubscribe中的代码的确是在onNext方法之前执行的。但是与上文不同的是,request方法之后的初始化代码依然在onNext之前执行,总之最后写request是准没错的。
- request(long n) n参数代表了回调的限制次数 若n=2,flowable最多只会执行两次onNext方法,甚至不会调用onComplete方法,若不需要作此限制 可将n=Long.MaxValue。如果不调用request,onNext和onComplete方法将不会被调用。
当我们仅仅只关心onNext方法时,1.x中我们的做法是使用Action1-9来实现多个参数的简洁实现。而在2.x中使用Consumer来代替Action1,如果是两个参数,则用BiConsumer来代替Action2,并且删除了Action3-9,如果是多个参数则用Custom<Object[]>代替ActionN。
测试一下最简单的Consumer:
Flowable.fromArray(new String[]{"Hello","I","am","DoggieX"})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});
结果:
E/RxTest: Hello
E/RxTest: I
E/RxTest: am
E/RxTest: DoggieX
具体使用需以后结合操作符体现。
基础操作符
- from(1.x)
该操作符是将其它种类的对象和数据类型转换为Flowable,如果当你发射的的数据是同一种类型,而不是混合使用Flowables和其它类型的数据,会非常方便。
因为在使用java8编译时,javac不能够区分功能接口类型,所以它在2.x中被拆分为:fromArray,fromIterable,fromFuture
fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)指定超时时间,如果执行的时候Future超时会回调onError()方法。
Flowable.fromArray(new Integer[]{1, 0, 2, 4})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "integer:" + integer);
}
});
结果:
E/RxTest: integer:1
E/RxTest: integer:0
E/RxTest: integer:2
E/RxTest: integer:4
- just
from会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,但也可以传入多个数据(甚至多个类型)逐个发送。
Flowable.just(1, "0", 2, "4", "1024")
.subscribe(new Consumer<Serializable>() {
@Override
//传入类型需有一个统一的返回类型 例如实现统一接口Serializable 或者有共同父类
public void accept(Serializable serializable) throws Exception {
Log.e(TAG, serializable.toString());
}
});
输出结果:
E/RxTest: 1
E/RxTest: 0
E/RxTest: 2
E/RxTest: 4
E/RxTest: 1024
- Empty/Never/Error
Empty:创建一个不发射任何数据但是正常终止的Observable,此时会回调onCompleted()
Never:创建一个不发射数据也不终止的Observable
Error:创建一个不发射数据以一个错误终止的Observable
error操作符需要一个Throwable参数,你的Observable会以此终止。这些操作符默认不在任何特定的调度器上执行,但是empty和error有一个可选参数是Scheduler,如果你传递了Scheduler参数,它们会在你指定的调度器上发送通知。
Empty操作符测试:
Flowable.empty().subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG,"初始化");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
Log.e(TAG,"onNext");
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError");
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
输出结果:
E/RxTest: 初始化
E/RxTest: onComplete
没有执行onNext直接执行onComplete方法。
另外两种就不加说明了,有时间自己试。Scheduler可以调度进程,可以使用empty来达到handler.sendEmptyMessage的作用。
- range
range是用户创建一组连续整数的操作符,传入两个参数,起始值和数量。数量为0时,不发射任何数据,数量为负数时,抛出异常。
Flowable.range(3, 4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "" + integer);
}
});
输出结果:
E/RxTest: 3
E/RxTest: 4
E/RxTest: 5
E/RxTest: 6
rangeLong()用于创建long类型的一组数。
intervalRange()创建间隔一段时间增长的一组数。
- timer
timer操作符创建一个在给定的时间段之后返回一个特殊值的Flowable。它在延迟一段给定的时间后发射一个简单的数字0 。三个参数分别为,延迟时间,时间单位,调度器Scheduler。
Flowable.timer(5, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "" + aLong);
}
});
5秒后,log输出结果:
E/RxTest: 0
返回值确实是0。
- interval
该操作符按固定的时间间隔发射一个无限递增的整数序列,它接受一个表示时间间隔的参数和一个表示时间单位的参数,当然该操作符合Timer一样,是在computation调度器上执行的,若想更新UI需要指定Scheduler 为AndroidSchedulers.mainThread()。
final CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(Flowable.interval(3, 1, TimeUnit.SECONDS)
.subscribeWith(new DisposableSubscriber<Long>() {
@Override
public void onNext(Long aLong) {
Log.e(TAG, "aLong:" + aLong);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "interval onError");
}
@Override
public void onComplete() {
Log.e(TAG, "interval complete");
}
}));
Flowable.timer(10, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
//取消订阅
compositeDisposable.dispose();
}
});
在方法执行后三秒开始,输出结果:
E/RxTest: aLong:0
E/RxTest: aLong:1
E/RxTest: aLong:2
E/RxTest: aLong:3
E/RxTest: aLong:4
E/RxTest: aLong:5
E/RxTest: aLong:6
E/RxTest: aLong:7
interval操作符会隔一段时间后,返回一个值,该值从0开始递增,并且永远执行。要终止其行为必须取消订阅。于是使用timer操作符,延迟10秒后,将上面订阅关系解除。最后得到8个结果(timer在interval代码之后,因此有8个数据),并且interval取消订阅后,并不会执行onComplete方法,这点需注意。
- repeat
该操作符是重复的发射某个数据序列,并且可以自己设置重复的次数。达到次数后,执行onComplete方法。
String strs[] = new String[]{"blink-dagger", "better", "than", "butterknife"};
Flowable.fromArray(strs)
.repeat(3)
.subscribe(new DisposableSubscriber<String>() {
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
输出结果:
E/RxTest: blink-dagger
E/RxTest: better
E/RxTest: than
E/RxTest: butterknife
E/RxTest: blink-dagger
E/RxTest: better
E/RxTest: than
E/RxTest: butterknife
E/RxTest: blink-dagger
E/RxTest: better
E/RxTest: than
E/RxTest: butterknife
E/RxTest: onComplete
- defer
直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable,该操作符能保证订阅执行时数据源是最新的数据。
不使用defer的例子:
String src = "17岁的blink-dagger";
Flowable<String> flowable = Flowable.just(src);
DisposableSubscriber subscriber = new DisposableSubscriber<String>() {
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
src = "18岁的blink-dagger";
flowable.subscribe(subscriber);
输出结果:
E/RxTest: 17岁的blink-dagger
可以看到,虽然在建立订阅关系之前,数据源src已经发生改变,但是flowable并没有随之改变,所以最后输出依然是旧数据。
使用defer的情况:
private final String TAG = "RxTest";
String src = "17岁的blink-dagger";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Flowable<String> flowable = Flowable.defer(new Callable<Publisher<? extends String>>() {
@Override
public Publisher<? extends String> call() throws Exception {
//Publisher是Flowable的父类
return Flowable.just(src);
}
});
DisposableSubscriber subscriber = new DisposableSubscriber<String>() {
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
src = "18岁的blink-dagger";
flowable.subscribe(subscriber);
}
输出结果:
E/RxTest: 18岁的blink-dagger
可以看到,结果是最后更改的数据。
进阶操作符
以后的代码会使用retrolambda表达式。相关配置,记录一下:
Project的build.gradle中:
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:2.2.0'
classpath 'me.tatarka:gradle-retrolambda:3.2.5'
// NOTE: Do not place your application dependencies here; they belong
// in the individual module build.gradle files
}
}
Module的build.gradle中:
apply plugin: 'me.tatarka.retrolambda'
...
android {
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
- map
map操作符可以将一个类型的数据源,转换成另一种类型。也可以对数据源做简单处理。
String src = "17岁的blink-dagger";
Flowable.just(src)
.map(s -> s.length())
.subscribe(integer -> {
Log.e(TAG, "length=" + integer);
});
输出结果:
E/RxTest: length=16
真的简洁了好多,爽得一匹。map当然不只是转换基本数据类型这么简单,在实际中,经常可以根据一组数据,生成一组Bean,美滋滋啊。
- cast
该操作符就是做一些强制类型转换操作的。例如,当我们在页面跳转时数据对象往往是序列化的,当我们在新的页面收到数据后就要强制转换为我们想要的类型。cast操作符也可以实现这样的功能。如下:
Observable.just(serializable).cast(FileInfo.class).subscribe(new Subscriber<FileInfo>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(FileInfo fileInfo) {
Log.e(TAG, "onNext: "+fileInfo.toString());
tv1.append("\n"+fileInfo.toString());
}
});
不过在该操作符实际用途并没有那么的广泛,很少用到,当然这个操作符也可以达到java 中instanceof相同的作用,用于类型检查,当不是该类型就会执行onError()方法。
- flatmap
该操作符与map操作符的区别是它将一个发射数据的Flowable变换为多个Flowables,然后将它们发射的数据合并后放进一个单独的Flowable.
List<String> strs = new ArrayList<>();
strs.add("blink-dagger");
strs.add("is");
strs.add("better");
strs.add("than");
strs.add("butterknife");
Flowable.just(strs)
.flatMap(new Function<List<String>, Publisher<String>>() {
@Override
public Publisher<String> apply(List<String> strings) throws Exception {
return Flowable.fromIterable(strings);
}
})
.map(s -> s.toUpperCase())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
sb = new StringBuilder();
sb.append("result: ");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
sb.append(s + " ");
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {
Log.e(TAG, sb.toString());
}
});
输出结果:
E/RxTest: result: BLINK-DAGGER IS BETTER THAN BUTTERKNIFE
先使用flatmap将一个Flowable转换为多个Flowables,再使用map将每个字符串转换为大写,最后拼接到一个SB中,输出。
- concatmap
flatMap并没有保证数据源的顺序性,但是ConcatMap操作符保证了数据源的顺序性。在应用中,如果你对数据的顺序性有要求的话,就需要使用ConcatMap。若没有要求,二者皆可使用。
- switchmap
暂时不懂 以后再研究
- doOnXXX
使用这一组操作符,我们能在使用简单Consumer回调对onNext方法作处理时,依然能对其他三种回调做处理:
doOnSubscribe
在onNext之前做初始化操作。doOnNext
允许我们在每次输出一个元素之前做一些额外的事情。doOnError
当出现错误时复写方法做处理doOnComplete
当onNext执行完时,做最后总处理
还是用上面的例子,稍作修改:
List<String> strs = new ArrayList<>();
strs.add("blink-dagger");
strs.add("is");
strs.add("better");
strs.add("than");
strs.add("butterknife");
Flowable.just(strs)
.flatMap(new Function<List<String>, Publisher<String>>() {
@Override
public Publisher<String> apply(List<String> strings) throws Exception {
return Flowable.fromIterable(strings);
}
})
.map(s -> s.toUpperCase())
.doOnSubscribe(subscription -> {
sb = new StringBuilder();
sb.append("result: ");
})
.doOnComplete(() -> {
Log.e(TAG, sb.toString());
})
.subscribe(s -> {
sb.append(s + " ");
});
结果还是相同的。
- groupby
groupBy操作符是对源Flowable产生的结果进行分组,形成一个类型为GroupedFlowable的结果集,GroupedFlowable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。
值得注意的是,由于结果集中的GroupedFlowable是把分组结果缓存起来,如果对每一个GroupedFlowable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。因此,如果你对某个GroupedFlowable不进行处理,最好是对其使用操作符take(0)处理。
1-10区分奇偶的例子:
Flowable.range(1, 10)
.groupBy(integer -> integer % 2 == 0)
.subscribe(booleanIntegerGroupedFlowable -> {
booleanIntegerGroupedFlowable.subscribe(integer -> {
Log.e(TAG, (booleanIntegerGroupedFlowable.getKey() == true ? "偶数" : "奇数") + integer);
});
});
输出结果:
E/RxTest: 奇数1
E/RxTest: 偶数2
E/RxTest: 奇数3
E/RxTest: 偶数4
E/RxTest: 奇数5
E/RxTest: 偶数6
E/RxTest: 奇数7
E/RxTest: 偶数8
E/RxTest: 奇数9
E/RxTest: 偶数10
这里使用boolean值来区分结果,因为结果只有两种,当然key也可以不用boolean,当结果有多组时,使用Integer来分组:
Flowable.range(1, 10)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer % 3;
}
})
.subscribe(integerIntegerGroupedFlowable -> {
integerIntegerGroupedFlowable.subscribe(integer -> {
Log.e(TAG,integer + "被3整除余" + integerIntegerGroupedFlowable.getKey());
});
});
输出结果:
E/RxTest: 1被3整除余1
E/RxTest: 2被3整除余2
E/RxTest: 3被3整除余0
E/RxTest: 4被3整除余1
E/RxTest: 5被3整除余2
E/RxTest: 6被3整除余0
E/RxTest: 7被3整除余1
E/RxTest: 8被3整除余2
E/RxTest: 9被3整除余0
E/RxTest: 10被3整除余1
此时有三组,即key有三个值
当我把上面的代码稍微改变一下:
integerIntegerGroupedFlowable.toList().subscribe(integer -> {
Log.e(TAG,integer + "被3整除余" + integerIntegerGroupedFlowable.getKey());
});
也就是说,添加了一个toList()方法。让我们来看下结果:
E/RxTest: [3, 6, 9]被3整除余0
E/RxTest: [1, 4, 7, 10]被3整除余1
E/RxTest: [2, 5, 8]被3整除余2
怎么会这样呢,其实这就显示出了lambda的一个弊端,虽然简洁,但是代码可读性是真的低,不是自己写的代码根本看不懂啊。让我们还原回正常写法:
integerIntegerGroupedFlowable.toList().subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integer) throws Exception {
Log.e(TAG, integer + "被3整除余" + integerIntegerGroupedFlowable.getKey());
}
});
其实此时的integer和上面的并不相同,这里的integer代表的是一个list,拼接到字符串,自动调用了其toString()方法,因此看到了归类的现象。
- scan
操作符对原始Flowable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。
是不是很绕很绕,想砸电脑?
一个例子就能明白什么意思:
Flowable.range(1, 9)
.scan((integer, integer2) -> integer + integer2)
.subscribe(integer -> {
Log.e(TAG, "total:" + integer);
});
输出结果:
E/RxTest: total:1
E/RxTest: total:3
E/RxTest: total:6
E/RxTest: total:10
E/RxTest: total:15
E/RxTest: total:21
E/RxTest: total:28
E/RxTest: total:36
E/RxTest: total:45
1-9求和,每次执行完scan操作符后,会在上一次结果之上继续运算。那么scan返回的值,在下一次运算时是否是作为第一个参数的呢?那我对上述代码做一下简单处理:
Flowable.range(1, 9)
.scan((integer, integer2) -> integer * 10 + integer2)
.subscribe(integer -> {
Log.e(TAG, "total:" + integer);
});
结果:
E/RxTest: total:1
E/RxTest: total:12
E/RxTest: total:123
E/RxTest: total:1234
E/RxTest: total:12345
E/RxTest: total:123456
E/RxTest: total:1234567
E/RxTest: total:12345678
E/RxTest: total:123456789
第一个参数*10则表示,每次都加在个位数,印证了返回值为第一个参数的猜想。
scan还有重载方法,能在function前面添加一个参数,也就是使用这个参数和实际第一个数做第一次处理,继续修改代码,添加默认值8(8个数,9位数超出Integer范围了):
Flowable.range(1, 8)
.scan(8, (integer, integer2) -> integer * 10 + integer2)
.subscribe(integer -> {
Log.e(TAG, "total:" + integer);
});
输出结果:
E/RxTest: total:8
E/RxTest: total:81
E/RxTest: total:812
E/RxTest: total:8123
E/RxTest: total:81234
E/RxTest: total:812345
E/RxTest: total:8123456
E/RxTest: total:81234567
E/RxTest: total:812345678
第一位是8,说明默认值8起作用了。
- buffer
buffer的作用是缓存发射结果,等到缓存池满了之后,才一起发射数据。例如11个数,缓存池为3的例子:
Flowable.range(1, 11)
.buffer(3)
.subscribe(integers -> {
Log.e(TAG, integers.toString());
});
输出结果:
E/RxTest: [1, 2, 3]
E/RxTest: [4, 5, 6]
E/RxTest: [7, 8, 9]
E/RxTest: [10, 11]
结果为三个为一组输出。
buffer还有个两个参数的重载,第二个参数skip,感觉用处不大,不去看了。
- window
window和buffer类似,不同的是,发射的是Flowable。
Flowable.range(1, 6)
.window(2)
.doOnComplete(() -> Log.e(TAG, "outter complete"))
.subscribe(integerFlowable -> {
integerFlowable.doOnComplete(() -> Log.e(TAG, "inner complete"))
.subscribe(integer -> Log.e(TAG, "inner" + integer));
});
输出结果:
E/RxTest: inner1
E/RxTest: inner2
E/RxTest: inner complete
E/RxTest: inner3
E/RxTest: inner4
E/RxTest: inner complete
E/RxTest: inner5
E/RxTest: inner6
E/RxTest: inner complete
E/RxTest: outter complete
内部flowable全部执行完后,才继续执行外部的操作。
过滤操作符
- filter
该操作符接收一个Predicate类,我们可以在其中通过运用你自己的判断条件去判断我们要过滤的数据,当数据通过判断条件后返回true表示发射该项数据,否则就不发射,这样就过滤出了我们想要的数据。如下,我们过滤出不能被2整除的数:
Flowable.range(1, 10)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(integer -> Log.e(TAG, integer + ""));
输出结果:
E/RxTest: 2
E/RxTest: 4
E/RxTest: 6
E/RxTest: 8
E/RxTest: 10
当满足上面的判断条件时,才会发射该数据,否则就不发射,最终筛选出了偶数。我们可以发现,filter的过程是不是很熟悉,是不是和groupby很像?只是filter把其他组的数据踢掉了而已。filter又牵扯出来一个Predicate类,头又大了,坑爹的rxjava,搞这么多的类,怎么记得住。其实有个小技巧,当不知道传什么的时候,按Ctrl+P 看提示什么 就打什么,一般就自动生成了,然后就看悟性啦。
- ofType
该操作符是filter操作符的一个特殊形式。它过滤一个Flowable只返回指定类型的数据,例如当数据源有字符串和int型数据时,我们想要过滤出字符串就可以使用这个操作符,如下示例代码:
Flowable.just("one", 2, 3, "four", "five", "six", 7, 8, "nine", "ten")
.ofType(String.class)
.subscribe(s -> Log.e(TAG, s.toString()));
1-10有int类型也有String类型,筛选出其中的String类型,看结果:
E/RxTest: one
E/RxTest: four
E/RxTest: five
E/RxTest: six
E/RxTest: nine
E/RxTest: ten
当然除了过滤基本类型,也可以过滤自定义类型。可以替代instanceof。
- first
当我们只对一组数据的其中一个值感兴趣时,可以使用first操作符。但是往往这种需求是很蠢的,indexof(0),get(0),[0]不就可以了,用牛刀就显得特别不合适。但是当需求稍作修改,在一组数据中选出第一个满足条件的。第一想法是for循环,在RxJava里,可以使用filter和first的结合(ps:在1.x里first()可以没有参数,2.x里,需要提供一个默认参数,若没有发射数据,则发射该默认值):
Flowable.range(51, 15)
.filter(integer -> integer % 5 == 0)
.first(8)
.subscribe(integer -> Log.e(TAG, integer + ""));
输出结果:
E/RxTest: 55
将范围缩小,导致没有过滤出数据:
Flowable.range(51, 4)
.filter(integer -> integer % 5 == 0)
.first(8)
.subscribe(integer -> Log.e(TAG, integer + ""));
输出结果:
E/RxTest: 8
输出了默认值8。
熟悉1.x的人可能会提出疑问,在1.x中first的参数可以为一个Function,类似filter的判断,直接作用于发射结果,可以查阅2.x开发文档,可以发现,该使用被废弃了,只能使用filter+first来实现了。同理,1.x的firstOrDefault也被废弃了。
- single
验证发射是否只有一个值,若不是,则会抛出一个NoSuchElementException异常。白话可以理解为发送数据是一项的话输出此项的值,若是多个数据则抛出异常执行onError()方法。示例代码:
Flowable.just(1, 2)
.single(1)
.doOnError(throwable -> Log.e(TAG, "onError" + throwable.getMessage().toString()))
.subscribe(integer -> Log.e(TAG, integer + ""));
Run一哈,崩了,意料之中。看下log,贴一下:
E/RxTest: onErrorSequence contains more than one element!
成功的代码就不贴了。
- last
同first,就不多写了。
- skip
跳过前几项,再开始发射数据。
skip有两个重载方法。skip(long time, TimeUnit unit)和skip(long time, TimeUnit unit, Scheduler scheduler),两者表达的意思是相同的,只是增加了一个调度器参数而已。
注意,此时的第一个参数,并不是跳过多少个数,而是跳过多少时间后开始发射。
示例代码:
Flowable.interval(500, TimeUnit.MILLISECONDS)
.skip(2888, TimeUnit.MILLISECONDS)
.filter(aLong -> aLong <= 10)
.subscribe(aLong -> Log.e(TAG, aLong + ""));
运行结果:
E/RxTest: 5
E/RxTest: 6
E/RxTest: 7
E/RxTest: 8
E/RxTest: 9
E/RxTest: 10
接近三秒后,开始发射数据,筛选出不大于10的数据。
这里有个坑,一般不注意的话,我们以为达到目标了,就不管他了。但是有个隐患,就是interval操作符,若是不取消订阅,那么他将一直运行下去,岂能忍,可是2.x好坑,把不能直接使用this.unsubcribe();直接取消订阅了,因为subcribe()返回值为void。
于是我们只能做如下处理:
CompositeDisposable cd = new CompositeDisposable();
cd.add(Flowable.interval(500, TimeUnit.MILLISECONDS)
.skip(2000, TimeUnit.MILLISECONDS)
.subscribeWith(new DisposableSubscriber<Long>() {
@Override
public void onNext(Long aLong) {
Log.e(TAG, aLong + "");
Log.e(TAG, cd.isDisposed() + "");
if (aLong > 10) {
cd.dispose();
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
}));
Flowable.timer(6, TimeUnit.SECONDS)
.subscribe(aLong -> Log.e(TAG, cd.isDisposed() + ""));
输出结果:
E/RxTest: 3
E/RxTest: false
E/RxTest: 4
E/RxTest: false
E/RxTest: 5
E/RxTest: false
E/RxTest: 6
E/RxTest: false
E/RxTest: 7
E/RxTest: false
E/RxTest: 8
E/RxTest: false
E/RxTest: 9
E/RxTest: false
E/RxTest: 10
E/RxTest: false
E/RxTest: 11
E/RxTest: false
E/RxTest: true
最后使用timer,看6秒后isDispose()==true说明已经取消订阅了。
- skipLast
同skip,忽略最后几项数据。
- take
与skip相反,take是取前几个发射的数据,或者在前一段时间内发射的数据。
CompositeDisposable cd = new CompositeDisposable();
cd.add(Flowable.interval(500, TimeUnit.MILLISECONDS)
.take(2000, TimeUnit.MILLISECONDS)
.subscribeWith(new DisposableSubscriber<Long>() {
@Override
public void onNext(Long aLong) {
Log.e(TAG, aLong + "");
Log.e(TAG, cd.isDisposed() + "");
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
cd.dispose();
}
}));
输出结果:
E/RxTest: 0
E/RxTest: false
E/RxTest: 1
E/RxTest: false
E/RxTest: 2
E/RxTest: false
同样不要忘了取消订阅,take可以在onComplete方法中取消订阅。