Rxjava 过程分析二(操作符流程)
说明
- 主要分析操作符的设计流程, 只要关注主要环节, 大致流程。
- 由于每个操作符背后的设计基本一致, 所以随便选出一个分析即可。 那我们肯定选最简单的 map 啦哈。
基本使用
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// emitter.onNext("");
// emitter.onError();
// emitter.onComplete();
}
}, BackpressureStrategy.LATEST)
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
})
.subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
上面一堆和下面一堆我们在上一篇已经见过, 我们现在主要关注与中间那一堆, 也就是 map 的实现。
引发的思考
- 操作符是怎么承上启下连接上游和下游的呢?
- Flowable 经过操作符转换后再进行点操作, 还是同一个 Flowable 吗?
源码分析
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
return new FlowableMap<T, R>(this, mapper);
}
很简答嘛, 新建了一个 FlowableMap 而已, 把当前的 Flowable 和 怎么转换的对象(也就是接口, 提供除去让用户怎么实现怎么转换)传递进去, 并对成员变量赋值。
我们在 map 后去订阅了下游, 那么有我们上一篇分析的结果来开, 在订阅的时候是调用了 FlowableMap 的 subscribeActual。 所以我们看看里面做了什么。
protected void subscribeActual(Subscriber<? super U> s) {
source.subscribe(new MapSubscriber<T, U>(s, mapper));
}
等等这个 source 是什么呢, 好像在这里又发生了一次订阅了对吧? 对的。 这个 source 就是我们在实例化 FlowableMap 传进来的 this, 也就是上一个 Flowable 实例, 在这里是 FlowableCreate。 什么鬼! 怎么又发生了一次订阅呢。 那么好, 我们试着去分析下, 我们发生订阅后结合上片文章的分析, 发生订阅后, 会调用到 FlowableMap 中的 subscribeActual 最后不管是回调还是返回, 肯定是调用的相应的方法如 onNext 等都会调用到 这里的 MapSubscriber 里相应的方法。 你发现没, 这里 MapSubscriber 就变声了一个临时中间节点的一个下游。 那我们这个中间临时下游的 onNext 做了什么事情。
public void onNext(T t) {
U v;
try {
v = mapper.apply(t);
} catch (Throwable ex) {
return;
}
downstream.onNext(v);
}
mapper 就是我们传入 map 中的接口, 在这里调用, 从上层 Flowable 中拿到的结果, 通过 map 里的这个 function 接口去回调给用户去实现, 然后把转换后的值拿到直接塞给了最后的下游的 onNext。 你明白了没, 原理就是这么简单。 到这里前前后后都连接通了吧!
前面的疑惑问题
- 操作符是怎么承上启下的呢?
没一个操作符返回的都是自己实现的一个 Flowable, 然后保留了上一层的 Flowable, 在发生订阅时会一级一级的发生订阅, 顶层的 FlowableCreate 调用的方法, 会间接的调用到了操作符相应的方法, 然后操作的相应方法除了实现自己的业务, 然后把结果传给了底层的 Flowable 或者下游。
- Flowable 经过操作符后还是同一个 Flowable 吗?
很明显不是