RxJava2.x常用操作符总结(一)

一、创建操作符

1、create

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {

    @Override

    public void subscribe(ObservableEmitter<String> e) throws Exception {

        e.onNext("Hello Observer");

        e.onComplete();

    }

});

说明:创建一个被观察者Observable

2、just

Observable.just(1, 2, 3)

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "=================onSubscribe");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "=================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "=================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "=================onComplete ");

    }

});

打印:

=================onSubscribe

=================onNext 1

=================onNext 2

=================onNext 3

=================onComplete

说明:创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。

3、fromArray

Integer[] array = new Integer[]{1, 2, 3, 4};

        Observable.fromArray(array)

                .subscribe(new Observer < Integer > () {

                    @Override

                    public void onSubscribe(Disposable d) {

                        Log.d(TAG, "=================onSubscribe");

                    }

                    @Override

                    public void onNext(Integer integer) {

                        Log.d(TAG, "=================onNext " + integer);

                    }

                    @Override

                    public void onError(Throwable e) {

                        Log.d(TAG, "=================onError ");

                    }

                    @Override

                    public void onComplete() {

                        Log.d(TAG, "=================onComplete ");

                    }

                });

打印:

=================onSubscribe

=================onNext 1

=================onNext 2

=================onNext 3

=================onNext 4

=================onComplete

说明:这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。

4、fromCallable

Observable.fromCallable(new Callable < Integer > () {

    @Override

    public Integer call() throws Exception {

        return 1;

    }

})

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "================accept " + integer);

    }

});

================accept 1

说明:这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。

5、fromFuture

FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {

    @Override

    public String call() throws Exception {

        Log.d(TAG, "CallableDemo is Running");

        return "返回结果";

    }

});

Observable.fromFuture(futureTask)

    .doOnSubscribe(new Consumer < Disposable > () {

    @Override

    public void accept(Disposable disposable) throws Exception {

        futureTask.run();

    }

})

.subscribe(new Consumer < String > () {

    @Override

    public void accept(String s) throws Exception {

        Log.d(TAG, "================accept " + s);

    }

});

打印:

CallableDemo is Running

================accept 返回结果

说明:参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。

doOnSubscribe() 的作用就是只有订阅时才会发送事件。

6、fromIterable

List<Integer> list = new ArrayList<>();

list.add(0);

list.add(1);

list.add(2);

list.add(3);

Observable.fromIterable(list)

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "=================onSubscribe");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "=================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "=================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "=================onComplete ");

    }

});

打印:

=================onSubscribe

=================onNext 0

=================onNext 1

=================onNext 2

=================onNext 3

=================onComplete

说明:直接发送一个 List 集合数据给观察者

8、defer

// i 要定义为成员变量

Integer i = 100;


Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {

    @Override

    public ObservableSource<? extends Integer> call() throws Exception {

        return Observable.just(i);

    }

});

i = 200;

