Rxjava2 Observable的结合操作详解及实例

简要:

需求了解:

在使用 RxJava 开发的过程中,很多时候需要结合多个条件或者数据的逻辑判断,比如登录功能的表单验证,实时数据比对等。这个时候我们就需要使用 RxJava 的结合操作符来完成这一需求,Rx中提供了丰富的结合操作处理的操作方法。

可用于组合多个Observables的操作方法:

  • CombineLatest:当Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据,然后发射这个函数的结果。
  • Join:只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
  • Merge:合并多个Observables的发射物,可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。
  • Zip:通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体严格按照数量以及顺序发射单个数据项。
  • StartWith:在数据序列的开头插入一条指定的数据项或者数据序列。
  • SwitchOnNext:将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最新发射的Observable的数据项。

1. CombineLatest

当 Observables 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。

CombineLatest 操作符行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时 zip 才发射数据。 CombineLatest 则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时, CombineLatest 使用一 个函数结合它们最近发射的数据,然后发射这个函数的返回值。

img-CombineLatest

解析: combineLatest 操作符可以结合多个Observable,可以接收 2-9 个Observable对象, 在其中原始Observables的任何一个发射了一条数据时, CombineLatest 使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。此外combineLatest 操作符还有一些接收 Iterable , 数组方式的变体,以及其他指定参数combiner、bufferSize、和combineLatestDelayError方法等变体,在此就不在详细展开了,有兴趣的可以查看官方的相关API文档了解。

实例代码:

    // Observables 创建
    Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
    Observable<Long> observable2 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
    Observable<Long> observable3 = Observable.intervalRange(100, 5, 1, 1, TimeUnit.SECONDS);
    
    // 1. combineLatest(ObservableSource, ObservableSource [支持2-9个参数]...,  BiFunction)
    // 结合多个Observable, 当他们其中任意一个发射了数据时,使用函数结合他们最近发射的一项数据
    Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, String>() {
    
        @Override
        public String apply(Long t1, Long t2) throws Exception {
            System.out.println("--> apply(1) t1 = " + t1 + ", t2 = " + t2);
            if (t1 + t2 == 10) {
                return "Success";   // 满足一定条件,返回指定的字符串
            }
            return t1 + t2 + "";    // 计算所有数据的和并转换为字符串
        }
    }).subscribe(new Consumer<String>() {
    
        @Override
        public void accept(String t) throws Exception {
            System.out.println("----> accept combineLatest(1): " + t);
        }
    });
    
    System.out.println("--------------------------------------------------------");
    // 2. combineLatest(T1, T2, T3, Function)
    // Observables的结合
    Observable.combineLatest(observable1, observable2, observable3, new Function3<Long, Long, Long, String>() {
        @Override
        public String apply(Long t1, Long t2, Long t3) throws Exception {
            System.out.println("--> apply(2): t1 = " + t1 + ", t2 = " + t2 + ", t3 = " + t3);
            return t1 + t2 + t3 + "";   // 计算所有数据的和并转换为字符串
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept(2): " + t);
        }
    });

输出:

--> apply(1) t1 = 1, t2 = 1
----> accept combineLatest(1): 2
--> apply(1) t1 = 2, t2 = 1
----> accept combineLatest(1): 3
--> apply(1) t1 = 3, t2 = 1
----> accept combineLatest(1): 4
--> apply(1) t1 = 3, t2 = 2
----> accept combineLatest(1): 5
--> apply(1) t1 = 4, t2 = 2
----> accept combineLatest(1): 6
--> apply(1) t1 = 4, t2 = 3
----> accept combineLatest(1): 7
--> apply(1) t1 = 5, t2 = 3
----> accept combineLatest(1): 8
--> apply(1) t1 = 5, t2 = 4
----> accept combineLatest(1): 9
--> apply(1) t1 = 5, t2 = 5
----> accept combineLatest(1): Success
--------------------------------------------------------
--> apply(2): t1 = 1, t2 = 1, t3 = 100
--> accept(2): 102
--> apply(2): t1 = 2, t2 = 1, t3 = 100
--> accept(2): 103
--> apply(2): t1 = 2, t2 = 1, t3 = 101
--> accept(2): 104
--> apply(2): t1 = 2, t2 = 2, t3 = 101
--> accept(2): 105
--> apply(2): t1 = 3, t2 = 2, t3 = 101
--> accept(2): 106
--> apply(2): t1 = 3, t2 = 2, t3 = 102
--> accept(2): 107
--> apply(2): t1 = 4, t2 = 2, t3 = 102
--> accept(2): 108
--> apply(2): t1 = 4, t2 = 2, t3 = 103
--> accept(2): 109
--> apply(2): t1 = 5, t2 = 2, t3 = 103
--> accept(2): 110
--> apply(2): t1 = 5, t2 = 3, t3 = 103
--> accept(2): 111
--> apply(2): t1 = 5, t2 = 3, t3 = 104
--> accept(2): 112
--> apply(2): t1 = 5, t2 = 4, t3 = 104
--> accept(2): 113
--> apply(2): t1 = 5, t2 = 5, t3 = 104
--> accept(2): 114

