RxJava map操作符和flatMap操作符的实现

RxJava2源码学习之一
RxJava2源码学习之二

RxJava 版本:2.2.5

map操作符的实现

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();

            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return String.valueOf(integer);
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(String string) {
                Log.e(TAG, "onNext: " + string);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        });

先上个图


map.png

Observable的create()方法简化版

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new ObservableCreate<T>(source);
}

create()方法返回的Observable是一个ObservableCreate对象。

Observable的map()方法简化版

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        return new ObservableMap<T, R>(this, mapper);
    }

Observable的subscribe()方法简化版

public final void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
}

我们进入到ObservableMap类去看

public void subscribeActual(Observer<? super U> t) {
    //1.
    source.subscribe(new MapObserver<T, U>(t, function));
}

这里的source就是create()方法返回的ObservableCreate对象。然后使用传入的下游的观察者和function构建了一个MapObserver对象。

然后source调用subscribe()方法。内部也是调用subscribeActual()方法

protected void subscribeActual(Observer<? super T> observer) {
         //创建CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            //注释1处
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

注释1处,这里的source,就是创建ObservableCreate对象的时候传入的ObservableOnSubscribe对象。

Observable.create(new ObservableOnSubscribe<Integer>() {
    //发射数据
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
})

这个时候CreateEmitter开始发射数据,调用3次onNext方法,然后调用
onComplete方法。在CreateEmitter对象内部,会真正调用传入的observer(MapObserver类型)对应的onNext(T t)方法和onComplete()方法。

MapObserver类简化版

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            //调用super方法为downstream赋值
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {

           U v;
          //应用mapper以后,返回结果
           v = mapper.apply(t)

           downstream.onNext(v);
        }
    }

MapObserver,在onNext() 方法中,获取上游发射的数据t,应用传入的mapper的apply方法,转换成期望类型的对象v,调用下游观察者(在这个例子中就是我们手写的Observer)的onNext()方法。

总结:map操作符就是将上游发射的每个数据,应用一个mapper,转化成期望的数据类型,然后再发射出去。

flatMap操作符的实现

Observable.create(new ObservableOnSubscribe<List<Integer>>() {
        @Override
        public void subscribe(ObservableEmitter<List<Integer>> emitter) throws Exception {
            List<Integer> list1 = new ArrayList<>();
            list1.add(1);
            list1.add(2);
            list1.add(3);
            List<Integer> list2 = new ArrayList<>();
            list2.add(4);
            list2.add(5);
            list2.add(6);

            emitter.onNext(list1);
            emitter.onNext(list2);

            emitter.onComplete();
        }
    }).flatMap(new Function<List<Integer>, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(List<Integer> integers) throws Exception {
           //注释1处,返回的是一个ObservableFromIterable对象
            return Observable.fromIterable(integers);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "onSubscribe: ");
        }

       @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "onNext: " + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete: ");
        }
    });

上张图


flatMap.png

create()方法返回的Observable是一个ObservableCreate对象。

然后我们在调用flatMap方法的时候,传入了一个mapper对象。该对象的apply方法返回的是一个ObservableFromIterable对象。

Observable的fromIterable方法精简版

public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {       
    return new ObservableFromIterable<T>(source);
}

Observable的flatMap()方法

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    //注意,传入的maxConcurrency参数是Integer.MAX_VALUE
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

方法重载也是狠,最终是调用了这个方法,返回一个ObservableFlatMap对象。

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
    //...
    return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}

当调用subscribe方法把观察者和被观察者关联起来的时候会调用ObservableFlatMap的subscribeActual()方法。

subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "onSubscribe: ");
        }

       @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "onNext: " + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete: ");
        }
    });

ObservableFlatMap的subscribeActual()方法简化版

public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

首先创建了一个MergeObserver对象,MergeObserverObservableFlatMap类的静态内部类。我们看一下MergeObserver的构造函数。

MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
    this.downstream = actual;//我们手写的observer
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency;//我们传入的是Integer.MAX_VALUE
    this.bufferSize = bufferSize;
    if (maxConcurrency != Integer.MAX_VALUE) {
        sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
    }
    //初始化observers
    this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}

然后source(ObservableCreate对象)调用subscribe()方法。最终会进入到ObservableCreate的subscribeActual()方法。

ObservableCreate的subscribeActual()方法简化版

protected void subscribeActual(Observer<? super T> observer) {
    //1. 创建CreateEmitter对象,传入的observer参数是MergeObserver对象
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //2. 调用观察者的onSubscribe()方法
    observer.onSubscribe(parent);
    //3. 发射数据
    source.subscribe(parent);
   
}

在注释1处,首先创建了一个CreateEmitter对象。CreateEmitter类是ObservableCreate的静态内部类。

在注释2处,调用了观察者MergeObserver的onSubscribe方法

MergeObserver的onSubscribe()方法

public void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {
        //为upstream赋值,并调用下游的onSubscribe()方法
        this.upstream = d;
        //调用下游,就是我们手写的observer的onSubscribe方法
        downstream.onSubscribe(this);
    }
}