Observer<Integer> observer = new Observer<Integer>() {

    @Override

    public void onSubscribe(Disposable d) {

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onComplete() {

    }

};

observable.subscribe(observer);

i = 300;

observable.subscribe(observer);

打印:

================onNext 200

================onNext 300

说明:因为 defer() 只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会打印一次,并且都是打印 i 最新的值。

9、timer

Observable.timer(5, TimeUnit.SECONDS)

                .subscribe(new Observer<Long>() {

                    @Override

                    public void onSubscribe(Disposable d) {

                        Log.d(TAG, "===============onSubscribe " + d);

                    }

                    @Override

                    public void onNext(Long aLong) {

                        Log.d(TAG, "===============onNext " + aLong);

                    }

                    @Override

                    public void onError(Throwable e) {

                    }

                    @Override

                    public void onComplete() {

                    }

                });

打印:

2019-08-19 13:30:09.819 26417-26417/com.xbox D/SDK: ===============onSubscribe null

2019-08-19 13:30:14.820 26417-26446/com.xbox D/SDK: ===============onNext 0

说明:timer的第一个参数是延时5秒,第二个参数是单位。当执行完onSubscribe方法后延时5秒执行onNext方法,返回的参数为0

10、interval

Observable.interval(2, TimeUnit.SECONDS)

                .subscribe(new Observer < Long > () {

                    @Override

                    public void onSubscribe(Disposable d) {

                        Log.d(TAG, "==============onSubscribe ");

                    }

                    @Override

                    public void onNext(Long aLong) {

                        Log.d(TAG, "==============onNext " + aLong);

                    }

                    @Override

                    public void onError(Throwable e) {

                    }

                    @Override

                    public void onComplete() {

                    }

                });

打印:

==============onSubscribe

==============onNext 0

==============onNext 1

==============onNext 2

==============onNext 3

.............

.............

说明:第一个参数为每隔多少秒执行onNext方法(从0开始),当执行完onSubscribe后,每隔2秒执行onNext方法

11、intervalRange

Observable.intervalRange(4, 5, 8, 3, TimeUnit.SECONDS)

                .subscribe(new Observer < Long > () {

                    @Override

                    public void onSubscribe(Disposable d) {

                        Log.d(TAG, "==============onSubscribe ");

                    }

                    @Override

                    public void onNext(Long aLong) {

                        Log.d(TAG, "==============onNext " + aLong);

                    }

                    @Override

                    public void onError(Throwable e) {

                    }

                    @Override

                    public void onComplete() {

                    }

                });

打印:

==============onSubscribe

==============onNext 4

==============onNext 5

==============onNext 6

==============onNext 7

==============onNext 8

说明:

第一个参数:从4开始计数

第二个参数:一共执行onNext5次(五个值)

第三个参数:当执行完onSubscribe方法之后,隔8秒开始执行第一个onNext方法

第四个参数:执行完第一个onNext之后,每隔3秒执行下一个onNext方法

依次+1递增

12、range

Observable.range(2, 5)

                .subscribe(new Observer < Integer > () {

                    @Override

                    public void onSubscribe(Disposable d) {

                        Log.d(TAG, "==============onSubscribe ");

                    }

                    @Override

                    public void onNext(Integer aLong) {

                        Log.d(TAG, "==============onNext " + aLong);

                    }

                    @Override

                    public void onError(Throwable e) {

                    }

                    @Override

                    public void onComplete() {

                    }

                });

打印:

==============onSubscribe

==============onNext 2

==============onNext 3

==============onNext 4

==============onNext 5

==============onNext 6

说明:从2开始执行5次onNext方法,+1递增

13、rangeLong

说明:与 range() 一样,只是数据类型为 Long 这里就不上代码了

14、 empty() & never() & error()

Observable.empty()

.subscribe(new Observer < Object > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe");

    }

    @Override

    public void onNext(Object o) {

        Log.d(TAG, "==================onNext");

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError " + e);

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete");

    }

});

说明:

empty():直接发送 onComplete() 事件

never():不发送任何事件

error():发送 onError() 事件

二、转换操作符

1、map

Observable.just(1, 2, 3)

.map(new Function < Integer, String > () {

    @Override

    public String apply(Integer integer) throws Exception {

        return "I'm " + integer;

    }

})

.subscribe(new Observer < String > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.e(TAG, "===================onSubscribe");

    }

    @Override

    public void onNext(String s) {

        Log.e(TAG, "===================onNext " + s);

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onComplete() {

    }

});

打印:

===================onSubscribe

===================onNext I'm 1

===================onNext I'm 2

===================onNext I'm 3

说明:map 可以将被观察者发送的数据类型转变成其他的类型,上述代码的作用是将 Integer 类型的数据转换成 String。

2、flatMap

