十一、RxJava简析

RxJava有4个角色Observable、Observer、Subscriber和Suject,Observable和 Observer 通过subscribe方法实现订阅关系,Observable就可以在需要的时候通知Observer。

RxJava的使用

1,创建Observer(观察者)

它决定事件触发的时候有怎样的行为。

Subscriber subscriber = new Subscriber<String>(){
    @Override
    public void onCompleted(){//事件队列完结,当不再有新的onNext触发时调用作为完成标志
    }
    @Override
    public void onError(){//在事件处理过程中出现异常时会触发同时队列终止
    }
    @Override
    public void onNext(){//将要处理的事件添加到事件队列
    }
    @Override
    public void onstart(){//在事件还未发送前调用可以做一些准备工作
    }
}

Observer是一个接口,Subscriber是在Observer上进行了扩展,也可以使用Observer创建观察者

Observer<String> observer = new Observer<String>(){
    @Override
    public void onCompleted(){}
    @Override
    public void onError(){}
    @Override
    public void onNext(){}
}

2,创建Observable(被观察者)

它决定了什么时候触发事件以及触发怎样的事件

Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
    @Override
    public void call(Subscriber<? super String> subscriber){
    subscriber.onNext("123")//调用方法将事件添加到队列
    subscriber.onNext("456")
    subscriber.onCompleted()
    }
})

//简化写法利用just和from实现

Observable observable = Observable.just("123","456")//依次调用onNext方法和onCompleted方法。
String[] words = {"123","456"}
Observable observable = Observable.from(words)//依次调用onNext方法和onCompleted方法。

3,Subscribe(订阅)

将观察者和被观察者进行关联observable.subscribe(subscriber)

RxJava的Subject

Subject 既可以是一个 Observer 也可以是一个 Observerable,它是连接 Observer 和Observerable的桥梁。 因此,Subject可以被理解为Subject=Observable+Observer
1,PublishSubject:只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者,因此为了防止数据丢失可以在所有观察者都订阅完成后在发送数据。
2,BehaviorSubject:当Observer订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据。如果此时还没有收到 任何数据,它会发射一个默认值,然后继续发射其他任何来自原始Observable的数据。如果原始的 Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,但是会向Observer传递一个 异常通知。
3,ReplySubject:不管Observer何时订阅ReplaySubject,ReplaySubject均会发射所有来自原始Observable的数据给 Observer。不同类型的ReplaySubject用于限定Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。如果使用ReplaySubject作为Observer,注意不要在多个线程中调用onNext、onComplete 和onError方法。这可能会导致顺序错乱,并且违反了Observer规则。
4,AsyncSubject:当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据。如果原始的 Observable 因为发生了错误而终止,AsyncSubject 将不会发射任何数据,但是会向Observer传递一个异常通知。

RxJava操作符

包括defer、range、interval、start、repeat、timer

创建操作符

1,interval
创建一个按固定时间间隔发送整数序列的Observable,相当于定时器

Observable.interval(6,TimeUnit.SECONDS)//间隔6秒发送
                  .subscribe(new Action1<Long>(){
                  @Override
                  public void call(Long mLong){
                      //TODO
                      } 
                  })

2,range
创建发射指定范围整数序列的Observable,可以用于代替for循环,第一个参数为起始值且不小于0,第二个参数为终值,左闭右开。

