Android Rxjava框架的原理和使用

原理

Rx是Reactive Extensions的缩写的简写,可以使用可观察数据流对编程接口进行异步编程,它结合了观察者模式,迭代器模式和函数式的精华。

Rxjava是一种异步数据处理库,也是一种观察者模式。最早是Netflix公司用于重构当前架构时减少REST调用的次数,参考了Microsoft公司的响应式编程,把Microsoft的Rx库迁移到Java JVM中,其中最有名的就是RxJava。

它的特点主要有以下:

  1. 支持Java 8 Lambda。
  2. 支持异步和同步。
  3. 单一依赖关系。
  4. 简洁,优雅。

RxAndroid

在开发项目的时候,开发者在使用Rxjava时会搭配RxAndroid,他是针对Rxjava在Android平台使用的一个响应式扩展组件。使用RxAndroid的Schedulers(调度器)可以解决Android主线程问题, 多线程等问题。

观察者模式的四大要素

  1. Observable 被观察者
  2. Observer
  3. 观察者 subscribe 订阅
  4. 事件
image.png

观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到。

扩展的观察者模式

image.png

onNext()订阅了一个事件,当事件完成时会回调onComplete(),在完成过程中发生了异常会回调onError()。

使用

依赖

//在Project的gradle下添加maven仓库
maven { url "https://oss.jfrog.org/libs-snapshot" }

implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

Hello World

//1.创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello world");
                emitter.onComplete();
            }
        });
//2.创建观察者
Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("onSubscribe():");
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println("onNext():" + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("onError():" + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete():");
            }
        };
//3.订阅事件
observable.subscribe(observer);

注意:onError()和onComplete()只会回调一个。

操作符

Creating Observables(创建 Observable)

Create

//链式写法
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello world");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe():"+d.toString());
            }

            @Override
            public void onNext(String o) {
                System.out.println("onNext():" + o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError():" + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete():");
            }
        });

Just
使用将为你创建一个Observable并自动为你调用onNext( )发射数据,just中传递的参数将直接在Observer的onNext()方法中接收到。

Observable.just("hello world").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

From

将一个Iterable, 一个Future, 或者一个数组转换成一个Observable,遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。

List<String> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add("Hello" + i);
        }

        Observable.fromArray(list).subscribe(new Observer<List<String>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<String> strings) {
                System.out.println(strings);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Defer

当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。 以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。

value = "2020/12/13";
    Observable<String> observable = Observable.defer(new Supplier<ObservableSource<? extends String>>() {
        @Override
        public ObservableSource<? extends String> get() throws Throwable {
            return Observable.just(value);
        }
    });
    value = "12345";
    observable.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(String s) {
            System.out.println(s);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

Empty/Never/Throw

Empty是创建一个不发射任何数据但是正常终止的Observable。 Never是创建一个不发射数据也不终止的Observable。 Throw是创建一个不发射数据以一个错误终止的Observable。 这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。

Observable.defer(new Supplier<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> get() throws Throwable {
                return Observable.error(new Throwable("你写了个bug"));
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Object o) {
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });

Interval

创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定1秒一次调用onNext()方法。

//TrampolineScheduler不会立即执行,当其他排队任务结束时才执行,TrampolineScheduler运行在主线程。

Observable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.trampoline()).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull Long aLong) {
                System.out.println(aLong);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Repeat

创建一个Observable,该Observable的事件可以重复调用。

 Observable.just(123).repeat(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Start

返回一个Observable,它发射一个类似于函数声明的值。

Timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。

 Observable.timer(2000, TimeUnit.MILLISECONDS,Schedulers.trampoline()).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                System.out.println(aLong);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Transforming Observables(转换 Observable)

Map

Map就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。

//Integer to String
Observable.just(123).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer s) throws Exception {
                return s.toString();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

flatMap对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。

 Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> apply(Integer integer) throws Exception {
                return Observable.just(integer.toString());
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

GroupBy

根据规则对数据进行分组。

Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer % 2==0?"偶数":"奇数";
            }
        }).subscribe(new Observer<GroupedObservable<String, Integer>>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull final GroupedObservable<String, Integer> arg0) {
                arg0.subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(arg0.getKey() + "-------" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Buffer

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

Observable.just(1, 2, 3, 4, 5,6).buffer(3).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<Integer> integers) {
                System.out.println(integers);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Scan

将数据进行累加。

Observable.range(1, 5).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Window

window和buffer相似,它返回的是一个Observable对象,它根据一系列任务规则把数据聚集到一个列表。

//        window第一个参数count:每个窗口应发射前的最大大小;第二个:在启动新窗口之前需要跳过多少项
        Observable.range(1, 5).window(5, 1).subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(final Observable<Integer> arg0) {
                System.out.println(arg0);
                arg0.subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("---"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Filtering Observables(过滤 Observable)

Debounce

操作间隔一定时间内没有做任何操作,数据才会发送到观察者。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(2000);
                    arg0.onNext(i);
                }
                arg0.onComplete();
            }
        }).debounce(1, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
            }
        });