List<Person> personList = new ArrayList<>();

        List<Plan> plansList = new ArrayList<>();

        List<String> actionList = new ArrayList<>();

        actionList.add("玩游戏");

        actionList.add("写作业");

        actionList.add("看书");

        Plan plan = new Plan("1", "小明的计划");

        plan.setActionList(actionList);

        plansList.add(plan);

        Person person = new Person("小明", plansList);

        personList.add(person);

        List<String> actionList2 = new ArrayList<>();

        List<Plan> plansList2 = new ArrayList<>();

        actionList2.add("开电脑");

        actionList2.add("打王者");

        actionList2.add("吃鸡");

        Plan plan2 = new Plan("2", "小红的计划");

        plan2.setActionList(actionList2);

        plansList2.add(plan2);

        Person person2 = new Person("小红", plansList2);

        personList.add(person2);

        Observable.fromIterable(personList)

                .flatMap(new Function<Person, ObservableSource<Plan>>() {

                    @Override

                    public ObservableSource<Plan> apply(Person person) {

                        return Observable.fromIterable(person.getPlanList());

                    }

                })

                .flatMap(new Function<Plan, ObservableSource<String>>() {

                    @Override

                    public ObservableSource<String> apply(Plan plan) throws Exception {

                        return Observable.fromIterable(plan.getActionList());

                    }

                })

                .subscribe(new Observer<String>() {

                    @Override

                    public void onSubscribe(Disposable d) {

                    }

                    @Override

                    public void onNext(String s) {

                        Log.d(TAG, "==================action: " + s);

                    }

                    @Override

                    public void onError(Throwable e) {

                    }

                    @Override

                    public void onComplete() {

                    }

                });

public class Person {

        private String name;

        private List<Plan> planList;

        public Person(String name, List<Plan> planList) {

            this.name = name;

            this.planList = planList;

        }

        public String getName() {

            return name;

        }

        public void setName(String name) {

            this.name = name;

        }

        public List<Plan> getPlanList() {

            return planList;

        }

        public void setPlanList(List<Plan> planList) {

            this.planList = planList;

        }

    }

    public class Plan {

        private String time;

        private String content;

        private List<String> actionList = new ArrayList<>();

        public Plan(String time, String content) {

            this.time = time;

            this.content = content;

        }

        public String getTime() {

            return time;

        }

        public void setTime(String time) {

            this.time = time;

        }

        public String getContent() {

            return content;

        }

        public void setContent(String content) {

            this.content = content;

        }

        public List<String> getActionList() {

            return actionList;

        }

        public void setActionList(List<String> actionList) {

            this.actionList = actionList;

        }

    }

打印:

==================action: 玩游戏

==================action: 写作业

==================action: 看书

==================action: 开电脑

==================action: 打王者

==================action: 吃鸡

说明:通过flatMap打印出所有的action

3、concatMap

说明:与flatMap用法基本一样,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的

4、buffer

Observable.just(1, 2, 3, 4, 5,6,7)

                .buffer(3, 2)

                .subscribe(new Observer < List < Integer >> () {

                    @Override

                    public void onSubscribe(Disposable d) {

                    }

                    @Override

                    public void onNext(List < Integer > integers) {

                        Log.d(TAG, "================缓冲区大小: " + integers.size());

                        for (Integer i: integers) {

                            Log.d(TAG, "================元素: " + i);

                        }

                    }

                    @Override

                    public void onError(Throwable e) {

                    }

                    @Override

                    public void onComplete() {

                    }

                });

打印:

================缓冲区大小: 3

================元素: 1

================元素: 2

================元素: 3

================缓冲区大小: 3

================元素: 3

================元素: 4

================元素: 5

================缓冲区大小: 3

================元素: 5

================元素: 6

================元素: 7

================缓冲区大小: 1

================元素: 7

说明:buffer从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。

第一个参数代表缓冲区元素的数量,第二个参数表示下一次事件序列的时候要跳过多少元素,比如例子中第二个参数是2,那么在次遍历就从3开始,跳过了1和2。