Javadoc: combineLatest(T1, T2, T3... , T9, combiner)

2. Join

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。

img-join

Join 操作符结合两个Observable发射的数据,基于时间窗口(你定义的针对每条数据特定的原则)选择待集合的数据项。你将这些时间窗口实现为一些Observables,它们的生命周期从任何一条Observable发射的每一条数据开始。当这个定义时间窗口的Observable发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它将继续结合其它Observable发射的任何数据项。你定义一个用于结合数据的函数。

解析: join(other, leftEnd, rightEnd, resultSelector) 相关参数的解析

  • other: 源Observable与其组合的目标Observable。
  • leftEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是源Observable发射数据的有效期。
  • rightEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是目标Observable发射数据的有效期。
  • resultSelector: 接收源Observable和目标Observable发射的数据项, 处理后的数据返回给观察者对象。

注意: 这是源Observable和目标Observable发射数据在任意一个基于时间窗口的有效期内才会接收到组合数据,这就意味着可能有数据丢失的情况,在其中一个已经发射完所有数据,并且没有处于时间窗口的数据情况,另一个Observable的数据发射将不会收到组合数据。

示例代码:

    // Observable的创建
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 1. join(other, leftEnd, rightEnd, resultSelector)
    // other: 目标组合的Observable
    // leftEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是源Observable发射数据的有效期
    // rightEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是目标Observable发射数据的有效期
    // resultSelector: 接收源Observable和目标Observable发射的数据项, 处理后的数据返回给观察者对象
    sourceObservable.join(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable发射数据的有效期为1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目标Observable发射数据的有效期为1000毫秒
        }
    }, new BiFunction<Long, Long, String>() {
        @Override
        public String apply(Long t1, Long t2) throws Exception {
            return "t1 = " + t1 + ", t2 = " + t2;                   // 对数据进行组合后返回和观察者
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept(1): " + t);
        }
    });

    System.in.read();

输出:

-----> t1 is emitter: 1
-----> t2 is emitter: 10
--> accept(1): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> accept(1): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> accept(1): t1 = 3, t2 = 10
-----> t2 is emitter: 11
--> accept(1): t1 = 1, t2 = 11
--> accept(1): t1 = 2, t2 = 11
--> accept(1): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> accept(1): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> accept(1): t1 = 5, t2 = 11
-----> t2 is emitter: 12
--> accept(1): t1 = 3, t2 = 12
--> accept(1): t1 = 4, t2 = 12
--> accept(1): t1 = 5, t2 = 12
-----> t2 is emitter: 13
--> accept(1): t1 = 5, t2 = 13
-----> t2 is emitter: 14   // 此时源t1中已经没有数据还处于时间窗口有效期内

Javadoc: join(other, leftEnd, rightEnd, resultSelector)

groupJoin

groupJoin 操作符与 join 相同,只是参数传递有所区别。groupJoin(other, leftEnd, rightEnd, resultSelector) 中的resultSelector 可以将原始数据转换为 Observable 类型的数据发送给观察者。

img-groupJoin