在注释3处,发射数据,source就Observable.create()方法传入的匿名的ObservableOnSubscribe的对象。

Observable.create(new ObservableOnSubscribe<List<Integer>>() {
    @Override
    public void subscribe(ObservableEmitter<List<Integer>> emitter) throws Exception {
        List<Integer> list1 = new ArrayList<>();
        list1.add(1);
        list1.add(2);
        list1.add(3);
        List<Integer> list2 = new ArrayList<>();
        list2.add(4);
        list2.add(5);
        list2.add(6);
        //CreateEmitter调用onNext
        emitter.onNext(list1);
        //CreateEmitter调用onNext
        emitter.onNext(list2);
        //CreateEmitter调用onNext
        emitter.onComplete();
    }
})

在发射数据过程中,CreateEmitter会调用两次会onNext()和一次onComplete()。

CreateEmitter的onNext方法和onComplete方法

@Override
public void onNext(T t) {
            
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

@Override
public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}

CreateEmitter的onNext方法和onComplete方法内部会调用observer(MergeObserver类型对象)对应的onNext方法和onComplete方法。

现在我们要进入MergeObserver进行分析了,进入正题。

MergeObserver的onNext方法简化版

MergeObserver第1次调用onNext方法。注意注意,这是MergeObserver第1次调用onNext方法。

public void onNext(T t) {
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    //应用传入的mapper,在这个例子中,mapper.apply返回的是一个ObservableFromIterable对象
    p = mapper.apply(t)

    subscribeInner(p);
}

MergeObserver的subscribeInner方法简化版

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        //注释1处,创建一个InnerObserver对象
        InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
        //注释2处,添加inner
        if (addInner(inner)) {
            //注释3处,如果添加成功,p就订阅inner对象。
            p.subscribe(inner);
        }
        //跳出循环
        break;
     }
}

在注释1处,创建一个InnerObserver对象,InnerObserver类是ObservableFlatMap的一个静态内部类,注意我们构建的第一个InnerObserver对象传入的uniqueId是0。

InnerObserver的构造函数

InnerObserver(MergeObserver<T, U> parent, long id) {
    this.id = id;
    this.parent = parent;
}

在注释2处,添加inner,内部操作就是把创建好的InnerObserver对象添加到observers中去。

final AtomicReference<InnerObserver<?, ?>[]> observers;

boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        InnerObserver<?, ?>[] a = observers.get();
        if (a == CANCELLED) {
            inner.dispose();
            return false;
        }
        int n = a.length;
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}

在注释3处,如果添加成功,p就订阅inner对象。正常情况下添加是成功的,这里的p就是我们应用flatMap的mapper后返回的对象。在上面的例子中就是Observable.fromIterable()返回的对象。
Observable的fromIterable()方法简化版

public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
    return new ObservableFromIterable<T>(source);
}

返回一个ObservableFromIterable对象。

ObservableFromIterable的subscribe方法内部会调用subscribeActual方法。

subscribeActual方法精简版

public void subscribeActual(Observer<? super T> observer) {
    Iterator<? extends T> it;
    //注释1处,这里的source就是List<Integer> 对象
    it = source.iterator();
      
    boolean hasNext= it.hasNext();
     
    if (!hasNext) {
        EmptyDisposable.complete(observer);
        return;
    }
    
    //注释2处,构建一个FromIterableDisposable对象
    FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
    //注释3处,调用observer的onSubscribe()方法
    observer.onSubscribe(d);
    //注释4处,这个地方fusionMode会是true
    if (!d.fusionMode) {
        d.run();
    }
}

在注释1处,获取了source的迭代器在这个例子中就是List<Integer>的迭代器。

在注释2处构建一个FromIterableDisposable对象,并传入了InnerObserver和List<Integer>的迭代器。FromIterableDisposable是ObservableFromIterable的静态内部类。

FromIterableDisposable(Observer<? super T> actual, Iterator<? extends T> it) {
    this.downstream = actual;
    this.it = it;
}

然后在注释3处,调用observer的onSubscribe()方法,并传入构建的FromIterableDisposable对象。

InnerObserver的onSubscribe()方法简化版

 volatile SimpleQueue<U> queue;

 public void onSubscribe(Disposable d) {
        //调用onSubscribe多次不起作用,只有第一次起作用
        if (DisposableHelper.setOnce(this, d)) {
            if (d instanceof QueueDisposable) {
                QueueDisposable<U> qd = (QueueDisposable<U>) d;
                //1. m的取值是SYNC
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                if (m == QueueDisposable.SYNC) {
                    fusionMode = m;
                    //2. 为queue赋值
                    queue = qd;
                    done = true;
                    //3 . 调用parent.drain()
                    parent.drain();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;
                }
            }
        }
    }

注释1处,m的取值是SYNCFromIterableDisposable的requestFusion()方法

