这个页面展示的操作符可用于过滤和选择Observable发射的数据序列。
Debounce — 仅在过了一段指定的时间还没发射数据时才发射一个数据。
Distinct — 过滤掉重复数据。
ElementAt — 发射第 N 项数据。
Filter — 过滤数据。
First 只发射第一项数据。
IgnoreElements — 丢弃所有的正常数据,只发射错误或完成通知。
Last — 只发射最后的一项数据。
Sample 定期发射 Observable 最近的数据。
Skip — 跳过开始的 N 项数据。
SkipLast — 跳过最后的 N 项数据。
Take — 只发射开始的 N 项数据。
TakeLast — 只发射最后的 N 项数据。
3.1 Debounce
仅在过了一段指定的时间还没发射数据时才发射一个数据,过滤掉发射速率过快的数据项。
通俗一点来说,N 个事件的结点发生的时间太接近,debounce 过滤掉前 N-1 个事件。
实践:常用于过滤实时的搜索、防止按钮重复点击。
示例代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 7; i++) {
emitter.onNext(i);
Thread.sleep(i * 100);
}
emitter.onComplete();
}
}).debounce(300, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
4
5
6
next 发送时与上一个 next 间隔少于 300 毫秒所以不发送0,1,2,3。
操作符 throttleWithTimeout 和 debounce 效果一致。
3.1 Distinct
过滤掉重复数据,只允许还没有发射过的数据项通过。
示例代码1:
Observable.just(1, 2, 1, 1, 2, 3)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
1 2 3
3.1.1 Distinct(Function)
这个操作符有一个变体接受一个函数。这个函数根据原始 Observable 发射的数据项产生一个 Key,然后,比较这些 Key 而不是数据本身,来判定两个数据是否是不同的。
示例代码:
Observable.just(-1, -2, 0, 1, 2, 1).distinct(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return Math.abs(integer);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
-1 , -2 , 0
3.1.2 DistinctUntilChanged
过滤掉连续重复的数据,判定一个数据和它的直接前驱(相邻)是否是不同的。
示例代码:
Observable.just(-1, 1, 1, 2, 1, 2, 2)
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
-1,1,2,1,2
3.1.3 DistinctUntilChanged(Function)
过滤掉连续重复的数据,通过一个 key 判定一个数据和它的直接前驱(相邻)是否是不同的。
示例代码:
Observable.just(-1, -2, -1, 1, 2, 1).distinctUntilChanged(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return Math.abs(integer);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
-1 , -2 , -1 , 2 , 1
3.3 ElementAt
发射第 N 项数据,操作符获取原始 Observable 发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。
示例代码:
Observable.just(2,3,4,5).elementAt(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
4
ElementAt(long index, T defaultItem)
发射第 N 项数据,如果 Observable 数据少于N项就发射默认值,原始 Observable 的数据项数小于 index,使用默认的 defaultItem 数据。
示例代码:
Observable.just("h", "e", "l", "l", "0")
.elementAt(6, "dafalut")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "");
}
});
输出结果:
dafalut
3.4 Filter
Filter 操作符使用你指定的一个函数过滤数据项,只有满足条件的数据才会被发射。
示例代码:
Observable.just(1,2,3,4,5,6,7)
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return integer > 2 && integer < 6;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer+"");
}
});
输出结果:
3,4,5
3.5 First
返回一个 Single,它仅发出 Observable 发出的第一个项目,或者如果 Observable 不发送任何项目,则返回一个默认项目。first = elementAt(0L, defaultItem);
示例代码1:
Observable.just(1,2).first(100).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer+"");
}
});
输出结果:
1
示例代码2:
//创建一个不发射任何数据但是正常终止的 Observable ,则返回默认的字符“ empty default ”
Observable.empty().first("empty default").subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
Log.e(TAG, o.toString());
}
});
输出结果:
empty default
3.6 IgnoreElements
不发射任何数据,只发射 Observable 的终止通知
示例代码:
Observable.just(1, 2, 3).ignoreElements().subscribe(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "什么意思呢");
}
});
输出结果:
什么意思呢
3.7 Last
只发出一个 Observable 发出的最后一个项目(或者满足某些条件的最后一个项目)(PS:与 First 对应)
如果您只对 Observable 发出的最后一个项目或满足某些条件的最后一个项目感兴趣,则可以使用 Last 操作符过滤 Observable。
在某些实现中,Last 不是作为返回 Observable 的过滤运算符实现的,而是作为阻止函数,当 Observable 终止时返回特定项。 在这些实现中,如果您想要一个过滤操作符,那么建议你使用操作符 TakeLast(1)。
示例代码 1:
// 发射倒数第一个元素
Observable.just(-1, -2, 0, 1, 2).last(3).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
2
3.8 Sample
定期发射 Observable 发射的最后一条数据。
Sample 操作符定时查看一个 Observable,然后发射自上次采样以来它最近发射的数据。
在某些实现中,有一个 ThrottleFirst 操作符的功能类似,但不是发射采样期间的最近的数据,而是发射在那段时间内的第一项数据。
RxJava 将这个操作符实现为 Sample 和 ThrottleLast。
注意:如果自上次采样以来,原始 Observable 没有发射任何数据,这个操作返回的 Observable 在那段时间内也不会发射任何数据。
3.8.1 Sample
示例代码:
//取300毫秒内,发送的最后一次事件
Observable.interval(100, TimeUnit.MILLISECONDS)
.sample(330, TimeUnit.MILLISECONDS)
.take(3)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept:" + aLong);
}
});
输出结果:
accept:2
accept:5
accept:8
3.8.2 throttleLast
总是发射原始 Observable 的第一项数据,而不是最近的一项。
实践:常于防止按钮重复点击。
示例代码:
//取300毫秒内,发送的最后一次事件
Observable.interval(100, TimeUnit.MILLISECONDS)
.throttleLast(330, TimeUnit.MILLISECONDS)
.take(3)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept:" + aLong);
}
});
输出结果:
accept:1
accept:5
accept:7
3.8.3 throttleFirst
总是发射原始 Observable 的第一项数据,而不是最近的一项。
实践:常于防止按钮重复点击。
示例代码:
//取300毫秒里第一次发出的事件
Observable.interval(100, TimeUnit.MILLISECONDS)
.throttleFirst(330, TimeUnit.MILLISECONDS)
.take(3)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept" + aLong);
}
});
输出结果:
accept:0
accept:4
accept:8
3.9 Skip
抑制 Observable 发射的前 N 项数据,只保留之后的数据。
示例代码:
Observable.just(1, 2, 3, 4, 5).skip(3).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
4
5
3.10 SkipLast
与 skip 相反,抑制 Observable 发射的前 N 项数据,只保留前面的数据。
示例代码:
Observable.just(13, 8, 5, 3, 2, 1).skipLast(4).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
13
8
3.11 Take
只发射开始的 N 项数据。
示例代码:
//从1开始计时,输出前五项
Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, aLong + "");
}
});
输出结果:
1,2,3,4,5
3.12 TakeLast
发射 Observable 发射的最后 N 项数据.
示例代码:
//只输出最后三项
Observable.just(13, 8, 5, 3, 2, 1).takeLast(3).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, integer + "");
}
});
输出结果:
3,2,1