示例代码:

    // Observable的创建
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 2. groupJoin(other, leftEnd, rightEnd, resultSelector)
    // groupJoin操作符与join相同,只是参数传递有所区别。
    // resultSelector可以将原始数据转换为Observable类型的数据发送给观察者。
    sourceObservable.groupJoin(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable发射数据的有效期为1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目标Observable发射数据的有效期为1000毫秒
        }
    }, new BiFunction<Long, Observable<Long>, Observable<String>>() {
        @Override
        public Observable<String> apply(Long t1, Observable<Long> t2) throws Exception {
            System.out.println("--> apply(2) combine: " + t1);            // 结合操作
            return t2.map(new Function<Long, String>() {
                @Override
                public String apply(Long t) throws Exception {
                    System.out.println("-----> apply(2) operation: " + t);
                    return "t1 = " + t1 + ", t2 = " + t;
                }
            });
        }
    }).subscribe(new Consumer<Observable<String>>() {
        @Override
        public void accept(Observable<String> stringObservable) throws Exception {
            stringObservable.subscribe(new Consumer<String>() {
                @Override
                public void accept(String t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            });
        }
    });

输出:

-----> t1 is emitter: 1
--> apply(2) combine: 1
-----> t2 is emitter: 10
-----> apply(2) operation: 10
--> accept(2): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> apply(2) combine: 2
-----> apply(2) operation: 10
--> accept(2): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> apply(2) combine: 3
-----> apply(2) operation: 10
--> accept(2): t1 = 3, t2 = 10
-----> t2 is emitter: 11
-----> apply(2) operation: 11
--> accept(2): t1 = 1, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 2, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> apply(2) combine: 4
-----> apply(2) operation: 11
--> accept(2): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> apply(2) combine: 5
-----> apply(2) operation: 11
--> accept(2): t1 = 5, t2 = 11
-----> t2 is emitter: 12
-----> apply(2) operation: 12
--> accept(2): t1 = 3, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 4, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 5, t2 = 12
-----> t2 is emitter: 13
-----> apply(2) operation: 13
--> accept(2): t1 = 5, t2 = 13
-----> t2 is emitter: 14

Javadoc: groupJoin(other, leftEnd, rightEnd, resultSelector)

3. Merge

合并多个Observables的发射物。

img-Merge

使用 Merge 操作符你可以将多个Observables的输出合并,就好像它们是一个单个的 Observable 一样。

3.1 merge

Merge 可能会让合并的Observables发射的数据交错(有一个类似的操作符 Concat 不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物),任何一个原始Observable的 onError 通知会被立即传递给观察者,而且会终止合并后的Observable。

img-merge

除了传递多个Observable给 merge ,你还可以传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable, merge 将合并它们的输出作为单个Observable的输出。

img-merge-observables

如果你传递一个发射Observables序列的Observable,你可以指定 merge 应该同时订阅的 Observable 的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了 onCompleted 通知。

示例代码:

    // 创建Observable对象
    Observable<Integer> odd = Observable.just(1, 3, 5);
    Observable<Integer> even = Observable.just(2, 4, 6);
    Observable<Integer> big = Observable.just(188888, 688888, 888888);

    // 创建list对象
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(odd);
    list.add(even);
    list.add(big);

    // 创建Array对象
    Observable<Integer>[] observables = new Observable[3];
    observables[0] = odd;
    observables[1] = even;
    observables[2] = big;

    // 创建发射Observable序列的Observable
    Observable<ObservableSource<Integer>> sources = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {
        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.just(1, 2));
            emitter.onNext(Observable.just(1, 2, 3));
            emitter.onNext(Observable.just(1, 2, 3, 4));
            emitter.onNext(Observable.just(1, 2, 3, 4, 5));
            emitter.onComplete();
        }
    });

    // 1. merge(ObservableSource source1, ObservableSource source2, ..., ObservableSource source4)
    // 可接受 2-4 个Observable对象进行merge
    Observable.merge(odd, even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 2. merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize)
    // 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每个内部观察资源预取的项数)
    // 接受一个Observable的列表List
    Observable.merge(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 3. mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources)
    // 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每个内部观察资源预取的项数)
    // 接受一个Observable的数组Array
    Observable.mergeArray(observables)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 4. merge(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency)
    // 可选参数, maxConcurrency: 最大的并发处理数
    // 接受一个发射Observable序列的Observable
    Observable.merge(sources)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 5. mergeWith(other)
    // merge 是静态方法, mergeWith 是对象方法: Observable.merge(odd,even) 等价于 odd.mergeWith(even)
    odd.mergeWith(even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(5): " + integer);
                }
            });

输出:

--> accept(1): 1
--> accept(1): 3
--> accept(1): 5
--> accept(1): 2
--> accept(1): 4
--> accept(1): 6
-----------------------------------------------
--> accept(2): 1
--> accept(2): 3
--> accept(2): 5
--> accept(2): 2
--> accept(2): 4
--> accept(2): 6
--> accept(2): 188888
--> accept(2): 688888
--> accept(2): 888888
-----------------------------------------------
--> accept(3): 1
--> accept(3): 3
--> accept(3): 5
--> accept(3): 2
--> accept(3): 4
--> accept(3): 6
--> accept(3): 188888
--> accept(3): 688888
--> accept(3): 888888
-----------------------------------------------
--> accept(4): 1
--> accept(4): 1
--> accept(4): 2
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 5
-----------------------------------------------
--> accept(5): 1
--> accept(5): 3
--> accept(5): 5
--> accept(5): 2
--> accept(5): 4
--> accept(5): 6

