RxJava Map操作原理
最近看了一些RxJava的文章,被他好多操作符的原理包括线程切换之类的搞得云里雾里。现在整理了一份最基础的Map操作符原理,加强一下理解!!!
本文的原理分析基于RxJava 1.0.14。对Map操作进行了仔细的分析,阅读本文前要求知道RxJava观察者模式的基本原理,即只有观察者注册的时候被观察者的事件序列才会被发送。
先来看一个例子
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("123456");
subscriber.onComplete();
}
}).map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i(TAG,"Integer=" + integer);
}
});
这个例子先创建了一个Observable
对象,它持有一个OnSubscribe<String>
的对象,如果没有map
方法进行转换,那么这个Observable
对应的Subscriber
应该是Subscriber<String>
,并且Subscriber
的回调方法onNext
中的参数类型应该是String
。通过map
方法的映射,最终我们注册的是Subscriber<Integer>
。
接下来,我们先介绍一下map方法里做了哪些事。然后我们从发生注册事件开始说起,因为RxJava的所有事件都是观察者在注册的时候才开始发送(或者说激活)的。
那么map
方法究竟是怎么做这个转换的呢?
可以看到,map
方法的入参是一个Func1
对象。Func1
类的声明如下,先不管它泛型的参数是T
,还是R
。只要记住它只有一个call
方法,该方法以左边的参数为入参,右边的参数为返回值。即T
为入参,R
为返回值。
public interface Func1<T, R> extends Function {
R call(T t);
}
map
方法的实现如下,我们看到它将我们传入的Func1
对象封装成了OperatorMap
对象。最终调用了lift
方法。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
再来看看,OperatorMap
是个什么玩意儿?
public final class OperatorMap<T, R> implements Operator<R, T>;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
Operator
其实就是Fun1
,它继承了Fun1
,也没做什么扩展。OperatorMap
的构造方法里把我们new
出来的Fun1
对象赋值给了transformer
。
接着来看lift
方法里做了什么。
lift
方法的实现,代码位于Observable中。
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
//注意!!!这个onSubscribe是原来的Observable对象持有的onSubscriber
onSubscribe.call(newSubscriber);
}
});
}
可以看到lift
方法返回了一个Observable
对象,我们把它叫做Observable$2
,它持有的OnSubscriber
对象叫做OnSubscriber$2
,注册这个。我们本来有的那个Observable
对象,叫做Observable$1
,同样它持有的OnSubscriber对象叫做OnSubscriber$1
。
这个lift
方法做了三件事:
1、利用传入的OperationMap
创建了一个新的Subscriber
。
2、调用了新的Subscriber
的onStart()
方法。这只是一个可选的准备方法,可以暂时忽略
3、调用了OnSubscriber$1
的call
方法。
我们来看这个Subscriber$2
是怎么被创建出来的,看我们传入的OperationMap
的call
方法。为了不被T
和R
混淆,我们这里可以理解成T
就是String
,R
就是Integer
。因为我们调用map
方法是创建的Fun1
对象是Fun1<String, Integer>
。
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
到这里我们基本清楚map
方法里面做了哪些事,一是保存了我们传入的Fun1
对象,二是创建了一个Observable$2
对象,并返回。我们拿到了这个Observable$2
并为之注册了一个Subscriber
。
现在开始,我们要从注册事件发生开始说起,看看上面讲的各个方法是什么时候被触发调用。
要明确一件事情,我们在外部调用map
之后只创建了一个Subscriber
,并且它的参数是Integer
。那么当我们调用Observable$2.subscribe(Subscriber)
时,发生了什么?(这里之所以是Observable$2
,是因为流式调用,注册的应该是lift方法返回的Observable
)
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
注意!这个onSubscribe
是我们上文所说的onSubscribe$2
,这个subscriber
应该是Subscriber<Integer>
。onSubscribe.call(subscriber)
就会调用到我们讲过的lift
方法里做的三件事情。
这里再重复一遍,并转换一种说法。
1、利用Subscriber<Integer>
创建Subscriber<String>
,这个部分的实现就在OperationMap
里面。
2、调用Subscriber<String>的onStart()
。
3、调用OnSubscriber.call(Subscriber<String>)
,这个OnSubscriber
已经说过了是OnSubscriber$1
,所以是我们在外部创建的。这个call
方法里面,我们调用了subscriber.onNext("123456")
; subscriber.onComplete()
;
最后的关键点在于我们通过Subscriber<Integer>
创建Subscriber<String>
之后,调用到了subscriber<String>.onNext("123456")
和subscriber<String>.onComplete()
(注意这个subscriber<String>
的写法是不对的,我这边是为了能看的清逻辑,所以这样标注)。那么这个Subscriber<String>
的onNext
与onComplete
里面做了什么呢? 还记得我们是怎么创建的吧?
在OperationMap
里面,这里再贴一次代码。
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
关键的地方就在于o.onNext(transformer.call(t));
这个o应该是我们外部创建的Subscriber<Integer>
,这个transformer
是我们也是我们外部传入的new Fun1<String,Integer>
。看明白了吧!!!Subscriber<String>
只是个代理,最终是通过我们定义的Fun1
对象的规则把String
转换成Integer
,再调用我们创建的Subscriber<Integer>
的onNext
方法,其他方法同理。
最后的最后,盗用一个抛物线大侠的图再加上我的一些文字说明再来回忆一下整个过程。
可能语言比较啰嗦,但也确实是自己学习和思考的过程。如有错误,请大家指正~~~
参考文章