网络编程 -- RxJava

一、概念

RxJava是一个基于事件流、实现异步操作的库。

二、特点

由于RxJava是基于事件流的链式调用,因此具有逻辑简洁、实现优雅、使用简单等特点。

三、原理

RxJava是基于一种扩展的观察者模式,Rxjava的扩展观察者模式中有4个角色:
被观察者(Observable):产生事件。
观察者(Observer):接收事件,并给出响应动作。
订阅(Subscribe):连接被观察者与观察者。
事件(Event):被观察者与观察者沟通的载体。

被观察者(Observable)通过 订阅(Subscribe)按顺序发送事件 给观察者(Observer), 观察者(Observer)按顺序接收事件并作出对应的响应动作。

四、使用

添加依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
// 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在

1.基本使用

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "Rxjava";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        // RxJava的流式操作
        Observable.create(new ObservableOnSubscribe<Integer>() {
        // 1. 创建被观察者 & 生产事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            // 2. 通过通过订阅(subscribe)连接观察者和被观察者
            // 3. 创建观察者 & 定义响应事件的行为
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件"+ value +"作出响应"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

        });
    }
} 

方法调用顺序:观察者.onSubscribe() > 被观察者.subscribe() > 观察者.onNext() > 观察者.onComplete()。

2.函数式接口

RxJava 2.x 提供了多个函数式接口 ,用于实现简便式的观察者模式。
以Consumer为例:

Observable.just("hello").subscribe(new Consumer<String>() {
            // 每次接收到Observable的事件都会调用Consumer.accept()
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

3.Observer#subscribe

观察者Observer的subscribe()方法具备多个重载的方法。
定义如下:

public final Disposable subscribe() {}
// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)

public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件作出响应

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
// 表示观察者只对被观察者发送的Next事件及Error事件作出响应

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件及Complete事件作出响应

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件及onSubscribe事件作出响应

public final void subscribe(Observer<? super T> observer) {}
// 表示观察者对被观察者发送的任何事件都作出响应

4.Disposable#dispose

Disposable的dispose()方法可切断观察者与被观察者之间的连接,即观察者无法继续接收被观察者的事件,但被观察者还是可以继续发送事件。
例子:

// 主要在观察者 Observer中 实现
Observer<Integer> observer = new Observer<Integer>() {
    // 1. 定义Disposable类变量
    private Disposable mDisposable;

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "开始采用subscribe连接");
        // 2. 对Disposable类变量赋值
        mDisposable = d;
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "对Next事件"+ value +"作出响应"  );
        if (value == 2) {
            // 设置在接收到第二个事件后切断观察者和被观察者的连接
            mDisposable.dispose();
            Log.d(TAG, "已经切断了连接:" + mDisposable.isDisposed());
        }
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "对Error事件作出响应");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "对Complete事件作出响应");
    }
};

五、操作符

1.创建操作符

1.1 基本创建

完整的创建被观察者对象。

create

完整创建1个被观察者对象(Observable)。
例子:

// 1. 通过creat()创建被观察者对象
Observable.create(new ObservableOnSubscribe<Integer>() {

    // 2. 在复写的subscribe()里定义需要发送的事件
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);

        emitter.onComplete();
    }  // 至此,一个被观察者对象(Observable)就创建完毕
}).subscribe(new Observer<Integer>() {
    // 3. 通过通过订阅(subscribe)连接观察者和被观察者
    // 4. 创建观察者 & 定义响应事件的行为
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "开始采用subscribe连接");
    }
    // 默认最先调用复写的 onSubscribe()

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "接收到了事件"+ value  );
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "对Error事件作出响应");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "对Complete事件作出响应");
    }

});

1.2 快速创建 & 发送事件

快速的创建被观察者对象。

just

快速创建被观察者对象(Observable) & 发送10个以下事件。
例子:

// 1. 创建时传入整型1、2、3、4
// 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
Observable.just(1, 2, 3,4)   
    // 至此,一个Observable对象创建完毕
    // 2. 通过通过订阅(subscribe)连接观察者和被观察者
    // 3. 创建观察者 & 定义响应事件的行为
 .subscribe(new Observer<Integer>() {
    
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "开始采用subscribe连接");
    }
    // 默认最先调用复写的 onSubscribe()

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "接收到了事件"+ value  );
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "对Error事件作出响应");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "对Complete事件作出响应");
    }

});

fromArray

快速创建被观察者对象(Observable) & 发送10个以上事件(数组形式)。
例子:

/*
 * 数组遍历
 */
// 1. 设置需要传入的数组
Integer[] items = { 0, 1, 2, 3, 4 };

// 2. 创建被观察者对象(Observable)时传入数组
// 在创建后就会将该数组转换成Observable & 发送该对象中的所有数据
Observable.fromArray(items)
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "数组遍历");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "数组中的元素 = "+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "遍历结束");
            }

        });

fromIterable

快速创建被观察者对象(Observable) & 发送10个以上事件(集合形式)。
例子:

/*
 * 集合遍历
 */
// 1. 设置一个集合
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);

// 2. 通过fromIterable()将集合中的对象 / 数据发送出去
Observable.fromIterable(list)
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "集合遍历");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "集合中的数据元素 = "+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "遍历结束");
            }
        });

其它方法

// 下列方法一般用于测试使用

<-- empty()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即观察者接收后会直接调用onCompleted()

<-- error()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()

<-- never()  -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用

1.3 延迟创建

定时操作:经过x秒后,需要自动执行y操作;
周期性操作:每隔x秒后,需要自动执行y操作。

defer

动态创建被观察者对象(Observable) & 获取最新的Observable对象数据。
例子:

<-- 1. 第1次对i赋值 ->>
Integer i = 10;

// 2. 通过defer 定义被观察者对象
// 注:此时被观察者对象还没创建
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> call() throws Exception {
        return Observable.just(i);
    }
});

<-- 2. 第2次对i赋值 ->>
i = 15;

<-- 3. 观察者开始订阅 ->>
// 注:此时,才会调用defer()创建被观察者对象(Observable)
observable.subscribe(new Observer<Integer>() {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "开始采用subscribe连接");
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "接收到的整数是"+ value  );
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "对Error事件作出响应");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "对Complete事件作出响应");
    }
});

timer

延迟指定事件,发送一个0,一般用于检测。
例子:

// 该例子 = 延迟2s后,发送一个long类型数值
Observable.timer(2, TimeUnit.SECONDS) 
          .subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "开始采用subscribe连接");
    }

    @Override
    public void onNext(Long value) {
        Log.d(TAG, "接收到了事件"+ value  );
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "对Error事件作出响应");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "对Complete事件作出响应");
    }

});

// 注:timer操作符默认运行在一个新线程上
// 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)

interval

每隔指定时间就发送事件,发送的事件序列是从0开始、无限递增1的整数序列。
例子:

// 参数说明:
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
Observable.interval(3,1,TimeUnit.SECONDS)
        // 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Long value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

        });

// 注:interval默认在computation调度器上执行
// 也可自定义指定线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)

intervalRange

每隔指定时间就发送事件,发送的事件序列是从0开始、无限递增1的整数序列,可指定发送的数据的数量。
例子:

// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 参数3 = 第1次事件延迟发送时间;
// 参数4 = 间隔时间数字;
// 参数5 = 时间单位
Observable.intervalRange(3,10,2, 1, TimeUnit.SECONDS)
        // 该例子发送的事件序列特点:
        // 1. 从3开始,一共发送10个事件;
        // 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从0开始递增1,无限个)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Long value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

        });

range

连续发送1个事件序列,可指定范围。
例子:

// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 注:若设置为负数,则会抛出异常
Observable.range(3,10)
        // 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送10个事件
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

        });

rangeLong

类似于range方法,区别在于该方法支持Long数据类型。