Javadoc: merge(source1, ... , source4)
Javadoc: merge(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArray(int maxConcurrency, int bufferSize, ObservableSource... sources)
Javadoc: merge(ObservableSource<ObservableSource> sources, int maxConcurrency)

3.2 mergeDelayError

如果传递给 merge 的任何一个的Observable发射了 onError通知终止了, merge 操作符生成的Observable也会立即以onError通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用 mergeDelayError

img-mergeDelayError

MergeDelayError 操作符,mergeDelayError 在合并与交错输出的使用上与 merge 相同,区别在于它会保留 onError 通知直到其他没有Error的Observable所有的数据发射完成,在那时它才会把onError传递给观察者。

注意: 如果有多个原始Observable出现了Error, 这些Error通知会被合并成一个 CompositeException ,保留在CompositeException 内部的 List<Throwable> exceptions 中,但是如果只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。

由于MergeDelayError使用上和merge相同 ,所以这里就不做详细分析了,这里就简单描述其中的一种的使用实例。

实例代码:

    // 创建有Error的Observable序列的Observable
    Observable<ObservableSource<Integer>> DelayErrorObservable = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {

        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.error(new Exception("Error Test1"))); // 发射一个Error的通知的Observable
            emitter.onNext(Observable.just(2, 3));
            emitter.onNext(Observable.error(new Exception("Error Test2"))); // 发射一个Error的通知的Observable
            emitter.onNext(Observable.just(4, 5, 6));
            emitter.onComplete();
        }
    });

    // 6. mergeDelayError
    // 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
    Observable.mergeDelayError(DelayErrorObservable)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(6)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(6): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    // 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(6): " + exceptions);
                    } else {
                        System.out.println("--> onError(6): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(6)");
                }
            });

输出:

--> onSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> onNext(6): 3
--> onNext(6): 4
--> onNext(6): 5
--> onNext(6): 6
--> onError(6): [java.lang.Exception: Error Test1, java.lang.Exception: Error Test2]

Javadoc: mergeDelayError(source1, … , source4)
Javadoc: mergeDelayError(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArrayDelayError(int maxConcurrency, int bufferSize, ObservableSource… sources)
Javadoc: mergeDelayError(ObservableSource sources, int maxConcurrency)

4. Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个 结合体 发射单个数据项。

img-Zip

Zip 操作符与 Merge 类似,都是合并多个Observables的数据,返回一个Obversable,主要不同的是它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。 它只发射与发射数据项最少的那个Observable一样多的数据。

img-Zip-Sources

解析:

  1. Zip 操作符与 Merge 的使用上基本一致,主要不同的是 zip 发射的数据取决于发射数据项最少的那个Observable并且按照严格的顺序去结合数据。
  2. 同样具备静态方法 zip 与对象方法 zipWith,可以传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable。

使用上在此就不做详细的展开了,可参照上面的 Merge 使用方法,下面就针对 zip 的特性实现一个简单的实例。

实例代码:

    // 创建Observable
    Observable<Integer> observable1 = Observable.just(1, 2, 3);
    Observable<Integer> observable2 = Observable.just(1, 2, 3, 4, 5, 6);
    
    // zip(sources)
    // 可接受2-9个参数的Observable,对其进行顺序合并操作,最终合并的数据项取决于最少的数据项的Observable
    Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer t1, Integer t2) throws Exception {
            System.out.println("--> apply: t1 = " + t1 + ", t2 = " + t2);
            return t1 + t2 + "";
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept: " + s);  // 最终接受observable1全部数据项与observable2相同数量顺序部分数据
        }
    });

输出:

--> apply: t1 = 1, t2 = 1
--> accept: 2
--> apply: t1 = 2, t2 = 2
--> accept: 4
--> apply: t1 = 3, t2 = 3
--> accept: 6

Javadoc: zip( source1, source2, ... , source9, zipper )
Javadoc: zip( Iterable sources, Function zipper )
Javadoc: zipIterable(Iterable<ObservableSource> sources, Function<Object[],R> zipper, boolean delayError, int bufferSize)
Javadoc: zipArray( Function<Object[]> zipper, boolean delayError, int bufferSize, ObservableSource... sources )
Javadoc: zip( ObservableSource<ObservableSource> sources, Function<Object[]> zipper )

