Rxjava2 Observable的数据变换详解及实例(一)

简要:

需求了解:

对于 Observable 发射的数据有的时候可能不满足我们的要求,或者需要转化为其他类型的数据,比如:缓存,数据类型转化,数据拦截等。此时可以使用 Rx 中的一些对于数据操作的操作进行数据的变换,方便我们的开发。

执行变换的操作方法:

  • Buffer:它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
  • Map:对序列的每一项都应用一个函数来变换Observable发射的数据序列
  • FlatMap,FlatMapIterable,ConcatMap:将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平铺化的放进一个单独的 Observable
  • SwitchMap:将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
  • Window:定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
  • GroupBy:将Observable分拆为Observable集合,将原始Observable发射的数据按 Key
    分组,每一个Observable发射一组不同的数据
  • Scan:对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
  • Cast:在发射之前强制将Observable发射的所有数据转换为指定类型

1. Buffer

定期收集Observable的数据放进一个数据包裹(缓存),然后发射这些数据包裹,而不是一次发射一个值。

img-buffer

Buffer 操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生 的Observable发射这些数据的缓存集合。 Buffer 操作符在很多语言特定的实现中有很多种变 体,它们在如何缓存这个问题上存在区别。

Window 操作符与 Buffer 类似,但是它在发射之前把收集到的数据放进单独的Observable, 而不是放进一个数据结构。

注意: 如果原来的Observable发射了一个 onError 通知, Buffer 会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

在RxJava中的一些 Buffer 的操作如下:

1.1 buffer(count)

img-buffer(count)

以列表(List)的形式发射非重叠的缓存,每一个缓存至多包含来自原始 Observable 的 count 项数据(最后发射的列表数据可能少于count项)。

实例代码:

    // 1. buffer(count)     
    // 以列表(List)的形式发射非重叠的缓存,
    // 每一个缓存至多包含来自原始 Observable的count项数据(最后发射的列表数据可能少于count项)
    Observable.range(1, 10)
        .buffer(3)
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> bufferr(1) accept: " + t);
            }
        });

输出:

--> bufferr(1) accept: [1, 2, 3]
--> bufferr(1) accept: [4, 5, 6]
--> bufferr(1) accept: [7, 8, 9]
--> bufferr(1) accept: [10]

Javadoc: buffer(count)

1.2 buffer(boundary)

开始创建一个List 收集原始 Observable 数据,监视一个名叫 boundary 的Observable,每当这个Observable发射了一个值,它就创建一个新的 List 开始收集来自原始Observable的数据并发射原来已经收集数据的 List, 当 boundary Observable 发送了完成通知,会将此时还未发送的 List 发送。

注意: 所有发送的 List 可能没有收集到数据,此时数据的收集可能并不会完整收集所有原始 Observable 数据。

img-buffer(boundary)

实例代码:

    // 2. buffer(boundary) 监视一个名叫boundary的Observable,
    // 开始创建一个List收集原始 Observable 数据,监视一个名叫boundary的Observable,
    // 每当这个Observable发射了一个值,它就创建一个新的List开始收集来自原始Observable的数据并发射原来已经收集数据的List,
    // 当boundary发送了完成通知,会将此时还未发送的 List 发送。 
    // 所有发送的 List 可能没有收集到数据,此时数据的收集可能并不会完整收集所有原始Observable数据。
    Observable.range(1, 10000)
        .buffer(Observable.timer(1, TimeUnit.MILLISECONDS))         // 1毫秒后开始接受原始数据
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> accept(2): " + t.size());   // 每次收集的数据序列个数
            }
        });

输出:

--> accept(2): 2858
--> accept(2): 5471

Javadoc: buffer(boundary)

1.3 buffer(count, skip)

从原始Observable的第一项数据开始创建新的缓存,此后每当收 到 skip 项数据,用 count 项数据填充缓存:开头的一项和后续的 count-1 项,它以列表 (List)的形式发射缓存,取决于 count 和 skip 的值,这些缓存可能会有重叠部分(比如skip < count时),也可能会有间隙(比如skip > count时)。

img-buffer(count, skip)

解析: 在指定的数据序列中移动指针来获取缓存数据:指针每次移动 skip 个数据长度,每次缓存指针位置及后面count个数据,指针初始位置在原始数据的第一个(存在的情况下)。

