手把手教你使用 RxJava 2.0(一)

网上有很多关于RxJava的技术文章,大多数是关于1.x版本的。随着 RxJava 2.0 的推出,有些文章也介绍了2.x版本新增的内容以及与1.x版本的对比。有些同学如果刚刚接触RxJava,仅仅看RxJava 1.x 的一些技术文章,有时候会有些出入。因此本篇文章基于RxJava 2.0 进行由浅入深的学习,逐步掌握RxJava。

1.作用

RxJava的目的就是异步
RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性,在深入的使用过程中一定对这点深有体会。

2.工程引用

要应用RxJava,需要在项目中引入依赖:

io.reactivex.rxjava2:rxjava:2.0.4
io.reactivex.rxjava2:rxjava:2.0.4

3.概念

要想理解好RxJava,首先要理解清楚其中的几个关键概念。由于RxJava是利用观察者模式来实现一些列的操作,所以对于观察者模式中的观察者,被观察者,以及订阅、事件需要有一个了解。如果不理解观察者模式,不要紧,下面会详细介绍。

Observable:在观察者模式中称为“被观察者”;
Observer:观察者模式中的“观察者”,可接收Observable发送的数据;
subscribe:订阅,观察者与被观察者,通过subscribe()方法进行订阅;
Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分内容是2.0新增的,后续文章再介绍。Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable

4.RxJava中的观察者模式

观察者模式的概念很好理解,具体可以解释为:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。
在程序的观察者模式,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。

下面具体讲RxJava 的观察者模式

RxJava 有四个基本概念:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在完成某些操作,获得一些结果后,回调触发事件,即发出事件来通知 Observer。

关于回调,如果理解则可以跳过这一段,如果不理解,在RxJava中可以简单的理解为:为了方便Observable和Observer交互,在Observable中,将Observer对象传入,在完成某些操作后调用Observer对象的方法,此时将触发Observer中具体实现的对应方法。
注意:Observer是个接口,Observable是个类。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() 之外,还定义了三个特殊的事件:onComplete() 和 onError(),onSubscribe()。

onComplete(): 事件队列完结时调用该方法。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
onSubscribe():RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于RxJava1.x中的Subscription,用于解除订阅。
注意:onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

讲了这么多,大家会疑惑:这些都跟异步有什么关系?
其实这都是在为异步进行铺垫。当大家理解了观察者模式之后,就会很容易理解RxJava的异步实现方式。让Observable (被观察者)开启子线程执行耗操作,完成耗时操作后,触发回调,通知Observer (观察者)进行主线程UI更新。如此轻松便可以实现Android中的异步,且代码简洁明了,集中分布。RxJava中默认Observer (观察者)和Observer (观察者)都在同一线程执行任务。本文主要介绍RxJava中的一些基本使用,关于线程调度问题下篇文章再进行介绍。即本文中的所有操作都默认在同一线程进行。
好了,下面我们就开始了解RxJava的一些基本使用。

5.基本的用法

RxJava用法多种多样,其多样性体现在Obserable(被观察者)的创建上。
我们先以最基础的Obserable(被观察者)的创建为例介绍RxJava的使用:
Observable的创建:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //执行一些其他操作
                //.............
                //执行完毕,触发回调,通知观察者
                e.onNext("我来发射数据");
            }
        });

Observer的创建:

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            //观察者接收到通知,进行相关操作
            public void onNext(String aLong) {
                System.out.println("我接收到数据了");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

订阅:

 observable.subscribe(observer);

使用create( )创建Observable最基本的创建方式。可以看到,这里传入了一个 ObservableOnSubscribe对象作为参数,它的作用相当于一个计划表,当 Observable被订阅的时候,ObservableOnSubscribe的subscribe()方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Observer 将会被调用一次 onNext())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
Observable的其他创建方式:
just()方式
Observable<String> observable = Observable.just("Hello");
使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据。通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。
fromIterable()方式

 List<String> list = new ArrayList<String>();
        for(int i =0;i<10;i++){
            list.add("Hello"+i);
        }
        Observable<String> observable = Observable.fromIterable((Iterable<String>) list);

