ObservableOn()
直接查看实现,会发现onSubscribe()
中做了一些判断,比如82
104
等几行都是做了一些同步
异步
等的判断,然后初始化Disposable
,onSubscribe()
是上游Observable
完成了整条订阅链之后调用的,所以这些操作是在开始订阅之后才初始化操作,然后106
行可以看出把一个包装处理过的Disposable
传递给下游
和之前的一样,subscribeActual
方法里会将observer
进行包装,然后传递给source也就是上游进行订阅
* `40`行进行了判断所传进来的`scheduler`是否跟原本的线程一致,如果是一样的就直接传递不用进行处理
* `43`行创建了一个对应`scheduler`的`worker`,`worker`在后续负责把数据在对应的线程进行发射操作
发射数据onNext
处理
@Override
public void onNext(T t) {
...
// 前面的都先忽略掉,会发现最后会调用这个方法
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
// 在这个可以看到,上面根据schedule的worker执行了schedule(),并且把自身传进去,this其实实现了runnable,所以可以理解为传了一个runnable进去
worker.schedule(this);
}
}
接着上面的以AndroidSchedulers.mainThread()
这个scheduler为例,这里实际上是将主线程的looper
传进去了
查看一下这个scheduler的worker,会发现worker的基类schedule()方法是相同的互相调用的,所以可以直接看多个参数的schedule(),可以看到73
行创建了一个ScheduledRunnable
对象,并且把主线程的handler
以及外面的Observer
传递过去,接着82
行用主线程的handler发送消息,119
行ScheduledRunnable
里的run
被调用,接着Observer也就是runnable
也调用run
方法
到这里可以看出,实际上当切换线程的时候,observer(也实现了Runnable
)的onNext往scheduler里发送自身,让scheduler来决定自身应该在什么线程执行run方法,接下来看回observer的run方法,就是判断了一下要执行哪个方法
可以看到最后是调用了onNext方法,到这里就完成了指定线程发射数据的功能
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
值得注意的是可以看到的是v
也就是我们要发射的数据,是通过poll
方法获取的,查看代码可以发现
queue
实际上就是一个Disposable
也就是说是上游Observable
,通过上游的poll方法去获取要onNext的数据
查看Observable
其中一个实现ObservableMap
的poll方法,可以看到这里实际上也是调用上游的poll方法,并且对数据的格式也就是不允许为null做了一层判断
public U poll() throws Exception {
T t = qd.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
poll方法操作的对象实际上是下图104
行的时候new出来的,具体查看其实就是缓存数据,类似一个容量池的作用
[图片上传失败...(image-a0f00c-1551301878533)]
用一段伪代码来展示切换线程之后的observer,其实相当于onNext等方法都被放在指定的线程里去发射数据
public class Observer {
Observer oldObserver;
public Observer(Observer observer) {
oldObserver = observer;
}
public void onNext(T t) {
// 一些其他操作
new Thread("Android mainThread") {
@Override
public void run() {
oldObserver.onNext(t);
}
} .start();
}
public void onError(Throwable e) {
// 一些其他操作
new Thread("Android mainThread") {
@Override
public void run() {
oldObserver.onError(e);
}
} .start();
}
public void onComplete() {
// 一些其他操作
new Thread("Android mainThread") {
@Override
public void run() {
oldObserver.onComplete();
}
} .start();
}
}