实例代码:

    // 3. buffer(int count, int skip)
    // 在指定的数据中移动指针来获取缓存数据:指针每次移动1个数据长度,每次缓存3个数据
    Observable.range(1, 5)
        .buffer(3, 1)
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> bufferr(3) accept: " + t);
            }
        });

输出:

--> bufferr(3) accept: [1, 2, 3]
--> bufferr(3) accept: [2, 3, 4]
--> bufferr(3) accept: [3, 4, 5]
--> bufferr(3) accept: [4, 5]
--> bufferr(3) accept: [5]

Javadoc: buffer(count, skip)

1.4 buffer(timespan, TimeUnit)

定期以 List 的形式发射新的数据,在每个时间段,收集来自原始 Observable 的数据(从前面一个数据包裹之后,或者如果是第一个数据包裹,从有观察者订阅原来的 Observale 之后开始)。还有另一个版本的 buffer 接受一个 Scheduler 参数。

img-buffer(timespan,TimeUnit)

解析: 每隔 timespan 时间段以 List 的形式收集原始Observable的数据

实例代码:

    // 4. buffer(long timespan, TimeUnit unit)
    // 每隔timespan时间段以list的形式收集数据
    Observable.range(1, 50000)
        .buffer(1, TimeUnit.MILLISECONDS)                                   // 每隔1毫秒收集一次原始序列数据
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> bufferr(4) accept: " + t.size());   // 每次收集的数据序列个数
            }
        });

输出:

--> bufferr(4) accept: 2571
--> bufferr(4) accept: 5457
--> bufferr(4) accept: 13248
--> bufferr(4) accept: 12755
--> bufferr(4) accept: 9543
--> bufferr(4) accept: 6426

注意: buffer(timespan,TimeUnit) 默认情况下会使用 computation 调度器
Javadoc: buffer(timespan,TimeUnit)
Javadoc: buffer(timespan,TimeUnit,Scheduler)

1.5 buffer(timespan, TimeUnit, count)

每当收到来自原始 Observablecount 项数据,或者每过了一段指定 timespan 时间后, 就以 List 的形式发射这期间的数据,即使数据项少于 count 项。还有另一个版本的 buffer 接受一个 Scheduler 参数。

img-buffer(timespan, TimeUnit, count)

实例代码:

    // 5. buffer(long timespan, TimeUnit unit, int count)
    // 每隔1毫秒缓存50个数据
    Observable.range(1, 1000)
        .buffer(1, TimeUnit.MILLISECONDS, 50)                               // 每隔1毫秒收集50个数据序列
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> bufferr(5) accept: " + t.size());   // 每次收集的数据序列个数
            }
        });

输出:

--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 20
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 4
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 26

注意: buffer(timespan, TimeUnit, count) 默认情况下会使用 computation 调度器
Javadoc: buffer(timespan, TimeUnit, count)
Javadoc: buffer(timespan, TimeUnit, scheduler, count)

1.6 buffer(timespan, timeskip, TimeUnit)

在每一个 timeskip 时期内都创建一个新的 List ,然后用原始 Observable 发射的每一项数据填充这个列表(在把这个 List 当做自己的数据发射前,从创建时开始,直到过了 timespan 这么长的时间)。如果 timespan 长于 timeskip ,它发射的数据包将会重叠,因此可能包含重复的数据项。

img-buffer(imespan, timeskip, TimeUnit)

解析: 在每隔 timeskip 时间段都创建一个新的 List ,每个 List 都独立收集 timespan 时间段原始Observable发射的数据。 因此在 timespan 长于 timeskip 时,它发射的数据包将会重叠,因此不同 List 中可能包含重复的数据项。 还有另一个版本的 buffer 接受一个 Scheduler 参数。

实例代码:

    // 6. buffer(long timespan, long timeskip, TimeUnit unit)
    // 在每一个timeskip时期内都创建一个新的 List,
    // 每个List都独立收集timespan时间段原始Observable发射的数据,
    // 如果 timespan 长于 timeskip,它发射的数据包将会重叠,因此不同List中可能包含重复的数据项
    Observable.range(1, 50000)
            .buffer(1, 1, TimeUnit.MILLISECONDS, Schedulers.newThread())
            .subscribe(new Consumer<List<Integer>>() {

                @Override
                public void accept(List<Integer> t) throws Exception {
                    System.out.println("--> accept(6): " + t.size());   // 每次收集的数据序列个数
                }
            });

