原创博客地址
对于程序猿来说,Demo
是最好的起手。
而对于RxJava
来说,你可以简单理解成:
- 是一个观察者模式框架
- 替代
AsyncTask
成为更好的异步操作工具 - 即便逻辑再复杂,对于
RxJava
来说就是:简洁
首先上Demo
:
public static void main(String[] args) {
// 0.准备一些数据
Integer[] numbers = { 1, 2, 3, 4 };
List<Integer> lists = Arrays.asList(numbers);
// 1.创建一个被观察者
// 被观察者很明显从List集合获取数据,现在就等着有人来订阅~
Observable<Integer> observable = Observable.from(lists);
// 2.创建一个观察者
// SubScriber是Observer的实现类,所以也是一个观察者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onNext(Integer data) {
// 被观察者发送的数据都会送到这里
System.out.println("Rx -- onNext:" + data);
}
@Override
public void onCompleted() {
// 被观察者发送完数据会调用该方法
System.out.println("Rx -- Complete!");
}
@Override
public void onError(Throwable e) {
// 被观察者传输数据中发生异常会调用该方法
System.out.println("Rx -- Error!");
}
};
// 3.订阅
// 正常来说应该是:observer.subscribe(observable); 看起来更合乎逻辑
// 这样反而像是:被观察者 订阅了 观察者(报纸 订阅了 读者)
// 这涉及到流式编程,姑且先这样记住吧
observable.subscribe(observer);
}
运行结果:
- 在观察者订阅的顺间,被观察者就发送数据过来了
- 数据发送过来调用的方法:
onNext()
- 数据发送完成调用的方法:
onCompleted()
- 数据发送期间出现异常调用的方法:
onError()
不要看代码多了,但逻辑很简洁!只有逻辑上的简洁才是真正的简洁!
上面的Demo
看完一遍,大概知道有什么样的角色在扮演。
现在分析下每个角色:
观察者
作用:接收数据并进行处理
观察者毫无疑问就是Observer
,但它是接口。在实际操作中,一般都使用它的抽象实现类Subscriber
。两者使用方式完全一样。
public abstract class Subscriber<T> implements Observer<T>, Subscription
现在来看看观察者常用的创建方式:
第一种:new Observer()接口
Observer<Integer> observer = new Observer<Integer>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
}
};
第二种:new Subscriber()抽象类
Subscriber<Integer> subscriber = new Subscriber<Integer>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
}
};
Subscriber
中有一个方法:
/**
* This method is invoked when the Subscriber and Observable have been connected but the Observable has
* not yet begun to emit items or send notifications to the Subscriber. Override this method to add any
* useful initialization to your subscription, for instance to initiate backpressure.
*/
public void onStart() {
// do nothing by default
}
- 很明显是留给调用者自己重写的
- 英文好的可以自己看注释
- 这里大致说下意思:这个方法是在观察者和被观察者已连接,但是被观察者还没有向观察者发送数据时进行调用。
- 所以,这个方法就是用来做初始化用的。
除此之外,Subscriber
实现的Subscription
接口还有两个方法:
public interface Subscription {
void unsubscribe(); // 取消订阅
boolean isUnsubscribed(); // 是否已经取消订阅
}
- 取消订阅后,观察者将不会再接收事件
- 取消之前先判断一下
isUnsubscribed()
- 如果程序中没有调用取消订阅方法,被观察者会始终持有观察者的引用。造成内存泄漏。
被观察者
作用:作为数据的发送方,它决定什么时候发送,怎么发送
被观察者Observable
,Java
里也有。很多地方都喜欢用这个单词作为被观察者,这也是它的直译。但是就因为都一样,所以小心不要导错包了。
现在来看看被观察者常用的创建方式:
第一种:Observable.create()
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
}
});
-
create()
方法接收一个OnSubscribe
接口参数 -
OnSubscribe
是Observable
的内部接口
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
- 根据接口名,顾名思义。当观察者被订阅的时候,会调用这个
call()
方法
- 下面举个小例子:
public static void main(String[] args) {
// 观察者
Observer<Integer> observer = new Observer<Integer>(){
@Override
public void onCompleted() {
System.out.println("接收数据结束");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
System.out.println("接收数据:" + t);
}
};
// 被观察者
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
});
// 订阅
observable.subscribe(observer);
}
运行结果:
注意:
这个方法已经被废弃了,推荐使用
SyncOnSubscribe
或AsyncOnSubscribe
看名字应该知道是什么意思
第二种:Observable.from()
Integer[] nums = {1, 2, 3};
Observable observable = Observable.from(nums);
- 从一个数组或
Iterable
中依次发送数据元素
第三种:Observable.just()
Observable observable = Observable.just(1, 2, 3);
- 这个更直接。将参数依次发送过来。
订阅
observable.subscribe(observer);
其内部实现:
-
subscriber.onStart()
就是观察者中内置的用于初始化的方法 - 被观察者.call(subscriber)就是
- 最后把观察者当成订阅者返回。前面说过
public abstract class Subscriber<T> implements Observer<T>, Subscription
- 所以,你可以:
// 订阅
Subscription subscription = observable.subscribe(observer);
// 取消订阅
subscription.unsubscribe();
- 形成链式编程
关于Action
前面在被观察者的第一种创建方式Observable.create()
中,接收的参数是OnSubscribe
接口。它继承了Action1
。
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
- 当被观察者被订阅时,
OnSubscribe
的call()
方法才会被调用 - 这个
call()
就是Action1
的
public interface Action1<T> extends Action {
void call(T t);
}
至于这个Action
,你可以理解为就是一次单纯的行为,一个单纯的回调。
有很多的Actionx
public interface Action0 extends Action {
void call();
}
public interface Action1<T> extends Action {
void call(T t);
}
public interface Action2<T1, T2> extends Action {
void call(T1 t1, T2 t2);
}
public interface Action3<T1, T2, T3> extends Action {
void call(T1 t1, T2 t2, T3 t3);
}
- 0就代表call()方法没有参数
- 1就代表call()方法有1个参数
- 2就代表call()方法有2个参数
- 至于ActionN接口
public interface ActionN extends Action {
void call(Object... args);
}
Observable.subscribe(..)
的时候,里面除了Observer
和Subscriber
这两个观察者之外。还可以接受一个Action
。
Action1<Integer> action1 = new Action1<Integer>() {
@Override
public void call(Integer num) {
System.out.println("接收到数据:" + num);
}
};
observable.subscribe(action1);
常用方法
map
People[] peoples = new People[]{
new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
};
// 观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
System.out.println("接收信息:" + name);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
// 被观察者
Observable.from(peoples).map(new Func1<People, String>() {
@Override
public String call(People people) {
return people.getName();
}
}).subscribe(subscriber);
- 可以看到被观察者从
People
数组里读取每一个元素 - 在
map
方法里找到每一个元素对象的name
并传递给观察者 - 观察者接收并使用
- 这里转换范围很大,不仅仅只是提取属性。
运行结果:
flatMap
People[] peoples = new People[]{
new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
};
// 观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String hobby) {
System.out.println("接收信息:" + hobby);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
// 被观察者
Observable.from(peoples).flatMap(new Func1<People, Observable<String>>() {
@Override
public Observable<String> call(People people) {
return Observable.from(people.getHobby());
}
}).subscribe(subscriber);
- 效果和
map
是类似的 - 区别在于
map
是用于一对一,而flatMap
是用于一对多 - 被观察者从
People
数组读取每一个对象,call()
里读取每一个对象的hobby
属性,并依次返回其中的一个元素
运行结果:
filter
People[] peoples = new People[]{
new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
};
// 观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
System.out.println("接收信息:" + name);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
// 被观察者
Observable.from(peoples).filter(new Func1<People, Boolean>() {
@Override
public Boolean call(People t) {
return t.getAge() > 18;
}
}).map(new Func1<People, String>() {
@Override
public String call(People people) {
return people.getName();
}
}).subscribe(subscriber);
运行结果:
线程
RxJava
遵循的线程原则:在那个线程订阅,则被观察者和观察者的操作都在该线程。
通过Schedulers
切换线程。
-
Schedulers.immediate()
:默认值。在当前线程运行。 -
AndroidSchedulers.mainThread()
:在Android主线程运行。- 注意:这个是
RxAndroid
里的。必须要导入RxAndroid
的jar
包。RxJava
里是没有的。
- 注意:这个是
-
Schedulers.newThread()
:总是开启新线程运行。 -
Schedulers.io()
:如果操作涉及到I/O使用该项。- 也是总是开启新线程运行
- 内部有线程池和复用
-
Schedulers.computation()
:如果操作涉及到图形计算等使用该项。
还是之前例子,但是增加两行代码:
People[] peoples = new People[]{
new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
};
// 观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
System.out.println("接收信息:" + name);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
// 被观察者
Observable.from(peoples).filter(new Func1<People, Boolean>() {
@Override
public Boolean call(People t) {
return t.getAge() > 18;
}
}).map(new Func1<People, String>() {
@Override
public String call(People people) {
return people.getName();
}
}).subscribeOn(Schedulers.immediate()) // 当前线程
.observeOn(Schedulers.io()) // io线程
.subscribe(subscriber);
-
被观察者在新开起的IO线程做
读取/过滤/转换
操作 - 数据传给观察者
- 观察者在当前线程显示数据
运行结果:
总结
- RxJava确实是一个非常强大的流式编程工具
- 再复杂的逻辑,RxJava都能很简洁的表示
- 一句代码完成线程切换,很方便
- 用多了才知道它的美~