Rx系列之Rxjava操作符进阶-使用场景

通过上一篇《Rx系列之RxJava操作符》,相信已经能够熟练的使用一些基本的操作符了。但是对于我们大家而言,其实最传统的命令式编程已经是我们顺手就可以拈来的,但是,现在用响应式编程,突然发现:卧槽,这个地方用响应式怎么写,这样写对么?估计很多人才开始接触RxJava的时候应该都有这样的疑虑。不用担心,这一篇就给大家讲讲RxJava到底该怎么用,在什么情况下用!

RxJava的使用场景

眼尖的小伙伴,可能已经发现,在上一篇中,很多那么重要的操作符怎么都没讲!哈哈哈,答案在这里。好废话不多说,来看看Rxjava到底在哪些情况下可以使用。

动态搜索的场景

我们先来看一个动态搜索的场景:

搜索

假设,我要在这进行网络搜索,那么,我就要在这里面进行网络访问,如果是输入完成之后点击确定进行搜索还好,但是如果是动态收索呢?只要搜索框中的搜索内容一改变,那么是不是就要进行网络请求呢?那这样就不是那么友好了。为了解决这样的问题,rxjava为我们提供了一个很好的解决方案:

  • 使用debounce作为textSearch
    debounce()函数过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。
    debounce()使用TimeUnit对象指定时间间隔。
    是不是感觉棒棒哒,昂,不管你喜不喜欢,反正我是爱死它了。

来看一下示意图


debounce示意图
debounce示意图

由上图我们可以看出,在比较密集的数据(2,3,4,5)发射之后,其实最终只是发射5。

附上代码:

        RxTextView.textChanges(editText)
                .debounce(5000,TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<CharSequence>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: onError");
                    }

                    @Override
                    public void onNext(CharSequence charSequence) {
                        Log.d(TAG, "onNext: "+charSequence.toString());
                    }
                });

在这5s内,我输入了2,3,4,5(出最后一个5,其他输入之后就删除哈),但是最后得到的结果却是:

onNext: 5

注意:这个操作符会会接着最后一项数据发射原始Observable的onCompleted通知,即使这个通知发生在你指定的时间窗口内(从最后一项数据的发射算起)。也就是说,onCompleted通知不会触发限流。

在上面可能会存在一个疑问,那就是
RxTextView.textChanges(editText)
这是个什么东西?你丫怎么没讲,哈哈,这个呀,要在后面的Rx系列中单独来讲,所以不要着急,暂时说一下这个的功能,这个RxTextView.textChanges(editText)其实是RxBinding里面的一个对控件的操作,其功能就跟TextWatcher一样,就是对数据的变更进行监听,所以上面的数据变化之后5s后将数据发射出去。嗯嗯,到这里就把动态搜索场景讲解了。

缓存检测场景

在请求取数据的处理过程中,我们的操作一般是这样一个原理:

  • ** 首先检查内存是否有缓存**
  • 然后检查文件缓存中是否有
  • 最后才从网络中取
    任何一步一旦发现数据后面的操作都不执行
    在rxjava中为我们提供了两个解决这个问题的操作符,分别是: concatfirst

concat
不交错的发射两个或多个Observable
concat操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。直到前面一个Observable终止,Concat
才会订阅额外的一个Observable

请注意上面所说的“就好像它们是一个Observable”,其实并不是一个Observable,是前面一个停止之后才会订阅下一个,所以说他们并不是一个,请君注意咯。

concat示意图
concat示意图

如上所示,就是将两个Observable连接起来了。
还有一个实例方法concatWith,它是和concat等价的:Observable.concat(a,b)==a.concatWith(b)

来看一下是不是这个样子的:

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: onError");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
        };

        Observable a = Observable.just(1, 2, 3, 4, 5);
        Observable b = Observable.just(6, 7, 8, 9, 10);

        Observable.concat(a, b)
                .subscribe(subscriber);

然后我们得到:

onNext: 1
 ...
onNext: 10
onCompleted: onCompleted

这时估计就会有人说了:你不是说这个操作符其实是将两个订阅连接起来了嘛!那么,为什么只是在最后打印了onCompleted,在onNext: 5后面不是也应该打印一个吗?
我们都知道观察者和被观察者之间,是由订阅建立关系的,那么对于被观察者来说,确实我发射了两个数据源,但是对于观察者来说,我不知道你有几个数据源,我的职责就只是,数据发射过来后,我打印而已。所以,只有当onNext没有接收到数据时,才会调用onCompleted

