- no Subject
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
System.out.println(Thread.currentThread());
subscriber.onNext(null);
subscriber.onCompleted();
}
})
.map(new Func1<Object, Object>() {
@Override
public Object call(Object o) {
System.out.println(Thread.currentThread());
return null;
}
})
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(Thread.currentThread());
}
});
- with Subject
mVoidPublishSubject
.map(new Func1<Void, Void>() {
@Override
public Void call(Void aVoid) {
System.out.println("map" + Thread.currentThread());
return null;
}
})
.asObservable()
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
System.out.println(Thread.currentThread());
}
});
- with flatMap
mVoidPublishSubject
.map(new Func1<Void, Void>() {
@Override
public Void call(Void aVoid) {
System.out.println("map" + Thread.currentThread());
return null;
}
})
.flatMap(new Func1<Void, Observable<Void>>() {
@Override
public Observable<Void> call(Void aVoid) {
return Observable.create(new Observable.OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> subscriber) {
System.out.println("flatMap" + Thread.currentThread());
subscriber.onNext(null);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(new Func1<Void, Void>() {
@Override
public Void call(Void aVoid) {
System.out.println("map2" + Thread.currentThread());
return null;
}
});
}
})
.asObservable()
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
System.out.println(Thread.currentThread());
}
});
总结
- 在没有任何observeOn调用的情况下,下游操作必然与上游操作在同一个线程,也就是说影响下游操作执行线程的只有上游调用onNext或者return情况的执行线程,并非必然归subscribeOn所管理,subscribeOn某些情况下无法控制上游操作的订阅线程,如subject
引发问题:
PublishSubject的onNext如果在主线程,则导致下游的map操作在主线程
2.组合操作符可以结合上述理解,由于组合操作符是在多个值传递的情况下进行组合操作,则影响其Func操作线程的必然是最后一个数值的线程,
引发问题:
如果combineLatest中的某个操作在主线程,则必然导致下游的map也在主线程
- subscribeOn只能影响observeOn之前的下游操作,一旦有某个操作使用了observeOn,则底下的操作将无视subscribeOn线程,再次调用observeOn将转换下游操作线程
4.flatMap操作可以被新的subscribeOn影响,如果不指定subscribeOn,则沿用上述规则