Rxjava 源于ReactiveX(Reactive Extensions),Rx是一个变成模型,目标是提供一致的编程接口,帮助开发者方便的处理异步数据流,许多流行的变成语言都有Rx库。Rx扩展了观察者模式,用于支持数据和事件序列,添加了一些操作符,他人你可以声明式的组合这些序列,而无需关注底层的实现(线程、同步、线程安全、并发)。
Rxjava优势就是使得代码更简洁,而且随着程序逻辑变得越来越复杂,它依然能够保持简洁。
扩展的观察者
Observable(被观察者)、Observer(观察者)、subscribe(订阅),Observable和Observer通过subscribe方法实现订阅关系。Observable在需要的时候发出事件来通知Observer。
与传统的观察者模式不同,除了定义了普通的回调事件onNext(),还定义的两个特殊的事件onCompleted()和onError()。
- onCompleted:事件队列完结,Rxjava把所有事件看做一个队列,并规定,当不会再有新的onNext事件触发时,需要触发onCompleted
- onError:事件队列异常,在时间处理过程中出现异常时触发onError,同时队列终止,不允许再有事件发出
- onCompleted和onError有且只有一个,并且是事件序列的最后一个
基本实现
创建Observer,定义事件触发时的具体行为
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber,Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。
与Observer 的主要区别:
- onStart:这是Subscriber 新增的方法,会在subscribe里,事件还未发送前调用,它总是在subscribe所发生的线程被调用
2、unsubscribe:这是Subscriber 新增的方法,用于取消订阅,这个方法被调用后,Subscriber将不再接收事件。Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
Subscription clickSubscribe = RxView.clicks(findViewById(R.id.bt))
.throttleFirst(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "clicks->doOnUnsubscribe");
}
})
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
methd6();
}
});
//维护相关的资源引用
subscriptions.add(clickSubscribe);
@Override
protected void onDestroy() {
for (Subscription s : subscriptions) {
if (!s.isUnsubscribed()) {
s.unsubscribe();
Log.e(TAG, "onDestroy: 取消订阅!");
}
}
super.onDestroy();
}
创建Observable,定义被观察者,决定什么时候出发事件以及触发怎么样的事件
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
创建Observable的操作符
- create(OnSubscribe)
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
- just(T...): 将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
- from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
- empty 创建一个什么都不做直接通知完成的Observable
- error 创建一个什么都不做直接通知错误的Observable
- never 创建一个什么都不做的Observable
Observable observable1=Observable.empty();//直接调用onCompleted。
Observable observable2=Observable.error(new RuntimeException());//直接调用onError。这里可以自定义异常
Observable observable3=Observable.never();//啥都不做
- timer 创建一个在给定的延时之后发射数据项为0的Observable<Long>
Observable.timer(1000,TimeUnit.MILLISECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d("JG",aLong.toString()); // 0
}
});
- interval 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//每隔1秒发送数据项,从0开始计数
//0,1,2,3....
}
});
- range 创建一个发射指定范围的整数序列的Observable<Integer>
Observable.range(2,5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("JG",integer.toString());// 2,3,4,5,6 从2开始发射5个数据
}
});
- defer 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer在订阅时调用Func0创建Observable。
Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just("hello");
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("JG",s);
}
});
合并操作符
- concat: 按顺序连接多个Observables。需要注意的是Observable.concat(a,b)等价于a.concatWith(b)。
Observable<Integer> observable1=Observable.just(1,2,3,4);
Observable<Integer> observable2=Observable.just(4,5,6);
Observable.concat(observable1,observable2)
.subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6
- startWith: 在数据序列的开头增加一项数据。startWith的内部也是调用了concat
Observable.just(1,2,3,4,5)
.startWith(6,7,8)
.subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5
- merge 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知。
- zip: 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合。内部通过OperatorZip进行压合。
Observable<Integer> observable1=Observable.just(1,2,3,4);
Observable<Integer> observable2=Observable.just(4,5,6);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer item1, Integer item2) {
return item1+"and"+item2;
}
})
.subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6
- combineLatest 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。类似于zip,但是,不同的是zip只有在每个Observable都发射了数据才工作,而combineLatest任何一个发射了数据都可以工作,每次与另一个Observable最近的数据压合。
过滤操作
- filter 过滤数据。内部通过OnSubscribeFilter过滤数据。
Observable.just(3,4,5,6)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>4;
}
})
.subscribe(item->Log.d("JG",item.toString())); //5,6
还有ofType、take、takeLast、throttleFirst、timeout等
条件/布尔操作
- all 判断所有的数据项是否满足某个条件,内部通过OperatorAll实现。
Observable.just(2,3,4,5)
.all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>3;
}
})
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.d("JG",aBoolean.toString()); //false
}
})
;
还有exists、contains、isEmpty等
聚合操作
- reduce: 对序列使用reduce()函数并发射最终的结果,内部使用OnSubscribeReduce实现。
Observable.just(2,3,4,5)
.reduce(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum+item;
}
})
.subscribe(integer -> Log.d("JG",integer.toString()));//14
还有collect、count/countLong
转换操作
- toList 收集原始Observable发射的所有数据到一个列表,然后返回这个列表
Observable.just(2,3,4,5)
.toList()
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
}
});
还有toSortedList、toMap、toMultiMap
变换操作
- map 对Observable发射的每一项数据都应用一个函数来变换。
Observable.just(6,2,3,4,5)
.map(integer -> "item:"+integer)
.subscribe(s -> Log.d("JG",s));//item:6,item:2....
还有cast、flatMap、flatMapIterable、concatMap、switchMap等
错误处理/重试机制
- onErrorResumeNext 当原始Observable在遇到错误时,使用备用Observable。
Observable.just(1,"2",3)
.cast(Integer.class)
.onErrorResumeNext(Observable.just(1,2,3))
.subscribe(integer -> Log.d("JG",integer.toString())) //1,2,3
;
- onExceptionResumeNext 当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常。
- onErrorReturn 当原始Observable在遇到错误时发射一个特定的数据。
Observable.just(1,"2",3)
.cast(Integer.class)
.onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 4;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("JG",integer.toString());1,4
}
});
- retry: 当原始Observable在遇到错误时进行重试。
Observable.just(1,"2",3)
.cast(Integer.class)
.retry(3)
.subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"))
;//1,1,1,1,onError
- retryWhen 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable,内部调用的是retry。
Observable.just(1,"2",3)
.cast(Integer.class)
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {
@Override
public Observable<Long> call(Observable<? extends Throwable> observable) {
return Observable.timer(1, TimeUnit.SECONDS);
}
})
.subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));
//1,1
还有一些操作符具体还是看RxJava操作符大全这个文章吧
线程控制
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
- Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
- Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制
- subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
- observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
doOnSubscribe()它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);