RxJave2.0之操作符的使用(一)

1. Create (可以用于获取一个被观察者的对象)

        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            /**
             *  开始发送事件
             */
            boolean disposed = emitter.isDisposed();
            emitter.onNext(" 发射数据一"+ "\n" );
            mRxOperatorsText.append("发射数据一" + "\n");
            emitter.onNext(" 发射数据二" );
            mRxOperatorsText.append("发射数据二" + "\n");
            emitter.onNext(" 发射数据三" );
            mRxOperatorsText.append("发射数据三" + "\n");
            emitter.onComplete(); // 事件完成
            emitter.onNext(" 发射数据四" );
            mRxOperatorsText.append("发射数据四" + "\n");
            emitter.onNext(" 发射数据五" );
            mRxOperatorsText.append("发射数据五" + "\n");
            emitter.onNext(String.valueOf(disposed)+ "\n");
            mRxOperatorsText.append(String.valueOf(disposed) + "\n");
         //   emitter.onError(new RuntimeException());
        }).subscribe(new Observer<String>() {
            /**
             *  开始发送事件的订阅
             * @param d
             */
            @Override
            public void onSubscribe(Disposable d) {
                boolean disposed = d.isDisposed();
                mRxOperatorsText.append(String.valueOf(disposed) + "\n");
                mRxOperatorsText.append("开始发送事件的订阅" + "\n");
            }

            /**
             *  事件发送中
             * @param s
             */
            @Override
            public void onNext(String s) {
                mRxOperatorsText.append("接受事件"+s+ "\n");
            }

            /**
             * 事件发送失败
             * @param e
             */
            @Override
            public void onError(Throwable e) {
                mRxOperatorsText.append("事件发送失败"+ "\n");

            }

            /**
             *  事件发送完成
             */
            @Override
            public void onComplete() {
                mRxOperatorsText.append("事件发送完成"+ "\n");
            }
        });
运行结果
09-19 14:36:39.707 3120-3120/com.nanchen.rxjava2examples E/RxCreateActivity:
     false
    开始发送事件的订阅
    发射数据一
    接受事件 发射数据一
    发射数据二
    接受事件 发射数据二
    发射数据三
    接受事件 发射数据三
09-19 14:36:39.717 3120-3120/com.nanchen.rxjava2examples E/RxCreateActivity: 事件发送完成
    发射数据四
    事件完成
    发射数据五
    false
注意事件 :
  • 1.在开始发送事件的时候不能同时调用 emitter.onError(); //事件异常emitter.onComplete(); // 事件完成 不然程序会出现运行异常。
  • 2.当发送事件中 发射数据 三后 当调用了emitter.onComplete(); // 事件完成 这个后,发现 事件还在发送但是接受事件中并没有接受事。
  • 3.Disposable 这事件的作用是可以直接调用切断,可以看到,当它的 isDisposed() 返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。 这是Rxjava 2.0 里面新添加的。

2. map (作用是对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化)

   这是用Lambda表达式来写
   Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
            Log.e(TAG, "int型的数据 1" + "\n");
            emitter.onNext(1);
            Log.e(TAG, "int型的数据 2" + "\n");
            emitter.onNext(2);
            Log.e(TAG, "int型的数据 3" + "\n");
            emitter.onNext(3);
            Log.e(TAG, "int型的数据 4" + "\n");
            emitter.onNext(4);
            Log.e(TAG, "int型的数据 5" + "\n");
            emitter.onNext(5);
        }).map(integer -> "转成字符型" + integer)
                .subscribe(s -> Log.e(TAG, s + "\n"));

// 正常的写法
   Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("accept : " + s + "\n");
                Log.e(TAG, "accept : " + s + "\n");
            }
        });

运行结果
09-19 14:58:24.497 3345-3345/com.nanchen.rxjava2examples E/RxMapActivity:
   int型的数据 1
    转成字符型1
    int型的数据 2
    转成字符型2
    int型的数据 3
    转成字符型3
    int型的数据 4
    转成字符型4
    int型的数据 5
    转成字符型5
注意事件 :

map 基本作用就是将一个 Observable 通过某种函数关系,转换为另一种 Observable,上面例子中就是把我们的 Integer 数据变成了 String 类型。从Log日志显而易见。 里面的map 可以多次调用操作是同一Observable。

3. FlatMap(将一个发送事件的上游Observable变换成多个发送事件的Observables, 然后将它们发射的时间合并后放进一个单独的Observable里)

  • 这里FlatMap 做一个解释说明,其实我在刚开看着个操作符,根本不知道说的是什么意思。
    1.用图来表达一下。


    image.png

    先看看上游, 上游发送了三个事件, 分别是1,2,3, 注意它们的颜色.

