前面说过了RXJava中的Observable,本次来说说Subject。
附上前文的链接。喜欢的兄弟麻烦点个喜欢,关注啥的吧。
RXJava Observable详解 (RXJava Part 1)
Subject
Subject有两种用途:
- 做为observable向其他的observable发送事件
- 做为observer接收其他的observable发送的事件。
最后的一个例子会使用一个subject监听一个observable,并将observable发射的事件转发给一个observer。
Subject做为Observable
- PublishSubject
该Subject不会改变事件的发送顺序。
如果在已经发送了一部分事件之后注册的observer,
是不会收到之前发送的事件。
重点跟下面三个类作对比
private void doPublishSubject() {
//将事件发送到observer,如果先前已经漏掉的事件,不会重新发送到后注册的observer上
PublishSubject<String> publish = PublishSubject.create();
publish.subscribe(new PublishObserver<String>("first"));
publish.onNext("1");
publish.onNext("2");
publish.subscribe(new PublishObserver<String>("seconde"));
publish.onNext("3");
publish.onCompleted();
}
class SubjectObserver<T extends String> implements Observer<String> {
String name;
public SubjectObserver(String name) {
this.name = name;
}
@Override
public void onCompleted() {
LogUtils.d("publishObserver %s is completed", name);
}
@Override
public void onError(Throwable e) {
LogUtils.d("publishObserver %s error msg %s", name, e.getStackTrace());
}
@Override
public void onNext(java.lang.String s) {
LogUtils.d("publishObserver %s receive msg %s", name, s);
}
}
- BehaviorSubject
该类有创建时需要一个默认参数,该默认参数会在subject未发送过其他的事件时,向注册的observer发送。
注意看代码注释。
private void doBehaviorSubject() {
//将事件发送到observer,如果先前已经漏掉的事件,除了最近的一个事件以外,
//其他相关事件不会重新发送到后注册的observer上。所以需要带默认值,
//第一次被observer注册时,observable中没有内容的时候,就会将默认值发给observer
BehaviorSubject<String> behavior = BehaviorSubject.create("创建beahavior时候带的消息");
behavior.subscribe(new SubjectObserver<String>("first"));
behavior.onNext("1");
behavior.onNext("2");
behavior.subscribe(new SubjectObserver<String>("seconde"));
behavior.onNext("3");
behavior.onCompleted();
}
- ReplaySubject
将事件发送到observer,无论什么时候注册observer,
无论何时通过该observable发射的所有事件,均会发送给新的observer。
private void doReplaySubject() {
//将事件发送到observer,无论什么时候注册observer,
//无论何时通过该observable发射的所有事件,均会发送给新的observer。
ReplaySubject<String> replay = ReplaySubject.create();
replay.subscribe(new SubjectObserver<String>("first"));
replay.onNext("1");
replay.onNext("2");
replay.subscribe(new SubjectObserver<String>("seconde"));
replay.onNext("3");
replay.onCompleted();
}
- AsyncSubject
只有当subject调用onComplete方法时,才会将subject中的最后一个事件传递给observer。
如果不调用onComplete方法,则不会给observer发送任何事件。
private void doAsyncSubject() {
//只会有当subject调用onComplete方法时,才会将subject中的最后一个事件传递给observer。
//如果不调用onComplete方法,则不会向observer中发送任何事件
AsyncSubject async = AsyncSubject.create();
async.subscribe(new SubjectObserver<String>("first"));
async.onNext("1");
async.onNext("2");
async.onNext("3");
async.onCompleted();
async.subscribe(new SubjectObserver<String>("seconde"));
async.onCompleted();
}
Subject做为observable
文章开头说了,subject及可以做observer也可以做observable。
示例会将subject注册为一个observer来接收创建的observable中的事件。
并将observable中的事件再发送给在该Subject中注册的observer中。
private void doObserverSubject() {
//将Subject当作Observer使用,并将另外的observer注册到subject上,来监听原始的observable发出的事件
List<String> items = new ArrayList<>();
items.add("100");
items.add("103");
items.add("107");
Observable<String> observable = Observable.from(items);
ReplaySubject<String> replay = ReplaySubject.create();
observable.subscribe(replay);
replay.subscribe(new SubjectObserver<String>("first"));
}