2.变换操作符

对事件序列中的 事件 / 整个事件序列 进行加工处理(即变换),使得其转变成不同的 事件 / 整个事件序列。

Map

对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件,即将被观察者发送的事件转换为任意的类型事件。
例子:

// 采用RxJava基于事件流的链式操作
Observable.create(new ObservableOnSubscribe<Integer>() {

    // 1. 被观察者发送事件 = 参数为整型 = 1、2、3
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);

    }
    // 2. 使用Map变换操作符中的Function函数对被观察者发送的事件进行统一变换:整型变换成字符串类型
}).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
        return "使用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer ;
    }
}).subscribe(new Consumer<String>() {

    // 3. 观察者接收事件时,是接收到变换后的事件 = 字符串类型
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, s);
    }
});

FlatMap

将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关。
例子:

// 采用RxJava基于事件流的链式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }

    // 采用flatMap()变换操作符
}).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        final List<String> list = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            list.add("我是事件 " + integer + "拆分后的子事件" + i);
            // 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
            // 最终合并,再发送给被观察者
        }
        return Observable.fromIterable(list);
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, s);
    }
});

ConcatMap

类似FlatMap操作符,区别在于新合并生成的事件序列顺序是有序的,即严格按照旧序列发送事件的顺序。
例子:

// 采用RxJava基于事件流的链式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }

    // 采用concatMap()变换操作符
}).concatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        final List<String> list = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            list.add("我是事件 " + integer + "拆分后的子事件" + i);
            // 通过concatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
            // 最终合并,再发送给被观察者
        }
        return Observable.fromIterable(list);
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, s);
    }
});

Buffer

定期从被观察者(Obervable)需要发送的事件中获取一定数量的事件 & 放到缓存区中,最终发送。
例子:

// 被观察者 需要发送5个数字
Observable.just(1, 2, 3, 4, 5)
        .buffer(3, 1) // 设置缓存区大小 & 步长
                      // 缓存区大小 = 每次从被观察者中获取的事件数量
                      // 步长 = 每次获取新事件的数量
        .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(List<Integer> stringList) {
                //
                Log.d(TAG, " 缓存区里的事件数量 = " +  stringList.size());
                for (Integer value : stringList) {
                    Log.d(TAG, " 事件 = " + value);
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应" );
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

3.组合 / 合并操作符

3.1 组合多个被观察者

concat / concatArray

组合多个被观察者一起发送数据,合并后按发送顺序串行执行,二者区别:组合被观察者的数量,即concat组合被观察者数量≤4个,而concatArray组合被观察者数量>4个。

// concat():组合多个被观察者(≤4个)一起发送数据
// 注:串行执行
Observable.concat(Observable.just(1, 2, 3),
                   Observable.just(4, 5, 6),
                   Observable.just(7, 8, 9),
                   Observable.just(10, 11, 12))
          .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

// concatArray():组合多个被观察者一起发送数据(可>4个)
// 注:串行执行
Observable.concatArray(Observable.just(1, 2, 3),
                   Observable.just(4, 5, 6),
                   Observable.just(7, 8, 9),
                   Observable.just(10, 11, 12),
                   Observable.just(13, 14, 15))
          .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

merge / mergeArray

组合多个被观察者一起发送数据,合并后按时间线并行执行,二者区别:组合被观察者的数量,即merge组合被观察者数量≤4个,而mergeArray组合被观察者数量>4个。与concat操作符区别:同样是组合多个被观察者一起发送数据,但concat操作符合并后是按发送顺序串行执行。
例子:

// merge():组合多个被观察者(<4个)一起发送数据
// 注:合并后按照时间线并行执行
Observable.merge(
        Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
        Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)) // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
          .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

// mergeArray() = 组合4个以上的被观察者一起发送数据,类似concatArray()

concatDelayError / mergeDelayError

使用contact或merge操作符时,若其中一个被观察者发出onError事件,则会马上终止其它被观察者继续发送事件,需要使用concatDelayError或mergeDelayError操作符。
无使用concatDelayError的情况:

Observable.concat(
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
                emitter.onComplete();
            }
        }),
        Observable.just(4, 5, 6))
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

