根据自己的理解,公司用的还是Rxjava 1.x,不过很久以前使用过,没有过多注意这一块,不过还是想弄一下,周末的时候,主要看了 简单使用,如何切换线程,并且去理解这个过程
操作符什么的后面理解的时候,再谈论,先进行就简单的create。重点切换线程
一、准备
查看源码的版本:
implementation 'io.reactivex:rxjava:1.2.1'
这边先不纠结于 Rxjava 2.x,这个可能后续去看Rxjava2.x的时候,再去讨论了。
二、进入正题
2.1 简单使用
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
System.out.println("Observable.OnSubscribe : " + Thread.currentThread());
subscriber.onNext("sss");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s + " " + Thread.currentThread());
}
});
简单就是这么的使用,打印如下:
Observable.OnSubscribe: Thread[RxNewThreadScheduler-1,5,main]
sss Thread[RxIoScheduler-2,5,main]
2.2 查看subscribeOn的源码
//rx.Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
//1
//rx.Observable 里面的成员create方法
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
-
这里直接创建了一个
OperatorSubscribeOn
, 然后传递了一个this
,然后把scheduler
线程切换这里注意,这个
this
我感觉比较关键,把当前的Observable
对象传递了进去;当前OperatorSubscribeOn
还是通过Observable
的create()
方法创建Observable
对象,也就是说,OperatorSubscribeOn
是一个Observable.OnSubscribe
对象,并且里面含有上一个Observable
对象
进入到 OperatorSubscribeOn
类中
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
//成员变量
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
//1.
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
//...省略代码
};
//2.
source.unsafeSubscribe(s);
}
});
}
}
-
其实就是从
scheduler
中创建一个任务,然后通过schedule()
方法进行执行。我是这么理解的,相当于一个线程池执行
//inner.schedule()类比下面 ThreadPool.execute(new Runnable() { @Override public void run() { //执行任务 } };
具体是
scheduler
先不再这个讨论,我这边现在也还没再看(因为我是个菜鸡) source.unsafeSubscribe(s)
这段语句相当于source.subscribe(s)
,因为源码里面调用subscribe(s)
的时候也把我们的Subscriber
对象转成了SafeSubscriber
的对象。
到这里发现,subscribeOn
相当于下面代码
private Observable<T> source;
ThreadPool.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
void onError(Throwable t) {
sub.onError(t);
}
@Override
void onNext(T t) {
sub.onNext(t);
}
@Override
void onCompleted() {
sub.onCompleted();
}
};
source.subscribe(subscriber);
}
};
2.3 分析为什么subscribeOn多次调用只有第一次有用
其实 2.2 查看subscribeOn的源码已经得出了结论,是在线程里面通过代理了上一个的Observable<T>
对象,也就是说,上游的被当前线程池里面的线程接管了。
XXX.subscribeOn(Schedulers.AAA).subscribeOn(Schedulers.BBB).subscribe()
就拿这两层来说,因为本身subscribeOn
就是代理上一个的Observable<T>
对象,从后面往前面看,就是说,BBB
这个线程池,要给前一个AAA
的Observable<T>
对象,如下伪代码:
private Observable<T> sourceAAA;
ThreadPoolBBB.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
//...省略
@Override
void onNext(T t) {
//这个是BBB线程的执行
sub.onNext(t);
}
//...省略
};
sourceAAA.subscribe(subscriber);
}
};
而AAA
的前面是XXX
,那伪代码应该如下:
private Observable<T> sourceXXX;
ThreadPoolAAA.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
//...省略
@Override
void onNext(T t) {
//这个是AAA线程的执行
sub.onNext(t);
}
//...省略
};
sourceXXX.subscribe(subscriber);
}
};
好了,上面好像还是看不太出来,然后我们组合一下伪代码来看看
private Observable<T> sourceAAA;
ThreadPoolBBB.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
//...省略
@Override
void onNext(T t) {
//这个是BBB线程的执行
sub.onNext {
//伪代码迁移,内部执行
private Observable<T> sourceXXX;
ThreadPoolAAA.execute(new Runnable() {
@Override
public void run() {
Subscriber<T> subscriber = new Subscriber<T>() {
//...省略
@Override
void onNext(T t) {
//这个是AAA线程的执行
sub.onNext(t);
}
//...省略
};
sourceXXX.subscribe(subscriber);
}
};
}
}
//...省略
};
sourceAAA.subscribe(subscriber);
}
};
简化一下代码,用线程代替就是,相当于如下:
new Thread("BBB") {
@Override
public void run() {
new Thread("AAA") {
@Override
public void run() {
System.out.println("" + Thread.currentThread());
}
}.start();
}
}.start();
打印可想而知,是里面 AAA线程;
Thread[AAA,5,main]
2.4 查看observerOn的源码
//rx.Observable.java
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//...
//2.
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//1.
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
好家伙,看了一下,连续调用了4层,最总还是通过create()
创建了一个Observable<T>
对象,
-
OnSubscribeLift
和前面看subscribeOn()
一样,第一个参数onSubscribe
是上一个Observable<T>
对象的onSubscribe
,也就是说,subscribeOn()
和observerOn()
的比较大的区别是,subscribeOn()
“代理的是上一个对象的Observable<T>
”,而observerOn()
是“代理的是上一个的OnSubscribe<T>
”,所以observerOn()
就是改变了下游的线程切换。第二个参数是上一步生成的OperatorObserveOn
进行了传递到里面。 -
scheduler
的对象传给了OperatorObserveOn
,这个类干了切换换线程的操作。
可以看出observerOn()
通过两个类(OperatorObserveOn
和OnSubscribeLift
)来管理,也不知道是为什么,是不是有可能方便单元测试,还是为了单一职责呢,就不纠结于此了。如果我写可能就一个类做了。
2.4.1 先看OnSubscribeLift
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
//...
//1.
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
//...
}
//...
}
}
-
Operator#call
(OperatorObserveOn
)然后通过Subscriber
参数来生成了一个新的Subscriber
然后通过上一个的OnSubscribe<T>
对象传递。
2.4.2 OperatorObserveOn
这个类比较长,分开来了,
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
//...略
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
//...略
// 1.
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
//...略
}
//...略
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
//...略
}
- 接着 2.4.1 的
Operator#call
,里面就调用了OperatorObserveOn
的内部类ObserveOnSubscriber
进行把上一个的Subscriber
代理了一下,然后具有了有了切换线程的能力
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final Queue<Object> queue;
//...略
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
}
//...略
void init() {
Subscriber<? super T> localChild = child;
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
//1.
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
//1.
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
//1.
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
@Override
public void call() {
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
//...略
for (;;) {
long requestAmount = requested.get();
//...略
while (requestAmount != currentEmission) {
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
//1.
localChild.onNext(NotificationLite.<T>getValue(v));
}
//...略
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
//...略
}
}
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (e != null) {
//1.
a.onError(e);
return true;
} else {
//1.
a.onCompleted();
return true;
}
return false;
}
}
上面已经省去了很多代码
-
就是通过传入进来的
child
(Subscriber
)对象,然后在外面调用Subscriber#onNext()
方法的时候,实际就进行了 queue 入队操作,然后通过recursiveScheduler.schedule(this)
切换到线程中执行if (!queue.offer(NotificationLite.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } //recursiveScheduler.schedule(this); 最终执行到这里 public void call() { }
recursiveScheduler.schedule(this);
最终执行了call()
方法,从而达到了且换线程的目的。我根据根据这个思路写了伪代码
static final class ObserverOnSubscribe<T> extends Subscriber<T> implements Runnable { private Scheduler scheduler; private Subscriber<T> child; private Queue<T> queue; public ObserverOnSubscribe(Scheduler scheduler, Subscriber<T> child) { this.scheduler = scheduler; this.child = child; queue = new LinkedList<>(); } @Override public void run() { if (queue.isEmpty()) { child.onCompleted(); return; } T poll = queue.poll(); child.onNext(poll); } //... @Override void onNext(T t) { queue.offer(t); schedule(); } @Override void onCompleted() { schedule(); } public void schedule() { scheduler.execute(this); } }
2.5 observerOn每次调用,后面都会进行线程切换,但不会影响前面的线程
observerOn
每次调用,后面都会进行线程切换,但不会影响前面的,因为observerOn
“代理了”OnSubscribe
,就相当于影响下游的切换。好比如下代码
new Thread("BBB") {
@Override
public void run() {
System.out.println("" + Thread.currentThread());
new Thread("AAA") {
@Override
public void run() {
System.out.println("" + Thread.currentThread());
}
}.start();
}
}.start();
总结
subscribeOn :
整条线路上第一次切换的有效,但有效的范围分为
a. 有ObserverOn的话,那就是ObserverOn的下下一个,因为ObserverOn只影响下一个
b. 如果只用了subscribeOn,那就是subscribeOn第一次切换的那个线程,然后知道ObserverOn来进行切换
<img src="http://reactivex.io/documentation/operators/images/schedulers.png" alt="">
这个代理的是 Observrable 对象
//thread-1
thread {
Observeable#subscribe("创建了一新的");
}
为什么subscribeOn重复设置没有使用,只有第一次呢?
就像这样
//thread-2
thread {
//thread-1
thread {
Observeable#subscribe("创建了一新的");
}
}
这样读取出来其实还是thread-1,thread-2就没什么意义了。
observeOn:影响下一次订阅
observeOn代理的是Observable.OnSubscribe这个接口,就是onNext的这个接口,往下面传的这个接口,所以会影响下面的线程。
//thread-2
thread {
//thread-1
thread {
Observeable#subscribe(() -> {
Run run = this.run;
thread {
run.run();
}
});
}
}
以上,如果有错误,非常感谢您的指正,分享就是为了学习,写错了,虽然丢脸,但是能学到东西也很舒服。
区别:subscribeOn()
和observerOn()
的比较大的区别是,subscribeOn()
“代理的是上一个对象的Observable<T>
”,而observerOn()
是“代理的是上一个的OnSubscribe<T>
”,所以observerOn()
就是改变了下游的线程切换。