RxJava学习笔记

参阅
给 Android 开发者的 RxJava 详解
什么是函数式编程
RxJava 2.0 全新来袭
基于RxJava 1.x,结合RxJava 2.0整理学习笔记。

概念

1.实现了异步操作的库;
2.通过扩展观察者模式来实现异步;

Observable发送消息,而Subscriber则用于消费消息。
与观察者不同的是,Observable一般只有等到有Subscriber通过subscribe方法订阅它,才会开始发送消息。

基础类/方法

  • ** Observer(观察者)**,接口。
    它决定事件触发的时候将有怎样的行为。
    定义了4个行为/方法: onSubscribe(), onNext(), onError(), onComplete(),
/**
 * 创建一个观察者
 */
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(String s) {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
};
  • Subscriber(订阅者),接口。等价于观察者。与观察者不同之处在于:onSubscribe方法的参数不同,而且两者位于不同的jar包下。
    Subscriber位于reactive-streams.jar文件下,包名:org.reactivestreams.Subscriber
    Observable位于rxjava.jar文件下,包名:io.reactivex.Observable
/**
 * 创建一个订阅者
 */
Subscriber<String> subscriber = new Subscriber<String>() {

    @Override
    public void onSubscribe(Subscription s) {

    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable t) {

    }

    @Override
    public void onComplete() {

    }
};
  • Observable (被观察者),抽象类
    它决定什么时候触发事件以及触发怎样的事件
  /**
 * 创建一个Observable对象,并定义事件处理规则。当它被订阅的时候,事件会按顺序依次触发。
 */
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Hello");
        emitter.onNext("Hi");
        emitter.onNext("Aloha");
        emitter.onComplete();
    }
});

create()方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列。
例如:

  • just(T ...)


    just方法
  • fromArray(T... items)

    Observable observable1 = Observable.just("Hello", "Hi", "Aloha");
    Observable observable2 = Observable.fromArray("Hello", "Hi", "Aloha");
    
  • Flowable(被观察者),抽象类。等价于Observable。RxJava 2.x引入。

RxJava1.x中,Observeable用于订阅Observer和Subscriber。

RxJava2.x中, Observeable用于订阅Observer ,是不支持背压的,而 Flowable用于订阅Subscriber ,是支持背压(Backpressure)的。

背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略 ,在1.0中,关于背压最大的遗憾,就是集中在Observable这个类中,导致有的Observable支持背压,有的不支持。为了解决这种缺憾,新版本把支持背压和不支持背压的Observable区分开来。

  • subscribe(订阅),方法。
订阅方法
observable.subscribe(observer);
//RxJava 2.x中如下方法编译报错,没有提供与Subscriber对象关联的方法
//observable1.subscribe(subscriber);

可以从上图的订阅方法中发现Consumer类。

  • Consumer(消费者), 一个接口。用于接受单个值。
Consumer onNextConsumer = new Consumer<String>() {
    @Override
    public void accept(@NonNull String o) throws Exception {

    }
};

Consumer onErrorConsumer = new Consumer<String>() {
    @Override
    public void accept(@NonNull String o) throws Exception {

    }
};

observable.subscribe(onNextConsumer, onErrorConsumer);

显然,subscribe方法支持不完整定义的回调,可以根据需求单独处理只需要的回调,而无需每次都处理Observer中的4个回调。Consumer可以定义Observer的每一个部分,Observable.subscribe()函数能够处理一个,两个、三个或者4个参数,分别表示onNext(),onError(),onComplete()和onSubscribe函数。响应顺序是onSubscribe->onNext->onComplete或者onError。

范例

  1. 将字符串数组 names 中的所有字符串依次打印出来:
String[] names = {"Jason", "Bob", "Coco"};
Observable.fromArray(names).subscribe(new Consumer<String>() {
    @Override
    public void accept(@NonNull String s) throws Exception {
        System.out.println("name:" + s);
    }
});
  1. 由 id 取得图片并显示
Observable.create(new ObservableOnSubscribe<Drawable>() {
    @Override
    public void subscribe(ObservableEmitter<Drawable> e) throws Exception {
        //根据id获取Drawable对象,回调到观察者中。
        Drawable drawable = getResources().getDrawable(R.drawable.ic_action_name);
        e.onNext(drawable);
        e.onComplete();
    }
}).subscribe(new Consumer<Drawable>() {
    @Override
    public void accept(@NonNull Drawable drawable) throws Exception {
        ImageView imageView = (ImageView) findViewById(R.id.image);
        imageView.setImageDrawable(drawable);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(@NonNull Throwable throwable) throws Exception {
        System.out.println(throwable.getMessage());
    }
});

在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。

线程控制

  • Scheduler(调度器),抽象类
  • Scheduler的子类有ComputationScheduler、ExecutorScheduler、ImmediateThinScheduler、NewThreadScheduler、SingleScheduler、TrampolineScheduler。
  • Schedulers 一个可以返回标准Scheduler实例的静态工厂。


    方法
    • Schedulers.newThread(): 为每个工作单元创建一个新的线程。
    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。
      与newThread()的区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。
      不要把计算工作放在 io() 中,可以避免创建不必要的线程,计算工作可以使用computation()方法。
    • Schedulers.computation(): 用于计算型工作例如事件循环和回调处理。
      这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。
      这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。
      不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • Schedulers.single(): 单线程,共享的Scheduler。
  • Schedulers.trampoline():在当前线程上工作,但不立即执行的Scheduler。
    在当前线程中的工作放入队列中排队,并依次操作。
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。前者
用于指定被观察者执行的线程,或者叫事件产生的线程。后者用于指定观察者执行的线程,或者叫事件消费的线程。

Observable.create(new ObservableOnSubscribe<Drawable>() {
    @Override
    public void subscribe(ObservableEmitter<Drawable> e) throws Exception {
        //根据id获取Drawable对象,回调到观察者中。
        Drawable drawable = getResources().getDrawable(R.drawable.ic_action_name);
        e.onNext(drawable);
        e.onComplete();
    }
})
.subscribeOn(Schedulers.io())//用于指定被观察者执行的线程
.observeOn(AndroidSchedulers.mainThread())//用于执行观察者执行的线程
.subscribe(new Consumer<Drawable>() {
    @Override
    public void accept(@NonNull Drawable drawable) throws Exception {
        ImageView imageView = (ImageView) findViewById(R.id.image);
        imageView.setImageDrawable(drawable);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(@NonNull Throwable throwable) throws Exception {
        System.out.println(throwable.getMessage());
    }
});

变换

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

篇幅较长,请参阅给 Android 开发者的 RxJava 详解

简单的说就是在发送者Observable和消息消费者Subscriber之间对消息进行各种你所需要的加工处理。
RxJava(一)基础知识
RxJava(二) Operator

关键方法

  • map()
  • flatMap()

其他观察者模式

当然,除了上面这两种观察者,还有一类观察者
Single/SingleObserver
Completable/CompletableObserver
Maybe/MaybeObserver

更多请参阅RxJava 2.0 全新来袭

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容