5. StartWith

在数据序列的开头插入一条指定的数据项或者数据序列。

img-StartWith

如果你想要一个Observable在发射数据之前先发射一个指定的数据或者数据序列(可以是单个数据、数组、列表,Observable中的数据),可以使 用 StartWith 操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用 Concat 操作符。)

img-StartWith-Items

实例代码:

    // 创建列表List
    List<Integer> lists = new ArrayList<>();
    lists.add(999);
    lists.add(9999);
    lists.add(99999);

    // 创建数组Array
    Integer[] arrays = new Integer[3];
    arrays[0] = 999;
    arrays[1] = 9999;
    arrays[2] = 9999;

    // 1. startWith(item)
    // 在Observable数据发射前发射item数据项
    Observable.just(1, 2, 3)
            .startWith(999)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 2. startWith(Iterable items)
    // 在Observable数据发射前发射items列表中的数据序列
    Observable.just(1, 2, 3)
            .startWith(lists)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 3. startWithArray(items)
    // 在Observable数据发射前发射items数组中的数据序列
    Observable.just(1, 2, 3)
            .startWithArray(arrays)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 4. startWith(ObservableSource other)
    // 在Observable数据发射前发射other中的数据序列
    Observable.just(1, 2, 3)
            .startWith(Observable.just(999, 9999, 99999))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

输出:

--> accept(1): 999
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
-----------------------------------------
--> accept(2): 999
--> accept(2): 9999
--> accept(2): 99999
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
-----------------------------------------
--> accept(3): 999
--> accept(3): 9999
--> accept(3): 9999
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
-----------------------------------------
--> accept(4): 999
--> accept(4): 9999
--> accept(4): 99999
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3

Javadoc: startWith(item)
Javadoc: startWith(Iterable items)
Javadoc: startWithArray(items)
Javadoc: startWith(ObservableSource other)

6. SwitchOnNext

将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些 Observables最近发射的数据项。

6.1 switchOnNext

switchOnNext 订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个, switchOnNext 发射的这个新Observable并取消订阅前一个发射数据的旧Observable,开始发射最新的Observable发射的数据。

img-switchOnNext

注意: 当原始Observables发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在 后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射 的数据将被丢弃(就像图例上的那个黄色圆圈一样)。

6.2 switchOnNextDelayError

Observables发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,如果Observables中的Observable有 Error 异常,将保留 onError 通知直到其他没有Error的Observable所有的数据发射完成,在那时它才会把 onError 传递给观察者。

img-switchOnNextDelayError

注意: 如果有多个原始Observable出现了Error, 这些Error通知会被合并成一个 CompositeException ,保留在CompositeException 内部的 List<Throwable> exceptions 中,但是如果只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。

实例代码:

    // 创建Observable
    Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);

    // 创建发射Observable序列的Observable
    Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            Thread.sleep(1000);
            // 此时发射一个新的observable2,将会取消订阅observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 创建发射含有Error通知的Observable序列的Observable
    Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 发射一个发射Error通知的Observable
            emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 发射一个发射Error通知的Observable
            Thread.sleep(1000);
            // 此时发射一个新的observable2,将会取消订阅observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
    // 可选参数 bufferSize: 缓存数据项大小
    // 接受一个发射Observable序列的Observable类型的sources,
    // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据
    Observable.switchOnNext(sources)
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.in.read();
    System.out.println("--------------------------------------------------------------------");
    // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
    // 可选参数 prefetch: 与读取数据项大小
    // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,
    // 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
    Observable.switchOnNextDelayError(sourcesError)
            .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("--> onNext(2): " + t);
                }

                @Override
                public void onError(Throwable e) {
                    // 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(2): " + exceptions);
                    } else {
                        System.out.println("--> onError(2): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

输出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 10
--> accept(1): 11
--> accept(1): 12
--> accept(1): 13
--> accept(1): 14
--------------------------------------------------------------------
--> onSubscribe(2)
--> onNext(2): 10
--> onNext(2): 11
--> onNext(2): 12
--> onNext(2): 13
--> onNext(2): 14
--> onError(2): [java.lang.Exception: Error Test1!, java.lang.Exception: Error Test2!]

Javadoc: switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
Javadoc: switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)

小结

Rxjava 的合并操作符能够同时处理多个被观察者,并发送相应的事件通知以及数据。常常应用于多业务合并处理场景,比如表单的联动验证,网络交互性数据的校验等,rxjava的合并操作符能够很好的去实现和处理。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容