参考学习资料 1.0版本
http://gank.io/post/560e15be2dca930e00da1083 扔物线
https://github.com/lzyzsd/Awesome-RxJava 大头鬼
一、基础
Observables(被观察者,事件源)和Subscribers(观察者)Observables发出一系列事件,Subscribers处理这些事件
一个Observable可以发出零个或者多个事件,知道结束或者出错。每发出一个事件,就会调用它的Subscriber的onNext方法,最后调用Subscriber.onNext()或者Subscriber.onError()结束。
建议先看大头鬼的hello world 的例子 (建议码一遍)
http://blog.csdn.net/lzyzsd/article/details/41833541
Action1<String> onNextAction=new Action1<String>() {
@Override public void call(String s) {
System.out.println(s);
}};
应用场景
1.Observable和Subscriber可以做任何事情Observable可以是一个数据库查询,Subscriber用来显示查询结果;Observable可以是屏幕上的点击事件,Subscriber用来响应点击事件;Observable可以是一个网络请求,Subscriber用来显示请求结果。
left原理:都是基于一个lift(operator)
* Observable 在执行了left之后, 会返回一个新的Observable,这个Observable就像一个代理一样,负责接受原始的Observable发出的事件。并在处理后发送给Subscriber。更像是一种代理机制,通过事件拦截和处理实现时间序列的变换。
subscribeOn()和 observeOn区别:
* 1.subscribeOn()线程切换发生在Onsubscribe中,即在它通知上一级OnSubscribe时,这时事件还没有开始发送,因此subscribeOn()的线程控制,
* 可以从事件发出的开端就做成了影响。
* 2.observeOn()的线程切换发送在它内建Subscriber中,即发生在它即将给下一级Subscriber发送事件时,因此observeOn()控制的是它后面的线程。
*
Observable.doOnSubscribe() 和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行, 但是Observable.doOnSubscribe()可以指定线程,默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
2.0版本###
导包给我搞吐
包名开头 最旧 rx-->io 最新
2.0 最核心的是Publisher 和 Subscriber。 Publisher 发出一系列的时间,Subscriber负责和处理事件。
背压#####
在rxjava中有多重控制流以及背压(backpressure)策略用来应对当一个快速发送消息的被观察者遇到一个处理消息缓慢的观察者。
Flowable的三种Backpressure策略:
BackpressureStrategy.BUFFER####
onBackpressureBuffer是不丢弃数据的处理方式。把上游收到的全部缓存下来,等下游来请求再发给下游。相当于一个水库。但上游太快,水库(buffer)就会溢出。
BackpressureStrategy.DROP####
BackpressureStrategy.LATEST####
Drop 和Latest 类似,都会丢弃数据,下游通过request请求产生令牌给上游,上游接收到多少令牌,就发送多少,当令牌为0的时候,上游开始丢弃数据。区别在于,drop直接丢弃数据不缓存数据。而latest缓存最新的一条数据,当上游收到令牌,就把缓存的上一条“最新”数据发送给下游。
何时用Observable####
当上游在一段时间发送的数据量不大(以1000为界限)的时候优先选择使用Observable;
在处理GUI相关的事件,比如鼠标移动或触摸事件,这种情况下很少会出现backpressured的问题,用Observable就足以满足需求;
获取数据操作是同步的,但你的平台不支持Java流或者相关特性。使用Observable的开销低于Flowable。
何时用Flowable####
当上游在一段时间发送的数据量过大的时候(这个量我们往往无法预计),此时就要使用Flowable以限制它所产生的量的元素10K +处理。
当你从本地磁盘某个文件或者数据库读取数据时(这个数据量往往也很大),应当使用Flowable,这样下游可以根据需求自己控制一次读取多少数据;
以读取数据为主且有阻塞线程的可能时用Flowable,下游可以根据某种条件自己主动读取数据。
在RxJava2.0中,有五种观察者模式:####
Observable/Observer
Flowable/Subscriber
Single/SingleObserver
Completable/CompletableObserver
Maybe/MaybeObserver
Observable 写法:####
Observable-->Observer
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Flowable写法###
Flowable-->subscriber
Flowable.range(0, 10)
.subscribe(new Subscriber<Integer>() {
Subscription subscription;
//当订阅后,会首先调用这个方法,其实就相当于onStart(),
//传入的Subscription s参数可以用于请求数据或者取消订阅
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onsubscribe start");
## 这里需要特别注意。s.request()去请求资源,参数就是要请求的数量,一般如果不限制,写成Long.MAX_VALUE。如果不调用request, onNext()和onComplete方法将不会被调用。
subscription = s;
subscription.request(1);
Log.d(TAG, "onsubscribe end");
}
@Override
public void onNext(Integer o) {
## onNext 方法里面传入的参数就是Flowable 中发射出来的。
Log.d(TAG, "onNext--->" + o);
subscription.request(3);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
如果只关注onNext(),可以
Consumer consumer=new Consumer<String>(){
@Override
public void accept(String s) throws Exception{
System.out.println(s);
}
}
Actions
为了减少组件数量,2.x中没有定义Action3-Action9和ActionN。
保留的action接口按照Java 8 functional风格命名。 无参数的Action0 被操作符io.reactivex.functions.Action和Scheduler代替。
Action1被重命名为Consumer。Action2 被重命名为BiConsumer。 ActionN 被Consumer<Object[]> 代替。##
Functions
我们按照Java 8的命名风格定义了io.reactivex.functions.Function 和io.reactivex.functions.BiFunction, 把Func3 - Func9 分别改成了 Function3 - Function9 。FuncN被Function<Object[], R>代替。
此外,操作符不再使用Func1<T, Boolean>但原始返回类型为Predicate<T>。
io.reactivex.functions.Functions类提供了常见的转换功能Function<Object[], R>