输出:

--> accept(6): 1412
--> accept(6): 733
--> accept(6): 10431
--> accept(6): 694
--> accept(6): 18944
--> accept(6): 10710
--> accept(6): 944
--> accept(6): 6132

注意:buffer(imespan, timeskip, TimeUnit) 默认情况下会使用 computation 调度器。
Javadoc: buffer(imespan, timeskip, TimeUnit)
Javadoc: buffer(imespan, timeskip, TimeUnit, schedule)

1.7 buffer(bufferClosingSelector)

当它订阅原来的Observable时,开始将数据收集到一个List,然后它调用 bufferClosingSelector 生成第二个Observable,当第二个Observable 发射一个TClosing 时,buffer 发射当前的 List,然后重复这个过程:开始组装一个新的List,然后调用bufferClosingSelector创建一个新的Observable并监视它。

注意: 它会一直这样做直到原来的Observable执行完成,可以收集完整的原始 Observable 的数据

img-buffer(bufferClosingSelector)

实例代码:

    // 7. buffer(Callable<ObservableSource<T>> boundarySupplier)
    // 当它订阅原来的Observable时,开始将数据收集到一个List,然后它调用 bufferClosingSelector 生成第二个Observable,
    // 当第二个Observable 发射一个 TClosing 时,buffer 发射当前的 List ,
    // 然后重复这个过程:开始组装一个新的List,然后调用bufferClosingSelector创建一个新的Observable并监视它。
    // 它会一直这样做直到原来的Observable执行完成。会收集完整的原始 Observable 的数据
    Observable.range(1, 50000)
        .buffer(new Callable<Observable<Long>>() {

            @Override
            public Observable<Long> call() throws Exception {
                return Observable.timer(1, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> accept(7): " + t.size());   // 每次收集的数据序列个数
            }
        });

输出:

--> accept(7): 14650
--> accept(7): 9708
--> accept(7): 25642

Javadoc: buffer(bufferClosingSelector)

2. Map

对Observable发射的每一项数据应用一个函数,执行变换操作。

实例代码:

    // map(Function<T,R))
    // 接受原始Observable的数据,发送处理后的数据
    Observable.range(1, 5)
        .map(new Function<Integer, Integer>() {

            @Override
            public Integer apply(Integer t) throws Exception {
                System.out.println("--> apply: " + t);
                return t*t; // 计算原始数据的平方
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept Map: " + t);
            }
        });

输出:

--> apply: 1
--> accept Map: 1
--> apply: 2
--> accept Map: 4
--> apply: 3
--> accept Map: 9
--> apply: 4
--> accept Map: 16
--> apply: 5
--> accept Map: 25

Javadoc: map(mapper)

3. FlatMap

主要对原始数据进行转换操作后发送至订阅者。

RxJava2 中的一些 FlatMap 操作方法如下:

3.1 flatMap(mapper)

FlatMap 将一个发射数据的 Observable 变换为 多个 Observables,然后将它们发射的数据合并后放进一个单独的 Observable。

img-flatMap(mapper)

FlatMap 操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后 FlatMap 合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的 Observable发射这些次级Observable发射的数据的完整集合。

注意: FlatMap 对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。
在许多语言特定的实现中,还有一个操作符不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据,这个操作符通常被叫作ConcatMap或者类似的名字。

实例代码:

    //  1. flatMap(Function)
    //  对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,
    //  然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射
    Observable.range(1, 5)
        .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(1): " + t);                           // 原始数据
                return Observable.range(1, t).subscribeOn(Schedulers.newThread());  // 处理后数据
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept flatMap(1): " + t);                  // 接受的所有数据
            }
        });

输出:

--> apply(1): 1
--> apply(1): 2
--> apply(1): 3
--> apply(1): 4
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> apply(1): 5
--> accept flatMap(1): 1
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> accept flatMap(1): 3
--> accept flatMap(1): 4
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> accept flatMap(1): 3
--> accept flatMap(1): 4
--> accept flatMap(1): 5
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> accept flatMap(1): 3