中间flatMap的作用是将圆形的事件转换为一个发送矩形事件和三角形事件的新的上游Observable.

还是不能理解? 别急, 再来看看分解动作:


image.png

上游每发送一个事件, flatMap都将创建一个新的水管, 然后发送转换之后的新的事件, 下游接收到的就是这些新的水管发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 也就是图中所看到的, 并不是事件1就在事件2的前面. 如果需要保证顺序则需要使用。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                 emitter.onNext(1);
                 emitter.onNext(2);
                 emitter.onNext(3);
                 emitter.onNext(4);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return  Observable.fromIterable(list)
                        //延时10毫秒,第一个参数是数值,第二个参数是事件单位
                        .delay(10,TimeUnit.MILLISECONDS);
            }
        }) .subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, s);

            }
        });

如代码所示, 我们在flatMap中将上游发来的每个事件转换为一个新的发送三个String事件的水管, 为了看到flatMap结果是无序的,所以加了10毫秒的延时, 来看看运行结果吧:

运行结果
  09-19 16:21:21.837 3819-3849/com.nanchen.rxjava2examples E/RxFlatMapActivity:
    I am value 2
    I am value 2
    I am value 2
    I am value 4
    I am value 4
09-19 16:21:21.837 3819-3848/com.nanchen.rxjava2examples E/RxFlatMapActivity:
    I am value 1
    I am value 4
    I am value 1
    I am value 1
    I am value 3
    I am value 3
    I am value 3
注意事件 :
  • 1.flatMap并不保证事件的顺序

4.concatMap (作用和flatMap几乎一模一样,唯一的区别是它能保证事件的顺序)

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);

            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return
                      //  此方法接收一个继承自Iterable接口的参数,简单的说就是java中的集合类。因此你可以传入一个list集合等等
                        Observable.fromIterable(list)
                        //延时两秒,第一个参数是数值,第二个参数是事件单位
                        .delay(10,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, s);
            }
        });
运行结果
09-19 16:35:33.937 3965-3991/com.nanchen.rxjava2examples E/RxConcatMapActivity:
    I am value 1
    I am value 1
    I am value 1
09-19 16:35:33.947 3965-3992/com.nanchen.rxjava2examples E/RxConcatMapActivity: 
    I am value 2
    I am value 2
    I am value 2
09-19 16:35:33.957 3965-3991/com.nanchen.rxjava2examples E/RxConcatMapActivity: 
    I am value 3
    I am value 3
    I am value 3
09-19 16:35:33.967 3965-3992/com.nanchen.rxjava2examples E/RxConcatMapActivity: 
    I am value 4
    I am value 4
    I am value 4

5. Zip(专用于合并事件,该合并不是连接,而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同)

  • zip操作符就是合并多个被观察者的数据流, 然后发送(Emit)最终合并的数据。借用网上的一张图,分析的比较透彻


    image.png

从上游中可以看出,上游有两根水管,其中一根水管负责发送圆形事件 , 另外一根水管负责发送三角形事件 , 通过Zip操作符, 使得圆形事件 和三角形事件 合并为了一个矩形事件 . 拆分过程如下:


image.png

组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的, 也就是说不会出现圆形1 事件和三角形B 事件进行合并, 也不可能出现圆形2 和三角形A 进行合并的情况.
最终下游收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量 相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的 那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了.

/**
  事件一
 */
  private Observable<String> getStringObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    Log.e(TAG, "String emit : A \n");
                    e.onNext("B");
                    Log.e(TAG, "String emit : B \n");
                    e.onNext("C");
                    Log.e(TAG, "String emit : C \n");
                }
            }
        });
    }
/**
  事件二
 */
 private Observable<Integer> getIntegerObservable() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    Log.e(TAG, "Integer emit : 1 \n");
                    e.onNext(2);
                    Log.e(TAG, "Integer emit : 2 \n");
                    e.onNext(3);
                    Log.e(TAG, "Integer emit : 3 \n");
                    e.onNext(4);
                    Log.e(TAG, "Integer emit : 4 \n");
                    e.onNext(5);
                    Log.e(TAG, "Integer emit : 5 \n");
                }
            }
        });
    }

 Observable.zip(getIntegerObservable(),getStringObservable(), new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer+s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "zip : accept : " + s + "\n");
            }
        });

运行结果
09-19 16:45:08.947 4053-4053/com.nanchen.rxjava2examples E/RxZipActivity: 
    Integer emit : 1 
    Integer emit : 2 
    Integer emit : 3 
    Integer emit : 4 
    Integer emit : 5 
    zip : accept : 1A
    String emit : A 
    zip : accept : 2B
    String emit : B 
    zip : accept : 3C
    String emit : C 