使用concatDelayError的情况:

<-- 使用了concatDelayError()的情况 -->
Observable.concatArrayDelayError(
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new NullPointerException()); // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
                emitter.onComplete();
            }
        }),
        Observable.just(4, 5, 6))
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

mergeDelayError操作符使用方法同理。

3.2 合并多个事件

zip

合并多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送。事件组合方式为严格按照原先事件序列进行对位合并,最终合并的事件数量等于多个被观察者(Observable)中数量最少的数量。
例子:

<-- 创建第1个被观察者 -->
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.d(TAG, "被观察者1发送了事件1");
        emitter.onNext(1);
        // 为了方便展示效果,所以在发送事件后加入2s的延迟
        Thread.sleep(1000);

        Log.d(TAG, "被观察者1发送了事件2");
        emitter.onNext(2);
        Thread.sleep(1000);

        Log.d(TAG, "被观察者1发送了事件3");
        emitter.onNext(3);
        Thread.sleep(1000);

        emitter.onComplete();
    }
}).subscribeOn(Schedulers.io()); // 设置被观察者1在工作线程1中工作

<-- 创建第2个被观察者 -->
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "被观察者2发送了事件A");
        emitter.onNext("A");
        Thread.sleep(1000);

        Log.d(TAG, "被观察者2发送了事件B");
        emitter.onNext("B");
        Thread.sleep(1000);

        Log.d(TAG, "被观察者2发送了事件C");
        emitter.onNext("C");
        Thread.sleep(1000);

        Log.d(TAG, "被观察者2发送了事件D");
        emitter.onNext("D");
        Thread.sleep(1000);

        emitter.onComplete();
    }
}).subscribeOn(Schedulers.newThread());// 设置被观察者2在工作线程2中工作
// 假设不作线程控制,则该两个被观察者会在同一个线程中工作,即发送事件存在先后顺序,而不是同时发送

<-- 使用zip变换操作符进行事件合并 -->
// 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
    @Override
    public String apply(Integer integer, String string) throws Exception {
        return  integer + string;
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String value) {
        Log.d(TAG, "最终接收到的事件 =  " + value);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete");
    }
});

尽管被观察者2的事件D没有事件与其合并,但还是会继续发送,若在被观察者1 & 被观察者2的事件序列最后发送onComplete()事件,则被观察者2的事件D也不会发送。

combineLatest

当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。与zip的区别:zip按个数合并,即1对1合并;而CombineLatest按时间合并,即在同一个时间点上合并。
例子:

Observable.combineLatest(
        Observable.just(1L, 2L, 3L), // 第1个发送数据事件的Observable
        Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
        new BiFunction<Long, Long, Long>() {
            @Override
            public Long apply(Long o1, Long o2) throws Exception {
                // o1 = 第1个Observable发送的最新(最后)1个数据
                // o2 = 第2个Observable发送的每1个数据
                Log.e(TAG, "合并的数据是: "+ o1 + " "+ o2);
                return o1 + o2;
                // 合并的逻辑 = 相加
                // 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
            }
        }).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long s) throws Exception {
                Log.e(TAG, "合并的结果是: "+s);
            }
        });

combineLatestDelayError

类似于concatDelayError / mergeDelayError,即错误处理。

reduce

把被观察者需要发送的事件聚合成1个事件 & 发送,聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推。
例子:

Observable.just(1,2,3,4)
        .reduce(new BiFunction<Integer, Integer, Integer>() {
            // 在该复写方法中复写聚合的逻辑
            @Override
            public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
                return s1 * s2;
                // 本次聚合的逻辑是:全部数据相乘起来
                // 原理:第1次取前2个数据相乘,之后每次获取到的数据 = 返回的数据x原始下1个数据每
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最终计算的结果是: "+s);

            }
        }); 

