在上一小节中RxJava2_整体流程分析,有这么一个结论,那就是每一次调用 Observable 的操作符都会返回一个新的 Observable 对象,并且会通过构造的方式传入上一级创建的 Observable 对象,将其保存起来,下面是示例代码。那么接下来操作的 subscribeOn、observeOn 操作符都会分别创建新的 Observable 对象,并存储上一级创建的 observable。
//上一级创建的 observable 对象:ObservableOnSubscribe
Observable.create(new ObservableOnSubscribe<String>() {...}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//保存上一级创建的 Observable 对象 : ObservableOnSubscribe
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
一、执行流程图
二、示例代码
下面这段代码的功能就是在 subscribe 方法内部通过调用 getBitampFormServer 去请求一个 Bitmap 对象,这个方法是耗时操作,当前的操作应该在子线程中执行,得到 bmp 之后,根据结果分别去调用 onNext() /onError() 方法。而在订阅者中若是 onNext 被回调则表示成功获取到 bmp,对应地将其设置给对应的 mImageView 对象上,如果 onError 被回调了,那么表示加载 Bitmap 是失败的,对应的再做一些其它操作,这些操作应该在主线程中进行。本次通过从源码的角度探究的是 RxJava2 内部是如何进行线程切换操作的。本小节先分析 subscribeOn 如何去实现事件源在子线程中发射事件。也就是 ObservableOnSubscribe#subscribe 在子线程中去执行。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Bitmap> e) throws Exception {
//该方法进行网络请求,是比较耗时的操作。
Bitmap bmp = getBitampFormServer("uri");
if(bmp!=null) {
//获取 bmp 成功
e.onNext(bmp);
e.onComplete();
}else{
//如果从网络加载图片不成功,回调onError 来通知订阅者
e.onError(new Exception("图片加载出错啦"));
}
}}) //事件源发射事件在子线程中运行
.subscribeOn(Schedulers.io())
//订阅者在主线程中接受事件
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("zeal", "onSubscribe");
}
@Override
public void onNext(@NonNull Bitmap bmp) {
//设置显示在 ImageView 上
mImageView.setImageBitmap(bmp);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("zeal","error:"+e.toString());
}
@Override
public void onComplete() {
Log.e("zeal", "onComplete");
}
});
2、.subscribeOn(Schedulers.io()) 源码分析
public final Observable<T> subscribeOn(Scheduler scheduler) {
...
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
2.1、Scheduler
从下面的类注释可以知道,这个类是一个调度类,可以延时/周期性地去执行一个任务。可以从 Schedulers 这个类去获取 Scheduler 的实现子类对象,例如在频繁进行 io 操作就可以调用 Schedulers.io() ,如果是计算比较多的可以调用 Schedulers.computation()。
/**
* A {@code Scheduler} is an object that specifies an API for scheduling
* units of work with or without delays or periodically.
* You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
*/
public abstract class Scheduler {}
2.2、Schedulers.io()
通过下面的 Schedulers.io() 源码跟踪,最终返回的是一个 IoScheduler 对象,这个对象实际上就是 Scheduler 的子类对象。那么就符合 subscribeOn(Scheduler) 参数的要求了。
@NonNull
public static Scheduler io() {
//内部是 IO
return RxJavaPlugins.onIoScheduler(IO);
}
//-----------------------------------------------------
@NonNull
static final Scheduler IO;
static {
...
// IO 是在静态代码块中实例化的
IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
//这里返回一个 IoHolder 对象。
return IoHolder.DEFAULT;
}
});
...
}
//-----------------------------------------------------
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//-----------------------------------------------------
//IoHolder 类定义中可以知道,该类是继承至 Scheduler
public final class IoScheduler extends Scheduler {}
2.3、subscribeOn 内部实现
- subscribeOn(Scheduler scheduler)
这个方法内部会通过创建一个 ObservableSubscribeOn 对象,根据之前的经验可知道,这个类肯定也是一个 Observable 的子类对象。因此对于 subscribe(observer) 方法而言,我们就只关心它真正调用的方法 subscribeActual(observer) 。
- subscribeActual(observer)
在subscribeActual 内部首先是对 observer 进行包装成 SubscribeOnObserver 对象。这里的 SubscribeOnObserver 不仅是一个 Observer ,而且具备一个连接器的作用 Disposable 。
@Override
public void subscribeActual(final Observer<? super T> s) {
//包装 observer 对象
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//将连接器 parent 通过 onSubscribe 回调给 observer 对象
s.onSubscribe(parent);
//这里是通过 scheduler 去执行一个任务 SubscribeTask。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
- SubscribeOnObserver
这个类是对 observer 的包装,内部实现了 Observer 和 Disposable 接口。也就是说它既有订阅者的功能,也实现了连接器的功能。注意 actual 这个变量,它是下一级的 Observer 对象,为什么说是下一级呢?因为每次包装的 Observer 是一级级别往上被订阅的,当前的 Observer 都会包装下一级别的 Observer 对象。例如 SubscribeOnObserver 就封装了下一级的 Observer 对象,其实就是当前 Observer 接受到事件源发送过来的事件时,再调用包装的 Observer 回调给下一级,这样一级级传递下去知道最后一级 Observer。
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
...
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
//发送事件
@Override
public void onNext(T t) {
//回调给下一级
actual.onNext(t);
}
//发送事件
@Override
public void onError(Throwable t) {
//回调给下一级
actual.onError(t);
}
//发送事件
@Override
public void onComplete() {
//回调给下一级
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
- SubscribeTask(parent)
SubscribeTask 它是一个 Runnbale ,因此我们把它理解为一个任务。首先关注是它的 run 方法,它内部实现很简单,就是**通知上一级的 Observable 通过 subscribe 这个方法进行订阅当前 observer **。下面会执行一大堆代码,其实都会为创建一个线程然后交给指定的线程池取执行这个任务,先记住这个任务的使命。那么既然是一个线程,那么肯定有一个地方需要执行这个线程的,接下来关注 scheduler.scheduleDirect 方法。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//【核心代码,这段代码决定上一级observable订阅在哪个线程执行。】
//source:就是上一级创建的 observable
//parent 就是包装后的 observer
source.subscribe(parent);
}
}
开始寻找 SubscribeTask 这个线程实在哪里被执行的。
- scheduler.scheduleDirect(new SubscribeTask(parent))
刚才分析过 scheduler 就是 IoScheduler 对象了,跟踪源码发现,这个类并没有重写这个方法,因此直接进入 Scheduler 查看。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
//这里的 delay = 0,也就是马上执行这个任务。
//【这个 run 就是我们的目标】
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//核心代码 createWorker() 创建一个可以可以执行 run 的 worker
final Worker w = createWorker();
//对 run 进行了包装,实际上还是 run 这个对象。【这个 run 就是我们的目标】
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//decoratedRun 交给了 worker 去执行
w.schedule(new Runnable() {
@Override
public void run() {
try {
【我们的目标在此处被执行】
decoratedRun.run();
} finally {
//事件源发射事件完毕之后,就关闭连接器。
w.dispose();
}
}
}, delay, unit);
return w;
}
- IoScheduler#createWorker();
现在我们知道我们的任务是交给 worker.schedule() 去执行的。因为 Worker 是负责去执行调度的,因此不同的子类会有不同的 Worker 的实现,在 Scheduler 中通过 createWorker() 来获取子类实现的 Worker 对象。
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
- Scheduler#Worker
这个类具备延迟执行任务,周期性执行任务的功能。所有的执行都是基于 schedule() 方法,而这个方法是一个抽象方法,也就是它无法知道子类需要怎么执行这个任务,因为每一种调度器执行的方式 schedule 都不一样,因此交给子类去实现。
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
- EnentLooerWorker#schedule()
有了 Worker 之后就要开始执行【我们的任务 action 啦】
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//【任务 action 】交给 threadWorker 去执行
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
- threadWorker.scheduleActual
threadWorker 是 ThreadWorker ,继承至 NewThreadWorker 。
static final class ThreadWorker extends NewThreadWorker
//NewThreadWorker 内部维护一个线程池 executor。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
//最终代码会走到这里
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//对 run 进行包装
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//上面已经提到,delayTime = 0;所以这个任务会被立即执行,
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
@Override
public void run() {
try {
try {
//执行原始的 run 方法。
actual.run();
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(e);
}
} finally {
Object o = get(PARENT_INDEX);
if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
((DisposableContainer)o).delete(this);
}
for (;;) {
o = get(FUTURE_INDEX);
if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
break;
}
}
}
}
2.4、 结果
f = executor.submit((Callable<Object>)sr); 这里执行了 SubscribeTask#run() 方法,也就是当前的订阅者 Observer 订阅了上一级的 Observable 。也就是上一级的 ObservableCreate.subscribe(observer) 被执行了。请注意它是在子线程中被执行的。如果想要了解接下来的事件源是怎么发送事件的可以参考RxJava2_整体流程分析