RxJava基础使用

RxJava是什么

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.这是官方对于该库的描述,意思是RxJava是Java VM上一个灵活的,使用可观测序列组成的异步的、基于事件的库。 它的核心最主要在于它的“异步”, Observable(被观察者)与 Observer/Subscriber(观察者),Observable可发出一系列事件,事件执行完毕后回调被观察者,这里的事件可以有非常多种形式,例如:网络请求、文件操作、数据加载、循环、延时操作等等。

相关概念

同步/异步: 关注的是消息通信机制,同步是指 发出一个调用,在没有得到结果之前,该调用就不返回,但是一旦调用返回,就得到返回值了; 而异步是指 调用发出后,调用直接返回,但不会立刻得到调用的结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用; 异步强调被动通知。

阻塞/非阻塞:关注的是程序在等待调用结果(消息,返回值)时的状态;阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。如java8 stream为阻塞式,Future为非阻塞式的; 非阻塞强调状态主动轮询。

并发(Concurrency)与并行(parallelism): 并发是一个更通用的概念,两个并发线程被调度在一个单核处理上运行,这就是并发性,而非并行. 并行性更可能出现在多核,多CPU或分布式系统上。编写代码时一般不区分两者。

函数式编程Functional Programming):函数式编程将程序描述为表达式和变换,以数学方程的形式建立模型,并尽量避免可变的状态。每个逻辑分类(filter,map,reduce等)都由不同函数所表示,这些实现底层次的变换,而用户定义的高阶函数作为参数来实现真正的业务。

函数响应式编程(Functional Reactive Programming)):响应式编程是建立在观者者模式上的一种编程范式,对异步数据流进行编程,同时该事件流是按时间排序的序列(不同于java8中的stream);虽然响应式编程框架不一定要求是函数式的,但是RxJava等响应式编程框架都是结合了函数式编程的。

背压(Backpressure):背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略,简而言之,背压是流速控制的一种策略。

ReactiveX是什么

响应式扩展(Reactive Extensions, 简写为ReactiveX,Rx),最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源[1];

ReactiveX = Observer Pattern + Iterator Pattern + Functional Programming。Rx 让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序;

ReactiveX是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,ReactiveX的思想是跨平台的,学习其他语言的基本语法之后,可以做到 “learn once, write everywhere”;

Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。

RxJava 是 ReactiveX 规范的JVM平台实现

组织结构

RxJava核心由以下四个interface定义:

Publisher : 消息发布者
Subscriber : 消息订阅者
Subscription : 一个订阅
Processor : Publisher + Subscriber 的结合体

它有什么作用

1.使你的代码更加简洁,增强可读性
2.对事件的处理具有非常好的灵活性
3.非常方便的实现响应式编程
4.对线程与并发问题的处理变得简单
5.Flow Control(Backpressure、Throttling等)

基本使用

引入RxJava依赖,这里使用的2.x版本

repositories {
    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

dependencies {
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}

最简单的开始

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("hello");
                e.onNext("rxjava");
                e.onNext("test");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println("MainActivity.onNext:" + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Observable创建一个基于String类型的事件,然后被观察者调用subscribe方法订阅观察者。What?是不是搞反了。
ObservableEmitter为事件发射器,可调用onNext、onError、onComplete方法,Disposable用于解除订阅。
事件的执行内容是:依次输出“hello”,“rxjava”,“test”三个字符串。这里使用的方法链的方式,当然你也可以采用以下的方式

这里写图片描述

create方法是Observable的静态方法,用于构造一个Observable对象,如果说用于单纯的构造初始事件的话,除了create以外,还有许多方式可以构造观察者

interval & intervalRange & just & fromIterable & fromArray & timer

他们与create的差异主要在于,create创建的事件需要执行过程,而这些方法创建的事件显示是现成的,不需要额外操作,一旦与观察者关联后即可发送事件。

fromFuture & fromCallable 了解即可,Android中并不太适用

fromFuture, 事件从非主线程中产生; fromCallable, 事件从主线程中产生, 在需要消费时生产;

        ExecutorService executor = Executors.newFixedThreadPool(2);
        Callable<String> callable = new Callable<String>() {...};
        Future<String> future = executor.submit(callable);
        Consumer<String> onNext = new Consumer<String>() {...};
        Flowable.fromCallable(callable).subscribe(onNext);
        Flowable.fromFuture(future).subscribe(onNext);

ObservableEmitter和Disposable

ObservableEmitter: ObservableEmitter可以理解为发射器,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。

1.被观察者可以发送无限个onNext, 观察者也可以接收无限个onNext.
2.当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送, 而Observer收到onComplete事件之后将不再继续接收事件.
3.当Observable发送了一个onError后, Observable中onError之后的事件将继续发送, 而Observer收到onError事件之后将不再继续接收事件.
4.Observable可以不发送onComplete或onError.
5.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。

Disposable 调用dispose()方法时, 它就会将观察者和被观察者的联系切断, 从而导致观察者收不到事件。但是并不能控制被观察者是否发射事件。

Observale & Flowable

当然我们也可以使用Flowable,而不用Observale

Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
                e.onNext("hello");
                e.onNext("rxjava");
                e.onNext("test");
            }
        }, BackpressureStrategy.BUFFER).subscribe();