collect

将被观察者Observable发送的数据事件收集到一个数据结构里。
例子:

Observable.just(1, 2, 3 ,4, 5, 6)
        .collect(
            // 1. 创建数据结构(容器),用于收集被观察者发送的数据
            new Callable<ArrayList<Integer>>() {
                @Override
                public ArrayList<Integer> call() throws Exception {
                    return new ArrayList<>();
                }
                // 2. 对发送的数据进行收集
            }, new BiConsumer<ArrayList<Integer>, Integer>() {
                @Override
                public void accept(ArrayList<Integer> list, Integer integer)
                        throws Exception {
                    // 参数说明:list = 容器,integer = 后者数据
                    list.add(integer);
                    // 对发送的数据进行收集
                }
            }).subscribe(new Consumer<ArrayList<Integer>>() {
                @Override
                public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                    Log.e(TAG, "本次发送的数据是: "+s);

                }
            });

3.3 发送事件前追加发送事件

startWith / startWithArray

在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者。
例子:

<-- 在一个被观察者发送事件前,追加发送一些数据 -->
// 注:追加数据顺序 = 后调用先追加
Observable.just(4, 5, 6)
          .startWith(0)  // 追加单个数据 = startWith()
          .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
          .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });


<-- 在一个被观察者发送事件前,追加发送被观察者 & 发送数据 -->
// 注:追加数据顺序 = 后调用先追加
Observable.just(4, 5, 6)
        .startWith(Observable.just(1, 2, 3))
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

3.4 统计发送事件数量

count

统计被观察者发送事件的数量。

// 注:返回结果 = Long类型
Observable.just(1, 2, 3, 4)
          .count()
          .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "发送的事件数量 =  "+aLong);

            }
        });

4.功能性操作符

4.1 连接被观察者 & 观察者

subscribe

使得被观察者 & 观察者形成订阅关系。
例子:

observable.subscribe(observer);
// 前者 = 被观察者(observable);后者 = 观察者(observer 或 subscriber)

<-- 1. 分步骤的完整调用 -->
//  步骤1: 创建被观察者 Observable 对象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
});

// 步骤2:创建观察者 Observer 并 定义响应事件行为
Observer<Integer> observer = new Observer<Integer>() {
    // 通过复写对应方法来 响应 被观察者
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "开始采用subscribe连接");
    }
    // 默认最先调用复写的 onSubscribe()

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "对Next事件"+ value +"作出响应"  );
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "对Error事件作出响应");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "对Complete事件作出响应");
    }
};

// 步骤3:通过订阅(subscribe)连接观察者和被观察者
observable.subscribe(observer);


<-- 2. 基于事件流的链式调用 -->
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 创建被观察者 & 生产事件
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
}).subscribe(new Observer<Integer>() {
    // 2. 通过通过订阅(subscribe)连接观察者和被观察者
    // 3. 创建观察者 & 定义响应事件的行为
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "开始采用subscribe连接");
    }
    // 默认最先调用复写的 onSubscribe()

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "对Next事件"+ value +"作出响应"  );
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "对Error事件作出响应");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "对Complete事件作出响应");
    }

});

内部实现:

<-- Observable.subscribe(Subscriber) 的内部实现 -->

public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    // 在观察者 subscriber抽象类复写的方法 onSubscribe.call(subscriber),用于初始化工作
    // 通过该调用,从而回调观察者中的对应方法从而响应被观察者生产的事件
    // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
    // 同时也看出:Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
}

4.2 线程调度

方便快速指定及控制被观察者 & 观察者的工作线程。对于一般的需求场景,需要在子线程中实现耗时的操作,然后回到主线程实现 UI操作,即被观察者(Observable)在子线程中生产事件(如实现耗时操作等),观察者(Observer)在主线程接收 & 响应事件(即实现UI操作)。