使用fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。
注意:Collection接口是Iterable接口的子接口,所以所有Collection接口的实现类都可以作为Iterable对象直接传入fromIterable()方法。
defer()方式

 Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just("hello");
            }
        });

当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。
interval( )方式

 Observable<String> observable = Observable.interval(2, TimeUnit.SECONDS);

创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定2秒一次调用onNext()方法。
range( )方式

Observable<Integer> observable = Observable.range(1,20);

创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。上述表示发射1到20的数。即调用20次nNext()方法,依次传入1-20数字。
timer( )方式

Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);

创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。
repeat( )方式

Observable<Integer> observable = Observable.just(123).repeat();

创建一个Observable,该Observable的事件可以重复调用。
除了Observable(被观察者)的创建之外,RxJava 2.x 还提供了多个函数式接口 ,用于实现简便式的观察者模式。具体的函数式接口包括以下:



以Consumer为例,我们可以实现简便式的观察者模式:

Observable.just("hello").subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

其中Consumer中的accept()方法接收一个来自Observable的单个值。Consumer就是一个观察者。其他函数式接口可以类似应用。
注意:Observable (被观察者)只有在被Observer (观察者)订阅后才能执行其内部的相关逻辑,下面代码证实了这一点:

Observable<Long> observable = Observable.interval(2, TimeUnit.SECONDS);
        Observer<Long> observer = new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onNext(Long aLong) {
                    System.out.println(aLong);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
        SystemClock.sleep(10000);//睡眠10秒后,才进行订阅  仍然从0开始,表示Observable内部逻辑刚开始执行
        observable.subscribe(observer);

01-18 16:09:20.874 12535-12927/com.lvr.rxjavalearning I/System.out: 0
01-18 16:09:22.864 12535-12927/com.lvr.rxjavalearning I/System.out: 1
01-18 16:09:24.864 12535-12927/com.lvr.rxjavalearning I/System.out: 2
01-18 16:09:26.864 12535-12927/com.lvr.rxjavalearning I/System.out: 3

除此之外,RxJava中还有许多操作符。操作符就是用于在Observable和最终的Observer之间,通过转换Observable为其他观察者对象的过程,修改发出的事件,最终将最简洁的数据传递给Observer对象。下面我们介绍一些比较常用的操作符。

6.RxJava中的操作符

map()操作符

Observable<Integer> observable = Observable.just("hello").map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return s.length();
            }
        });

map()操作符,就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。
flatMap()操作符

 Observable<Object> observable = Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        });

flatMap()对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。
filter()操作符

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).filter(new Predicate<Object>() {
            @Override
            public boolean test(Object s) throws Exception {
                String newStr = (String) s;
                if (newStr.charAt(5) - '0' > 5) {
                    return true;
                }
                return false;
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println((String)o);
            }
        });

filter()操作符根据test()方法中,根据自己想过滤的数据加入相应的逻辑判断,返回true则表示数据满足条件,返回false则表示数据需要被过滤。最后过滤出的数据将加入到新的Observable对象中,方便传递给Observer想要的数据形式。
take()操作符

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).take(5).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                System.out.println((String)s);
            }
        });

take()操作符:输出最多指定数量的结果。
doOnNext()

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).take(5).doOnNext(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println("准备工作");
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                System.out.println((String)s);
            }
        });

doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。

以上就是一些常用的操作符,通过操作符的使用。我们每次调用一次操作符,就进行一次观察者对象的改变,同时将需要传递的数据进行转变,最终Observer对象获得想要的数据。
以网络加载为例,我们通过Observable开启子线程,进行一些网络请求获取数据的操作,获得到网络数据后,然后通过操作符进行转换,获得我们想要的形式的数据,然后传递给Observer对象。

以上仅仅是介绍RxJava的观察者模式以及RxJava的简单操作与使用。通过本篇文章,可以对RxJava有个简单的了解。后面我会继续介绍RxJava中线程调度的内容,以及RxJava 2.x 中新增的功能。如果大家喜欢这部分内容,可以持续关注,后面会继续更新。

手把手教你使用 RxJava 2.0(二)
手把手教你使用 RxJava 2.0(三)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容