在前面两篇文章RxJava2笔记(一、事件订阅流程)和RxJava2笔记(二、事件取消流程)中,我们分别了解了事件的订阅以及取消是如何进行的,接下来我们将要介绍RxJava的线程切换。
对RxJava有过了解的肯定知道,前面的代码都是运行在主线程当中,让我们来确认下,继续对代码做一些改动,添加些打印日志,打印各个方法的当前运行线程:
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--运行线程:" + Thread.currentThread().getName());
disposable = d;
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer + " --运行线程:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getMessage());
e.printStackTrace();
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--运行线程:" + Thread.currentThread().getName());
}
};
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
Log.i(TAG, "subscribe--运行线程:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(observer);
输出结果如下:
I/MainActivity: onSubscribe--运行线程:main
subscribe--运行线程:main
I/MainActivity: onNext: 1 --运行线程:main
onNext: 2 --运行线程:main
onNext: 3 --运行线程:main
onComplete--运行线程:main
可以看到,所有的线程都运行在主线程当中。
而在实际开发中,我们需要为网络请求相关代码单独开启一个子线程,将网络请求代码运行在这个子线程中,当网络请求结束返回数据并开始更新UI界面时,我们需要将线程切换回主线程后才能去更新UI界面,否则就会报错(android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views. at android.view.ViewRootImpl.checkThread(ViewRootImpl.java:6838))
这段话的意思其实就是我们只可以在主线程中更新UI,当我们在其他线程中更新UI时,就会抛出这个异常。
而使用RxJava切换线程也非常简单,只需要添加两个调用方法即可:
我们在订阅线程添加了一行代码TimeUnit.MILLISECONDS.sleep(1000)来模拟网络请求。如果在此处不添加这行代码,当我们把observeOn(AndroidSchedulers.mainThread())这行代码去掉时,程序仍能正常运行,有兴趣的读者可以试一下。至于原因,参考这篇文章Android中子线程真的不能更新UI吗?。
好了,按照上图的代码,我们再打印下结果(打印结果中间乱入了两行,这是因为订阅线程睡眠了1s的缘故,我们直接无视掉就行):
I/MainActivity: onSubscribe--运行线程:main
I/MainActivity: subscribe--运行线程:RxCachedThreadScheduler-1
I/MainActivity: onNext: 1 --运行线程:main
I/MainActivity: onNext: 2 --运行线程:main
I/OpenGLRenderer: Initialized EGL, version 1.4
W/art: Before Android 4.1, method int android.support.v7.widget.DropDownListView.lookForSelectablePosition(int, boolean) would have incorrectly overridden the package-private method in android.widget.ListView
I/MainActivity: onNext: 3 --运行线程:main
I/MainActivity: onComplete--运行线程:main
从上面的结果可以看到,我们的订阅线程(即发射数据事件所在的线程,也就是被观察者)运行在了名为RxCachedThreadScheduler-1这个线程中,而observer的四个方法(观察者)均运行在main线程中(也就是UI线程)。接下来我们就开始分析这两个线程切换代码做了哪些工作。
我们先从订阅线程开始:
将订阅线程切换到子线程运行
将订阅线程切换到子线程只需要调用方法subscribeOn(Schedulers.io())就能完成切换。那么这个方法做了哪些工作呢?我们点进去看看:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
该方法接收一个Scheduler类型的参数,这个Scheduler类就是负责线程调度的,关于这个类我们稍后再讨论。沿着上面的代码接着分析下去,subscribeOn(Scheduler scheduler)方法将传入的scheduler参数又传递到了new ObservableSubscribeOn<T>(this, scheduler)这个构造方法中,然后通过RxJavaPlugins.onAssembly将这个生成的ObservableSubscribeOn对象作为钩子返回生成一个新的Observable对象。我们接着来看ObservableSubscribeOn这个类:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>{
}
发现它继承了AbstractObservableWithUpstream这个类
/**
* Base class for operators with a source consumable.
*
* @param <T> the input source type
* @param <U> the output type
*/
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
通过源码可以发现,AbstractObservableWithUpstream继承了Observable,其内部有个ObservableSource类型成员变量source,这个ObservableSource又是什么?我们点进去看下
/**
* Represents a basic, non-backpressured {@link Observable} source base interface,
* consumable via an {@link Observer}.
*
* @param <T> the element type
* @since 2.0
*/
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(@NonNull Observer<? super T> observer);
}
可以看出,ObservableSource是一个接口,Observable实现了这个接口,而这个接口和前面的ObservableOnSubscribe接口如出一辙。因此AbstractObservableWithUpstream中的source成员变量就是用来保存上游传递下来的observable,也就是Observable.create方法生成的数据源。
我们来看下ObservableSubscribeOn的源码:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//1、线程调度器,这里是负责订阅线程切换
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//2、保存上游传递的observable
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//3、封装外部传递进来的观察者对象(将observer包装起来,这是一个装饰器模式)
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//4、建立订阅关系(观察者调用自己的onSubscribe方法)
s.onSubscribe(parent);
//5、将订阅线程中的disposable赋值给parent(也就是SubscribeOnObserver对象)
//这里使用scheduler开始线程调度,将外部observer的包装对象parent用SubscribeTask构造方法包装起来并使这个SubscribeTask运行在我们指定的线程中
//这个SubscribeTask是一个Runnable,实际上真正的订阅是发生在它的run()方法里面,而这个run()方法正是运行在我们前面指定的线程中(比如Schedulers.io()指定的IO线程)。
//这样我们就完成了线程切换
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
//......代码省略
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//6、订阅事件发生
source.subscribe(parent);
}
}
}
在步骤3中,这个外部observer的包装类SubscribeOnObserver跟前面所讲的CreateEmitter相比较起来,虽然它们都是observer包装类,但还是有一定的区别的,我们来分析下这个类:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
//7、保存外部传进来的observer对象
//(这里的actual用于保存下游传递过来的observer,
//当前这个类SubscribeOnObserver对象可以看做是介于上游observable和下游observer之间的一个中间observer,主要用于辅助订阅线程切换)
final Observer<? super T> actual;
//8、保存订阅发生时生成的disposable,用于后续的解除订阅
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
//9、该方法在调用source.subscribe(parent)时最先调用,
//将当前订阅事件产生的disposable保存到AtomicReference<Disposable> s这个成员变量中,用于之后取消订阅
//(从上面的代码可以看出这里传入的参数实际上就是SubscribeOnObserver自身对象)
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
//10、这里调用下游observer的onNext方法,actual保存的就是下游传递过来的observer
//后面的onComplete和onError方法同理
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
//11、取消订阅时除了要终止当前订阅事件(onSubscribe方法被调用时保存的disposable(保存在其成员变量s中)),
//还要终止scheduler线程调度时保存其返回的task(保存在其自身),
//因为网络请求任务是运行在独立的线程中,终止订阅事件时,我们也需要终止相应运行的线程
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//12、这个方法是在ObservableSubscribeOn类中的subscribeActual方法内部调用的,
//用于保存scheduler进行线程调度时返回的task(实际上返回的是DisposeTask类型的对象,它实现了Disposable接口,这个后面再讨论)
//这里是将scheduler线程调度时返回的task保存到自身
//(SubscribeOnObserver继承了AtomicReference<Disposable>类(继承该类保证了线程安全)和Disposable接口)
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
我们来梳理下思路:
- 1、当我们调用subscribeOn(Schedulers.io())方法时,调用这个方法的对象实际上是Observable.create方法生成的一个新的observable对象(即ObservableCreate对象,该对象有个source成员变量,用于保存上游传递过来的observable被观察者,也就是事件源);调用subscribeOn这个方法又会产生一个新的observable对象(通过new ObservableSubscribeOn<T>(this, scheduler)得到),用于进一步的事件操作。
- 2、步骤1中提到的ObservableSubscribeOn构造方法第一个参数是ObservableSource接口类型(Observable实现了这个接口)的参数,该参数实际上接收的是步骤1中的ObservableCreate对象,并保存在其成员变量source中(类型为ObservableSource,通过继承AbstractObservableWithUpstream得到)。
- 3、在ObservableSubscribeOn类的subscribeActual(final Observer<? super T> s)方法中(这个方法接收的observer参数是从下游传递过来的),将外部传进来的observer对象用SubscribeOnObserver内部类包装起来(保存在其成员变量actual中)得到该内部类的一个对象parent(通过SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s),这个内部类主要用于辅助订阅线程切换)。
- 4、然后通过s.onSubscribe(parent)建立订阅关系,最后执行parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))),通过scheduler.scheduleDirect(new SubscribeTask(parent))完成线程切换。(备注:SubscribeTask是一个Runnable,这里是将parent传入到SubscribeTask的构造方法中,并在SubscribeTask的run()方法中执行订阅(source.subscribe(parent)))。
至此,订阅线程的切换流程就介绍完了,本文也就告一段落。下面,还有几个细节,如果只关注订阅线程切换流程的话,可以结束本文的阅读了。
留下的几个问题:
调用subscribeOn(Schedulers.io())这个传入的Schedulers.io()到底是啥?
我们点进去看下代码:
/**
* Returns a default, shared {@link Scheduler} instance intended for IO-bound work.
* ......已省略
* @return a {@link Scheduler} meant for IO-bound work
*/
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
看注释,我们了解到这个方法是返回一个绑定在IO操作的默认共享实例,返回类型是Scheduler类型。RxJavaPlugins我们前面提到过,它的作用是返回一个钩子,这里返回的是一个常量IO,这个IO主要负责网络通信任务。其实Schedulers类内部声明了很多个常量,我们大致看下:
public final class Schedulers {
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
//......代码省略
}
Schedulers类一共内置了5个Scheduler类型的常量,除了IO之外,其他几个也简单介绍下:
- SINGLE:创建一个共享的单一线程,所有工作均在这个线程里面执行
- COMPUTATION:创建一个指定数量线程的线程池,主要适用于计算密集型的任务
- IO:创建一个预置一定数量线程的线程池,主要用于IO密集型的任务
- TRAMPOLINE:使用该种方式的任务会在当前线程上运行,但并不会马上执行,任务会被保存在一个队列中,等当前任务执行完后再从队列中把该任务取出来并执行。
- NEW_THREAD:直接启动一个新线程执行指定任务。
- 还有一种创建线程的方式:Scheduler from(@NonNull Executor executor),这是由我们自己指定线程创建以及调度方式。
上面六种方式常用的为IO和COMPUTATION,本文我们就分析下这个IO,COMPUTATION其实和IO是类似的,读者可以自行分析。
我们看到这个常量IO执行了静态代码初始化,初始化为一个IOTask对象:
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
Callable和Runnable类似,区别是使用Callable可以得到返回值,而使用Runnable则没有返回值。
上面的代码在call()方法内返回了IoHolder.DEFAULT,最终返回一个IoScheduler类型的对象:
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
有人在这里可能就有疑问,Schedulers.io()返回的是一个Scheduler对象啊,这个IOTask的call()方法是什么时候执行的呢?
我们可以看到,IO这个变量在声明的时候它的类型就是Scheduler类型,因此可以想到它在初始化的时候必然是经过了一些处理。我们看下IO的初始化代码:
IO = RxJavaPlugins.initIoScheduler(new IOTask());
这里直接将IOTask对象作为参数传递给 RxJavaPlugins.initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler)方法,这个方法正好接收一个Callable对象,看来call()方法应该就是在这里面执行的,点进去看下:
@NonNull
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
if (f == null) {
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
我们看下这个方法,首先验证传入的参数不能为空,然后定义一个Function类型的临时变量f,并用同是类型为Function的成员变量onInitIoHandler为其赋值,然后判断f==null是否成立,也就是onInitIoHandler==null是否成立。这里f==null是成立的,原因是RxJavaPlugins内部并没有onInitIoHandler预初始化方法,只有与其相关的Getter和Setter方法,而在本文开始到目前为止,onInitIoHandler的Setter方法也并没有被调用过,因此onInitIoHandler为null,自然f==null也就为true,因此调用callRequireNonNull(defaultScheduler)方法,我们点进这个方法看下:
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
终于,我们看到了call()方法的调用。上面说到,在IOTask类中,call()方法返回的是一个IoScheduler对象,因此结合前面的分析,这个IO在静态初始化的时候就被初始化为一个IoScheduler对象。
这里还有一个问题,f==null这个条件什么时候不成立呢?当然是我们想要自己处理传入的Callable对象了。那要如何自己处理呢?我们自己声明一个继承自Function接口的处理类,并实现接口中的apply方法(当然了这里面最终肯定是要调用call()方法返回Scheduler对象的,我们可以在返回之前做一些自己的处理),然后调用onInitIoHandler的静态Setter方法(RxJavaPlugins类中的成员变量都是静态的,因此其Setter和Getter方法也都是静态的)为onInitIoHandler赋值,此时由于onInitIoHandler是不为null的,因此f==null不成立,自然就执行下面的方法,我们简要的看下代码:
@NonNull
static Scheduler applyRequireNonNull(@NonNull Function<? super Callable<Scheduler>, ? extends Scheduler> f, Callable<Scheduler> s) {
//这里调用apply(f, s),f为我们自定义的处理类对象
return ObjectHelper.requireNonNull(apply(f, s), "Scheduler Callable result can't be null");
}
@NonNull
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
try {
//调用f中的apply方法,也就是我们自己定义的处理方法,t为Callable对象
return f.apply(t);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
至此,我们可以回答前面的问题了,Schedulers.io()返回的就是一个IoScheduler对象,这个IoScheduler实际上就是在IO线程调度时用来管理IO线程的。
scheduler.scheduleDirect(new SubscribeTask(parent))具体是如何完成线程调度的?
我们点进这个方法看一下:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
直接无延迟调用了scheduleDirect方法
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//1、创建一个worker,其实现了Disposable接口,是一个工作者类
final Worker w = createWorker();
//2、这里我们可能要对run参数做一些处理,也可能不处理,取决为我们是否为RxJavaPlugins类的成员变量onScheduleHandler赋值(自定义继承Function的实现类处理run)。
//一般情况下我们可以认为decoratedRun=run。
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//3、包装类。DisposeTask实现了Disposable, Runnable, SchedulerRunnableIntrospection这三个接口,
//主要用于管理任务Runnable及其工作Worker
DisposeTask task = new DisposeTask(decoratedRun, w);
//4、开始执行任务
w.schedule(task, delay, unit);
//5、返回这个管理者
return task;
}
上面的方法将外面传入进来的run传递给了DisposeTask,然后调用worker来进行任务调度,看起来主要的任务是在DisposeTask里面执行的,我看下DisposeTask这个类的代码:
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
//6、外部传进来的Runnable任务
final Runnable decoratedRun;
//7、调度工作者worker
final Worker w;
//8、当前线程
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
//9、保存decoratedRun执行时候所在的线程
runner = Thread.currentThread();
try {
//10、执行decoratedRun的run()方法
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
//11、当DisposeTask被取消执行任务时,如果取消任务时所在的线程和decoratedRun任务执行时所在的线程为同一个线程,
//并且调度工作者worker类型为NewThreadWorker类型,直接结束任务,关闭底层执行程序(这里是同步任务场景)
((NewThreadWorker)w).shutdown();
} else {
//12、当DisposeTask被取消执行任务时,告诉worker工作者取消调度任务(IO异步任务场景)
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
具体的步骤都在上面的代码中标注了出来,我们可以看到,在DisposeTask的run方法中最终执行了decoratedRun.run(),而这个decoratedRun也就是我们在ObservableSubscribeOn类的subscribeActual(final Observer<? super T> s)方法里面所执行的代码parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))传进来的new SubscribeTask(parent))对象,通过本文前面的分析我们知道,SubscribeTask也是一个Runnable,实际上的订阅source.subscribe(parent)正是在其run()方法中。
至此我们明白了scheduler.scheduleDirect(new SubscribeTask(parent))都做了哪些工作,还有一个问题,我们通过查看Worker类的源码会发现这是一个抽象类,而createWorker方法也是一个抽象方法,我们在上面说到在IO线程调度时实际上管理IO线程的是IoScheduler,这个类也是继承自Scheduler,我们去IoScheduler类看下createWorker方法:
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
该方法直接返回了EventLoopWorker对象(管理Worker--负责任务调度的工作者,CachedWorkerPool--管理Worker的一个队列,CompositeDisposable--批量管理订阅状态的disposable容器 ):
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
//保存订阅状态
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
//取消订阅时,释放当前的threadWorker
@Override
public void dispose() {
//如果成功的将disposed状态设置为true,则取消订阅,并将释放当前的threadWorker资源,并将其添加到Worker管理池中
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//实际线程调度工作
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
虽然EventLoopWorker也继承了Scheduler的内部类Worker,但在scheduler方法中,把实际的线程调度工作转发给了ThreadWorker去进行,ThreadWorker又继承了NewThreadWorker类,我们点进scheduleActual(action, delayTime, unit, tasks)方法看下(该方法位于NewThreadWorker类中):
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//1、包装类,管理任务线程,内部实现了一系列方法,包括线程任务的执行,设置任务标志位,任务的取消等。
//这个parent参数就是我们平常使用的CompositeDisposable对象,负责订阅批量管理
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//2、将任务线程ScheduledRunnable放入线程池执行任务,根据是否有延迟时间调用相应的方法
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
//3、设置ScheduledRunnable在线程池中的执行状态
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
//4、如果ScheduledRunnable在执行过程中出错,并且有设置订阅状态管理容器,将ScheduledRunnable的订阅状态从该容器中移除
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
//5、返回任务线程包装类给外部,用于该任务的管理
return sr;
}
既然返回的是ScheduledRunnable对象,我们就来看下这个类:
//这里即实现了Runnable接口也实现了Callable接口,用于应对需要返回值和不需要返回值的情况
public final class ScheduledRunnable extends AtomicReferenceArray<Object> implements Runnable, Callable<Object>, Disposable {
private static final long serialVersionUID = -6120223772001106981L;
final Runnable actual;
/** Indicates that the parent tracking this task has been notified about its completion. */
static final Object PARENT_DISPOSED = new Object();
/** Indicates the dispose() was called from within the run/call method. */
static final Object SYNC_DISPOSED = new Object();
/** Indicates the dispose() was called from another thread. */
static final Object ASYNC_DISPOSED = new Object();
static final Object DONE = new Object();
//保存追踪该任务的外部任务状态的标识
static final int PARENT_INDEX = 0;
//保存该任务运行状态的标识
static final int FUTURE_INDEX = 1;
//保存其他线程像该任务发出的指令标识
static final int THREAD_INDEX = 2;
/**
* Creates a ScheduledRunnable by wrapping the given action and setting
* up the optional parent.
* @param actual the runnable to wrap, not-null (not verified)
* @param parent the parent tracking container or null if none
*/
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(3);
this.actual = actual;
//如果我们在外部通过一个disposable容器来管理当前任务,就保存这个disposable容器
this.lazySet(0, parent);
}
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
@Override
public void run() {
//设置任务执行线程为当前所在线程
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try {
//执行实际IO任务
actual.run();
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(e);
}
} finally {
//任务执行期间出现错误,重置当前执行线程为null
lazySet(THREAD_INDEX, null);
//获取追踪次任务的外部任务状态
Object o = get(PARENT_INDEX);
if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
//如果追踪该任务状态的外部任务还没有被终止,将它设置为DONE状态,并将当前任务从订阅状态管理容器中删除
((DisposableContainer)o).delete(this);
}
//任务在执行出错时,确保任务的执行结果状态为SYNC_DISPOSED,ASYNC_DISPOSED以及DONE中的任意一个即可
for (;;) {
o = get(FUTURE_INDEX);
if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
break;
}
}
}
}
//设置Runnable任务的运行状态
public void setFuture(Future<?> f) {
for (;;) {
Object o = get(FUTURE_INDEX);
if (o == DONE) {
return;
}
if (o == SYNC_DISPOSED) {
f.cancel(false);
return;
}
if (o == ASYNC_DISPOSED) {
f.cancel(true);
return;
}
if (compareAndSet(FUTURE_INDEX, o, f)) {
return;
}
}
}
//结束Runnable任务运行
@Override
public void dispose() {
for (;;) {
//获取任务运行状态
Object o = get(FUTURE_INDEX);
//如果任务执行结果状态为以下三个状态中的任意一个,表示任务已被结束,什么也不做
if (o == DONE || o == SYNC_DISPOSED || o == ASYNC_DISPOSED) {
break;
}
//判断任务在执行期间所在的线程以及执行终止后所在的线程是否一致
boolean async = get(THREAD_INDEX) != Thread.currentThread();
//设置该任务是在同步环境下结束的还是在异步环境下结束的
if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
if (o != null) {
//取消任务的执行
((Future<?>)o).cancel(async);
}
break;
}
}
for (;;) {
//获取追踪该任务的外部任务所处状态
Object o = get(PARENT_INDEX);
if (o == DONE || o == PARENT_DISPOSED || o == null) {
//如果外部任务状态是终止状态,则什么也不做
return;
}
if (compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED)) {
//如果外部任务状态不是终止状态,将其设置为终止状态,并将当前任务从状态容器中移除
((DisposableContainer)o).delete(this);
return;
}
}
}
//根据追踪该任务的父任务状态并根据父任务是否结束来判断该任务是否结束
@Override
public boolean isDisposed() {
Object o = get(PARENT_INDEX);
return o == PARENT_DISPOSED || o == DONE;
}
}
通过上面代码我们可以看到,NewThreadWorker类内部确实是通过线程池来执行我们提交的IO任务,这个线程池是什么时候创建的呢?答案是在createWorker()方法调用的时候,即我们创建worker工作者对象时。通过调用EventLoopWorker的构造方法生成该对象并返回。
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
这个构造方法接收一个CachedWorkerPool类型的参数,这个CachedWorkerPool并不是线程池,而是用来管理我们创建的Worker工作者。
static final class CachedWorkerPool implements Runnable {
//空闲Worker在队列中的存活时间
private final long keepAliveTime;
//线程安全的任务队列,用于保存处于空闲状态的Worker工作者
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
//Disposable容器,可以用于统一管理多个订阅任务状态
final CompositeDisposable allWorkers;
//可以实现循环或延时执行任务的线程池
private final ScheduledExecutorService evictorService;
//保存延迟周期任务
private final Future<?> evictorTask;
//线程创建工厂方法
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
//如果空闲队列中的Worker有存活时间,则创建线程池和相应的延迟周期任务
//该类实现了Runnable接口,并在run()方法中调用方法evictExpiredWorkers(),用于移除空闲队列中超过设置的存活时间的Worker
if (unit != null) {
//创建线程池
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
//并将当前Runnable任务对象(即CachedWorkerPool)添加到周期延迟执行任务中,然后启动线程池执行该周期延迟任务,最后返回执行结果
//由于run()方法中主要是为了在周期时间内检查Worker空闲队列中缓存的Worker是否超过存活时间,因此此处周期延迟任务的延迟时间与Worker的存活时间一致
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
//CachedWorkerPool内部的线程池启动后开始执行
@Override
public void run() {
evictExpiredWorkers();
}
//从空闲队列中取出一个缓存的工作者Worker
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
//如果队列中没有缓存的Worker,则新建一个,并将其添加到disposable容器中,便于订阅任务的统一管理
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
//释放一个ThreadWorker,并为其设置存活时间,将其添加到空闲队列中
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
//移除空闲队列中所有超过存活时间(keepAliveTime)的Worker
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
//......代码省略
}
在get()方法中,先检查空闲队列有没有缓存的Worker,如果没有则创建一个。
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
//......代码省略
}
ThreadWorker继承自NewThreadWorker,虽然它们的名字都带了Thread,但它们并不是线程,而是一个调度订阅任务的工作者,NewThreadWorker继承自Scheduler内的一个内部类--Worker。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
//......代码省略
}
我们看到创建线程池的代码了,继续点进去看下:
//SchedulerPoolFactory.create
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
//Executors.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
//new ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
这里创建了一个线程池,其核心线程数为1,可创建的最大线程为Integer.MAX_VALUE,非核心线程在线程池中的存活时间为10,存活时间单位为MILLISECONDS,线程管理队列为DelayedWorkQueue,线程创建的工厂类为我们外面传入的工厂类(这里是RxJava默认实现的工厂类==>RxThreadFactory)。
最后NewThreadWorker就是通过这里创建的线程池来执行具体的IO任务==>SubscribeTask。
由上面代码可以看到,每个Worker内部都自己创建了一个独立的线程池来执行IO任务,这些Worker又都是通过CachedWorkerPool来统一管理的,那么这个CachedWorkerPool又是什么时候创建的呢?答案是在我们调用Schedulers.io()时。
前面我们提到过,当调用Schedulers.io()时,最终会调用IoScheduler类的构造方法产生一个IoScheduler对象。
public final class IoScheduler extends Scheduler {
//......代码省略
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
static final RxThreadFactory WORKER_THREAD_FACTORY;
static final CachedWorkerPool NONE;
static {
//......代码省略
//创建线程的工厂类,为每个创建的线程名添加前缀(RxCachedThreadScheduler)
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
//初始化一个没有线程池的CachedWorkerPool,该Worker管理池内空闲队列中的Worker存活时间为0,Worker内部线程池创建线程的工厂类为RxThreadFactory
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
//清空CachedWorkerPool内部状态
NONE.shutdown();
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
//创建线程的工厂类对象
this.threadFactory = threadFactory;
//包装类,以保证多线程环境下CachedWorkerPool可以正常工作
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
public void start() {
//新建一个有线程池以及Worker存活时间的CachedWorkerPool
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
//这里通过AtomicReference的CAS方法来确保NONE能被正常更新为update
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
//......代码省略
}
至此:订阅线程切换也就告一段落了,文章的后面还扯了很多题外话,下一篇文章RxJava2笔记(四、观察者线程切换)我们继续介绍观察者线程切换(即如何将线程由子线程切换回主线程从而进行UI更新)。