Observable.range(0,8)
                  .subscribe((new Action1<Interger>(){
                  @Override
                  public void call(Integer integer){
                      //TODO
                      } 
                  })

3,repeat
创建一个N次重复发射特定数据的Observable

Observable.range(0,8)
                   .repeat(3)//重复执行三次0-7循环
                  .subscribe((new Action1<Integer>(){
                  @Override
                  public void call(Integer integer){
                      //TODO
                      } 
                  })

变换操作符

包括map、flatMap、cast、concatMap、flatMapIterable、buffer、groupBy
1,map
通过指定一个Func对象,将Obserable转换为一个新的Observable对象并发射,观察者将接收到新的Observable处理。例如利用map来处理域名更换:

final String Host = "http://baidu.com/"
Observable.just("image").map(new Func1<String,String>(){
    @Override
    public String call(String s){
      return Host+s
    }
}).subscribe(new Action1<String>(){
                  @Override
                  public void call(String s){
                      //TODO
                      } 
                  })

2,flatMap、cast
flatMap操作符将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据平坦的放入一个单独的Observable,cast操作符的作用是强制将Observable发射的所有数据转换为指定的类型。例如在多个请求接口前添加host:

final String Host="http://baidu.com"
Lsit<String> list = new ArrayList<>()
list.add("image1")
list.add("image2")
list.add("image3")
//利用flatMap将list转换为Observable集合并再放入一个单独的Observable中发射,交叉执行不保证发射顺序。
Observable.from(list).flatMap(new Func1<String,Observable<?>>(){
    @Override
    public Observable<?> call(String s){
    return Observable.just(Host+s)
    }
//cast将Observable的数据转换为String类型
}).cast(String.class).subscribe(new Action1<String>(){
                  @Override
                  public void call(String s){
                      //TODO
                      } 
                  })

3,concatMap
concatMap与flatMap操作符一致,解决了flatMap的交叉问题,提供了一种能够把发射值连续在一起的函数,而不合并他们。
4,flatMapIterable
可以将数据包装成Iterable,我们在Iterable中对数据进行处理。

Observable.just(1,2,3).flatMapIterable(new Func1<Integer,Iterable<Integer>>(){
    @Override
    publc Iterable<Integer> call(Integer s){
    List<Integer> mList = new ArrayList<Integer>()
    mList.add(s+1)//对每个数都进行+1,输出为2,3,4
    return mList
    }
}).subscribe(new Action1<Integer>(){
                  @Override
                  public void call(Integer integer){
                      //TODO
                      } 
                  }

5,buffer
将原Observable变换为一个新的Observable,新的Observable每次发射一组列表值而不是一个一个发射。和其类似的有window操作符,window操作符发射的是Observable而不是数据列表

Observable.just(1,2,3,4,5,6)
                  .buffer(3)//缓存容量为3,输出为两组每组3个数分别输出
                  .subscribe(new Action1<List<Integer>>(){
                  @Override
                  public void call(List<Interger> integers){
                      //TODO
                      } 
                  })

6,groupBy
用于元素分组,将原Observable变换为一个发射Observable的新的分组后的Observable,每一个新的Observable都是发射一组指定数据。

Observable<GroupedObservable<String,Object>> groupObservable 
= Observable.just(obj1,obj2,obj3)
                     .groupBy(new Func1<Object,String>){
                     @Override
                     public String call(Object obj){
                      return obj.value//用于确定分组的参数
                     }
                     }
//对分组后的数据输出
Observable.concat(groupObservable).subscribe(new Action1<Object>(){
                  @Override
                  public void call(Object obj){
                      //TODO
                      } 
                  })

过滤操作符

包括filter、elementAt、distinct、skip、take、skipLast、takeLast、ignoreElements、throttleFirst、sample、debounce、throttleWithTimeout

组合操作符

包括startWith、merge、concat、zip、combineLastest、join、switch

辅助操作符

包括delay、DO、subscribeOn、observeOn、timeout、materialize、dematerialize、timeInterval、timestamp、to

错误处理操作符

包括catch、retry

布尔操作符

包括all、contains、isEmpty、exists、sequenceEqual

条件操作符

包括amb、defaultIfEmpty、skipUntil、skipWhile、takeUnit、takeWhile

转换操作符

包括toList、toSortedList、toMap、toMultiMap、getIterator、nest

RxJava线程控制

如果不设置线程,默认为在subscribe方法的线程上进行回调,设置线程需要用到Scheduler
Scheduler.immediate():在当前线程运行
Scheduler.newThread():总是启用新线程
Scheduler.io():I/O操作使用的Scheduler,和newThread类似,区别在于io内部实现的是一个无数量上限的线程池,可以重用空闲线程比newThread更有效率。
Scheduler.computation():计算使用的模式,使用固定线程池大小为cpu核数,不要进行I/O操作其等待时间会浪费cpu资源。
Scheduler.trampoline():在当前线程非立即执行使用,可以将任务加入队列然后按顺序运行。
RxAndroid提供的常用的Scheduler
AndroidSchedulers.mainThread():指定操作在主线程运行。
在RxJava中用subscribeOn和observeOn操作符来控制线程。

源码解析

1,RxJava订阅过程。
查看一段RxJava的基本使用代码。
Observable.create(new Observable.OnSubscribe<Integer>)...
.subscribe(new Subscriber<Integer>)...
查看create方法的定义

public static <T> Observable<T> create(OnSubscribe<T> f){
    return new Observable<T>(hook.onCreate(f))
}

可以看出在create方法中创建了Observable对象并返回,hook表示的是RxJavaObservableExecutionHook。查看他的onCreate方法定义

public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f){
  return f
}

在RxJavaObservableExecutionHook的onCreate方法中只是返回了传入得被观察者对象。
查看Observable的构造方法

protected Observable(OnSubscribe<T> f){
  this.onSubscribe = f//将前边构建的Observable对象赋值给了onSubscribe
}

之后调用subscribe方法完成订阅,查看Observable的subscrbie方法

static <T> Subscription subscribe(Subscriber<? super T> subscriber,Observable<T> observble)
...
subscriber.onStart()
if(!(subscriber instancefo SafeSubscriber)){//进行类型检查,如果不是进行封装
//SafeSubscriber继承自Subscriber,在 onCompleted和onError方法调用时不会再调用onNext,且保证onCompleted和onError方法只有一个执行
    subscriber = new SafeSubscriber<T>(subscriber)
}
try{
    hook.onSubscribeStrt(observable, observable.onSubscribe).call(subscriber)
    return hook.onSubscribeReturn(subscriber)
}catch(Throwable e){
    ...
    return Subscriptions.unsubscribed()
}

查看hook的onSubscribeStart方法可以发现是调用OnSubscribe.call(subscriber)来完成订阅的

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observable,final OnSubscribe<T> onSubscribe){
    return onSubscribe
}

2,RxJava的变换过程
前边说过map操作符会将源Observable转换为一个新的Observable,查看map方法

public final <R> Observable<R> map(Func1<? super T,? extends R>func){
//OperatorMap实现了Operator接口的call方法,且在call方法中创建了MapSubscriber并返回
    return lift(new OperatorMap<T,R>(func))
}

lift 方法返回一个新建的 Observable 对象ob2,并传入了一个 OnSubscribeLift对象记为on2,他的构造函数中需要两个参数为onSubscribe 和 operator(OperatorMap),在OnSubscribeLift构造方法中会拿到开始creat创建的Observable(前边提过的onSubscribe变量on1),在其call方法中调用hook.onLift(operator).call(o)方法即调用的是用OperatorMap的call方法返回MapSubscriber记为sub2,继续执行可理解为on1.call(sub2)完成订阅。在map方法后调用的subscribe方法传入Subscriber类型参数标记为sub1。
subscribe方法前面讲过,它会调用:hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber) 因为此前调用过map操作符,所以这里传入的observable.onSubscribe指的是on2。分析RxJava的订阅过程,onSubscribeStart方法会返回onSubscribe,也就是on2,相当于on2.call(sub1),on2指的是 OnSubscribeLift,在on1的call中会调用sub2的onNext查看MapSubscriber的onNext方法中调用了actual.onNext(result)方法,actual指的是sub1从而完成了变换过程。