RxJava内置了多种用于调度的线程类型,内部使用线程池来维护这些线程,所以线程的调度效率非常高。
线程类型如下:


线程类型

subscribeOn / observeOn

指定被观察者(Observable) / 观察者(Observer)的工作线程类型。
例子:

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "Rxjava";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        // 步骤1:创建被观察者 Observable & 发送事件
        // 在主线程创建被观察者 Observable 对象
        // 所以生产事件的线程是:主线程
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                Log.d(TAG, " 被观察者 Observable的工作线程是: " + Thread.currentThread().getName());
                // 打印验证
                emitter.onNext(1);
                emitter.onComplete();
            }
        });

        // 步骤2:创建观察者 Observer 并 定义响应事件行为
        // 在主线程创建观察者 Observer 对象
        // 所以接收 & 响应事件的线程是:主线程
        Observer<Integer> observer = new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
                Log.d(TAG, " 观察者 Observer的工作线程是: " + Thread.currentThread().getName());
                // 打印验证

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件"+ value +"作出响应"  );
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        };

        
        <-- 使用说明 -->
        // Observable.subscribeOn(Schedulers.Thread):指定被观察者 发送事件的线程(传入RxJava内置的线程类型)
        // Observable.observeOn(Schedulers.Thread):指定观察者 接收 & 响应事件的线程(传入RxJava内置的线程类型)

        <-- 实例使用 -->
        // 步骤3:通过订阅(subscribe)连接观察者和被观察者
        observable.subscribeOn(Schedulers.newThread()) // 1. 指定被观察者 生产事件的线程
                  .observeOn(AndroidSchedulers.mainThread())  // 2. 指定观察者 接收 & 响应事件的线程
                  .subscribe(observer); // 3. 最后再通过订阅(subscribe)连接观察者和被观察者
    }
}

情况1:若Observable.subscribeOn()多次指定被观察者生产事件的线程,则只有第一次指定有效,其余的指定线程无效。

// 步骤3:通过订阅(subscribe)连接观察者和被观察者
observable.subscribeOn(Schedulers.newThread()) // 第一次指定被观察者线程 = 新线程
          .subscribeOn(AndroidSchedulers.mainThread()) // 第二次指定被观察者线程 = 主线程
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(observer);

情况2:若Observable.observeOn()多次指定观察者接收 & 响应事件的线程,则每次指定均有效,即每指定一次,就会进行一次线程的切换。

// 步骤3:通过订阅(subscribe)连接观察者和被观察者
observable.subscribeOn(Schedulers.newThread())
          .observeOn(AndroidSchedulers.mainThread()) // 第一次指定观察者线程 = 主线程
          .doOnNext(new Consumer<Integer>() { // 生产事件
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "第一次观察者Observer的工作线程是: " + Thread.currentThread().getName());
            }
        })
        .observeOn(Schedulers.newThread()) // 第二次指定观察者线程 = 新的工作线程
        .subscribe(observer); // 生产事件


// 注:
// 1. 整体方法调用顺序:观察者.onSubscribe()> 被观察者.subscribe()> 观察者.doOnNext()>观察者.onNext()>观察者.onComplete() 
// 2. 观察者.onSubscribe()固定在主线程进行

4.3 延迟操作

delay

使得被观察者延迟一段时间再发送事件。
方法介绍:

// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)

// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延迟时间  & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟

例子:

Observable.just(1, 2, 3)
        .delay(3, TimeUnit.SECONDS) // 延迟3s再发送
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

4.4 在事件的生命周期中操作

在事件发送 & 接收的整个生命周期过程中进行操作,如发送事件前的初始化、发送事件后的回调请求等。

do