Observable: 不支持背压
Flowable : Observable新的实现,支持背压,同时实现Reactive Streams 的 Publisher 接口

其他形式的被观察者

除了上述描述的两种观察者形式外,还有Single & Completable & Maybe

Single

可以发射一个单独onSuccess 或 onError消息。它现在按照Reactive-Streams规范被重新设计,并遵循协议 onSubscribe (onSuccess | onError)

Single.create(new SingleOnSubscribe<Object>() {...)
        .subscribe(new SingleObserver<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Object value) {

            }

            @Override
            public void onError(Throwable e) {

            }
        });

Completable

可以发送一个单独的成功或异常的信号,按照Reactive-Streams规范被重新设计,并遵循协议onSubscribe (onComplete | onError)

Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter e) throws Exception {

            }
        }).subscribe(new Action() {
            @Override
            public void run() throws Exception {
                // complete
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                // error
            }
        });

Completable

它是Single 和 Completable 的结合体。它可以发射0个或1个通知或错误的信号, 遵循协议 onSubscribe (onSuccess | onError | onComplete)

Maybe.create(new MaybeOnSubscribe<Object>() {
            @Override
            public void subscribe(MaybeEmitter<Object> e) throws Exception {

            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                // success
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                // error
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                // complete
            }
        });

简化订阅

观察者除了用Observer对象外,我们还可以用Consumer单独接收onNext或者onError以及onSubscribe回调,用Action单独接收onComplete,下图是订阅关联的各个重载方法subscribe的参数列表

这里写图片描述

什么时候用 Observable
一般处理最大不超过1000条数据,并且几乎不会出现内存溢出
如果处理的式同步流而你的Java平台又不支持Java Stream

什么时候用 Flowable
处理以某种方式产生超过10K的元素
有很多的阻塞和/或 基于拉取的数据源,但是又想得到一个响应式非阻塞接口的

线程调度

默认情况下, Observer和Observable是工作在同一个线程中的, 也就是说Observable在哪个线程工作,就在哪个线程发送事件, Observer也就在同样的线程中接收处理事件

  • subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程
  • 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效
  • 允许多次指定订阅者接收线程,每调observerOn,下游都会进行线程切换

RxJava内置线程选项:

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
  • Schedulers.newThread() 代表一个常规的新线程;
  • AndroidSchedulers.mainThread() 代表Android的主线程

操作符

RxJava是基于事件序列、异步的操作库,它提供了许多操作符,这些操作符可以在整体事件序列中对元素或者事件本身进行一系列变换,下面为大家介绍一些常用的操作符的使用

concat

将多个Observable按顺序发射数据,不会交错,只有前一个 Observable 终止(onComplete) 后才会订阅下一个 Observable

        Observable observable1 = Observable.fromArray(1,2,3);
        Observable observable2 = Observable.fromArray(4,5,6);
        Observable.concat(observable1, observable2).subscribe(new Consumer() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println("MainActivity.accept" + o);
            }
        });
这里写图片描述

merge

将多个Observable数据合并,各个数据顺序可能交错

Observable observable1 = Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS);
        Observable observable2 = Observable.intervalRange(1, 3, 2, 2, TimeUnit.SECONDS);
        Observable.merge(observable1, observable2).subscribeOn(Schedulers.io()).subscribe(new Consumer() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println("MainActivity.accept" + o);
            }
        });
这里写图片描述

map