map转换图

3,RxJava的线程切换过程
线程切换主要用到了subscribeOn和observeOn两个方法,一个决定了被观察者执行线程,一个决定了观察者运行线程。

subscribeOn方法定义:

public final Observable<T> subscribeOn(Scheduler shceduler){
  if(this instanceof ScalarSynchronousObservable){
    return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler)
  }
  return create(new OperatorSubscribeOn<T>(this,scheduler))
}

上面代码create 方法仍旧是生成一个新的 Observable,并传入一个OperatorSubscribeOn 类。 OperatorSubscribeOn 需要传入两个参数:第一个参数 this,指的是我们最先创建的Observable,第二个参数是一个Scheduler。在OperatorSubcribeOn的call方法中调用Scheduler的createWorker方法会创建Worker然后调用它的schedule方法,Worker是线程处理的代理执行者。
选取查看Schedulers.newThread()代码:

poublic static Scheduler newThread(){
  return getInstance().newThreadSchedulere
}

Scheduler 是一个单例类,其返回自身的 newThreadScheduler 属性。这个属性最终指的是 NewThreadScheduler。

public final class NewThreadScheduler extends Scheduler{
  private final ThreadFactory threadFactory
  public NewThreadScheduler(ThreadFactory threadFactory){
    this.threadFactory = threadFactory
  }
  @Override
   public Worker createeWorker(){
      reurn new NewThreadWorker(threadFactory)
   } 
}

此前在 OperatorSubscribeOn 中调用了 Scheduler 的 createWorker 方法,其实就是调用 NewThreadScheduler 的createWorker方法。NewThreadWorker中使用了ScheduledThreadPool线程池。OperatorSubscribeOn中调用了 Worker 的 schedule 方法,而 Worker 指的是NewThreadWorker

public Subscription schedule(final Action0 action,long delayTime,TimeUnit unit){
  if(isUnsubscribed){
    return Subscriptions.unsubscribbed()
  }
  return scheduleActual(action,delayTime,unit)
}
//scheduleActual方法
public ScheduledAction scheduleActual(final Action0 action,long delayTime,TimeUnit unit){
  Action0 decoratedAction = schedulersHook.onSchedule(action)
  ScheduledAction run = new ScheduledAction(decoratedAction)
  Future<?>f
  if(delayTime<=0){
     f=executor.submit(run)
  }else{
    f = executor.schedule(run,delayTime,unit)
  }
  run.add(f)
  return run
}

可以看到最终线程切换的处理均由线程池处理。

observeOn方法定义

public final Observable<T> observeOn(Scheduler scheduler,boolean delayError,int bufferSize){
  if(this instanceof ScalarSynchronousObservable){  
    return ((ScalarSynchronousObservable<T>this).scalarScheduleOn(scheduler)
  }
  return lift(new OperatorObserveOn<T>(scheduler,delayError,bufferSize)
}

OperatorObserveOn和此前讲过的OperatorMap类似在其call方法中创建了ObserveOnSubscriber

ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler,child,delayError,bufferSize)
parent.init()
return parent

ObserveOnSubscriber是Subscriber的子类,在其onNext,onCompleted,onError方法中都调用了schedule方法。

protected void schedule(){
    if(counter.getAndIncrement() == 0){
        recursiveScheduler.schedule(this)
    }
}

recursiveScheduler是一个Worker,this指的是ObserveOnSubscriber,这意味着ObserveOnSubscriber的 onNext方法都被切换到recursiveScheduler的线程做处理,从而达到线程切换的目的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容