5、groupBy

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

                .groupBy(new Function<Integer, Integer>() {

                    @Override

                    public Integer apply(Integer integer) throws Exception {

                        return integer % 3;

                    }

                })

                .subscribe(new Observer<GroupedObservable<Integer, Integer>>() {

                    @Override

                    public void onSubscribe(Disposable d) {

                        Log.d(TAG, "====================onSubscribe ");

                    }

                    @Override

                    public void onNext(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {

                        Log.d(TAG, "====================onNext ");

                        integerIntegerGroupedObservable.subscribe(new Observer<Integer>() {

                            @Override

                            public void onSubscribe(Disposable d) {

                                Log.d(TAG, "====================GroupedObservable onSubscribe ");

                            }

                            @Override

                            public void onNext(Integer integer) {

                                Log.d(TAG, "====================GroupedObservable onNext  groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);

                            }

                            @Override

                            public void onError(Throwable e) {

                                Log.d(TAG, "====================GroupedObservable onError ");

                            }

                            @Override

                            public void onComplete() {

                                Log.d(TAG, "====================GroupedObservable onComplete ");

                            }

                        });

                    }

                    @Override

                    public void onError(Throwable e) {

                        Log.d(TAG, "====================onError ");

                    }

                    @Override

                    public void onComplete() {

                        Log.d(TAG, "====================onComplete ");

                    }

                });

打印:

====================onSubscribe

====================onNext

====================GroupedObservable onSubscribe

====================GroupedObservable onNext  groupName: 1 value: 1

====================onNext

====================GroupedObservable onSubscribe

====================GroupedObservable onNext  groupName: 2 value: 2

====================onNext

====================GroupedObservable onSubscribe

====================GroupedObservable onNext  groupName: 0 value: 3

====================GroupedObservable onNext  groupName: 1 value: 4

====================GroupedObservable onNext  groupName: 2 value: 5

====================GroupedObservable onNext  groupName: 0 value: 6

====================GroupedObservable onNext  groupName: 1 value: 7

====================GroupedObservable onNext  groupName: 2 value: 8

====================GroupedObservable onNext  groupName: 0 value: 9

====================GroupedObservable onNext  groupName: 1 value: 10

====================GroupedObservable onComplete

====================GroupedObservable onComplete

====================onComplete

说明:在 groupBy() 方法返回的参数是分组的名字(integerIntegerGroupedObservable.getKey()),每返回一个值,那就代表会创建一个组,以上的代码就是将1~10的数据分成3组。

6、scan

Observable.just(1, 2, 3)

.scan(new BiFunction < Integer, Integer, Integer > () {

    @Override

    public Integer apply(Integer integer, Integer integer2) throws Exception {

        Log.d(TAG, "====================apply ");

        Log.d(TAG, "====================integer " + integer);

        Log.d(TAG, "====================integer2 " + integer2);

        return integer + integer2;

    }

})

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "====================accept " + integer);

    }

});

打印:

====================accept 1

====================apply

====================integer 1

====================integer2 2

====================accept 3

====================apply

====================integer 3

====================integer2 3

====================accept 6

说明:scan将数据以一定的逻辑聚合起来,相当于从第一个元素开始(1)开始  元素1和元素2相加和与元素3相加

7、window

Observable.just(1, 2, 3, 4, 5)

.window(2)

.subscribe(new Observer < Observable < Integer >> () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "=====================onSubscribe ");

    }

    @Override

    public void onNext(Observable < Integer > integerObservable) {

        integerObservable.subscribe(new Observer < Integer > () {

            @Override

            public void onSubscribe(Disposable d) {

                Log.d(TAG, "=====================integerObservable onSubscribe ");

            }

            @Override

            public void onNext(Integer integer) {

                Log.d(TAG, "=====================integerObservable onNext " + integer);

            }

            @Override

            public void onError(Throwable e) {

                Log.d(TAG, "=====================integerObservable onError ");

            }

            @Override

            public void onComplete() {

                Log.d(TAG, "=====================integerObservable onComplete ");

            }

        });

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "=====================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "=====================onComplete ");

    }

});

打印:

=====================onSubscribe

=====================integerObservable onSubscribe

=====================integerObservable onNext 1

=====================integerObservable onNext 2

=====================integerObservable onComplete

=====================integerObservable onSubscribe

=====================integerObservable onNext 3

=====================integerObservable onNext 4

=====================integerObservable onComplete

=====================integerObservable onSubscribe

=====================integerObservable onNext 5

=====================integerObservable onComplete

=====================onComplete

说明:window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。例中,window() 将 1~5 的事件分成了3组。

三、组合操作符

1、concat

Observable.concat(Observable.just(1, 2),

Observable.just(3, 4),

Observable.just(5, 6),

Observable.just(7, 8))

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onComplete() {

    }

});

打印:

================onNext 1

================onNext 2

================onNext 3

================onNext 4

================onNext 5

================onNext 6

================onNext 7

================onNext 8

说明:可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。

2、concatArray

Observable.concatArray(Observable.just(1, 2),

Observable.just(3, 4),

Observable.just(5, 6),

Observable.just(7, 8),

Observable.just(9, 10))

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onComplete() {

    }

});

打印:

================onNext 1

================onNext 2

