一.引言
RxJava中 使用observeOn(Schedulers s)和subscribeOn(Schedulers s)是任务调度的操作符。subscribeOn(Schedulers s) 指示Observable将任务(数据的发射以及数据的处理)放在指定的调度器上的执行 ,observeOn(Schedulers s)指示一个Observer在一个指定的调度器上调用onNext, onError和onCompleted等方法。即subscribeOn决定任务的发射的线程,observeOn决定任务的接收线程。
二.Schedulers任务调度
先看看下面的例子,先猜猜任务调度执行的线程:
//-------------------------- 1 默认主线程-----------------
tx_console.setText("");
printThread("1 默认主线程 ");
Observable.create(observableOnSubscribe)
.flatMap(function)
.subscribe(consumer);
//---------------------------2 指定 Observable 的调度器---------------
printThread("2 指定 Observable 的调度器");
Observable.create(observableOnSubscribe)
.flatMap(function)
.subscribeOn(Schedulers.newThread())
.subscribe(consumer);
//---------------------------3 默认新线程---------------
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
printThread(" 默认新线程");
Observable.create(observableOnSubscribe).flatMap(function).subscribe(consumer);
}
}).start();
//--------------------------- 新线程 指定 Observable ,Observer的调度器---------------
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
printThread(" 新线程 指定 Observable ,Observer的调度器");
Observable.create(observableOnSubscribe).flatMap(function)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).subscribe(consumer);
}
}).start();
Observable.timer(4,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
printThread(" timer 调度器");
}
});
结果输出:
从上面的输出结果中,我们大概知道了下面几点:
- RxJava中不同的调度器可以指定在不同的线程中执行 。
- Create创建的Observable默认在当前线程中执行任务流,并在当前线程观察
- 没有调用observeOn指定观察者调度器,观察者默认在Observable发射线程里执行
- timer创建的Observable会在一个叫Computation的线程中执行任务流
- 除了observeOn和subscribeOn ,使用其他创建或者变换操作符也有可能造成线程的切换
三. subscribeOn()原理
subscribeOn()用来指定Observable在哪个线程中执行事件流, 通过源码分析subscribeOn可以知道是Observable怎样实现线程的切换的。
1.subscribeOn方法,创建一个 ObservableSubscribeOn对象
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
2.ObservableSubscribeOn实现subscribeActual()方法, 保证当前Observer的Disposable 只设置一次,后来的设置无效,并且在指定的调度器重新订阅
@Override
public void subscribeActual(final Observer<? super T> s) {
// parent表示目标观察者
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
//静态对象里的常量,保证初始化一次
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
//保证当前Observer的Disposable 只设置一次,后来的设置无效
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
//保证当前Observer的Disposable 只设置一次,后来的设置无效
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//指定的调度器重新订阅
source.subscribe(parent);
}
}
3. 调度器的scheduleDirect()指定调度器在自己的线程池(Worker)执行任务
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
4.IoScheduler ---IO调度器的createWorker()
class IoScheduler extends Scheduler {
final AtomicReference<CachedWorkerPool> pool;
static {
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
四. observeOn()原理
observeOn()指示一个观察者 observer 在指定的调度器上调用onNext, onError和onCompleted等方法。先来看看源码的实现过程:
1.observeOn()方法,创建 ObservableObserveOn对象
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
2.ObservableObserveOn实现方法subscribeActual,如果是TrampolineScheduler, TrampolineScheduler调度器表示在当前事件流的线程执行任务 ,否则在指定调度器的线程执行任务
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
// TrampolineScheduler调度器表示在当前事件流的线程执行任务
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
3.ObserveOnObserver在调度器工作线城池中执行onNext(),onError(),onComple()
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//目标观察者
final Observer<? super T> actual;
//调度器工作线城池
final Scheduler.Worker worker;
//数据缓冲队列
SimpleQueue<T> queue;
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
......
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);//数据缓存入队列
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);//调用当前Run()方法
}
}
//
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll(); //冲缓冲队列取数据
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);//数据接收
}
........
}
}
void drainFused() {
.......
actual.onNext(null);
......
}
//
@Override
public void run() {
//是否需要丢弃
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
}
五.实例演示
根据源码的分析,我们来看看下面的例子,就不难理解了。
//---------------------------1 同时指定多个 Observable 的调度器---------------
Observable.create(observableOnSubscribe)
.flatMap(function)
.subscribeOn(Schedulers.newThread())//Observable new 线程发射数据
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribe(consumer);//最终observer在new 线程接收数据
//---------------------------2 同时指定多个 Observable 的调度器---------------
Observable.timer(2,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
printThread("2 同时指定多个 Observable ,observer 的调度器");
Observable.create(observableOnSubscribe).flatMap(function)
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())//最终 Observable在 new 线程发射数据
.observeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);//最终observer 在 main 线程接收数据
}
});
//--------------------------- 3 切换 Observable ,Observer的调度器---------------
create(observableOnSubscribe)
.flatMap(function)//new 1 线程接收数据
.subscribeOn(Schedulers.newThread())//new 1线程发送数据
.flatMap(function)//new 1 线程接收数据
.subscribeOn(Schedulers.io())//设置无效
.observeOn(Schedulers.newThread())//切换new 2线程接收数据
.flatMap(function)//new 2 线程接收数据
.subscribeOn(Schedulers.computation())//设置无效
.flatMap(function)//new 2 线程接收数据
.observeOn(Schedulers.io())//切换io线程接收数据
.subscribe(consumer);//目标Observer 在io线程中接受数据
//---------------------------4 调用Observable 的操作符
create(observableOnSubscribe)//io线程发送数据
.flatMap(function)//io线程接收数据
.subscribeOn(Schedulers.io())//指定io线程发送数据
.flatMap(function)//io线程接收数据
.observeOn(AndroidSchedulers.mainThread())//切换mian线程接收数据
.flatMap(function)//mian线程接收数据
.delay(2, TimeUnit.SECONDS)//delay操作符在Computation线程中接受数据
.subscribe(consumer);//目标Observer 在Computation线程中接受数据
六.调度器的种类
RxJava中可用的调度器有下面几种:
Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
---|---|
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.single() | 该调度器的线程池只能同时执行一个线程。 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline() | 当其它排队的任务完成后,在当前线程排队开始执行。 |
AndroidSchedulers.mainThread( ) | 主线程,UI线程,可以用于更新界面 |
//---------------------------5 对比 切换 Observable trampoline调度器---------------
printThread(" 切换 Observable io调度器");
Observable.just("1","2","3","4","5")
.subscribeOn(Schedulers.newThread())
.flatMap(function)
.observeOn(Schedulers.io())
.flatMap(function)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
输出结果:
Observable.timer(3,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
printThread(" 切换 Observable trampoline调度器");
Observable.just("1","2","3","4","5")
.subscribeOn(Schedulers.newThread())
.flatMap(function)
.observeOn(Schedulers.trampoline())
.flatMap(function)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
});
输出结果:
六.各种操作符的默认调度器
总结了一些操作符默认的调度器:
buffer(timespan) | computation |
---|---|
buffer(timespan, count) | computation |
buffer(timespan,timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retrytrampolinesample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | trampoline |
skipLast(long time, TimeUnit unit, boolean delayError) | trampoline |
sample(long period, TimeUnit unit) | computation |
sample(long period, TimeUnit unit, boolean emitLast) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | trampoline |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | computation |
timeout(timeoutSelector) | computation |
timeout(firstTimeoutSelector, timeoutSelector) | computation |
timeout(timeoutSelector, other) | computation |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | computation |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | computation |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |
最后,小伙伴们,有么有觉得是干货,是的话,就为我点个赞吧!