最后对这个操作符,再补充一点:如果当第一个Observable a抛异常,那么将不会继续执行后面的Observable b了。
如果想测试请将上面的

Observable a = Observable.just(1, 2, 3, 4, 5);

变成

Observable a = Observable.just(1, 2, 3, 4, new RuntimeException());

进行测试。

first
只发射第一项(或者满足某个条件的第一项)数据

first示意图
first示意图

由上图我们可以看出,这个只要第一项满足条件,后面的将不会再进行发射,所以只是得到了1这个数字。

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: onError");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
        };

        Observable a = Observable.just(1, 2, 3, 4, 5);
        a.first().subscribe(subscriber);

得到结果:

onNext: 1
onCompleted: onCompleted

这个应该很容易就看出来了。就是只是打印了第一个数据!
在这儿必须为大家区别一个操作符:single(),这个操作符也是只打印一个数据的,但是single()和first()最大的区别在于:前者只会发射一个数据,不能发射多个,否则会报错;而first确实满足条件的那一个。
如下:

    Observable a = Observable.just(1);
    a.single().subscribe(subscriber);

估计到这儿应该已经有人知道了上面的3个步骤改真没写了,来我们来看看代码:

 final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (memoryCache != null) {
                    subscriber.onNext(memoryCache);
                } else {
                    subscriber.onCompleted();
                }
            }
        });
        Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                String cachePref = rxPreferences.getString("cache").get();
                if (!TextUtils.isEmpty(cachePref)) {
                    subscriber.onNext(cachePref);
                } else {
                    subscriber.onCompleted();
                }
            }
        });

        Observable<String> network = Observable.just("network");

        //依次检查memory、disk、network  
        Observable
                .concat(memory, disk, network)
                .first()
                .subscribeOn(Schedulers.newThread())
                .subscribe(s -> {
                    memoryCache = "memory";
                    System.out.println("--------------subscribe: " + s);
                });

现在看上面的代码是不是就知道它在干什么了,是不是很简单!这个缓存检测场景就讲到这里。

输入合法场景

在某些时候,我们需要所以的输入都合法后,我们的某些按钮才亮起来,或者才能点击,如下图:


输入合法示意图

在这个场景中,我们得掌握两个操作符:skipcombineLatest

skip
抑制Observable发射的前N项数据

skip示意图
skip示意图

从上图可以看到,总共发射了4个数据,只有最后两个发射出去了,这就是skip(2)的作用。

        Observable.just(1,2,3,4).skip(2).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: onError");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }
        });

得到如下结果:

onNext: 3
onNext: 4
onCompleted: onCompleted

combineLatest
当多个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
CombineLatest在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest
使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
一开始看到这句话,我又懵b了,这tm几个意思?我们先来看看这个场景的实现代码,然后再解释:

        private void combineLatestEvent() {

        Observable<CharSequence> usernameObservable = RxTextView.textChanges(mUsername).skip(1);
        Observable<CharSequence> emailObservable = RxTextView.textChanges(mEmail).skip(1);
        Observable<CharSequence> passwordObservable = RxTextView.textChanges(mPassword).skip(1);

       Subscription subscription = Observable.combineLatest(usernameObservable, emailObservable,
                passwordObservable,
                new Func3<CharSequence, CharSequence, CharSequence, Boolean>() {
                    @Override
                    public Boolean call(CharSequence userName, CharSequence email, CharSequence
                            password) {

                        boolean isUserNameValid = !TextUtils.isEmpty(userName) && (userName
                                .toString().length() > 2 && userName.toString().length() < 9);

                        if (!isUserNameValid) {
                            mUsername.setError("用户名无效");
                        }


                        boolean isEmailValid = !TextUtils.isEmpty(email) && Patterns
                                .EMAIL_ADDRESS.matcher(email).matches();

                        if (!isEmailValid) {
                            mEmail.setError("邮箱无效");
                        }

                        boolean isPasswordValid = !TextUtils.isEmpty(password) && (password
                                .toString().length() >5 && password.toString().length() < 11);

                        if (!isPasswordValid) {
                            mPassword.setError("密码无效");
                        }


                        return isUserNameValid && isEmailValid && isPasswordValid;
                    }
                })
                .subscribe(getObserver());
    }

  
    private Observer<Boolean> getObserver() {
        return new Observer<Boolean>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Boolean aBoolean) {
                //更改注册按钮是否可用的状态
                register.setEnabled(aBoolean);
            }
        };
    }