Javadoc: flatMap(mapper)

3.2 flatMap(mapper, maxConcurrency)

maxConcurrency 这个参数设置 flatMap 从原来的Observable映射Observables的最大同时订阅数。当达到这个限制时,它会等待其中一个终止然后再订阅另一个。

img-flatMap(mapper, maxConcurrency)

实例代码:

    // 2. flatMap(Function, maxConcurrency)
    // maxConcurrency 这个参数设置 flatMap 从原来的Observable映射Observables的最大同时订阅数。
    // 当达到这个限制时,它会等待其中一个终止然后再订阅另一个
    Observable.range(1, 5)
    .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

        @Override
        public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
            System.out.println("--> apply(2): " + t);
            return Observable.range(1, t).subscribeOn(Schedulers.newThread());
        }
    // 指定最大订阅数为1,此时等待上一个订阅的Observable结束,在进行下一个Observable订阅
    }, 1).subscribe(new Consumer<Integer>() {

        @Override
        public void accept(Integer t) throws Exception {
            System.out.println("--> accept flatMap(2): "+ t);
        }
    });

输出:

--> apply(2): 1
--> apply(2): 2
--> apply(2): 3
--> apply(2): 4
--> apply(2): 5
--> accept flatMap(2): 1
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 3
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 3
--> accept flatMap(2): 4
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 3
--> accept flatMap(2): 4
--> accept flatMap(2): 5

Javadoc: flatMap(mapper, maxConcurrency)

3.3 flatMap(mapper, delayErrors)

delayError 这个参数指定是否延迟发生 Error 的Observable通知。还有一个可以指定最大订阅数参数 maxConcurrency 的变体。

img-flatMap(mapper, delayErrors)

解析: 当值为 true 时延迟发生Error的这个订阅的Observable通知,不中断当前的订阅操作,继续下一个Observable的订阅,在所有订阅的Observable全部结束后发送 Error 这个Observable的通知,当值为 false 时则中断所有订阅的操作,并发送 Error 的通知。

实例代码:

    // 3. flatMap(Function, delayErrors)
    // delayErrors 这个参数指定是否延迟发生Error的Observable通知
    // 当true 时延迟发生Error的这个订阅的Observable通知,不中断当前的订阅操作,
    // 继续下一个Observable的订阅,在所有订阅的Observable全部结束后发送Error这个Observable的通知
    Observable.range(1, 5)
        .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(3): " + t);
                
                return Observable.create(new ObservableOnSubscribe<Integer>() {

                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        if( t == 3) {
                            throw new NullPointerException("delayErrors test!");    // 测试 Error
                        }
                        for (int i = 1; i <= t; i++) {
                            emitter.onNext(i);
                        }
                        emitter.onComplete();
                    }
                });
            }
        // 设置延迟 Error 通知到最后
        }, true).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept flatMap(3): "+ t);
            }
        },new Consumer<Throwable>() {

            @Override
            public void accept(Throwable t) throws Exception {
                System.out.println("--> acceot Error(3): " + t);
            }
        });

输出:

--> apply(3): 1
--> accept flatMap(3): 1
--> apply(3): 2
--> accept flatMap(3): 1
--> accept flatMap(3): 2
--> apply(3): 3
--> apply(3): 4
--> accept flatMap(3): 1
--> accept flatMap(3): 2
--> accept flatMap(3): 3
--> accept flatMap(3): 4
--> apply(3): 5
--> accept flatMap(3): 1
--> accept flatMap(3): 2
--> accept flatMap(3): 3
--> accept flatMap(3): 4
--> accept flatMap(3): 5
--> acceot Error(3): java.lang.NullPointerException: delayErrors test!

Javadoc: flatMap(Function, delayErrors)
Javadoc: flatMap(Function, delayErrors, maxConcurrency)

3.4 flatMapIterable(mapper)

flatMapIterable 这个变体成对的打包数据,然后生成 Iterable 而不是原始数据和生成的 Observables,但是处理方式是相同的。

img-flatMapIterable(Func)

解析: 对数据进行处理转换成 Iterable 来发射数据。