在某个事件的生命周期中调用。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Throwable("发生错误了"));
         }
       })
        // 1. 当Observable每发送1次数据事件就会调用1次
        .doOnEach(new Consumer<Notification<Integer>>() {
            @Override
            public void accept(Notification<Integer> integerNotification) throws Exception {
                Log.d(TAG, "doOnEach: " + integerNotification.getValue());
            }
        })
        // 2. 执行Next事件前调用
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "doOnNext: " + integer);
            }
        })
        // 3. 执行Next事件后调用
        .doAfterNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "doAfterNext: " + integer);
            }
        })
        // 4. Observable正常发送事件完毕后调用
        .doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "doOnComplete: ");
            }
        })
        // 5. Observable发送错误事件时调用
        .doOnError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "doOnError: " + throwable.getMessage());
            }
        })
        // 6. 观察者订阅时调用
        .doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(@NonNull Disposable disposable) throws Exception {
                Log.e(TAG, "doOnSubscribe: ");
            }
        })
        // 7. Observable发送事件完毕后调用,无论正常发送完毕 / 异常终止
        .doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "doAfterTerminate: ");
            }
        })
        // 8. 最后执行
        .doFinally(new Action() {
            @Override
            public void run() throws Exception {
                Log.e(TAG, "doFinally: ");
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

4.5 错误处理

发送事件过程中,遇到错误时的处理机制。

onErrorReturn

遇到错误时,发送1个特殊事件 & 正常终止,可捕获在它之前发生的异常。
例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onError(new Throwable("发生错误了"));
         }
       })
        .onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(@NonNull Throwable throwable) throws Exception {
                // 捕捉错误异常
                Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );

                return 666;
                // 发生错误事件后,发送一个"666"事件,最终正常结束
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

onErrorResumeNext

遇到错误时,发送1个新的Observable。
onErrorResumeNext()拦截的错误为Throwable,若需拦截Exception请用onExceptionResumeNext();若onErrorResumeNext()拦截的错误为Exception,则会将错误传递给观察者的onError方法。
例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onError(new Throwable("发生错误了"));
         }
       })
        .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {

                // 1. 捕捉错误异常
                Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );

                // 2. 发生错误事件后,发送一个新的被观察者 & 发送事件序列
                return Observable.just(11,22);
                
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

onExceptionResumeNext

遇到错误时,发送1个新的Observable。
onExceptionResumeNext()拦截的错误为Exception;若需拦截Throwable请用onErrorResumeNext();若onExceptionResumeNext()拦截的错误为Throwable,则会将错误传递给观察者的onError方法。
例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onError(new Exception("发生错误了"));
         }
       })
        .onExceptionResumeNext(new Observable<Integer>() {
            @Override
            protected void subscribeActual(Observer<? super Integer> observer) {
                observer.onNext(11);
                observer.onNext(22);
                observer.onComplete();
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

retry

当出现错误时,让被观察者(Observable)重新发射数据。
接收到onError()时,重新订阅 & 发送事件,Throwable和Exception都可拦截。
方法介绍:

<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送

<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制)
// 参数 = 重试次数
 
<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
// 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)

<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制)
// 参数 = 设置重试次数 & 判断逻辑

例子:

<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onError(new Exception("发生错误了"));
        e.onNext(3);
         }
       })
        .retry() // 遇到错误时,让被观察者重新发射数据(若一直错误,则一直重新发送)
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });


<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制)
// 参数 = 重试次数
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onError(new Exception("发生错误了"));
        e.onNext(3);
         }
       })
        .retry(3) // 设置重试次数 = 3次
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onError(new Exception("发生错误了"));
        e.onNext(3);
         }
       })
        // 拦截错误后,判断是否需要重新发送请求
        .retry(new Predicate<Throwable>() {
            @Override
            public boolean test(@NonNull Throwable throwable) throws Exception {
                // 捕获异常
                Log.e(TAG, "retry错误: "+throwable.toString());

                //返回false = 不重新重新发送数据 & 调用观察者的onError结束
                //返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
                return true;
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试)
// 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onError(new Exception("发生错误了"));
        e.onNext(3);
         }
       })

        // 拦截错误后,判断是否需要重新发送请求
        .retry(new BiPredicate<Integer, Throwable>() {
            @Override
            public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
                // 捕获异常
                Log.e(TAG, "异常错误 =  "+throwable.toString());

                // 获取当前重试次数
                Log.e(TAG, "当前重试次数 =  "+integer);

                //返回false = 不重新重新发送数据 & 调用观察者的onError结束
                //返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
                return true;
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });


<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制)
// 参数 = 设置重试次数 & 判断逻辑
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onError(new Exception("发生错误了"));
        e.onNext(3);
         }
       })
        // 拦截错误后,判断是否需要重新发送请求
        .retry(3, new Predicate<Throwable>() {
            @Override
            public boolean test(@NonNull Throwable throwable) throws Exception {
                // 捕获异常
                Log.e(TAG, "retry错误: "+throwable.toString());

                //返回false = 不重新重新发送数据 & 调用观察者的onError()结束
                //返回true = 重新发送请求(最多重新发送3次)
                return true;
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

retryUntil

出现错误后,判断是否需要重新发送数据,若需要重新发送 & 持续遇到错误,则持续重试,具体使用类似于retry(Predicate predicate),唯一区别:返回true则不重新发送数据事件。

retryWhen

遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件。
例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onError(new Exception("发生错误了"));
            e.onNext(3);
        }
    })
        // 遇到error事件才会回调
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            
            @Override
            public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                // 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
                // 返回Observable<?> = 新的被观察者 Observable(任意类型)
                // 此处有两种情况:
                    // 1. 若 新的被观察者 Observable发送的事件 = Error事件,那么原始Observable则不重新发送事件:
                    // 2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件:
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {

                        // 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不重新发送事件
                        // 该异常错误信息可在观察者中的onError()中获得
                         return Observable.error(new Throwable("retryWhen终止啦"));
                        
                        // 2. 若返回的Observable发送的事件 = Next事件,则原始的Observable重新发送事件(若持续遇到错误,则持续重试)
                         // return Observable.just(1);
                    }
                });

            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应" + e.toString());
                // 获取异常错误信息
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

4.6 重复发送

重复不断地发送被观察者事件。

repeat

无条件地、重复发送被观察者事件,具备重载方法,可设置重复创建次数。
例子:

// 不传入参数 = 重复发送次数 = 无限次
repeat();
// 传入参数 = 重复发送次数有限
repeatWhen(Integer int );

// 注:
// 1. 接收到.onCompleted()事件后,触发重新订阅 & 发送
// 2. 默认运行在一个新的线程上

// 具体使用
Observable.just(1, 2, 3, 4)
        .repeat(3) // 重复创建次数 =- 3次
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

        });

repeatWhen

将原始Observable停止发送事件的标识(Complete() / Error())转换成1个Object类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的Observable。若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable;若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 Observable。
例子:

Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
        @Override
        // 在Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据
        public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
            // 将原始 Observable 停止发送事件的标识(Complete() /  Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
            // 以此决定是否重新订阅 & 发送原来的 Observable
            // 此处有2种情况:
            // 1. 若新被观察者(Observable)返回1个Complete() /  Error()事件,则不重新订阅 & 发送原来的 Observable
            // 2. 若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
            return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {

                    // 情况1:若新被观察者(Observable)返回1个Complete() /  Error()事件,则不重新订阅 & 发送原来的 Observable
                    return Observable.empty();
                    // Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()

                    // return Observable.error(new Throwable("不再重新订阅事件"));
                    // 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。

                    // 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
                    // return Observable.just(1);
                   // 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() /  Error()事件
                }
            });

        }
    })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应:" + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

        });

六、源码解析

RxJava源码解析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容

  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,011评论 1 9
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,455评论 7 62
  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 902评论 0 2
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,158评论 6 151
  • 美,怦然心动的感觉。似曾相识的感觉,命中注定的幻觉,是野鹤闲云的孤傲,也是梨花带雨的娇弱,是一种淡淡的温暖,浅浅的...
    周浅浅的微博里没有什么阅读 407评论 0 1