将一个Observable发射的数据根据某种映射关系转换为另一个形式的数据

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(R.mipmap.ic_launcher);
            }
        }).map(new Function<Integer, Drawable>() {
            @Override
            public Drawable apply(@NonNull Integer integer) throws Exception {
                return getResources().getDrawable(integer);
            }
        }).subscribe(new Consumer<Drawable>() {
            @Override
            public void accept(Drawable drawable) throws Exception {
                img.setImageDrawable(drawable);
            }
        });

flatMap

flatMap 操作符可以将一个发射数据的 Observable 变换为多个 Observables ,然后将它们发射的数据合并后放到一个单独的 Observable,并且事件有可能交错

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
                e.onNext("b");
                e.onNext("c");
            }
        }).flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull String s) throws Exception {
                List<String> list = new ArrayList();
                list.add(s);
                list.add(s);
                list.add(s);
                return Observable.fromIterable(list).delay(1,TimeUnit.SECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG,s);
            }
        });
这里写图片描述

concatMap

作用等同于flatMap,唯一不同的是concatMap事件序列严格按照上流的发送顺序

zip

将多个Observable的事件压缩到一起再发送

Observable name = Observable.fromArray("lee","su","liu");
        Observable age = Observable.fromArray(22,31,18);
        Observable.zip(name, age, new BiFunction<String,Integer,User>() {
            @Override
            public User apply(@NonNull String o, @NonNull Integer o2) throws Exception {
                return new User(o,o2);
            }
        }).subscribe(new Consumer<User>() {
            @Override
            public void accept(User o) throws Exception {

            }
        });

filter

对上流发射的数据进行筛选,过滤到不符合条件的数据

Observable.fromArray(1,2,3,4,5,6).filter(new Predicate<Integer>() {
            @Override
            public boolean test(@NonNull Integer integer) throws Exception {
                return integer%2 == 0;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG,"过滤得到的整数" + integer);
            }
        });
这里写图片描述

take

事件数量限制,保留指定数量内的事件

Observable.fromArray(1,2,3,4,5,6).take(3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG,"value : " + integer);
            }
        });
这里写图片描述

doOnNext

在回调观察者onNext方法前进行额外的处理

Observable.fromArray(1,2,3,4,5,6).take(3)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG,"doOnNext : " + integer);
                    }
                })
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG,"value : " + integer);
            }
        });
这里写图片描述

debounce

用于事件的过滤,其官方解释是数据的发射都延时指定时间后进行,并且在这段延时内新数据将会把老数据替换掉,在每次执行e.onNext(value)后计时器重置。简单说,就是在指定时间内,只接收时间段内的最新数据。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).debounce(10, TimeUnit.MILLISECONDS, Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG,"value : " + integer);
                    }
                });
这里写图片描述

throttleFirst

用于事件的过滤,在指定时间间隔内只保留第一个事件

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for(int i=0;i<10;i++){
                    e.onNext(i);
                    SystemClock.sleep(1500);
                }
            }
        }).throttleFirst(2000, TimeUnit.MILLISECONDS, Schedulers.io())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG,"value : " + integer);
                    }
                });
这里写图片描述

compose

与 flatMap 类似,都是进行变换,但是compose操作与返回的都是Observable对象,并且激活并发送事件。compose操作符拥有更高层次的抽象概念:它操作于整个数据流中,不仅仅是某一个被发送的事件。
1.compose 是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。
2.compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。

First

同样用于事件过滤,只发送所有事件中的第一个事件

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
                e.onNext("b");
                e.onNext("c");
            }
        }).first(null).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG,"first value is "+s);
            }
        });
这里写图片描述

distinct & distinctUntilChanged

distinct( )过滤掉重复数据;distinctUntilChanged()过滤掉连续重复的数据。

Observable.just(1,2,1,1,3,4,5).distinct().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG,"distinct "+integer);
            }
        });
        Observable.just(1,2,1,1,3,4,5).distinctUntilChanged().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG,"distinctUntilChanged "+integer);
            }
        });
这里写图片描述

总结

这篇文章带大家了解什么是RxJava,以及涉及到的相关概念。并且整理了RxJava的基本用法,以及常用的操作符的作用与使用方式。RxJava中的操作符非常多,这里只是列举了部分常用操作符讲解,更多的可以参考以下文档

RxJava Wiki:https://github.com/ReactiveX/RxJava/wiki
RxJava 中文文档:https://mcxiaoke.gitbooks.io/rxdocs/content/

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容