================onNext 3

================onNext 4

================onNext 5

================onNext 6

================onNext 7

================onNext 8

================onNext 9

================onNext 10

说明:与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。

3、merge

Observable.merge(

Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {

    @Override

    public String apply(Long aLong) throws Exception {

        return "A" + aLong;

    }

}),

Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {

    @Override

    public String apply(Long aLong) throws Exception {

        return "B" + aLong;

    }

}))

    .subscribe(new Observer < String > () {

    @Override

    public void onSubscribe(Disposable d) {

    }

    @Override

    public void onNext(String s) {

        Log.d(TAG, "=====================onNext " + s);

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onComplete() {

    }

});

打印:

=====================onNext A0

=====================onNext B0

=====================onNext A1

=====================onNext B1

=====================onNext B2

=====================onNext A2

=====================onNext B3

=====================onNext A3

=====================onNext A4

=====================onNext B4

=====================onNext A5

=====================onNext B5

..............

说明:这个方法月 concat() 作用基本一样,知识 concat() 是串行发送事件,而 merge() 并行发送事件。

把merge换成concat之后打印如下:

=====================onNext A0

=====================onNext A1

=====================onNext A2

=====================onNext A3

=====================onNext A4

..............

说明:只有等到第一个被观察者发送完事件之后,第二个被观察者才会发送事件。mergeArray() 与 merge() 的作用是一样的,只是它可以发送4个以上的被观察者,这里就不再赘述了。

4、concatArrayDelayError() & mergeArrayDelayError()

Observable.concatArray(

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onError(new NumberFormatException());

    }

}), Observable.just(2, 3, 4))

    .subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "===================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "===================onError ");

    }

    @Override

    public void onComplete() {

    }

});

打印:

===================onNext 1

===================onError

说明:在 concatArray() 和 mergeArray() 两个方法当中,如果其中有一个被观察者发送了一个 Error 事件,那么就会停止发送事件,如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话,就可以使用  concatArrayDelayError() 和 mergeArrayDelayError();

从结果可以知道,确实中断了,现在换用 concatArrayDelayError(),代码如下:

Observable.concatArrayDelayError(

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onError(new NumberFormatException());

    }

}), Observable.just(2, 3, 4))

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "===================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "===================onError ");

    }

    @Override

    public void onComplete() {

    }

});

打印:

===================onNext 1

===================onNext 2

===================onNext 3

===================onNext 4

===================onError

说明:出现错误延迟回调

5、zip

Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)

    .map(new Function<Long, String>() {

        @Override

        public String apply(Long aLong) throws Exception {

            String s1 = "A" + aLong;

            Log.d(TAG, "===================A 发送的事件 " + s1);

            return s1;

        }}),

        Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)

            .map(new Function<Long, String>() {

            @Override

            public String apply(Long aLong) throws Exception {

                String s2 = "B" + aLong;

                Log.d(TAG, "===================B 发送的事件 " + s2);

                return s2;

            }

        }),

        new BiFunction<String, String, String>() {

            @Override

            public String apply(String s, String s2) throws Exception {

                String res = s + s2;

                return res;

            }

        })

.subscribe(new Observer<String>() {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "===================onSubscribe ");

    }

    @Override

    public void onNext(String s) {

        Log.d(TAG, "===================onNext " + s);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "===================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "===================onComplete ");

    }

});

打印:

===================onSubscribe

===================A 发送的事件 A1

===================B 发送的事件 B1

===================onNext A1B1

===================A 发送的事件 A2

===================B 发送的事件 B2

===================onNext A2B2

===================A 发送的事件 A3

===================B 发送的事件 B3

===================onNext A3B3

===================A 发送的事件 A4

===================B 发送的事件 B4

===================onNext A4B4

===================A 发送的事件 A5

===================B 发送的事件 B5

===================onNext A5B5

===================onComplete

说明:会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样,上面代码中有两个 Observable,第一个发送事件的数量为5个,第二个发送事件的数量为6个。可以发现最终接收到的事件数量是5,那么为什么第二个 Observable 没有发送第6个事件呢?因为在这之前第一个 Observable 已经发送了 onComplete 事件,所以第二个 Observable 不会再发送事件。

6、combineLatest() & combineLatestDelayError()