Distinct

去掉重复数据的操作符。

Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

ElementAt

取出指定位置的数据。

Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Filter

对数据进行指定规则的过滤。

Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {

                return integer > 2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

First

取数据中的第一个数据。

//first参数:defaultItem: 当前Observable不发射任何内容时发出的默认项
        Observable.just(1, 2, 3,4,5).first(10).subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }
        });

IgnoreElements

忽略所有的数据,不向观察者发送数据,直接回调onError或onComplete()。

 Observable.just(6, 9, 1, 3).distinct().ignoreElements().subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onComplete() {

            }

            @Override
            public void onError(Throwable e) {

            }

        });

Last

列表数据最后指定的数位项数据。 SingleObserver只发射一条单一的数据,或者一条异常通知,不能发射完成通知,其中数据与通知只能发射一个。

     Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        }).last("4").subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }
        });
 Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        }).last("4").subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }
        });

Sample

对数据源进行样本采集,发送给观察者。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000);
                    arg0.onNext(i);
                }
                arg0.onComplete();
            }
        }).sample(4, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Skip

跳过指定列表项数据的指定项数据。

Observable.just(6, 3, 2, 1).skip(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

SkipLast

跳过列表数据的最后几位数据。

Observable.just(1, 2, 3, 5).skipLast(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Take

只取列表数据的前几项。

Observable.just(1, 2, 3, 4).take(2).takeLast(1).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

TakeLast

取列表数据项的最后几项数据。 Consumer是简易版的Observer,他有多重重载,可以自定义你需要处理的信息,他只提供一个回调接口accept,由于没有onError和onCompete,无法再 接受到onError或者onCompete之后,实现函数回调。

Observable.just(1, 2, 3, 4).takeLast(2).take(1).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
Combining Observables(组合 Observable)

Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。当其中一个Observable发送数据结束或异常,另外一个也停止发送。

 Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
        Observable.zip(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Merge

合并多个Observables的发射物。

Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
        Observable.merge(observable, observable1).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

StartWith

在数据序列的开头插入一条指定的项。

Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
        Disposable subscribe = observable.startWith(observable1).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

CombineLatest

当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

Observable<Integer> observable = Observable.just(1, 3, 5);
        Observable<Integer> observable1 = Observable.just(2, 4, 6);
        Observable.combineLatest(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Throwable {
                System.out.println("integer:" + integer + "---" + "integer2:" + integer2);
                return integer + integer2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Join

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。

String[] args1 = new String[]{"张欣1", "张欣2", "张欣3", "张欣4", "张欣5"};
    String[] args2 = new String[]{"春晓1", "春晓2", "春晓3", "春晓4"};
    Observable<String> o1 = Observable.fromArray(args1);
    Observable<String> o2 = Observable.fromArray(args2);
    //相同的数组可以进行合并
    o2.join(o1, new Function<String, Observable<Long>>() {
        @Override
        public Observable<Long> apply(String s) throws Exception {
            return Observable.timer(2, TimeUnit.SECONDS);
        }
    }, new Function<String, Observable<Long>>() {
        @Override
        public Observable<Long> apply(String s) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }
    }, new BiFunction<String, String, String>() {
        @Override
        public String apply(String s, String s2) throws Exception {
            return s + "-&--" + s2;
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {

        }

        @Override
        public void onNext(@NonNull String s) {
            System.out.println(s);
        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

SwitchOnNext

将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。

        final Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
        final Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);
        Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
            @Override
            public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
                emitter.onNext(observable1);
                Thread.sleep(1000);
                // 此时发射一个新的observable2,将会取消订阅observable1
                emitter.onNext(observable2);
                emitter.onComplete();
            }
        });

        // 创建发射含有Error通知的Observable序列的Observable
        Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

            @Override
            public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
                emitter.onNext(observable1);
//                emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 发射一个发射Error通知的Observable
//                emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 发射一个发射Error通知的Observable
                Thread.sleep(1000);
                // 此时发射一个新的observable2,将会取消订阅observable1
                emitter.onNext(observable2);
                emitter.onComplete();
            }
        });

        // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
        // 可选参数 bufferSize: 缓存数据项大小
        // 接受一个发射Observable序列的Observable类型的sources,
        // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据
        Disposable subscribe = Observable.switchOnNext(sources)
                .subscribe(new Consumer<Long>() {

                    @Override
                    public void accept(Long integer) throws Exception {
                        System.out.println("--> accept(1): " + integer);
                    }
                });
        System.out.println("--------------------------------------------------------------------");
        // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
        // 可选参数 prefetch: 与读取数据项大小
        // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,
        // 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
        Observable.switchOnNextDelayError(sourcesError)
                .subscribe(new Observer<Long>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe(2)");
                    }

                    @Override
                    public void onNext(Long t) {
                        System.out.println("--> onNext(2): " + t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        // 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
                        if (e instanceof CompositeException) {
                            CompositeException compositeException = (CompositeException) e;
                            List<Throwable> exceptions = compositeException.getExceptions();
                            System.out.println("--> onError(2): " + exceptions);
                        } else {
                            System.out.println("--> onError(2): " + e);
                        }
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("--> onComplete(2)");
                    }
                });

Error Handling Operators(处理错误)

Catch

Catch操作符拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。 还有一个叫onErrorResumeNext的操作符,它的行为与Catch相似。 RxJava将Catch实现为三个不同的操作符:

  • onErrorReturn 让Observable遇到错误时发射一个特殊的项并且正常终止。
  • onErrorResumeNext 让Observable在遇到错误时开始发射第二个Observable的数据序。
  • onExceptionResumeNext 让Observable在遇到错误时继续发射后面的数据项。
Observable.just(1,2,3).onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Throwable {
                return  null;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });

Retry

如果原始Observable遇到错误,重新订阅它期望它能正常终止。 retryWhen和retry类似,区别是,retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。 retryWhen默认在trampoline调度器上执行,你可以通过参数指定其它的调度器。

场景:网络请求失败重试操作。

final AtomicInteger atomicInteger = new AtomicInteger(3);
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext(String.valueOf(System.currentTimeMillis()));
                emitter.onError(new Error(String.valueOf(atomicInteger.decrementAndGet())));
            }
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Throwable {
                return throwableObservable;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });

更多相关文章

Android如何进阶:http://docs.qq.com/doc/DWHFqVHBMVEJPWUx
Android面试题汇总:http://docs.qq.com/doc/DWGZIRFh5VEtYWE1D
Android音视频需要学习哪些:http://docs.qq.com/doc/DWFFWZHNPTHZVdHFX
Android常有的开源框架有哪些框:docs.qq.com/doc/DWHlGYUdseVhsSUda
Android车载应需要学习哪些:docs.qq.com/doc/DWEl0blBabXVvU2Nw Android
Framework怎么学:docs.qq.com/doc/DWFdlc2JocEtNbEJ1

Schedulers(调度器)

它是RxJava以一种及其简单的方式解决多线程问题的机制。

种类

io() 用于I/O操作。 computation() 计算,计算工作默认的调度器,与I/O操作无关。 immediate() 立即执行,允许立即在当前线程执行你指定的工作。 newThread() 新线程,为指定任务创建新线程。 trampoline() 顺序处理,按需处理队列,并运行队列的每一个任务。