这个场景,有3个edittext,分别是mUsername,mEmail,mPassword,通过输入合法的内容进行判定注册按钮是否亮起来。
当我点击其中的任何一个进行编写的时候,就会发射数据,发射的是什么?是我们编辑的内容吗?其实不是的,发射的是结合Func3这个方法的返回值,在这里这个返回值是Boolean型的。返回了boolean型之后,就可以在观察者里面设置注册按钮是否亮起来。现在再看上面那句高深莫测的话,是不是简单多了!

这儿可能有人有疑问了:这3个edittext为什么要使用skip(1)呢?
答案其实很简答啊,那就是当我们不写skip(1)的时候,edittext中没有输入任何值的时候,会把它当作第一个数据进行发射,虽然发射的是个空数据,但是还是会发射啊!
奥偶,这个场景解释完了!

数据过期场景

其实这个场景可以和上面的数据缓存检测场景进行合并:在缓存检测场景中,我们知道,如果memory中没有数据,就从disk上面寻找,然后再是网络请求,那么,问题来了,如果我们的memory中一直有数据,但是网络数据已经变更了,又由于缓存检测原则的只要有一个有数据就不会进行网络请求了,这就会造成我们显示的数据一直是一个旧数据。

哦豁

那这个该怎么办呢?
解决方法有如下两个:

  • 采用定时进行清除本地缓存数据
  • 采用过滤操作符

我们先来看看第一种,如果是进行定时做本地数据清空的话,那么就会用到,我们一个轮询的操作符Interval
创建一个按固定时间间隔发射整数序列的Observable
Interval通俗的讲,就是每隔一段时间过后做什么事情!上一篇已经讲过了,所以这里就不详细讲解了,直接上代码:

        Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
           ...
           @Override
           public void onNext(Long aLong) {
                //清除缓存操作
           }
       });

很多人可能会想到,那既然,我能够用清除本地缓存的方法,那么能不能用,每隔一段时间进行请求,让请求的结果与本地缓存进行合并呢?
答案是肯定的,来看如下代码:

    Observable.create(new Observable.OnSubscribe<String>() {  
            @Override  
           public void call(final Subscriber<? super String> observer) {  
 
              Schedulers.newThread().createWorker()  
                    .schedulePeriodically(new Action0() {  
                          @Override  
                          public void call() {  
                              observer.onNext(doNetworkCallAndGetStringResult());  
                         }  
                      }, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);  
            }  
        }).subscribe(new Action1<String>() {  
           @Override  
           public void call(String s) {  
               
           }  
      })  

这个就是使用schedulePeriodically做轮询请求

这样造成每过一定时间我们就会,清除缓存或者网络请求。读到这儿,是不是感觉这个方法真烂,哈哈哈,不着急,我们不是还有第二种方法嘛!来接着看第二种方法

  • 采用过滤操作符
    其实这个操作符我们已经讲过了,那就是操作符first,回顾一下上面的代码,就是我们的first就是保证,众多的数据,有一个符合条件就发射数据,后面的都将不执行。我们的是否需要更新的条件不加在这里,就没天理咯!
Observable source = Observable
    .concat(memory, disk, network)
    .first(new Func1() {
      @Override public Boolean call(Data data) {
        return data.isUpToDate();
      }
    });

哇偶,这个操作符完美的解决了如上的问题!那你丫的还将那么多,呵呵,我只是给大家讲解操作符的使用场景而已,那个适合哪个场景,取决你们自己咯!

这一篇主要讲解的内容的就到这儿了,下面还有一些其他的场景,就简单的介绍一下。

其他的场景

合并两个数据源场景

使用merge合并两个数据源,代码如下:

    Observable.merge(getInfoFromFile(), getInfoFromNet())  
           .observeOn(AndroidSchedulers.mainThread())  
              .subscribe(new Subscriber<String>() {  
                  @Override  
                  public void onCompleted() {  
                     Log.d(TAG, "onCompleted: onCompleted");
                 }  
 
                 @Override  
                  public void onError(Throwable e) {  
                    Log.d(TAG, "onError: onError");
                  }  
  
                  @Override  
                  public void onNext(String data) {  
                       Log.d(TAG, "onNext: only one ! ");
             });  

Retrofit结合RxJava场景

这个场景的话,大家可以查看扔物线大神写的给 Android 开发者的 RxJava 详解,其中讲解到了这个场景的结合!

就操作符使用场景这一块而言,大概就讲解这么多,如果大家有其他的使用场景,我们可以一起交流哦。感谢大家的支持,谢谢!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容