前言
Observable.just("1", "2")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
})
.subscribe();
无论是just
还是map
,返回的都是Observable
对象,其实这和设计模式中构造器模式很像,另外虽然上面每一步返回的是Observable
,但是实际返回都是Observable
实现类,每执行一步操作进行包装增强,这也算是代理模式的一种应用吧。
Map操作符
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
new ObservableMap<T, R>(this, mapper))
中传入上一步Observable.just
作为第一个参数,mapper
是一个Function
,这里返回的是Observable<R>
,这里可以验证之前的说法,每执行一步操作都是包装增强。
/**
* A functional interface that takes a value and returns another value, possibly with a
* different type and allows throwing a checked exception.
*
* @param <T> the input value type
* @param <R> the output value type
*/
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
Function
是转换的回调,用于确定如何转换,泛型T
是转换前的类型,R
是转换后的类型。
订阅
从上一章的分析可以知道调用subscribe()
方法最终回调到ObservableMap
的subscribeActual
方法中。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
......
}
ObservableMap
中的source
就是Observable.just
,在subscribeActual
就可以看到,它执行Observable.just
的subscribe
,这将会回调Observable.just
的subscribeActual
方法。
从这里可以猜到,Map
在我们真正的下游前,执行了一个包装增强(这里是MapObserver
),从而在下游获得到我们想要的数据形式。
接下来回顾ObservableFromArray
一下subscribeActual
的方法
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> downstream;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
}
......
@Override
public boolean isEmpty() {
return index == array.length;
}
@Override
public void clear() {
index = array.length;
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}
}
在上一章中,下游直接就是我们的observer
,而经过map
操作后source.subscribe(new MapObserver<T, U>(t, function));
是经过MapObserver
才到我们的observer
,ObservableFromArray
就操作的下游就是MapObserver
,ObservableFromArray
调用downstream.onNext(value);
会回调到MapObserver
的onNext
。到这里,可以猜到map
操作就是在MapObserver
的onNext
进行变换。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
}
看到MapObserver
的onNext
,对于map
操作已经了然于胸了。其中最关键就是下面这一段。
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
在这里downstream
就是我们真正的Observer
了。你可以在GitHub找到我仿照RxJava
流程写的伪代码。