rxJava日常学习
简述
rxjava基本概念:Observable(被观察者),Observer(观察者),Subscribe(订阅)
observable和observer通过subscribe()方法实现订阅关系,从而observable可以在需要的时候发出事件来通知observer.
与传统的回调方法不同的是除了onNext(相当于onClick())之外,还定义了两个特殊事件:onCompleted() 和 onError().onCompleted():事件队列完结。rxjava不仅把每个事件单独处理,还把它们看作是一个队列。rxjava规定,当不会再有新的onNext()发出时,需要触发onCompleted()方法作为完结的标示。
onError():事件队列异常。在事件的处理过程中,出现异常时,onError会被触发,同时队列自动终止,不会再有事件发出。
在一个运行的事件序列中,onComplted()和onError()有且只有一个会被调用,而且是事件序列的最后一个。
观察者的创建
rxjava中观察者observer接口的实现方式为:
Observer<String> observer = new Observer<String>() {
@Override public void onCompleted() {
//end flag
}
@Override public void onError(Throwable e) {
//report error
}
@Override public void onNext(String s) {
//do something
}};
除了这个接口外,rxjava还内置了一个实现Observer的抽象类:Subscriber。Subscriber对Observer接口进行了一些扩展,但是基本的使用方法一致。
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override public void onCompleted() {
//end flag
}
@Override public void onError(Throwable e) {
//report error
}
@Override public void onNext(String o) {
//do something
}};
不仅在使用方式上一样,实质上在Rxjava的subscribe过程中,Observer也总是会先被转为Subscriber再使用。
源码:
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new Subscriber<T>() {
@Override public void onCompleted() {
observer.onCompleted();
}
@Override public void onError(Throwable e) {
observer.onError(e);
}
@Override public void onNext(T t) {
observer.onNext(t);
} });
}
它两的区别主要有两点:
1.onStart():这是Subscriber增加的方法,主要是在subscribe()刚开始,而事件还未发送前调用,做一些准备工作,例如数据的清零或者重置,一个可选的方法,默认实现为空,如果对线程又要求,onStart()就不太适用,因为它不能指定线程,总是在subscribe发生的线程调用
2.unsubscribe():这是Subscriber所实现的另一个接口Subscription的方法,用于取消订阅,一般在调用前,可以用isUnsubscribed()先判断一下状态,在合适的地方调用unSubscribe(),来接触引用关系,以避免内存泄露。
- 创建Observable
被观察者,它决定什么时候触发事件和触发怎样的事件,rxjava使用create()创建一个observable.
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello rxjava");
subscriber.onCompleted();
}});
这里传入了一个OnSubscribe对象作为参数,OnSubscribe会被存储在返回的Observable对象中,它的作用相当于一个计划表,当observable被订阅的时候,onSubscribe的call()方法会自动被调用,事件序列就会依照设定依次出发,这样由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
create()方法是Rxjava最基本的创造事件的方法,基于这个方法Rxjava还提供了两个快捷创造事件队列的方法。
- just(T...) :将传入的参数依次发出
- from(T [] )/from(Iterable<? extends T>) : 将传入的数组或Iterable拆成具体对象后,依次发出。
subscribe()还支持不完整定义的回调,Rxjava会自动根据定义创建出Subscriber.形式如下:
Action0 action0 = new Action0() {
@Override
public void call() {
//onComplted
}};
Action1<String> action1 = new Action1<String>() {
@Override
public void call(String s) {
//OnNext
}} ;
Action1<Throwable> action11 = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
//onError
}};
//自动创建subscriber,并使用action1来定义onNext
myObservable.subscribe(action1);
//自动创建subscriber,并使用action11来定义onError
myObservable.subscribe(action1,action11);
//自动创建subscriber,并使用action0来定义onComplted
myObservable.subscribe(action1,action11,action0);
在rxjava的默认规则中,事件的发出和消费都是在同一个线程,观察者模式本身的目的就是:后台处理,前台回调的异步机制,因此异步对Rxjava非常重要,而要实现异步,而要实现异步则需要Rxjava另一个重要概念:Scheduler.
- 线程控制 Scheduler
- Scheduler的API
1.Schedulers.immediate():默认当前线程
2.Schedulers.newThread():总是启动新线程,并在新线程执行操作
3.Schedulers.io(): I/O操作(读写文件,读写数据库,网络信息交互等),行为模式和Schedulers.newThread()差不多,区别在于内置来一个无数量上限等线程池,可以重用空闲线程,因此大多数情况下会使用这个,比newThread()更有效率
4.AndroidSchedulers.mainThread():它指定操作在Android主线程运行
有了这几个Scheduler,就可以使用subscribeon()和observeon()两个方法对线程进行控制,subscribeon():指定subscribe()所发生的线程,或者事件产生的线程,Observeon()指定Subscriber所运行的线程,或者叫做事件的消费现场。
Observable.just(1,2,3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("number", "number" + integer);
}}
);
这种在subscribe()前加上 subscribeOn() 和 observerOn() 的使用方法,它多适用于 ** 后台线程取数据,主线程显示 ** 的程序策略。
2016-11-07 立冬
- 变换
Rxjava提供了对事件序列进行变换的支持,所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或者事件序列。
example:
Observable.just("images.logo.png")
.map(new Func1<String, Bitmap>() { //输入类型 String
@Override
public Bitmap call(String s) { //参数类型 String
return getBitmapfromPath(s); //返回类型 bitmap
}}
).subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { //参数类型bitmap
showBitmap(bitmap); }}
);
这里出现的Func1的类,和Action1非常相似,也是Rxjava的一个接口,用于包装含有一个参数的方法。Func1和Action的区别在于,Func1包装的是有返回值的方法,另外,和Actionx一样,Funcx也有多个,用于不同参数个数的方法。
** map()是一对一的转换,flatMap()是一对多的转换**
map example:
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() { @Override
public void onNext(String name) {
Log.d(tag, name); } ...
};
Observable.from(students) .map(new Func1<Student, String>() {
@Override public String call(Student student) {
return student.getName(); } }
) .subscribe(subscriber);
flatMap example:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
} ...};
Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses()); } })
.subscribe(subscriber);
flapMap()和map()有一个相同点:把传入点参数转化之后返回另一个对象。不同的是,flapMap()中返回的是个observable对象,并且这个observable对象并不是被直接发送到了Subscriber的回调方法中,原理为:1.使用传入的事件对象创建一个observable对象
2.并不发送这个observable,而是将它激活,于是它开始发送事件
3.每一个创建出来的observable发送的事件,都被汇入同一个observable,而这个observable负责将这些事件统一交给Subscriber的回调方法
- 变换的原理: lift()
这些变化虽然功能上各有不同,但实质上都是针对事件序列的处理和再发送
它们基于同一个基础的变化方法:lift(Operator)
- 线程控制:Scheduler(二)
可以使用observeOn()实现多次线程切换,observeOn()指定的是它之后的操作所在的线程。
与Subscriber.onStart()相对应,有个方法Observable.doOnSubscribe(),同Subscriber.onStart()同样是在subscribe()调用后而且在事件发送前执行,区别在于它可以指定线程,默认情况下,doOnSubscribe()执行在subscribe()发生的线程,而如果在doOnSubscribe()之后有subscribeOn()的话,它将执行离它最近的subscribeOn()所指定到的线程。
参考:
抛物线Rxjava详解