实例代码:

    //  4. flatMapIterable(Function(T,R))
    //  对数据进行处理转换成Iterable来发射数据
    Observable.range(1, 5)
        .flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() {

            @Override
            public Iterable<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply: " + t);
                ArrayList<Integer> list = new ArrayList<Integer>();
                list.add(888);
                list.add(999);
                return list; // 将原始数据转换为两个数字发送
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept flatMapIterable(4): " + t);
            }
        });

输出:

--> apply: 1
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 2
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 3
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 4
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 5
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999

Javadoc: flatMapIterable(mapper)

3.5 flatMapIterable(mapper, resultSelector)

参数 mapper 接收原始数据,resultSelector 同时接收原始数据和 mapper 处理的数据,进行二次数据转换。

img-flatMapIterable(mapper, resultSelector)

实例代码:

    //  5. flatMapIterable(Function(T,R),Function(T,T,R))
    //  第一个func接受原始数据,转换数据,第二个func同时接受原始和处理的数据,进行二次转换处理
    Observable.range(1, 3)
            .flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() {

                @Override
                public Iterable<? extends Integer> apply(Integer t) throws Exception {
                    ArrayList<Integer> list = new ArrayList<Integer>();
                    list.add(888);
                    list.add(999);
                    return list; // 将原始数据转换为两个数字发送
                }
            }, new BiFunction<Integer, Integer, Integer>() {

                @Override
                public Integer apply(Integer t1, Integer t2) throws Exception {
                    System.out.println("--> apply(5): t1 = " + t1 + ", t2 = " + t2);
                    return t1 + t2; // 将原始数据和处理过的数据组合进行二次处理发送
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept flatMapIterable(5): " + t);
                }
            });

输出:

--> apply(5): t1 = 1, t2 = 888
--> accept flatMapIterable(5): 889
--> apply(5): t1 = 1, t2 = 999
--> accept flatMapIterable(5): 1000
--> apply(5): t1 = 2, t2 = 888
--> accept flatMapIterable(5): 890
--> apply(5): t1 = 2, t2 = 999
--> accept flatMapIterable(5): 1001
--> apply(5): t1 = 3, t2 = 888
--> accept flatMapIterable(5): 891
--> apply(5): t1 = 3, t2 = 999
--> accept flatMapIterable(5): 1002

Javadoc: flatMapIterable(mapper, resultSelector)

4. ConcatMap

concatMap 操作符的功能和 flatMap 是非常相似的,只是有一点,concatMap 最终输出的数据序列和原数据序列是一致,它是按顺序链接Observables,而不是合并(flatMap用的是合并)。

通过 mapper 处理原数据后,转换成 Observables ,按照顺序进行连接 Observables 发送数据。

img-concatMap(mapper)

解析: concatMapflatMap的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并ObservablesflatMap采用的merge,而concatMap采用的是连接(concat)。区别:concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。

实例代码:

    // 1. concatMap(Function(T,R))
    // 按照顺序依次处理原始数据和处理的数据
    Observable.range(1, 3)
        .concatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(1): " + t);
                return Observable.range(1, t).doOnSubscribe(new Consumer<Disposable>() {

                    @Override
                    public void accept(Disposable t) throws Exception {
                        System.out.println("--> accept(1): Observable on Subscribe");   // 当前的Observable被订阅
                    }
                });
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept concatMap(1): " + t);
            }
        });
    
    System.out.println("--------------------------------------------");
    // 2. concatMap(mapper, prefetch)
    // prefetch 参数是在处理后的Observables发射的数据流中预读数据个数,不影响原数据的发射和接收顺序
    Observable.range(1, 3)
        .concatMap(new Function<Integer, ObservableSource<? extends Integer>>() {
    
            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(2): " + t);
                return Observable.range(1, 3).doOnSubscribe(new Consumer<Disposable>() {
    
                    @Override
                    public void accept(Disposable t) throws Exception {
                        System.out.println("--> accept(2): Observable on Subscribe");   // 当前的Observable被订阅
                    }
                });
            }
        }, 2).subscribe(new Consumer<Integer>() {
    
            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept concatMap(2): " + t);
            }
        });

输出:

--> apply(1): 1
--> accept(1): Observable on Subscribe
--> accept concatMap(1): 1
--> apply(1): 2
--> accept(1): Observable on Subscribe
--> accept concatMap(1): 1
--> accept concatMap(1): 2
--> apply(1): 3
--> accept(1): Observable on Subscribe
--> accept concatMap(1): 1
--> accept concatMap(1): 2
--> accept concatMap(1): 3
--------------------------------------------
--> apply(2): 1
--> accept(2): Observable on Subscribe
--> accept concatMap(2): 1
--> accept concatMap(2): 2
--> accept concatMap(2): 3
--> apply(2): 2
--> accept(2): Observable on Subscribe
--> accept concatMap(2): 1
--> accept concatMap(2): 2
--> accept concatMap(2): 3
--> apply(2): 3
--> accept(2): Observable on Subscribe
--> accept concatMap(2): 1
--> accept concatMap(2): 2
--> accept concatMap(2): 3

Javadoc: concatMap(mapper)
Javadoc: concatMap(mapper, refetch)

5. SwitchMap

有选择的订阅 Observable,当原始 Observable 发射一个数据,通过 witchMap 返回一个 Observable,
当原始Observable发射一个新的数据时,它将取消订阅并停止监视产生执之前的Observable,开始监视当前新的Observable。

img-SwitchMap

解析: 如果上一个任务尚未完成时,就开始下一个任务的话,上一个任务就会被取消掉。如果所有任务都是在同一个线程里执行的话,此时这个操作符与 ContactMap 一致,都是依次顺序执行。只有在不同的线程里执行的时候,即线程方案为newThread的时候,才会出现这种情况,常用于网络请求中。

实例代码:

    // 1. witchMap(Function(T,R))
    // 同一个线程执行
    Observable.range(1, 3)
    .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() {

        @Override
        public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
            System.out.println("--> apply(1): " + t);
            return Observable.range(1, 3);  // 每个任务指定在同一个线程执行
        }
    }).subscribe(new Consumer<Integer>() {

        @Override
        public void accept(Integer t) throws Exception {
            System.out.println("--> accept switchMap(1): " + t);
        }
    });
    
    System.out.println("---------------------------------------");
    // 2. witchMap(Function(T,R))
    // 不同线程执行
    Observable.range(1, 3)
        .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(2): " + t);
                return Observable.range(1, 3)
                                 .subscribeOn(Schedulers.newThread());  // 每个任务指定在子线程执行
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept switchMap(2): " + t);
            }
        });
    

    System.out.println("---------------------------------------");
    // 3. switchMap(mapper, bufferSize)
    // bufferSize 参数是从当前活动的Observable中预读数据的大小
    Observable.range(1, 3)
    .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() {

        @Override
        public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
            System.out.println("--> apply(3): " + t);
            return Observable.range(1, 5).subscribeOn(Schedulers.newThread());
        }
    }, 3).subscribe(new Consumer<Integer>() {   // 指定缓存大小为3

        @Override
        public void accept(Integer t) throws Exception {
            System.out.println("--> accept switchMap(3): " + t);
        }
    });

输出:

--> apply(1): 1
--> accept switchMap(1): 1
--> accept switchMap(1): 2
--> accept switchMap(1): 3
--> apply(1): 2
--> accept switchMap(1): 1
--> accept switchMap(1): 2
--> accept switchMap(1): 3
--> apply(1): 3
--> accept switchMap(1): 1
--> accept switchMap(1): 2
--> accept switchMap(1): 3
---------------------------------------
--> apply(2): 1
--> apply(2): 2
--> apply(2): 3
--> accept switchMap(2): 1
--> accept switchMap(2): 2
--> accept switchMap(2): 3
---------------------------------------
--> apply(3): 1
--> apply(3): 2
--> apply(3): 3
--> accept switchMap(3): 1
--> accept switchMap(3): 2
--> accept switchMap(3): 3
--> accept switchMap(3): 4
--> accept switchMap(3): 5

Javadoc: switchMap(mapper)
Javadoc: switchMap(mapper, bufferSize)

接续:

后续的Rx相关数据变换部分请参考: Rxjava2 Observable的数据变换详解及实例(二)

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容