RxJava有4个角色Observable、Observer、Subscriber和Suject,Observable和 Observer 通过subscribe方法实现订阅关系,Observable就可以在需要的时候通知Observer。
RxJava的使用
1,创建Observer(观察者)
它决定事件触发的时候有怎样的行为。
Subscriber subscriber = new Subscriber<String>(){
@Override
public void onCompleted(){//事件队列完结,当不再有新的onNext触发时调用作为完成标志
}
@Override
public void onError(){//在事件处理过程中出现异常时会触发同时队列终止
}
@Override
public void onNext(){//将要处理的事件添加到事件队列
}
@Override
public void onstart(){//在事件还未发送前调用可以做一些准备工作
}
}
Observer是一个接口,Subscriber是在Observer上进行了扩展,也可以使用Observer创建观察者
Observer<String> observer = new Observer<String>(){
@Override
public void onCompleted(){}
@Override
public void onError(){}
@Override
public void onNext(){}
}
2,创建Observable(被观察者)
它决定了什么时候触发事件以及触发怎样的事件
Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber){
subscriber.onNext("123")//调用方法将事件添加到队列
subscriber.onNext("456")
subscriber.onCompleted()
}
})
//简化写法利用just和from实现
Observable observable = Observable.just("123","456")//依次调用onNext方法和onCompleted方法。
String[] words = {"123","456"}
Observable observable = Observable.from(words)//依次调用onNext方法和onCompleted方法。
3,Subscribe(订阅)
将观察者和被观察者进行关联observable.subscribe(subscriber)
RxJava的Subject
Subject 既可以是一个 Observer 也可以是一个 Observerable,它是连接 Observer 和Observerable的桥梁。 因此,Subject可以被理解为Subject=Observable+Observer
1,PublishSubject:只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者,因此为了防止数据丢失可以在所有观察者都订阅完成后在发送数据。
2,BehaviorSubject:当Observer订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据。如果此时还没有收到 任何数据,它会发射一个默认值,然后继续发射其他任何来自原始Observable的数据。如果原始的 Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,但是会向Observer传递一个 异常通知。
3,ReplySubject:不管Observer何时订阅ReplaySubject,ReplaySubject均会发射所有来自原始Observable的数据给 Observer。不同类型的ReplaySubject用于限定Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。如果使用ReplaySubject作为Observer,注意不要在多个线程中调用onNext、onComplete 和onError方法。这可能会导致顺序错乱,并且违反了Observer规则。
4,AsyncSubject:当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据。如果原始的 Observable 因为发生了错误而终止,AsyncSubject 将不会发射任何数据,但是会向Observer传递一个异常通知。
RxJava操作符
包括defer、range、interval、start、repeat、timer
创建操作符
1,interval
创建一个按固定时间间隔发送整数序列的Observable,相当于定时器
Observable.interval(6,TimeUnit.SECONDS)//间隔6秒发送
.subscribe(new Action1<Long>(){
@Override
public void call(Long mLong){
//TODO
}
})
2,range
创建发射指定范围整数序列的Observable,可以用于代替for循环,第一个参数为起始值且不小于0,第二个参数为终值,左闭右开。
Observable.range(0,8)
.subscribe((new Action1<Interger>(){
@Override
public void call(Integer integer){
//TODO
}
})
3,repeat
创建一个N次重复发射特定数据的Observable
Observable.range(0,8)
.repeat(3)//重复执行三次0-7循环
.subscribe((new Action1<Integer>(){
@Override
public void call(Integer integer){
//TODO
}
})
变换操作符
包括map、flatMap、cast、concatMap、flatMapIterable、buffer、groupBy
1,map
通过指定一个Func对象,将Obserable转换为一个新的Observable对象并发射,观察者将接收到新的Observable处理。例如利用map来处理域名更换:
final String Host = "http://baidu.com/"
Observable.just("image").map(new Func1<String,String>(){
@Override
public String call(String s){
return Host+s
}
}).subscribe(new Action1<String>(){
@Override
public void call(String s){
//TODO
}
})
2,flatMap、cast
flatMap操作符将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据平坦的放入一个单独的Observable,cast操作符的作用是强制将Observable发射的所有数据转换为指定的类型。例如在多个请求接口前添加host:
final String Host="http://baidu.com"
Lsit<String> list = new ArrayList<>()
list.add("image1")
list.add("image2")
list.add("image3")
//利用flatMap将list转换为Observable集合并再放入一个单独的Observable中发射,交叉执行不保证发射顺序。
Observable.from(list).flatMap(new Func1<String,Observable<?>>(){
@Override
public Observable<?> call(String s){
return Observable.just(Host+s)
}
//cast将Observable的数据转换为String类型
}).cast(String.class).subscribe(new Action1<String>(){
@Override
public void call(String s){
//TODO
}
})
3,concatMap
concatMap与flatMap操作符一致,解决了flatMap的交叉问题,提供了一种能够把发射值连续在一起的函数,而不合并他们。
4,flatMapIterable
可以将数据包装成Iterable,我们在Iterable中对数据进行处理。
Observable.just(1,2,3).flatMapIterable(new Func1<Integer,Iterable<Integer>>(){
@Override
publc Iterable<Integer> call(Integer s){
List<Integer> mList = new ArrayList<Integer>()
mList.add(s+1)//对每个数都进行+1,输出为2,3,4
return mList
}
}).subscribe(new Action1<Integer>(){
@Override
public void call(Integer integer){
//TODO
}
}
5,buffer
将原Observable变换为一个新的Observable,新的Observable每次发射一组列表值而不是一个一个发射。和其类似的有window操作符,window操作符发射的是Observable而不是数据列表
Observable.just(1,2,3,4,5,6)
.buffer(3)//缓存容量为3,输出为两组每组3个数分别输出
.subscribe(new Action1<List<Integer>>(){
@Override
public void call(List<Interger> integers){
//TODO
}
})
6,groupBy
用于元素分组,将原Observable变换为一个发射Observable的新的分组后的Observable,每一个新的Observable都是发射一组指定数据。
Observable<GroupedObservable<String,Object>> groupObservable
= Observable.just(obj1,obj2,obj3)
.groupBy(new Func1<Object,String>){
@Override
public String call(Object obj){
return obj.value//用于确定分组的参数
}
}
//对分组后的数据输出
Observable.concat(groupObservable).subscribe(new Action1<Object>(){
@Override
public void call(Object obj){
//TODO
}
})
过滤操作符
包括filter、elementAt、distinct、skip、take、skipLast、takeLast、ignoreElements、throttleFirst、sample、debounce、throttleWithTimeout
组合操作符
包括startWith、merge、concat、zip、combineLastest、join、switch
辅助操作符
包括delay、DO、subscribeOn、observeOn、timeout、materialize、dematerialize、timeInterval、timestamp、to
错误处理操作符
包括catch、retry
布尔操作符
包括all、contains、isEmpty、exists、sequenceEqual
条件操作符
包括amb、defaultIfEmpty、skipUntil、skipWhile、takeUnit、takeWhile
转换操作符
包括toList、toSortedList、toMap、toMultiMap、getIterator、nest
RxJava线程控制
如果不设置线程,默认为在subscribe方法的线程上进行回调,设置线程需要用到Scheduler
Scheduler.immediate():在当前线程运行
Scheduler.newThread():总是启用新线程
Scheduler.io():I/O操作使用的Scheduler,和newThread类似,区别在于io内部实现的是一个无数量上限的线程池,可以重用空闲线程比newThread更有效率。
Scheduler.computation():计算使用的模式,使用固定线程池大小为cpu核数,不要进行I/O操作其等待时间会浪费cpu资源。
Scheduler.trampoline():在当前线程非立即执行使用,可以将任务加入队列然后按顺序运行。
RxAndroid提供的常用的Scheduler
AndroidSchedulers.mainThread():指定操作在主线程运行。
在RxJava中用subscribeOn和observeOn操作符来控制线程。
源码解析
1,RxJava订阅过程。
查看一段RxJava的基本使用代码。
Observable.create(new Observable.OnSubscribe<Integer>)...
.subscribe(new Subscriber<Integer>)...
查看create方法的定义
public static <T> Observable<T> create(OnSubscribe<T> f){
return new Observable<T>(hook.onCreate(f))
}
可以看出在create方法中创建了Observable对象并返回,hook表示的是RxJavaObservableExecutionHook。查看他的onCreate方法定义
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f){
return f
}
在RxJavaObservableExecutionHook的onCreate方法中只是返回了传入得被观察者对象。
查看Observable的构造方法
protected Observable(OnSubscribe<T> f){
this.onSubscribe = f//将前边构建的Observable对象赋值给了onSubscribe
}
之后调用subscribe方法完成订阅,查看Observable的subscrbie方法
static <T> Subscription subscribe(Subscriber<? super T> subscriber,Observable<T> observble)
...
subscriber.onStart()
if(!(subscriber instancefo SafeSubscriber)){//进行类型检查,如果不是进行封装
//SafeSubscriber继承自Subscriber,在 onCompleted和onError方法调用时不会再调用onNext,且保证onCompleted和onError方法只有一个执行
subscriber = new SafeSubscriber<T>(subscriber)
}
try{
hook.onSubscribeStrt(observable, observable.onSubscribe).call(subscriber)
return hook.onSubscribeReturn(subscriber)
}catch(Throwable e){
...
return Subscriptions.unsubscribed()
}
查看hook的onSubscribeStart方法可以发现是调用OnSubscribe.call(subscriber)来完成订阅的
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observable,final OnSubscribe<T> onSubscribe){
return onSubscribe
}
2,RxJava的变换过程
前边说过map操作符会将源Observable转换为一个新的Observable,查看map方法
public final <R> Observable<R> map(Func1<? super T,? extends R>func){
//OperatorMap实现了Operator接口的call方法,且在call方法中创建了MapSubscriber并返回
return lift(new OperatorMap<T,R>(func))
}
lift 方法返回一个新建的 Observable 对象ob2,并传入了一个 OnSubscribeLift对象记为on2,他的构造函数中需要两个参数为onSubscribe 和 operator(OperatorMap),在OnSubscribeLift构造方法中会拿到开始creat创建的Observable(前边提过的onSubscribe变量on1),在其call方法中调用hook.onLift(operator).call(o)方法即调用的是用OperatorMap的call方法返回MapSubscriber记为sub2,继续执行可理解为on1.call(sub2)完成订阅。在map方法后调用的subscribe方法传入Subscriber类型参数标记为sub1。
subscribe方法前面讲过,它会调用:hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber) 因为此前调用过map操作符,所以这里传入的observable.onSubscribe指的是on2。分析RxJava的订阅过程,onSubscribeStart方法会返回onSubscribe,也就是on2,相当于on2.call(sub1),on2指的是 OnSubscribeLift,在on1的call中会调用sub2的onNext查看MapSubscriber的onNext方法中调用了actual.onNext(result)方法,actual指的是sub1从而完成了变换过程。
3,RxJava的线程切换过程
线程切换主要用到了subscribeOn和observeOn两个方法,一个决定了被观察者执行线程,一个决定了观察者运行线程。
subscribeOn方法定义:
public final Observable<T> subscribeOn(Scheduler shceduler){
if(this instanceof ScalarSynchronousObservable){
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler)
}
return create(new OperatorSubscribeOn<T>(this,scheduler))
}
上面代码create 方法仍旧是生成一个新的 Observable,并传入一个OperatorSubscribeOn 类。 OperatorSubscribeOn 需要传入两个参数:第一个参数 this,指的是我们最先创建的Observable,第二个参数是一个Scheduler。在OperatorSubcribeOn的call方法中调用Scheduler的createWorker方法会创建Worker然后调用它的schedule方法,Worker是线程处理的代理执行者。
选取查看Schedulers.newThread()代码:
poublic static Scheduler newThread(){
return getInstance().newThreadSchedulere
}
Scheduler 是一个单例类,其返回自身的 newThreadScheduler 属性。这个属性最终指的是 NewThreadScheduler。
public final class NewThreadScheduler extends Scheduler{
private final ThreadFactory threadFactory
public NewThreadScheduler(ThreadFactory threadFactory){
this.threadFactory = threadFactory
}
@Override
public Worker createeWorker(){
reurn new NewThreadWorker(threadFactory)
}
}
此前在 OperatorSubscribeOn 中调用了 Scheduler 的 createWorker 方法,其实就是调用 NewThreadScheduler 的createWorker方法。NewThreadWorker中使用了ScheduledThreadPool线程池。OperatorSubscribeOn中调用了 Worker 的 schedule 方法,而 Worker 指的是NewThreadWorker
public Subscription schedule(final Action0 action,long delayTime,TimeUnit unit){
if(isUnsubscribed){
return Subscriptions.unsubscribbed()
}
return scheduleActual(action,delayTime,unit)
}
//scheduleActual方法
public ScheduledAction scheduleActual(final Action0 action,long delayTime,TimeUnit unit){
Action0 decoratedAction = schedulersHook.onSchedule(action)
ScheduledAction run = new ScheduledAction(decoratedAction)
Future<?>f
if(delayTime<=0){
f=executor.submit(run)
}else{
f = executor.schedule(run,delayTime,unit)
}
run.add(f)
return run
}
可以看到最终线程切换的处理均由线程池处理。
observeOn方法定义
public final Observable<T> observeOn(Scheduler scheduler,boolean delayError,int bufferSize){
if(this instanceof ScalarSynchronousObservable){
return ((ScalarSynchronousObservable<T>this).scalarScheduleOn(scheduler)
}
return lift(new OperatorObserveOn<T>(scheduler,delayError,bufferSize)
}
OperatorObserveOn和此前讲过的OperatorMap类似在其call方法中创建了ObserveOnSubscriber
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler,child,delayError,bufferSize)
parent.init()
return parent
ObserveOnSubscriber是Subscriber的子类,在其onNext,onCompleted,onError方法中都调用了schedule方法。
protected void schedule(){
if(counter.getAndIncrement() == 0){
recursiveScheduler.schedule(this)
}
}
recursiveScheduler是一个Worker,this指的是ObserveOnSubscriber,这意味着ObserveOnSubscriber的 onNext方法都被切换到recursiveScheduler的线程做处理,从而达到线程切换的目的。