6.doOnNext(让订阅者在接收到数据前干点事情的操作符)

   Observable.just(1, 2, 3, 4)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
         
                        Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n");
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
           
                Log.e(TAG, "doOnNext :" + integer + "\n");
            }
        });
运行结果
09-19 17:00:26.167 4053-4053/com.nanchen.rxjava2examples E/RxDoOnNextActivity: 
    doOnNext 保存 1成功
    doOnNext :1
    doOnNext 保存 2成功
09-19 17:00:26.177 4053-4053/com.nanchen.rxjava2examples E/RxDoOnNextActivity: 
    doOnNext :2
    doOnNext 保存 3成功
    doOnNext :3
    doOnNext 保存 4成功
    doOnNext :4

注意事件
1 是指每发射一件事之前做的操作

  1. just 这个操作符是指若干个相同 的参数,

7. filter(过滤操作符,取正确的值)

  • Filter 你会很常用的,它的作用也很简单,过滤器嘛。可以接受一个参数,让其过滤掉不符合我们条件的值.
  Observable.just(1, 20, 65, -5, 7, 19)      
   .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        return integer >= 10;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
          
                Log.e(TAG, "filter : " + integer + "\n");
            }
        });

运行结果
09-19 17:25:53.987 4053-4053/com.nanchen.rxjava2examples E/RxFilterActivity:
   filter : 20
    filter : 65
    filter : 19

可以看到,我们过滤器舍去了小于 10 的值,所以最好的输出只有 20, 65, 19。

7. skip( 代表跳过 count 个数目开始接收。)

   Observable.just(1,2,3,4,5)
              .skip(2)
              .subscribe(new Consumer<Integer>() {
                  @Override
                  public void accept(@NonNull Integer integer) throws Exception {
                      Log.e(TAG, "skip : "+integer + "\n");
                  }
              });
运行结果
09-19 17:27:15.067 4053-4053/com.nanchen.rxjava2examples E/RxSkipActivity:
    skip : 3
    skip : 4
    skip : 5

从log里面可以看到 是从第二位数开始 接收数据。 (不计算0位)

8. take(用于指定订阅者最多收到多少数据)

Flowable.just(1,2,3,4,5)
              .take(2)
              .subscribe(new Consumer<Integer>() {
                  @Override
                  public void accept(@NonNull Integer integer) throws Exception {
                      mRxOperatorsText.append("take : "+integer + "\n");
                      Log.e(TAG, "accept: take : "+integer + "\n" );
                  }
              });
运行结果
09-19 17:38:14.877 4053-4053/com.nanchen.rxjava2examples E/RxTakeActivity:
    accept: take : 1
    accept: take : 2

从log里面可以看到 ,只接受前两位的发射事件。

9distinct(去重操作符,其实就是简单的去重)

   Observable.fromArray(1,1,1,1,1,1,12,2,3,3,3,4,4,5,6,5,6,7,8,9,0)
              .distinct()
              .subscribe(new Consumer<Integer>() {
                  @Override
                  public void accept(Integer integer) throws Exception {
                      Log.e(TAG, "distinct : " + integer + "\n");
                  }
              });

运行结果
09-19 17:51:46.917 4273-4273/com.nanchen.rxjava2examples E/RxDistinctActivity:
     distinct : 1
    distinct : 12
    distinct : 2
    distinct : 3
    distinct : 4
    distinct : 5
    distinct : 6
    distinct : 7
    distinct : 8
    distinct : 9
    distinct : 0

Log 日志显而易见,我们在经过 dinstinct() 后接收器接收到的事件只有1,12,2,3,4,5,6,7,8,,9,0了。

10 last(操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。)

 Observable.just(1, 2, 3)
              .takeLast(0)
              .subscribe(new Consumer<Integer>() {
                  @Override
                  public void accept(@NonNull Integer integer) throws Exception {
                      mRxOperatorsText.append("last : " + integer + "\n");
                      Log.e(TAG, "last : " + integer + "\n");
                  }
              });

运行结果
09-19 17:56:29.137 4460-4460/com.nanchen.rxjava2examples E/RxLastActivity:
 last : 3

最后需要说明一下 ObservableFlowable的区别
在rxjava2.x时代,上述的背压逻辑全部挪到Flowable里了,所以说Flowable支持背压。而2.x时代的Observable是没有背压的概念的,Observable如果来不及消费会死命的缓存直到OOM,所以rxjava2.x的官方文档里面有讲,大数据流用Flowable,小数据流用Observable 。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容