lift()
方法是RxJava中所有操作符的基础,可以通过它做各种各样的变化。弄清楚它的原理,也方便我们理解其他操作符。首先先看几个相关接口。
Func1 接口
public interface Func1<T, R> extends Function {
R call(T t);
}
Func1
接口会按照泛型参数的顺序传入T
,并返回R
。
Operator 接口
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
按照Func1
接口的定义,Operator
接口会传入一个Subscriber<? super R>
参数,并返回一个Subscriber<? super T>
。
关于Operator和lift()中泛型顺序的问题
也许有人(is me)第一眼看到Observable<T>
,Operator<R, T>
,Func1<T, R>
这几个类的泛型参数,头都大了,关键是Operator
的泛型参数顺序为什么是R, T
,而不是T, R
?
其实这里不需要关心顺序是什么,只需要记住Operator<R, T>
是按照泛型参数的顺序,传入一个Subscriber<R>
参数,并返回一个Subscriber<T>
,写成Operator<A, B>
或者Operator<M, N>
是没有任何区别的。
lift()调用流程
首先需要记住lift()
方法是在一个已有Observable
上调用的。
lift()
方法核心代码:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = operator.call(o);
st.onStart();
// 这里的onSubscribe是调用lift方法的Observable中的onSubscribe
onSubscribe.call(st);
}
});
}
根据代码的调用流程来分析:
1、假设已有一个Observable<T>
,调用lift()
方法,生成一个Observable<R>
,此时就有了两个Observable
和两个OnSubscribe
对象。
2、然后调用Observable<R>
的subscribe()
方法,传入一个Subscriber<R>
对象,此时触发Observable<R>.onSubscribe.call()
方法,也就是上面lift()
方法中的call()
方法。
3、在该方法中会调用onSubscribe.call()
方法,注意这个onSubscribe
是Observable<T>
中的那个OnSubscribe<T>
对象,它需要传入一个Subscriber<T>
对象,这个对象是通过operator.call()
方法生成的。正是这个Operator
对象将两个Subscriber
对象关联起来,OnSubscribe<T>
在执行Subscriber<T>.onNext(T t)
方法的时候也会执行Subscriber<R>.onNext(R r)
,而这里从T
变成R
,正好用到了传到Operator
中的参数Func1<T, R>
。
4、如果具体化一点,上面的Observable<T>
就是事件源,对它进行lift()
变换得到新的Observable<R>
,这个新的Observable
的回调已经固定,相当于是一个模板(也就是上面lift()
方法中的call()
方法)。这时调用subscribe()
,传入的Subscriber<R>
是用户定义的事件监听者,但它监听的是新的Observable<R>
,这个Observable
的回调是固定的,它并不能产生新事件,所以得靠事件源Observable<T>
。这个时候Operator
生成一个中间的Subscriber<T>
对象,该对象的作用就是接收事件源的事件,并将事件转给用户定义的Subscriber
。这个Subscriber<T>
并没有消耗事件,而是起着一个代理的作用。所以Operator
可以看做是一个生成代理的工具类。在这个转发过程中有一个数据类型的变化过程,也是通过Operator
的转换器Func1实现的,想怎样转换数据,也是用户定义后传到Operator
中的。
小结
1、我们需要把Observable
的调用看做一条流。
2、对于Observable<T> -> Observable<R>
这个变化,订阅者为Subscriber<R>
,在subscriber()
方法调用后,流的顺序为倒序的,即从Observable<R> -> Observable<T>
,因为我们始终需要调用最开始的事件源。为了满足这个需求,会通过Operator<R, T>
这个代理工具生成一个代理Subscriber<T>
,这也解释了为什么在声明Operator
时泛型参数的顺序写为R, T
,正好可以和这一变化对应起来,用相同的泛型参数更便于理解。这样准备工作就都做好了。
3、Observable<T>
开始向Subscriber<T>
发送事件,发送的参数类型为T
,这时候通过转换器Func1
将T
变成R
,这样就能顺利的通过代理Subscriber<T>
将事件发送给Subscriber<R>
了。
4、所以流的路线为Observable<R> -> Observable<T> -> Subscriber<T> -> Subscriber<R>
。一条线分成两部分,前半部分为准备工作,后半部分为执行操作。
下图是lift()
的过程,其中虚线箭头代表生成,实线箭头代表调用。也可以参考 扔物线 - 给 Android 开发者的 RxJava 详解 中的配图。
map()方法
map()
方法是RxJava中使用lift()
最简单的方法,如果上面lift()
方法过于抽象,可以通过该方法来加深理解。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
看到OperatorMap.call()
方法,它直接生成一个新的Subscriber
,通过上面的分析可以知道,这是一个代理Subscriber,所以它的onNext()
等方法都只是直接调用了外部传进来的Subscriber
。
举个例子:
Observable.just(1.34f, 8.3453f, -534.34f, 392.99f)
.map(new Func1<Float, Integer>() {
@Override
public Integer call(Float aFloat) {
return Math.round(aFloat);
}
})
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return Integer.toBinaryString(integer);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
log("2 map onNext->" + s);
}
});
// outputs
// 2 map onNext->1
// 2 map onNext->1000
// 2 map onNext->11111111111111111111110111101010
// 2 map onNext->110001001
该例子是一个Float->Integer->String
的转换。我们按上面的流程来分析。
1、生成一个Observable<Float>
。
2、调用map()
生成一个Observable<Integer>
。
3、再调用map()
生成一个Observable<String>
。
4、subscribe()
一个Subscriber<String>
。至此流的前半部分完成。
5、执行开始,Observable<String>
发送事件,先生成一个Subscriber<Integer>
传给Observable<Integer>
(Observable<Integer>.onSubscribe.call()
)。
6、Observable<Integer>
开始发送事件,同样的生成一个Subscriber<Float>
传给Observable<Float>
(Observable<Float>.onSubscribe.call()
)。
7、真正发送事件开始,Observable<Float>
调用Subscriber<Float>.onNext(Float)
等方法,同时Subscriber<Integer>.onNext(Integer)
被调用,同时Subscriber<String>.onNext(String)
被调用,事件发送完成。
8、虽然是流的模型,但其实是一堆内部类和外部类的嵌套关系。