AndroidSchedulers

RxAndroid提供在Android平台的调度器(指定观察者在主线程)。

  • SubscribeOn 方法用于每个Observable对象
  • ObserveOn 方法用于每个Subscriber(Observer)对象
 observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .subscribe(observer);

使用场景

与Retrofit结合使用

retrofitBuilder = new Retrofit.Builder();
        retrofitBuilder.client(okHttpClient)
                .addConverterFactory(ScalarsConverterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create());
                
public <T> void doCall(LifecycleOwner owner, Observable<T> observable, final HttpCallBack<T> httpCallBack) {

        if (observable == null || httpCallBack == null) {
            throw new IllegalArgumentException("observable或HttpCallBack为空");
        }

        //观察者_网络请求状态
        BaseObserver<T> observer = new BaseObserver<T>() {
            @Override
            public void onNext(T t) {
                try {
                    if (t != null) {
                        httpCallBack.onSuccess(t);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    httpCallBack.onFailure(e);
                }
            }

            @Override
            public void onError(Throwable e) {
                httpCallBack.onFailure(e);
            }

        };

        if (owner == null) {
            //被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
            observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(getAppErrorHandler())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .subscribe(observer);
        } else {
            //被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
            observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(getAppErrorHandler())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
                    .subscribe(observer);
        }
    }

与RxPermission结合使用

RxPermission是基于RxJava的Android动态权限申请框架。

使用(简单封装)

  public void initPermissions(String[] permissions, PermissionResult permissionResult) {
        if (rxPermissions == null) {
            rxPermissions = new RxPermissions(this);
        }
        rxPermissions.requestEachCombined(permissions)
                .subscribe(permission -> {
                    if (permission.granted) {
                        permissionResult.onSuccess();
                    } else if (permission.shouldShowRequestPermissionRationale) {
                        permissionResult.onFailure();
                    } else {
                        permissionResult.onFailureWithNeverAsk();
                    }
                });
    }

代替EventBus

EventBus是一个Android端优化的publish/subscribe消息总线,简化了应用程序内各组件间、组件与后台线程间的通信。更多相关请参考Android事件总线之EventBus。 RxJava也可以实现事件总线,因为它们都依据于观察者模式。我们使用RxJava替换EventBus,可以减少App的体积。

使用

private static volatile RxBus instance;
    private PublishSubject<Object> mRxtBus;
    
    public static RxBus getDefault() {
        if (instance == null) {
            synchronized (RxBus.class) {
                instance = new RxBus();
            }
        }
        return instance;
    }

    private RxBus() {
        mRxtBus = PublishSubject.create();
    }

    public void post(String tag, Object event) {
        Message msg = new Message(tag, event);
        mRxtBus.onNext(msg);
    }

    public <T> Observable<T> toEvent(Class<T> eventType) {
        return mRxtBus.ofType(eventType);
    }
 }

发送

RxBus.getDefault().post("payValue",code);

接收

subscribe = RxBus.getDefault().toEvent(RxBus.Message.class).subscribe(new Consumer<RxBus.Message>() {
            @Override
            public void accept(RxBus.Message message) throws Throwable {
                if ("payValue".equals(message.getTag())) {
                    Log.e("yhj", "accept: " + message.getEvent().toString());
                }
            }
        });

解绑

if (subscribe != null && !subscribe.isDisposed()) {
            subscribe.dispose();
        }

Rxjava内存泄漏的处理

Rxjava的使用不当会导致内存泄漏,使用AutoDispose可以解决这个问题,它是一个随Android生命周期事件自动解绑Rxjava订阅的方便工具。

使用

结合JetPack的LifeCycle(生命周期感知型组件),根据生命周期取消订阅。

//被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
            observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(getAppErrorHandler())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
                    .subscribe(observer);

总结

本文主要是对RxJava使用及Android常见使用场景进行总结,掌握这些还远远不够,RxJava还有许多强大的功能,诸如从磁盘/内存中获取缓存数据,背压策略,联想搜索优化等等。后面在项目开发中遇到相关场景再进行总结,更新。本文若有不当之处,请批评指正。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容