去年九月份开始就想好好学习下RxJava,奈何工作原因 强度太大 没时间研究新东西 刚把项目忙完可以轻松两天 感觉上手 为了记录一下 写一系列文章来记录RxJava的学习之旅
具体为什么用RxJava也不说了,关于RxJava的栗子很多,众说纷纭,我看中的一点就是写代码简单,代码结构清晰,维护简单。这里代码接单绝不是少些几行代码,简单是指代码逻辑清晰 简洁 一目了然。
我学东西习惯是先会用 再研究为什么 所以先来学会怎么使用再说其他的吧
一、添加依赖
```gride
compile'io.reactivex:rxandroid:1.1.0'
compile'io.reactivex:rxjava:1.1.0'
```
二、创建观察者Observer
RxJava 使用的是观察者模式 举个非常简单的栗子,通常在写UI时会有view,每个view都有对应的onClickListener
这的OnClickListner就是观察者 而view就是被观察者 view在被点击的时候就会调用onClick 然后观察者OnClikListener就会收到view被点击的事件 做出对应的操作
```Java
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
log("onComplete()");
}
@Override
public void onError(Throwable e) {
log("onError");
}
@Override
public void onNext(String s) {
log("onNext" + s);
}
};
```
RxJava的观察者模式有三个基本的函数分别是
onComplete():这是一个无参无返回值的函数 会在所有事件结束后被调用具体的会在后面说
onError(Throwable e):这是执行过程中出现错误会调用的函数,比如在一系列逻辑操作中有IO操作 如果IO操作有异常出现 就可以调用onError通知执行出错,onError()和onComplete() 是互斥的 在执行任务开始到结束只会有一个会执行
onNext(T t);这是一个有参函数,这个函数的主要作用就是执行下一个事件
当然观察者不止一个这么简单的
```Java
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
log("onComplete()");
if (!isUnsubscribed()){
unsubscribe();
}
}
@Override
public void onError(Throwable e) {
log("onError()");
}
@Override
public void onNext(String s) {
log("onNext() ====" + s);
}
@Override
public void onStart() {
super.onStart();
}
};
```
这是Observer的扩展类,Subscriber实现了Observer这个观察者接口,本身是一个抽象类,在其中有两个常用的函数接口介绍下
1、onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
2、unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
三、创建被观察者对象Observable
```Java
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("learn RxJava");
subscriber.onNext("学习愉快");
subscriber.onCompleted();
}
});
```
Observable 直接可以通过creat方法创建被观察者对象
```Java
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
```
返回的就是一个观察者对象
被观察者对象有一个call函数 参数就是一个支持多态参数的观察者subscriber对象这里就可以使用subscriber对象调用onNext onComplete onError 注意 这个不是创建就开始执行任务 这里逻辑写好 代码没有开始执行的
上面的观察者中就顺序调用了onNext() 那么onNext()就会依次执行三次 相当于一个有序队列执行任务
```Java
Observable<String> observable1 = Observable.just("Hello","World","!");
```
为了简便写法 也可以写成上面这种 两种方式等价、
三、Subscribe(对观察者进行订阅)
```Java
observable.subscribe(observer);
```
使用上面的进行就可以订阅 这里订阅之后才开始执行call函数,任务开始
```Java
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
observer.onCompleted();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
});
}
```
剔除一下上面的代码简洁下
```Java
// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
```
在订阅的时候就调用了观察者subscriber的onStart() 所以上面有提到观察者的onStart() 可以做一些任务开始前的初始化操作 比如重置各种状态,清除数据缓存等。接着又调用了被观察者的call,这里可以返回去看看创建被观察者observable的时候在call中去调用了观察者subscriber的onNext()、onComplete()、onError() 等函数,所以做在订阅时任务才开始 而不是像ui的点击时间那样 不管有没有设置监听器 事件都会产生,只是注册的才能收到,这里的区别就在订阅后任务事件才会发生。
这是进行订阅的源码 实际这里返回了一个Subscription对象
```Java
public interface Subscription {
/**
* Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
* was received.
* <p>
* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
* onCompleted is called).
*/
void unsubscribe();
/**
* Indicates whether this {@code Subscription} is currently unsubscribed.
*
* @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
*/
boolean isUnsubscribed();
}
```
Subscription 的源码 看到这个知道这里为什么会返回Subscription对象了吧 对就是你看到的为了能够在任务执行完成后取消订阅 防止内存泄漏
四、支持不完整定义的回调
```Java
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
log("onNextAction ----" + s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log("onErrorAction");
}
};
Action0 onCompleteAction = new Action0() {
@Override
public void call() {
log("onCompleteAction");
}
};
observable.subscribe(onNextAction);
observable.subscribe(onNextAction,onErrorAction);
observable.subscribe(onNextAction,onErrorAction,onCompleteAction);
```
这样等价于上面的observable.subscribe(observer);
为什么onNext()函数和onError()用Action1 onComplete()用Action0呢
Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。
基本使用就到这里了 这篇主要是讲解怎么简单使用RxJava 后面继续深入,进行场景下的应用
这里参考了一些其他大佬的博客感谢这篇作者 ![大佬版详细介绍]http://gank.io/post/560e15be2dca930e00da1083?spm=5176.100239.blogcont63660.9.IMOS0q#toc_1
以上内容仅是小弟自学记录,有不对的地方请指教拍砖