前言:上篇文章我们讲解了RxJava最基本的基础知识原理,这篇呢我打算讲解下怎么来创建一个observable被观察者,各个api的使用情况。
一.创建操作符
1.基本创建:create()
RxJava中最基本的创建被观察者(Observable)的操作符
-
具体使用
/** * 通过Observable.create()来创建一个被观察者 */ Observable observable=Observable.create(new ObservableOnSubscribe<String>() { /** * 在复写的subscribe方法中定义要发送的事件 * @param e * @throws Exception */ @Override public void subscribe(ObservableEmitter<String> e) throws Exception { //在这个方法里面开始向外发射事件 e.onNext("hello"); e.onNext("RxJava"); e.onComplete(); } }); 这样一个被观察者(ObServable)就被创建出来了,接下来我们用完整的链式将这个Observable订阅起来使用 /** * 通过Observable.create()来创建一个被观察者 */ Observable.create(new ObservableOnSubscribe<String>() { /** * 在复写的subscribe方法中定义要发送的事件 * @param e * @throws Exception */ @Override public void subscribe(ObservableEmitter<String> e) throws Exception { //在这个方法里面开始向外发射事件 e.onNext("hello"); e.onNext("RxJava"); e.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe连接"); } @Override public void onNext(String s) { Log.d(TAG,"接收结果:"+s); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
打印结果如下: 04-26 19:58:53.100 11189-11189/? D/MainTwoActivity: 开始采用subscribe连接 04-26 19:58:53.100 11189-11189/? D/MainTwoActivity: 接收结果:hello 04-26 19:58:53.100 11189-11189/? D/MainTwoActivity: 接收结果:RxJava 04-26 19:58:53.100 11189-11189/? D/MainTwoActivity: 对Complete事件作出响应
2.快速创建&发送事件
2.1 just():
快速创建一个被观察者对象(observable)
发送事件的特点:直接发送 传入的事件
最多只能发送10个参数
-
具体使用:
// 1. 创建时传入整型1、2、3、4 // 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4) Observable.just(1, 2, 3,4) // 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略 // 2. 通过通过订阅(subscribe)连接观察者和被观察者 // 3. 创建观察者 & 定义响应事件的行为 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe连接"); } // 默认最先调用复写的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } }); }
打印结果: 04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 开始采用subscribe连接 04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 接收到了事件1 04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 接收到了事件2 04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 接收到了事件3 04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 接收到了事件4 04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 对Complete事件作出响应
2.2 fromArray():
快速创建1个被观察者对象(Observable)
发送事件的特点:直接发送 传入的数组数据 (会将数组中的数据转换为Observable对象)
没有限制个数
-
具体demo使用如下:
// 1. 创建时传入整型1、2、3、4 // 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4) Observable.fromArray(1, 2, 3, 4,5,6,7,8,9,10,11,12,13) // 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略 // 2. 通过通过订阅(subscribe)连接观察者和被观察者 // 3. 创建观察者 & 定义响应事件的行为 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe连接"); } // 默认最先调用复写的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
打印结果如下: 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 开始采用subscribe连接 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件1 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件2 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件3 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件4 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件5 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件6 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件7 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件8 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件9 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件10 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件11 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件12 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件13 04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 对Complete事件作出响应
2.3 formIterable():
快速创建1个被观察者对象(Observable)
发送事件的特点:直接发送 传入的集合List数据
没有限制个数
-
具体demo使用如下:
// 1. 设置一个集合 List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); // 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4) Observable.fromIterable(list) // 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略 // 2. 通过通过订阅(subscribe)连接观察者和被观察者 // 3. 创建观察者 & 定义响应事件的行为 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe连接"); } // 默认最先调用复写的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件" + value); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
打印结果: 04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 开始采用subscribe连接 04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 接收到了事件1 04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 接收到了事件2 04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 接收到了事件3 04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 对Complete事件作出响应
3.延迟创建
3.1 defer():
直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
通过 Observable工厂方法创建被观察者对象(Observable)
每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的
-
具体demo使用:
<-- 1. 第1次对i赋值 ->> Integer i = 10; // 2. 通过defer 定义被观察者对象 // 注:此时被观察者对象还没创建 Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return Observable.just(i); } }); <-- 2. 第2次对i赋值 ->> i = 15; <-- 3. 观察者开始订阅 ->> // 注:此时,才会调用defer()创建被观察者对象(Observable) observable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe连接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到的整数是"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
打印结果:最后打印的值15:因为每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的,所以i值会取第2次的赋值 04-27 01:00:55.319 13898-13898/lingan.test.com.myapplication D/MainTwoActivity: 开始采用subscribe连接 04-27 01:00:55.319 13898-13898/lingan.test.com.myapplication D/MainTwoActivity: 接收到的整数是15 04-27 01:00:55.319 13898-13898/lingan.test.com.myapplication D/MainTwoActivity: 对Complete事件作出响应
3.2 timer():
快速创建1个被观察者对象(Observable)
发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
本质 = 延迟指定时间后,调用一次 onNext(0) 延迟指定事件,发送一个0,一般用于检测
-
具体demo使用:
Observable.timer(10, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG,"onSubscribe: "); } @Override public void onNext(Long aLong) { Log.d(TAG,"onNext: "+aLong); } @Override public void onError(Throwable e) { Log.d(TAG,"onError: "); } @Override public void onComplete() { Log.d(TAG,"onComplete: "); } });
打印结果:.645 19748-19763/lingan.test.com.myapplication D/MainActivity: onNext: 0
3.3 interval():
快速创建1个被观察者对象(Observable)
发送事件的特点:每隔指定时间 就发送 事件
发送的事件序列 = 从0开始、无限递增1的的整数序列
-
具体demo使用测试:
Observable.interval(10,5, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG,"onSubscribe: "); } @Override public void onNext(Long aLong) { Log.d(TAG,"onNext: "+aLong); } @Override public void onError(Throwable e) { Log.d(TAG,"onError: "); } @Override public void onComplete() { Log.d(TAG,"onComplete: "); } });
打印结果:第一次间隔10秒开始发送一次事件,第一次事件值为0,然后每次间隔5秒发送一次事件,每次事件值增加1
3.4 intervalRange():
快速创建1个被观察者对象(Observable)
发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
发送的事件序列 = 从指定一个数开始、无限递增1的的整数序列,到指定的第二个参数大小停止
作用类似于interval(),但可指定发送的数据的数量
-
具体demo使用
Observable.intervalRange(1,10,10,5, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG,"onSubscribe: "); } @Override public void onNext(Long aLong) { Log.d(TAG,"onNext: "+aLong); } @Override public void onError(Throwable e) { Log.d(TAG,"onError: "); } @Override public void onComplete() { Log.d(TAG,"onComplete: "); } });
打印结果:发送事件值1开始叠加,最大发送10次,第一次间隔10秒开始发送,第二次是每次间隔5秒开始 04-14 02:48:30.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 1 04-14 02:48:35.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 2 04-14 02:48:40.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 3 04-14 02:48:45.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 4 04-14 02:48:50.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 5 04-14 02:48:55.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 6 04-14 02:49:00.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 7 04-14 02:49:05.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 8 04-14 02:49:10.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 9 04-14 02:49:15.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 10 04-14 02:49:15.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onComplete:
3.5 range():
快速创建1个被观察者对象(Observable)
发送事件的特点:连续发送 1个事件序列,可指定范围
发送的事件序列 = 从指定一个数开始、无限递增1的的整数序列,到指定的第二个参数大小停止
作用类似于intervalRange(),但区别在于:无延迟发送事件
-
具体demo使用
Observable.range(1,10).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG,"onSubscribe: "); } @Override public void onNext(Integer aLong) { Log.d(TAG,"onNext: "+aLong); } @Override public void onError(Throwable e) { Log.d(TAG,"onError: "); } @Override public void onComplete() { Log.d(TAG,"onComplete: "); } });
打印结果:没有延迟,快速打印出1到10的值。 04-14 02:52:32.563 20305-20305/? D/MainActivity: onSubscribe: 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 1 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 2 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 3 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 4 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 5 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 6 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 7 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 8 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 9 04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 10 04-14 02:52:32.564 20305-20305/? D/MainActivity: onComplete:
3.6 rangeLong():
- 作用:类似于range(),区别在于该方法支持数据类型 = Long
- 与range()类似,此处不作过多描述至此.