rxjava1 map操作符分析
<font color = "red">由于源码版本不同,可能会有所差别</font>
代码执行流程分析
create:
创建一个新的Observable
map :
也是创建了一个新的Observable
rx.Observable#create(rx.Observable.OnSubscribe<T>)
OnSubscribe 是create里面的参数
@Deprecated
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
rx.Observable#map
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
// 这个this是我们通过create方法创建的Observable
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {}
rx.Observable#unsafeCreate
OnSubscribe 就是OnSubscribeMap对象,因为它实现了OnSubscribe接口
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
// 这里会修改this这个引用
return new Observable<T>(RxJavaHooks.onCreate(f));
}
代码调用流程分析
rx.Observable#subscribe(rx.Observer<? super T>)
public final Subscription subscribe(final Observer<? super T> observer) {
// 构建subscribe对象
return subscribe(new ObserverSubscriber<T>(observer));
}
rx.Observable#subscribe(rx.Subscriber<? super T>)
this 这个对象是我们上面的Observable对象,即最后一次调用创建的Observable对象,这里是OnSubscribeMap
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
rx.Observable#subscribe(rx.Subscriber<? super T>, rx.Observable<T>)
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// 这里直接调用了我们上面传入的Observable即OnSubscribeMap的call方法
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
rx.internal.operators.OnSubscribeMap#call
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
// source即Observable,即旧的Observable
source.unsafeSubscribe(parent);
}
rx.Observable#unsafeSubscribe
subscriber OnSubscribeMap$MapSubscriber
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
// 这里调用onSubscribe的call方法,其实就是调用了在create时传入的OnSubscribe的call方法
// 同把时OnSubscribeMap$MapSubscriber对象的引用作为call方法的参数传入
// 这样, 在调用subscriber.onNext,onComplete即调用刚才了MapSubscriber中去了
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
示例代码
Observable.
create((Observable.OnSubscribe<String>) subscriber -> {
LogUtils.loge("OnSubscribe call ... ");
if (!subscriber.isUnsubscribed()) {
LogUtils.loge("subscriber.onNext ... ");
subscriber.onNext("10");
subscriber.onCompleted();
}
}).
map(s -> {
return Integer.valueOf(s);
}).
subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
LogUtils.loge("Observer onCompleted ... ");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
LogUtils.loge("Observer onNext value = " + integer + " , cls type = " + integer.getClass());
}
});
rx.internal.operators.OnSubscribeMap.MapSubscriber#onNext
@Override
public void onNext(T t) {
R result;
// map操作符的func1,即对传入的数据进行转换
result = mapper.call(t);
// actual 为ObserverSubscriber,即调用到了Observer的onNext方法
actual.onNext(result);
}
rx.internal.util.ObserverSubscriber#onNext
@Override
public void onNext(T t) {
// 调用observer的onNext方法
observer.onNext(t);
}