Observable.combineLatest(

Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)

    .map(new Function < Long, String > () {@Override

    public String apply(Long aLong) throws Exception {

        String s1 = "A" + aLong;

        Log.d(TAG, "===================A 发送的事件 " + s1);

        return s1;

    }

}),

Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)

    .map(new Function < Long, String > () {@Override

    public String apply(Long aLong) throws Exception {

        String s2 = "B" + aLong;

        Log.d(TAG, "===================B 发送的事件 " + s2);

        return s2;

    }

}),

new BiFunction < String, String, String > () {@Override

    public String apply(String s, String s2) throws Exception {

        String res = s + s2;

        return res;

    }

})

.subscribe(new Observer < String > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "===================onSubscribe ");

    }

    @Override

    public void onNext(String s) {

        Log.d(TAG, "===================最终接收到的事件 " + s);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "===================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "===================onComplete ");

    }

});

打印:

===================onSubscribe

===================A 发送的事件 A1

===================A 发送的事件 A2

===================B 发送的事件 B1

===================最终接收到的事件 A2B1

===================A 发送的事件 A3

===================最终接收到的事件 A3B1

===================A 发送的事件 A4

===================B 发送的事件 B2

===================最终接收到的事件 A4B1

===================最终接收到的事件 A4B2

===================B 发送的事件 B3

===================最终接收到的事件 A4B3

===================B 发送的事件 B4

===================最终接收到的事件 A4B4

===================B 发送的事件 B5

===================最终接收到的事件 A4B5

===================onComplete

说明:combineLatest() 的作用与 zip() 类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送,分析上面的代码,Observable A 会每隔1秒就发送一次事件,Observable B 会隔2秒发送一次事件。当发送 A1 事件之后,因为 B 并没有发送任何事件,所以根本不会发生结合。当 B 发送了 B1 事件之后,就会与 A 最近发送的事件 A2 结合成 A2B1,这样只有后面一有被观察者发送事件,这个事件就会与其他被观察者最近发送的事件结合起来了。

因为 combineLatestDelayError() 就是多了延迟发送 onError() 功能,这里就不再赘述了。

7、reduce

Observable.just(3, 7, 10, 2)

.reduce(new BiFunction < Integer, Integer, Integer > () {

    @Override

    public Integer apply(Integer integer, Integer integer2) throws Exception {

        int res = integer + integer2;

        Log.d(TAG, "====================integer " + integer);

        Log.d(TAG, "====================integer2 " + integer2);

        Log.d(TAG, "====================res " + res);

        return res;

    }

})

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "==================accept " + integer);

    }

});

打印:

====================integer 3

====================integer2 7

====================res 10

====================integer 10

====================integer2 10

====================res 20

====================integer 20

====================integer2 2

====================res 22

==================accept 22

说明:与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来,这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者。从结果可以看到,其实就是前2个数据聚合之后,然后再与后1个数据进行聚合,一直到没有数据为止,把最终的结果通过观察者一次输出。

8、collect

Observable.just(1, 2, 3, 4)

.collect(new Callable < ArrayList < Integer >> () {

    @Override

    public ArrayList < Integer > call() throws Exception {

        return new ArrayList < > ();

    }

},

new BiConsumer < ArrayList < Integer > , Integer > () {

    @Override

    public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {

        integers.add(integer);

    }

})

.subscribe(new Consumer < ArrayList < Integer >> () {

    @Override

    public void accept(ArrayList < Integer > integers) throws Exception {

        Log.d(TAG, "===============accept " + integers);

    }

});

打印:

===============accept [1, 2, 3, 4]

说明:

将数据收集到数据结构当中。

9、startWith() & startWithArray()

Observable.just(5, 6, 7)

.startWithArray(2, 3, 4)

.startWith(1)

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "================accept " + integer);

    }

});

打印:

================accept 1

================accept 2

================accept 3

================accept 4

================accept 5

================accept 6

================accept 7

说明:在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。

10、count

Observable.just(1, 2, 3)

.count()

.subscribe(new Consumer < Long > () {

    @Override

    public void accept(Long aLong) throws Exception {

        Log.d(TAG, "=======================aLong " + aLong);

    }

});

打印:

=======================aLong 3

说明:返回被观察者发送事件的数量。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容