public int requestFusion(int mode) {
    if ((mode & SYNC) != 0) {
        fusionMode = true;
        return SYNC;
    }
    return NONE;
}

返回结果是SYNC

注释2处,为queue赋值为传入的FromIterableDisposable对象。

注释3处,调用parent.drain()方法,然后return。这里的parent就是MergeObserver,我们回到MergeObserver类。

MergeObserver的drain方法

void drain() {
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}

方法内部判断getAndIncrement为0的时候调用drainLoop方法,内部逻辑是怎么执行的,我是一步一步debug的。

MergeObserver的drainLoop方法

   void drainLoop() {
        //我们手写的observer
        final Observer<? super U> child = this.downstream;
        int missed = 1;
        for (; ; ) {
            if (checkTerminate()) {
                return;
            }
            SimplePlainQueue<U> svq = queue;
            //此条件不满足
            if (svq != null) {
                for (; ; ) {
                    if (checkTerminate()) {
                        return;
                    }
                      
                    U o = svq.poll();

                    if (o == null) {
                        break;
                    }

                    child.onNext(o);
                }
            }

            boolean d = done;
            svq = queue;
            ObservableFlatMap.InnerObserver<?, ?>[] inner = observers.get();
            int n = inner.length;

            int nSources = 0;
            //此条件不满足
            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    nSources = sources.size();
                }
            }
           //此条件不满足
            if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
                Throwable ex = errors.terminate();
                if (ex != ExceptionHelper.TERMINATED) {
                    if (ex == null) {
                        child.onComplete();
                    } else {
                        child.onError(ex);
                    }
                }
                return;
            }

            int innerCompleted = 0;
            if (n != 0) {
                long startId = lastId;
                int index = lastIndex;
                //此条件不满足
                if (n <= index || inner[index].id != startId) {
                    if (n <= index) {
                        index = 0;
                    }
                    int j = index;
                    for (int i = 0; i < n; i++) {
                        if (inner[j].id == startId) {
                            break;
                        }
                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    index = j;
                    lastIndex = j;
                    lastId = inner[j].id;
                }

                int j = index;
                //continue标签
                sourceLoop:
                for (int i = 0; i < n; i++) {
                    if (checkTerminate()) {
                        return;
                    }

                    ObservableFlatMap.InnerObserver<T, U> is = (ObservableFlatMap.InnerObserver<T, U>) inner[j];
                    //注释1处,取出InnerObserver的quque对象,就是我们传入的FromIterableDisposable对象
                    SimpleQueue<U> q = is.queue;
                    if (q != null) {
                        for (; ; ) {
                            U o;
                            try {
                                //从迭代器中取出数据,没有数据返回null,并将InnerObserver的done置为true
                                o = q.poll();
                            } catch (Throwable ex) {
                                Exceptions.throwIfFatal(ex);
                                is.dispose();
                                errors.addThrowable(ex);
                                if (checkTerminate()) {
                                    return;
                                }
                                removeInner(is);
                                innerCompleted++;
                                j++;
                                if (j == n) {
                                    j = 0;
                                }
                                continue sourceLoop;
                            }
                            if (o == null) {
                                break;
                            }
                            //调用我们手写的observer的onNext方法
                            child.onNext(o);

                            if (checkTerminate()) {
                                return;
                            }
                        }
                    }

                    boolean innerDone = is.done;
                    SimpleQueue<U> innerQueue = is.queue;
                    //发射完数据就移除掉InnerObserver
                    if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                        removeInner(is);
                        if (checkTerminate()) {
                            return;
                        }
                        innerCompleted++;
                    }

                    j++;
                    if (j == n) {
                        j = 0;
                    }
                }
                lastIndex = j;
                lastId = inner[j].id;
            }

            if (innerCompleted != 0) {
                if (maxConcurrency != Integer.MAX_VALUE) {
                    while (innerCompleted-- != 0) {
                        ObservableSource<? extends U> p;
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                continue;
                            }
                        }
                        subscribeInner(p);
                    }
                }
                continue;
            }
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }

我们第一次调用的时候会发射出第一个List中的3个数据1,2,3。

MergeObserver第2次调用onNext方法。注意注意,这是MergeObserver第2次调用onNext方法。最终会发射出第2个List中的3个数据 4,5,6。

然后MergeObserver最终会调用onComplete方法。

@Override
public void onComplete() {
    if (done) {
         return;
    }
    done = true;
    drain();
}

MergeObserver的onComplete方法内部调用链 drain->drainLoop->手写observer的onComplete方法。结束战斗。

MergeObserver的drainLoop方法内部逻辑不太好叙述,建议自己debug。

存在的疑问:都说flatMap内部无法保证发射出的数据的顺序,但是我怎么感觉顺序是保证的呢?

参考

  1. RxJava2源码学习之一
  2. RxJava2源码学习之二
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,980评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,422评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,130评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,553评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,408评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,326评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,720评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,373评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,678评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,722评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,486评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,335评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,738评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,283评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,692评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,893评论 2 335

推荐阅读更多精彩内容