为了学习 RxJava 的原理,参考其源码,自己动手实现一个简化的 RxJava,代码 SimpleRx
本文接上一篇 自己动手实现 RxJava 理解其调用链
上一篇主要实现了操作符 create、map、lift、subscribeOnIO、observeOn,后来我又在代码补充了 flatMap、just、from、merge,在第一篇文章的基础上,后来补充的这几个操作符就很容易理解了,所以这里就不介绍了
本文讲参考 RxJava 源码,如何实现 zip
Demo
Observable<Integer> o1 = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(8);
subscriber.onNext(9);
}
});
Observable<String> o2 = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("A");
subscriber.onNext("B");
subscriber.onNext("C");
}
});
Observable.zip(o1, o2, new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}
});
很显然,我们希望看到的结果是:8A 和 9B,而 C 是被忽略的
Observable
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return just(new Observable<?>[]{o1, o2}).lift(new OperatorZip<R>(zipFunction));
}
public static <T> Observable<T> just(final T value) {
return create(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.onNext(value);
}
});
}
这里我们看看非链式的写法
final Observable[] observables = new Observable[]{o1, o2};
OnSubscribe onSubscribe = new OnSubscribe() {
@Override
public void call(Subscriber subscriber) {
subscriber.onNext(observables);
}
}
Observable o3 = new Observable(onSubscribe);
Func2 func = new Func2()... // integer + s
OperatorZip operatorZip = new OperatorZip(func);
// o3.lift() 即
OnSubscribeLift onSubscribeLift = new OnSubscribeLift(onSubscribe, operatorZip);
Observable zipObservable = new Observable(onSubscribeLift);
这样就初始化完了,接下来就是看 subscribe(subscriber) 调用过程
**
- zipObservable.subscribe(subscriber) -->
- onSubscribeLift.call(subscriber) -->
- Subscriber subscriber2= operatorZip.call(subscriber) --> // 重点就是这一步
- onSubscribe.call(subscriber2)
- subscriber2.onNext(observables)
**
先看 operatorZip.call(subscriber),然后再看 subscriber2.onNext(observables)
public final class OperatorZip<R> implements Operator<R, Observable<?>[]> {
final FuncN<? extends R> zipFunction;
public OperatorZip(Func2 f) {
zipFunction = FuncN.fromFunc(f); // 并不重要,见文末
}
@Override
public Subscriber<? super Observable<?>[]> call(Subscriber<? super R> child) {
final Zip<R> zipper = new Zip<R>(child, zipFunction);
final ZipSubscriber subscriber = new ZipSubscriber(child, zipper);
return subscriber;
}
private final class ZipSubscriber extends Subscriber<Observable[]> {
final Subscriber<? super R> child;
final Zip<R> zipper;
public ZipSubscriber(Subscriber<? super R> child, Zip<R> zipper) {
this.child = child;
this.zipper = zipper;
}
@Override
public void onNext(Observable[] os) {
zipper.start(os);
}
}
}
截止到现在,最后一步是 zipper.start(observables),继续看 zipper 是何方神圣
static final class Zip<R> {
final Subscriber<? super R> child;
private final FuncN<? extends R> zipFunction;
private Subscriber[] subscribers;
public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) {
this.child = child;
this.zipFunction = zipFunction;
}
public void start(Observable[] observables) {
final int length = os.length;
subscribers = new Subscriber[length];
for (int i = 0; i < observables.length; i++) {
InnerSubscriber subscriber = new InnerSubscriber();
subscribers[i] = subscriber;
}
for (int i = 0; i < observables.length; i++) {
observables[i].subscribe(subscribers[i]);
}
}
private void tick() {
final int length = subscribers.length;
final Object[] objs = new Object[length];
for (int i = 0; i < length; i++) {
InnerSubscriber subscriber = (InnerSubscriber) subscribers[i];
objs[i] = subscriber.queue.peek();
if (objs[i] == null) {
return;
}
}
for (int i = 0; i < length; i++) {
InnerSubscriber subscriber = (InnerSubscriber) subscribers[i];
subscriber.queue.poll();
}
child.onNext(zipFunction.call(objs));
}
final class InnerSubscriber extends Subscriber {
Queue queue = new LinkedList();
@Override
public void onNext(Object o) {
queue.offer(o);
tick();
}
}
}
原理就是把多个 observable 合并成一个 observables 事件
ZipSubscriber 的 onNext(observables) 调用 zip.start(observables)
遍历 observables 执行 observable.subscribe(InnerSubscriber)
InnerSubscriber 内部维护一个队列
InnerSubscriber.onNext(Object) 把 obj 存入队列,并触发 tick()
tick() 会检查所有的 InnerSubscriber.queue 是否都有值
如果是,才触发上层的 subscriber 的 onNext(R value) 当然 value 得先经过我们定义的 func2 的处理,即 value = zipFunction.call(Object[]),调用了 subscriber.onNext(value) 后,再把每一个 InnerSubscriber.queue 的第一个元素移除
如果不是,则直接 return
假如 zip(o1, o2, o3, o4, o5),那么就会创建5个 InnerSubscriber,任何一个 observable.call(InnerSubscriber) --> InnerSubscriber.onNext(obj) 都会先把 obj 加入 InnerSubscriber 内部的 queue 再触发tick()
public interface FuncN<R> {
R call(Object... args);
static <T0, T1, R> FuncN<R> fromFunc(final Func2<? super T0, ? super T1, ? extends R> f) {
return new FuncN<R>() {
@SuppressWarnings("unchecked")
@Override
public R call(Object... args) {
if (args.length != 2) {
throw new RuntimeException("Func2 expecting 2 arguments.");
}
return f.call((T0) args[0], (T1) args[1]);
}
};
}
}