一点牢骚:
前段时间,接到需求,旧项目要增添许多功能;旧项目是这样的:功能以及代码量就非常庞大,加上各种代码不规范、可读性很差、代码耦合度有点小高;
听到这个消息真的让我脑袋大了一圈,
如果真的要在原有架构上做开发,肯定会导致小组成员开发冲突以及众多的冗余代码,浪费时间和精力在非必要的事情上,之前自身也知道旧项目有这个问题 但由于新项目开发呀嫌弃旧项目一直没有决心去改动,这下好了完全推不了 那就改架构吧,新的模式是 组件化+Rxjava.Retrofit+MVP模式,最近一直在忙着项目代码架构调整,相对应的代码模板编写等等,虽然说改架构是被逼的,但改着改着还是有成长以及很有成就感的一件事情; 再接再厉。
说实话,rxjava的源码太难了,一直没有去时间(懒癌)去学习; 包括现在项目比较紧张,每天下班后更是不太想去学习,那么现在我就和大家一起看一下rxjava的源码吧;
1、正常简易流程;
2、带线程切换流程;
3、map之后;
4、一些总结
1、正常简易流程
基于以下这段代码查看源码
Observable.just("11")
.subscribe(observer);
大家应该都知道或者听过,Rxjava采用的是 增强版的观察者模式,在订阅的那一瞬间开始执行整个流程,那么现在看一下订阅方法subscribe(Observer<? super T> observer)
Observable.class
@Override
public final void subscribe(Observer<? super T> observer) {
//..
// 实际订阅
subscribeActual(observer);
//...
}
RxJavaPlugins.class
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
static <T, U, R> R apply(@NonNull BiFunction<T, U, R> f, @NonNull T t, @NonNull U u) {
try {
return f.apply(t, u);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
看到这里实际订阅是发生在 observable 的 subscribeActual
中 而 subscribeActual
是个抽象方法; 那么我们又要去找它的实现;
这边通过Observable.just
开始看
Observable.calss
public static <T> Observable<T> just(T item) {
//...
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
ObservableJust.class
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
//调用 observer的 onSubsribe方法
s.onSubscribe(sd);
//执行
sd.run();
}
ScalarDisposable.calss
public void run() {
// 判断什么的
if (get() == START && compareAndSet(START, ON_NEXT)) {
//
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
可以看到 run是直接执行的;
整体的一个简单正常的流程就是: observable.subscribe(Observer) -> observable.subscribeActual -> Observer.onSubscribe( Disposable ) -> ScalarDisposable.run -> observer.onNext(value) -> observer.onComplete();
其中正常完整流程都会执行标红部分的方法;其中其它部分先放着,只是判断有没有完成完成所有数据流的发射
2、线程切换流程
基于以下这段代码查看源码
Observable.just("11")
.subscribeOn(Schedulers.io())//指定Observable 在哪个线程上创建执行操作
.observeOn(AndroidSchedulers.mainThread()) //在指定下一事件发生的线程
.subscribe(observer);
2.1、 流向 Observable.subscribe 都经历了什么
先看下 Observable.subscribeOn
都做了些什么
Observable.class
public final Observable<T> subscribeOn(Scheduler scheduler) {
//
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
ObservableSubscribeOn.class 本质上继承 Observable
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//保存以及初始化
super(source);
this.scheduler = scheduler;
}
可以看就就是转换变成了 ObservableSubscribeOn
再看下 Observable.observeOn(Scheduler scheduler)
做了些什么
Observable.class 这边应该是: ObservableSubscribeOn extends .... Observable
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
ObservableObserveOn.class
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
本质是将Observable
转换成ObservableObserveOn
,在这个流程中是将 ObservableSubscribeOn
转换成ObservableObserveOn
;
我们的Observable
变换是这样子的,ObservableJust
->ObservableSubscribeOn
->ObservableObserveOn
一层一层被包含
2.2、流向 -> Observer.onSubscribe 都经历了什么
那么又到了我们的 订阅方法subscribe(Observer<? super T> observer)
了,只不过我们中间多了几层转换; 我们再来看一下
Observable.class
@Override
public final void subscribe(Observer<? super T> observer) {
//...
// 实际订阅
subscribeActual(observer);//...
}
ObservableObserveOn.class
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//创建一个 Scheduler.Worker
Scheduler.Worker w = scheduler.createWorker();
// new一个新的 ObserveOnObserver implements Observer 再次循环 Observable.subscribe
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObserveOnObserver.class .... implements Observer<T>, Runnable
Observable.class
@Override
public final void subscribe(Observer<? super T> observer) {
//..
// 实际订阅
subscribeActual(observer);
//...
}
ObservableSubscribeOn.class
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//直接执行,what? Observer.onSubscribe 不能指定线程
// 记录一下 Observer.onSubscribe 的入口是
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Observer
的转变是这样的 Observer
->ObserveOnObserver
->SubscribeOnObserver
以上面为准,先看下 s.onSubscribe(parent)
所经历的事情
ObserveOnObserver.class
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
// 看这里 actual 其实是 Observer ;
actual.onSubscribe(this);
}
}
Observer.class
onSubscribe(sd){...}
这里究竟可以看到 执行到 最初observer
的onSubscribe
的一条完整的线路;
ObserveOnObserver.subscribeActual
-> ObservableSubscribeOn.subscribeActual
-> ObserveOnObserver.onSubscribe
-> Observer.onSubscribe
;
不知道有没有细心的同学发现了没有,'onSubscribe'的执行没有SubscribeOnObserver
什么事情,虽然说上面有一层转换成功了SubscribeOnObserver
画成图应该就是下面这样:
我们发现了 从订阅开始一直到执行我们的 observer.onSubscribe()
中间没有任何切换线程的影子;
所以我们得出了一个
observer的 onSubscribe 运行与订阅动作发生在同一线程,不受线程指定方法(observeOn subscribeOn)影响
2.3、流向 -> observer.next、onComplete 都经历了什么
ObservableSubscribeOn.class
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
// new 出一个 SubscribeTask
// scheduler.scheduleDirect 切换线程执行 SubscribeTask
// SubscribeOnObserver.setDisposable方法
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
可以看到上面最后一段代码做个这样事情,一件一件去看一下:
// new 出一个 SubscribeTask
// scheduler.scheduleDirect 切换线程执行 SubscribeTask
// SubscribeOnObserver.setDisposable方法
先看一下SubscribeTask
的 run
里面是干嘛的
ObservableSubscribeOn.class
class SubscribeOnObserver
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//其中 source 是 ObservableJust
source.subscribe(parent);
}
}
由第一节的分析我们可以知道,这边最终会执行到 SubscribeOnObserver.onNext()
-> ObserveOnObserver.onNext()
->Observer.onNext()
这边一层一层调用出来;
SubscribeTask.run
最终执行我们的 最初observer.onNext() onComplete()
; 这边还没有涉及到线程切换
再看我们的 scheduler.scheduleDirect(new SubscribeTask)
我们上面用的是 Scheduler.IO 实际上是 IoScheduler;
IoScheduler extends Scheduler.class
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//指定工作线程
w.schedule(task, delay, unit);
return task;
}
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
EventLoopWorker extends Scheduler.Worker
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
那么这边流程就比较清晰了,拿到subscribeOn 设置的Scheduler
中创建一个Worker
设定了一个 IO 线程;
看到这里 我们就该逆向地执行我们 Observer 真正的方法了;
执行到 SubscribeOnObserver.onNext()
ObservableSubscribeOn : SubscribeOnObserver<T>
@Override
public void onNext(T t) {
// actual 为 ObserveOnObserver
actual.onNext(t);
}
// scheduler 这边指定为 AndroidSchedulers.mainThread() createWorker() 这边不深究,里面转成了 handler
Scheduler.Worker worker = scheduler.createWorker();
ObservableObserveOn : ObserveOnObserver
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
// 这个最终 执行在handler
worker.schedule(this);
}
}
最后的流程应该是这样的
3、map 数据操作源码
Observable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return null;
}
}).subscribe(integer -> out("accept:" + integer));
Observable.class
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
可以看到 它是在 执行完 function.apply在执行 onNext();
配合上一节 ,流程图就变成这样了
4、一些总结
来个总结吧: 估计源码看得很混乱。
1、对Observable指定线程、数据变换等等,都采用了一种代理包装模式; 比如 ObservableJust-> ObservableSubscribeOn -> ObservableMap -> ObservableObserveOn ; 进行了一层包装;
2、在订阅完成的那一刻起,反向调用 subscribe():subscribeActual()方法;比如 :(ObservableObserveOn.subscribe->ObservableObserveOn.subscribeActual())
->(ObservableMap.subscribe->ObservableMap.subscribeActual())
->(ObservableSubscribeOn.subscribe->ObservableSubscribeOn.subscribeActual())
->(ObservableJust.subscribe->ObservableJust.subscribeActual())
3、Observer ,同理包装 Observer -> ObservableMap... 添加了指定 Schedulers.createWorker() ;
4、 Observer 的执行顺序是 Observer.onSubscribe() -> ObservableXX.onNext() -> ObsevableXXX.onNext() ->...-> Obsever.onNext() -> ObservableXX.OnComplete() -> ObsevableXXX.OnComplete() ->...-> Obsever.OnComplete();
5、 中间有些操作放入到了线程当中.
其实有点坑的是:原本我就知道这个流程应该是这样的,类似于事件分发机制成 U 字型的流程...... 本篇只是在 众多代码 中验证我的思路.................、