具有更精确控制订阅动态的专业观察器。
首先我们有必要来了解一下什么是 Connectable Observable: 就是一种特殊的 Observable 对象,ConnectableObservable 在被订阅时并不开始发射数据,而是只有在调用 connect 操作符的时候才开始发射数据,所以可以用来更灵活的控制数据发射的时机。
9.1 Public
Publish 操作符将普通的 Observable 转换为可连接的(ConnectableObservable)。
注意:如果一个 ConnectableObservable 已经开始发射数据,再对其进行订阅只能接受之后发射的数据,订阅之前已经发射过的数据就丢失了。
示例代码:见 9.2
9.2 Connect
指示一个 ConnectableObservable 开始发射数据。
Connect 操作符就是用来触发 ConnectableObservable 发射数据的。调用 Connect 操作符后会返回一个 Subscription 对象,通过这个 Subscription 对象,我们可以调用其 unsubscribe 方法来终止数据的发射。另外,如果还没有订阅者订阅的时候就应用 Connect 操作符也是可以使其开始发射数据的。
示例代码:
//使用 publish 操作符创建一个 ConnectableObservable:
ConnectableObservable<Long> connectableObservable = Observable.interval(100,
TimeUnit.MILLISECONDS).take(6).publish();
// 创建两个 Consumer 对象
Consumer<Long> consumer1 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "step 1 -> accept:" + aLong);
}
};
Consumer<Long> consumer2 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "step 2 -> accept:" + aLong);
}
};
connectableObservable.subscribe(consumer1);
//延迟 300 毫秒订阅 consumer2
connectableObservable.delay(330, TimeUnit.MILLISECONDS).subscribe(consumer2);
//如果不调用 connect 方法,connectableObservable 则不会发射数据
connectableObservable.connect();
输出结果:
step 1 -> accept:0
step 1 -> accept:1
step 1 -> accept:2
step 1 -> accept:3
step 2 -> accept:0
step 1 -> accept:4
step 2 -> accept:1
step 1 -> accept:5
step 2 -> accept:2
step 2 -> accept:3
step 2 -> accept:4
step 2 -> accept:5
9.3 RefCount / share
让一个 ConnectableObservable 行为像普通的 Observable。
RefCount 操作符把从一个 ConnectableObservable 连接和断开的过程自动化了。调用 RefCount,返回一个普通的 Observable。当第一个订阅者订阅这个 Observable 时,RefCount 连接到下层的可连接 Observable。RefCount 跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接 Observable 的连接。
示例代码:
//使用 publish 操作符创建一个 ConnectableObservable:
ConnectableObservable<Long> connectableObservable = Observable.interval(100,
TimeUnit.MILLISECONDS).take(6).publish();
// 创建两个 Consumer 对象
Consumer<Long> consumer1 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "step 1 -> accept:" + aLong);
}
};
Consumer<Long> consumer2 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "step 2 -> accept:" + aLong);
}
};
//这两种实现方法结果一致
// Observable observable = Observable.interval(100, TimeUnit.MILLISECONDS).take(6).share();
Observable observable = connectableObservable.refCount();
observable.subscribe(consumer1);
//延迟 300 毫秒订阅 consumer2
observable.delay(330, TimeUnit.MILLISECONDS).subscribe(consumer2);
输出结果:
step 1 -> accept:0
step 1 -> accept:1
step 1 -> accept:2
step 1 -> accept:3
step 2 -> accept:0
step 1 -> accept:4
step 2 -> accept:1
step 1 -> accept:5
step 2 -> accept:2
step 2 -> accept:3
step 2 -> accept:4
step 2 -> accept:5
9.3.1 Share
Share 操作符作用与 refCount 相似。内部实现为:
public final Observable<T> share() {
return publish().refCount();
}
9.4 replay
保证所有的观察者收到相同的数据序列,即使它们在 Observable 开始发射数据之后才订阅。
Replay 操作符返回一个 ConnectableObservable 对象并且可以缓存其发射过的数据,这样即使有订阅者在其发射数据之后进行订阅也能收到其之前发射过的数据。不过使用 Replay 操作符我们最好还是限定其缓存的大小,否则缓存的数据太多了可会占用很大的一块内存。
Replay 操作符能指定缓存的大小或者时间,这样能避免耗费太多内存。
- Javadoc: replay()
- Javadoc: replay(int)
- Javadoc: replay(long,TimeUnit)
- Javadoc: replay(int,long,TimeUnit)
有一种 replay 返回一个普通的 Observable。它可以接受一个变换函数为参数,这个函数接受原始 Observable 发射的数据项为参数,返回结果 Observable 要发射的一项数据。因此,这个操作符其实是 replay 变换之后的数据项。
- Javadoc: replay(Function)
- Javadoc: replay(Function,int)
- Javadoc: replay(Function,long,TimeUnit)
- Javadoc: replay(Function,int,long,TimeUnit)
示例代码 1:
Observable<Long> ob1 = Observable.just(1L, 12L);
//缓存两次,三秒时间内有效。
ConnectableObservable<Long> connectableObservable = ob1.replay(2, 3, TimeUnit.SECONDS);
//ConnectableObservable<Long> connectableObservable = ob1.publish();
Consumer<Long> consumer1 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept:" + aLong);
}
};
Disposable disposable = connectableObservable.subscribe(consumer1);
connectableObservable.connect();
try {
//间隔一秒取消订阅,然后重新订阅,有缓存数据
Thread.sleep(1000);
disposable.dispose();
disposable = connectableObservable.subscribe(consumer1);
//间隔两秒取消订阅后,然后重新订阅,缓存数据失效
Thread.sleep(2000);
disposable.dispose();
disposable = connectableObservable.subscribe(consumer1);
} catch (InterruptedException e) {
e.printStackTrace();
}
输出结果:
accept:1
accept:12
accept:1
accept:12