RxJava源码分析-线程切换
接着上篇分析,本篇我们来揭开RxJava线程切换的神秘面试,先上一段代码
Observable.just("hello,world!")
.map { res->
Log.d("Observable", "thread:" + Thread.currentThread().name)
res+"1234"
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ res ->
Log.d("Observable", "thread:" + Thread.currentThread().name)
Log.d("Observable", "length:" + res.length)
}, { e ->
Log.d("Observable", "thread:" + Thread.currentThread().name)
e.printStackTrace()
}, {
Log.d("Observable", "thread:" + Thread.currentThread().name)
Log.d("Observable", "onComplete")
},{
Log.d("Observable", "thread:" + Thread.currentThread().name)
Log.d("Observable", "onSubscribe")
})
这段代码执行玩打印的log如下
05-21 10:45:06.109 17068-17068/com.example.pandaguo.rxdemo D/Observable: thread:main
onSubscribe
05-21 10:45:06.115 17068-17086/com.example.pandaguo.rxdemo D/Observable: thread:RxCachedThreadScheduler-1
05-21 10:45:06.165 17068-17068/com.example.pandaguo.rxdemo D/Observable: thread:main
length:16
thread:main
onComplete
可以看到其中在map操作符中执行的代码是在RxCachedThreadScheduler-1线程中执行,而其余的均是在UI线程,为什么呢?
- 本文的重点不在数据流向分析,因此前面几个函数不在仔细分析
Observable.just("hello,world!")
.map { res->
Log.d("Observable", "thread:" + Thread.currentThread().name)
res+"1234"
}
代码执行到这里,我们可以拿到经过封装的数据源ObservableMap,其实就是个Observable,那么接下来调用subscribeOn(Schedulers.io())
来进行线程切换的操作了,我们来一点点的分析,首先看下Schedulers.io()
是怎么创建一个Scheduler对象IO的
Schedulers.io()
public final class Schedulers {
...
static final Scheduler IO;
...
static {
IO = RxJavaPlugins.initIoScheduler(new IOTask());
...
}
...
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
...
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
...
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
...
}
- 一部分代码需要结合着RxJavaPlugins来一起看
public final class RxJavaPlugins {
...
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
//初始化时候 onInitIoHandler = null
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
if (f == null) {
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
...
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
...
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
//初始化时候onIoHandler = null
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
...
}
- 上面代码首先
Schedulers.io()
会调用RxJavaPlugins.onIoScheduler(IO)
,这里传入的IO实际上是一早就初始化的RxJavaPlugins.initIoScheduler(new IOTask())
, IOTask是Schedulers的一个静态内部类,实现了Callable接口,并且在call()方法中返回了一个IoHolder.DEFAULT
,这个IoHolder
其实是一个Schedulers的静态内部类,然后默认会持有一个IoScheduler对象DEFAULT -
RxJavaPlugins.initIoScheduler(new IOTask())
会调用到callRequireNonNull()
,我们来看下这个方法回去调用s.call()
,s的类型是IOTask
- 说了那么多
Schedulers.io()
最终就是创建了一个类型为IoScheduler的对象,我们先不去看IoScheduler的实现,先来分析Observable.subscribeOn()
的实现
Observable.subscribeOn()
public abstract class Observable<T> implements ObservableSource<T> {
...
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
...
}
- 还是要结合RxJavaPlugins来一起看
public final class RxJavaPlugins {
...
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
...
}
- 这里其实
RxJavaPlugins.onAssembly()
实际上就还是返回了传入的参数,也就是创建拿到了一个ObservableSubscribeOn对象,那么线程切换的核心逻辑也就在这个类中实现,接下来我们来分析这个类
ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//这里是线程切换的关键
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//订阅实际发生的位置
source.subscribe(parent);
}
}
}
- 我们来看下
subscribeActual
的实现,s.onSubscribe(parent);
执行的时候我们并没有看到线程切换的业务,所以我们可以肯定Observ.onSubscribe()
一定是在UI线程回调的,那么为什么map操作符中的逻辑是在另一个线程呢?parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
中parent.setDisposable()
从实现上来看是没有做线程切换的逻辑,new SubscribeTask(parent)
从实现上来看仅仅是让订阅的操作发生在SubscribeTask执行的线程,等等我有一个大胆的想法 ,既然有Runnable对象了,那么scheduler.scheduleDirect()
会不会就是实际上去切换线程的操作呢?我们来追下代码
Scheduler.scheduleDirect()
public abstract class Scheduler {
...
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
...
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
...
public abstract Worker createWorker();
...
}
- Scheduler是一个抽象类!
- 上面代码可以看到,实际上执行的逻辑是在另一个重载方法scheduleDirect中,这里调用
createWorker()
创建了Worker对象w,然后调用了w.schedule(task, delay, unit);
去实现了线程切换的逻辑 - 啥?你说这样就实现了线程切换,我们不信!那我就证明给你看撒,还记得前面说得么
Schedulers.io()
最终创建了一个IoScheduler对象,我们来看下它的定义
IOScheduler
public final class IoScheduler extends Scheduler {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
static final RxThreadFactory WORKER_THREAD_FACTORY;
private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
static final RxThreadFactory EVICTOR_THREAD_FACTORY;
private static final long KEEP_ALIVE_TIME = 60;
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
static final ThreadWorker SHUTDOWN_THREAD_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_IO_PRIORITY = "rx2.io-priority";
static final CachedWorkerPool NONE;
static {
SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
SHUTDOWN_THREAD_WORKER.dispose();
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
@Override
public void run() {
evictExpiredWorkers();
}
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
long now() {
return System.nanoTime();
}
void shutdown() {
allWorkers.dispose();
if (evictorTask != null) {
evictorTask.cancel(true);
}
if (evictorService != null) {
evictorService.shutdownNow();
}
}
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
@Override
public void shutdown() {
for (;;) {
CachedWorkerPool curr = pool.get();
if (curr == NONE) {
return;
}
if (pool.compareAndSet(curr, NONE)) {
curr.shutdown();
return;
}
}
}
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
...
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
...
}
- 代码比较长,我们对着
Scheduler.scheduleDirect()
的流程来看下IOScheduler,首先我们来看createWorker()
- 可以看到这里是通过
pool.get()
获取了一个CachedWorkerPool,这个pool是IOScheduler的成员变量是在构造方法中进行初始化的,它是一个AtomicReference能够保证针对持有对象的原子操作,换句话说能够保证线程的安全
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
- 默认其持有的是一个
NONE
,是在静态代码块中初始化完成的
static {
...
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
- 紧接着的start方法中会替换为新的值,并且这个CachedWorkerPool处于非shutDown状态
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
- CachedWorkerPool是什么我们后面再去分析,我们回到
Scheduler.scheduleDirect()
接着往下看,后续会将创建的Worker对象w与传入的Runnable接口对象run封装成一个DisposeTask对象task,之后调用Worker对象w的schedule方法也就是EventLoopWorker的schedule方法 - 这里说下DisposeTask其实就是代理了传入的Runnable对象run,在其run()方法中会调用到传入的Runnable对象的run方法
static final class DisposeTask implements Disposable,
Runnable, SchedulerRunnableIntrospection {
final Runnable decoratedRun;
...
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
...
}
...
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
...
}
- EventLoopWorker.schedule这里会调用
threadWorker.scheduleActual(action, delayTime, unit, tasks)
public Disposable schedule(@NonNull Runnable action, long delayTime,
@NonNull TimeUnit unit) {
...
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
- threadWorker是在EventLoopWorker的构造方法中进行初始化的
EventLoopWorker(CachedWorkerPool pool) {
...
this.threadWorker = pool.get();
}
- 可以看到就是从CachedWorkerPool中获取的,我们来看下CachedWorkerPool的get()方法
static final class CachedWorkerPool implements Runnable {
...
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
...
}
- 这段代码其实就是先从缓存中看看能不能拿到一个已经缓存下来的ThreadWorker,如果没有就创建一个新的ThreadWorker对象并缓存起来,接下来我们看下ThreadWorker的实现
ThreadWorker
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
- 可以看到并没有实现特殊的方法,所以其大部门实现都是在NewThreadWorker中的,回到前面分析的,我们说过EventLoopWorker.schedule会调用
threadWorker.scheduleActual(action, delayTime, unit, tasks)
,在NewThreadWorker中最终会调用到scheduleActual这个方法,我们来看下具体实现
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//包装一层传入的参数run
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//通过线程池去执行run
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
- 这里会将传入的Runnable再做一层包装,之后通过线程池去submit或者schedule执行对应的任务,这个Runnable对象就是前面我们分析的SubscribeTask,还记得SubscribeTask的run方法执行了什么么?
source.subscribe(parent);
- 所以map操作符中所有的流程就是执行在线程池之中
- 对于ObservableOn()原理也是一样的,只不过开源库中通过Handler将获取MainThread.Looper然后将其切回UI线程,看客大佬们可以跟下源码分析一波