- 核心:异步 观察者模式
- 作用:
RxJava
的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性,在深入的使用过程中一定对这点深有体会。 - 引入:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
- 关键字
Observable
:在观察者模式中称为“被观察者”;
Observer
:观察者模式中的“观察者”,可接收Observable发送的数据;
subscribe
:订阅,观察者与被观察者,通过subscribe()方法进行订阅;
Subscriber
:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分内容是2.0新增的,Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable.
Flowable
:是一个被观察者,
ObservableEmitter
:Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
Disposable
:这个单词的字面意思是一次性用品,用完即可丢弃的.调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件.
Scheduler
:相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景,实现发送消息和接受消息在不同线程中进行的目的。 - RxJava中的观察者模式
观察者模式的概念很好理解,具体可以解释为:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。
在程序的观察者模式,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。
RxJava的使用
- Observable的创建:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//执行一些其他操作
//.............
//执行完毕,触发回调,通知观察者
e.onNext("发射数据");
}
});
- Observer的创建:
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
//观察者接收到通知
public void onNext(String aLong) {
System.out.println("收到数据");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
- 建立连接
observable.subscribe(observer);
Observable的创建除了常用的create方法外还有
just()方式
,fromIterable()方式
,defer()方式
,interval( )方式
,range( )方式
,timer( )方式
,repeat( )方式
,参考这篇文章http://www.jianshu.com/p/d149043d103aScheduler线程控制器的使用:正常逻辑下我们在那个线程中发送消息就会在那个线程中接受消息,而我们在开发中更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI,要达到这个目的, 我们需要先改变上游发送事件的线程, 让它去子线程中发送事件, 然后再改变下游的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点. 接下来看一段代码:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
可以看到打印结果为
D/TAG: Observable thread is : RxNewThreadScheduler-2
D/TAG: emit 1
D/TAG: Observer thread is :main
D/TAG: onNext: 1
可以看到, 上游发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-2
的线程中发送的事件, 而下游仍然在主线程中接收事件, 这说明我们的目的达成了, 接下来看看是如何做到的.
实现这一过程的是这两行代码:
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
- 简单的来说,
subscribeOn()
指定的是上游发送事件的线程,observeOn()
指定的是下游接收事件的线程.多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn()
只有第一次的有效, 其余的会被忽略.多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
RxJava中的操作符
MAP
map是RxJava中最简单的一个变换操作符了, 它的作用就是对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化.例如将发送的int数据统一变为String,代码如下
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
输出结果为
D/TAG: This is result 1
D/TAG: This is result 2
D/TAG: This is result 3
FlatMap
FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.
ZIP用法
通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
运行结果为
D/TAG: onSubscribe
D/TAG: emit A
D/TAG: emit 1
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: emit 2
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: emit 3
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete
ZIP应用场景
比如一个界面需要展示用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了之后才能进行展示, 这个时候就可以用Zip了:
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});
Subscriber和Flowable的使用
-
Observable
和Observer
是一对被观察者和观察者,两者通过subscribe
连接在一起实现我们处理异步的过程。那么Subscriber
和Flowable
又有什么用呢?这一对和上面介绍的那一对又有什么区别呢?- 为什么要使用
Subscriber
和Flowable
:Observable
和Observer
在处理同步订阅也就是在一个线程中的时候上游发送一个事件下游接受一个事件是没有什么问题的,而当处理异步订阅也就是上游和下游不在同一个线程中时上游发送数据不需要等待下游接收, 因为两个线程并不能直接进行通信, 因此上游发送的事件并不能直接到下游里去, 上游把所有的事件发送到一个可以存储的池里面去(相当于Handler中的消息队列), 下游从队列里取出事件来处理, 因此, 当上游发事件的速度太快, 下游取事件的速度太慢, 队列就会迅速装满, 然后溢出来, 最后就OOM了.当然我们也可以通过控制上游消息的发送速度和发送数量来控制。不过Subscriber
和Flowable
的出现就完美的解决了OOM这个问题. - 使用
Subscriber
和Flowable
基本的用法:
- 为什么要使用
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE); //注意这句代码
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
upstream.subscribe(downstream);
- 首先是创建Flowable的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里我们直接用
BackpressureStrategy.ERROR
这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException
. 其余的策略后面再来讲解. - 另外的一个区别是在下游的onSubscribe方法中传给我们的不再是
Disposable
了, 而是Subscription, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()
方法可以切断水管, 同样的调用Subscription.cancel()
也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)
方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:
s.request(Long.MAX_VALUE);
- 因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题,我们把
request
当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM.
[RxJava 与 Retrofit 结合请求数据]
(http://gank.io/post/56e80c2c677659311bed9841)