RxAndroid笔记

​ RxAndroid学习笔记--2019-1-31

​ 原文链接: https://www.jianshu.com/p/0cd258eecf60

https://juejin.im/entry/5993a80cf265da249150e93c

  1. 配置

    implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    
  2. 基础用法

    Observable被观察者,即发射器(上游事件)

    Observer 观察者,即接收器(下游事件)

    Observable.create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                    emitter.onNext("测试1");
                                    emitter.onNext("测试2");
                                    emitter.onNext("测试3");
                                    emitter.onNext("测试4");
                                    emitter.onComplete();
                            }
                    }).subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {

                            }

                            @Override
                            public void onNext(String s) {
                                    Log.d(TAG, s);
                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onComplete() {
                                    Log.d(TAG, "onComplete");
                            }
                    });

运行程序

com.dxl.myapplication D/dxl: 测试1
com.dxl.myapplication D/dxl: 测试1
com.dxl.myapplication D/dxl: 测试2
com.dxl.myapplication D/dxl: 测试3
com.dxl.myapplication D/dxl: 测试4
com.dxl.myapplication D/dxl: onComplete
  1. 在发射事件过程中,我们调用了onComplete后,接收事件将停止,但是发射事件仍将继续:

例如:

public void subscribe(ObservableEmitter<String> emitter) throws Exception {
             emitter.onNext("测试1");
             Log.d(TAG, "emitter.onNext(\"测试1\")");

             emitter.onNext("测试2");
             Log.d(TAG, "emitter.onNext(\"测试2\")");

             emitter.onNext("测试3");
             Log.d(TAG, "emitter.onNext(\"测试3\")");

             emitter.onComplete();
             Log.d(TAG, "emitter.onComplete()");

             emitter.onNext("测试4");
             Log.d(TAG, "emitter.onNext(\"测试4\")");
                            }
                    })

测试3发送完成后,调用了onComplete方法后,测试4仍然会发送,但是无法接收到

  1. Disposable概念,可以切断接收。当它的isDisposed为false时,可以继续接收到事件。如果为true,将不再接收事件。使用方法:

    ...
    }).subscribe(new Observer<String>() {
    
                private Disposable mDisposable;
    
                @Override
                public void onSubscribe(Disposable d) {
                    mDisposable = d;
                }
    
                @Override
                public void onNext(String s) {
                    if (s.equals("测试3")) {
                        mDisposable.dispose();
                    }
                    Log.d(TAG, s);
                }
      ...
    

    当接收到测试3 后,切断接收事件。后续测试4 将不会再接收到。

  2. Map

    它的作用是对发射时间发送的每一个事件应用一个函数,每一个事件都按照指定的函数去变化.

    举例:发送的是1,2,3,我们对发送的数字做*10处理。

       Observable.just(1, 2, 3).map(new Function<Integer, Integer>() {
                   @Override
                   public Integer apply(Integer integer) throws Exception {
                       return integer * 10;
                   }
               }).subscribe(new Consumer<Integer>() {
                   @Override
                   public void accept(Integer integer) throws Exception {
                       Log.d(TAG, integer + "");
                   }
               });
    

    log输出:

       01-31 09:56:29.435 20363-20363/com.dxl.myapplication D/dxl: 10
       01-31 09:56:29.435 20363-20363/com.dxl.myapplication D/dxl: 20
       01-31 09:56:29.435 20363-20363/com.dxl.myapplication D/dxl: 30
    
  3. ZIP

    专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的Observable发射事件数目只和少的那个相同。

       Observable.zip(Observable.just(1, 2, 3), Observable.just("one", "two"),
                       new BiFunction<Integer, String, String>() {
                           @Override
                           public String apply(Integer integer, String s) throws Exception {
                               return integer + s;
                           }
                       }).subscribe(new Consumer<String>() {
                   @Override
                   public void accept(String s) throws Exception {
                       Log.d(TAG, s);
                   }
               });
    

    日志输出:

       01-31 10:49:59.577 30063-30063/com.dxl.myapplication D/dxl: 1one
       01-31 10:49:59.577 30063-30063/com.dxl.myapplication D/dxl: 2two
    
  4. Concat

    两个发射器连接成一个发射器

    Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                      .subscribe(new Consumer<Integer>() {
                          @Override
                          public void accept(@NonNull Integer integer) throws Exception {
                              Log.e(TAG, "concat : "+ integer + "\n" );
                          }
                      });
    

    注意,concat必须是第一个发射器执行完complete之后,才会去执行第二个。如果第一个发射器没有执行onComplete,那么第二个将不会被执行。

    比如:

    Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("1");
                    emitter.onNext("2");
                    emitter.onNext("3");
                    //没有调用onComplete,observable2将不会被执行
    //                emitter.onComplete();
                }
            });
    
            Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("4");
                    emitter.onNext("5");
                    emitter.onNext("6");
                    emitter.onComplete();
                }
            });
    
            Observable.concat(observable1, observable2).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String s) {
                    Log.d(TAG, s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    

    这时候输出的结果为:

    02-01 08:26:45.516 30613-30613/com.dxl.myapplication D/dxl: 1
    02-01 08:26:45.516 30613-30613/com.dxl.myapplication D/dxl: 2
    02-01 08:26:45.516 30613-30613/com.dxl.myapplication D/dxl: 3
    

    如果我们把注释打开:此时observable2就会被执行了。

    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 1
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 2
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 3
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 4
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 5
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 6
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: onComplete
    

    如果把concat改为merge, 则observable1和observable2将都会被执行。

    用途举例,比如有些时候,对数据不太敏感时,我们需要先从缓存中读取数据,如果缓存中没有数据的话,再去读取网络数据。

    这时候可以分别定义缓存的observable和在线的observable,当成功从缓存中读取数据时,调用onNext,如果缓存获取不到,直接调用onComplete去执行在线获取的observable。

  5. FlatMap

    将一个发射器Observable转换为多个发射器Observables,然后再将多个发射器装入一个单一的发射器Observable。

    有个需要注意的是,flatMap 并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的 ConcatMap

     Observable.just(1,2,3).flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    List<String> list = new ArrayList<>();
                    for (int i = 0; i < integer; i++) {
                        list.add("integer:" + integer + "--" + i + "");
                    }
                    return Observable.fromIterable(list);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, s);
                }
            });
    

    输出结果:

    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:1--0
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:2--0
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:2--1
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:3--0
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:3--1
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:3--2
    
  6. concatMap

    concatMapFlatMap 的唯一区别就是 concatMap 保证了顺序,其他使用是一样的。

  7. distinct

    去重。例如Observable.just(1,1,2,3,3) 输出结果为1,2 ,3

  8. Fliter
    过滤器。接收一个参数,过滤掉不需要的结果。
    例如:

    Observable.just(1, 2, 3, 4, 5).filter(new Predicate<Integer>() {
               @Override
               public boolean test(Integer integer) throws Exception {
                     //不满足此条件的将被过滤
                     return integer > 3;
                      }
               }).subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                       Log.d(TAG, integer + "");
                     }
              });
    

    输出结果:

     01-31 11:52:09.085 2005-2005/com.dxl.myapplication D/dxl: 4
     01-31 11:52:09.085 2005-2005/com.dxl.myapplication D/dxl: 5
  1. timer
    相当于一个定时任务,默认在新线程。
    如:

        Log.d(TAG, "定时开始");
        Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
             @Override
             public void accept(Long aLong) throws Exception {
                  Log.d(TAG, "定时结束");
             }
        });
    

aLong暂时没有意义。都是0

       01-31 13:10:47.518 8435-8435/com.dxl.myapplication D/dxl: 定时开始
       01-31 13:10:49.560 8435-8505/com.dxl.myapplication D/dxl: 定时结束
  1. interval

    interval 操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。

    返回值是Disposable,可以利用用于取消事件。

      Log.d(TAG, "定时开始");
              mDisposable = Observable.interval(1, 2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
                  @Override
                  public void accept(Long aLong) throws Exception {
                      Log.d(TAG, "aLong = " + aLong);
                      if (aLong >= 5) {
                          mDisposable.dispose();
                      }
                  }
              });
      
      @Override
          protected void onDestroy() {
              super.onDestroy();
              if (mDisposable != null && !mDisposable.isDisposed()) {
                  mDisposable.dispose();
              }
          }
    

输出:

      01-31 13:17:01.063 9118-9118/com.dxl.myapplication D/dxl: 定时开始
      01-31 13:17:02.104 9118-9143/com.dxl.myapplication D/dxl: aLong = 0
      01-31 13:17:04.096 9118-9143/com.dxl.myapplication D/dxl: aLong = 1
      01-31 13:17:06.098 9118-9143/com.dxl.myapplication D/dxl: aLong = 2
      01-31 13:17:08.100 9118-9143/com.dxl.myapplication D/dxl: aLong = 3
      01-31 13:17:10.102 9118-9143/com.dxl.myapplication D/dxl: aLong = 4
      01-31 13:17:12.104 9118-9143/com.dxl.myapplication D/dxl: aLong = 5

倒计时:

/**
     * 倒计时方法
     * @param time
     * @return
     */
    private Flowable<Long> countDown(final int time) {
        return Flowable.interval(1, TimeUnit.SECONDS)
                .map(new Function<Long, Long>() {
                    @Override
                    public Long apply(Long aLong) throws Exception {
                        return time - aLong;
                    }
                }).take(time + 1);
    }
    
    mDisposable = countDown(5).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            textview.setText(aLong + "");
                        }
                    });

@Override
 protected void onDestroy() {
     super.onDestroy();
     if (mDisposable != null && !mDisposable.isDisposed()) {
         mDisposable.dispose();
     }
 }
  1. delay

    延时发送数据。

    mDisposable = Observable.just(1).delay(2, TimeUnit.SECONDS)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                             Log.d(TAG, System.currentTimeMillis() + " " + " integer = " + integer);
                        }
                    });
    
  1. 背压BackPressure

    背压产生的原因: 被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息

    为了解决这个问题,在RxJava2里,引入了Flowable这个类:Observable不包含 backpressure 处理,而 Flowable 包含。

    下面我们来模拟一个触发背压的实例 , 发射器每1毫秒发射一个数据,接收器每一秒处理一个数据。数据产生是数据处理的1000 倍。

    首先用 RxJava 2.x 版本的 Observable 来实现。

    Observable.interval(1, TimeUnit.MILLISECONDS)
              .subscribeOn(Schedulers.io())
              .observeOn(Schedulers.newThread())
              .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Thread.sleep(1000);
                        Log.e("zhao", "onNext: " + aLong);
                    }
           });
    

    经过测试,app 很健壮,没有发生崩溃,日志每1秒打印一次。在上面我们说到 2.x 版本中 Observable 不再支持背压,发神器生成的数据全部缓存在内存中。

    Observable :

    • 不支持 backpressure 处理,不会发生 MissingBackpressureException 异常。
    • 所有没有处理的数据都缓存在内存中,等待被订阅者处理。
    • 坏处是:当产生的数据过快,内存中缓存的数据越来越多,占用大量内存。

    然后用 RxJava 2.x 版本的 Flowable 来实现。

    Flowable.interval(1, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<Long>() {
                  @Override
                   public void accept(Long aLong) throws Exception {
                        Thread.sleep(1000);
                        Log.e("zhao", "onNext: " + aLong);
                    }
             });
    

    运行起来发生崩溃,崩溃日志如下:

    io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
    ...
    ...
      Caused by: io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
    

    很明显发生了 MissingBackpressureException 异常 , 128 代表是 Flowable 最多缓存 128 个数据,缓存次超过 128 个数据,就会报错。可喜的是,Rxjava 已经给我们提供了解决背压的策略。

    onBackpressureDrop

    onBackpressureDrop() :当缓冲区数据满 128 个时候,再新来的数据就会被丢弃,如果此时有数据被消费了,那么就会把当前最新产生的数据,放到缓冲区。简单来说 Drop 就是直接把存不下的事件丢弃。

    onBackpressureDrop 测试

    Flowable.interval( 1 , TimeUnit.MILLISECONDS)
            .onBackpressureDrop() //onBackpressureDrop 一定要放在 interval 后面否则不会生效
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<Long>() {
                  @Override
                   public void accept(Long aLong) throws Exception {
                       Thread.sleep(1000);
                       Log.e("zhao", "onNext: " + aLong);
                   }
           });
    

    效果如下:

    E/zhao: onNext: 0
    E/zhao: onNext: 1
    ...
    E/zhao: onNext: 126
    E/zhao: onNext: 127
    E/zhao: onNext: 96129
    E/zhao: onNext: 96130
    E/zhao: onNext: 96131
    

    从日志上分析来看,发射器发射的 0 ~ 127 总共 128 个数据是连续的,下一个数据就是 96129 , 128 ~ 96128 的数据被丢弃了。

    注意事项

    1、onBackpressureDrop 一定要放在 interval 后面否则不会生效

    onBackpressureLatest

    onBackpressureLatest 就是只保留最新的事件。

    onBackpressureBuffer

    • onBackpressureBuffer:默认情况下缓存所有的数据,不会丢弃数据,这个方法可以解决背压问题,但是它有像 Observable 一样的缺点,缓存数据太多,占用太多内存。
    • onBackpressureBuffer(int capacity) :设置缓存队列大小,但是如果缓冲数据超过了设置的值,就会报错,发生崩溃。

    onBackpressureBuffer(int capacity) 测试

    Flowable.interval( 1 , TimeUnit.MILLISECONDS)
            .onBackpressureBuffer( 1000 ) //设置缓冲队列大小为 1000
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<Long>() {
                  @Override
                   public void accept(Long aLong) throws Exception {
                      Thread.sleep(1000);
                      Log.e("zhao", "onNext: " + aLong);
                   }
              });
    

    运行起来后,过了几秒钟,发生崩溃,日志如下:

    io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
    ···
    Caused by: io.reactivex.exceptions.MissingBackpressureException: Buffer is full
    

    通过日志可以看出,缓冲区已经满了。

  1. doOnNext

    可以让订阅者在接收到数据之前做一些操作,比如把数据进行保存。

    Observable.just(1,2,3).doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "doOnNext-" + integer);
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer + "");
                }
            });
    

    输出结果:

    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: doOnNext-1
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: 1
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: doOnNext-2
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: 2
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: doOnNext-3
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: 3
    
  2. skip

    跳过count个数目开始接收。

    Observable.just(1,2,3,4,5)
                    .skip(2)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            Log.d(TAG, "skip : "+integer + "\n");
                        }
                    });
    
  3. take

    接受一个 long 型参数 count ,代表至多接收 count 个数据。

    Observable.just(1,2,3,4,5)
                          .take(2)
                          .subscribe(new Consumer<Integer>() {
                              @Override
                              public void accept(@NonNull Integer integer) throws Exception {
                                  Log.d(TAG, "skip : "+integer + "\n");
                              }
                          });
    
  4. just

    简单的一个发射器,依次调用next方法

  1. Single
    只会接收一个参数,而 SingleObserver 只会调用 onError() 或者 onSuccess()

    Single.just(new Random().nextInt(10))
            .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "dispose");
                }
    
                @Override
                public void onSuccess(Integer integer) {
                    Log.d(TAG, integer + "");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, e.getMessage());
                }
            });
    

    输出结果:

    01-31 14:47:59.949 22093-22093/com.dxl.myapplication D/dxl: dispose
    01-31 14:47:59.949 22093-22093/com.dxl.myapplication D/dxl: 5
    
  1. debounce
    去除发送频率过快的项

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    // send events with simulated time wait
                    emitter.onNext(1); // skip
                    Thread.sleep(400);
                    emitter.onNext(2); // deliver
                    Thread.sleep(505);
                    emitter.onNext(3); // skip
                    Thread.sleep(100);
                    emitter.onNext(4); // deliver
                    Thread.sleep(605);
                    emitter.onNext(5); // deliver
                    Thread.sleep(510);
                    emitter.onComplete();
                }
            }).debounce(500, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, integer + "");
                        }
                    });
    

    设置的时间间隔是500ms, 发送1之后,400ms后发送2,所以1被舍弃。依次类推。

    最后输出结果:

    01-31 14:53:50.290 22568-22601/com.dxl.myapplication D/dxl: 2
    01-31 14:53:50.901 22568-22601/com.dxl.myapplication D/dxl: 4
    01-31 14:53:51.502 22568-22601/com.dxl.myapplication D/dxl: 5
    
  2. defer
    简单地时候就是每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable

    Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
                @Override
                public ObservableSource<Integer> call() throws Exception {
                    return Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                            emitter.onNext(1);
                            Log.d(TAG, "1");
                            emitter.onNext(2);
                            Log.d(TAG, "2");
                            emitter.onNext(3);
                            Log.d(TAG, "3");
                            emitter.onNext(4);
                            Log.d(TAG, "4");
                            emitter.onComplete();
                        }
                    });
                }
            });
    
     observable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept : " + integer);
                }
            });
    

    输出结果:

    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 1
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 1
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 2
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 2
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 3
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 3
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 4
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 4
    
  3. last
    仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。(参数表示默认值,如果没有发送的数据,取默认值)

    Observable.just(1,2,3,4).last(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer + "");
                }
            });
    

    输出结果为4.

    如果改为:

    Observable.just(1,2,3,4).skip(5).last(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer + "");
                }
            });
    

    这时候全部跳过,没有要发送的数据,返回默认值2

  4. merge
    作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。
    操作符每次用一个方法处理一个值
    例如

    Observable.just(1, 2, 3, 4, 5).reduce(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    Log.d(TAG, "integer : " + integer + ", integer2 : " + integer2);
                    return integer + integer2;
    
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer + "");
                }
            });
    

    第一次先取1,2,进行求和得到3,第二次利用求和得到的3与下一个3进行运算,得到6,依次类推。

    最后输出结果:

    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 1, integer2 : 2
    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 3, integer2 : 3
    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 6, integer2 : 4
    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 10, integer2 : 5
    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: 15
    
  5. scan
    和reduce相似,但是reduce只输出最后的结果,scan会输出过程。

    例如上面的代码reduce改为scan,输出结果如下:

    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 1
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 1, integer2 : 2
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 3
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 3, integer2 : 3
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 6
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 6, integer2 : 4
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 10
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 10, integer2 : 5
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 15
    
  6. 实例

    url: http://gank.io/api/xiandu/categories

Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
                Log.d(TAG, "create : " + Thread.currentThread().getName());
                Request request = new Request.Builder().url("http://gank.io/api/xiandu/categories").build();
                Response response = new OkHttpClient().newCall(request).execute();
                if (response.isSuccessful()) {
                    emitter.onNext(response);
                } else {
                    emitter.onError(new Exception(response.message()));
                }
                emitter.onComplete();
            }
        })
                //指定map的操作线程
                .observeOn(Schedulers.computation())
                .map(new Function<Response, Category>() {
                    @Override
                    public Category apply(Response response) throws Exception {
                        Log.d(TAG, "map : " + Thread.currentThread().getName());
                        ResponseBody responseBody = response.body();
                        Category category = new Gson().fromJson(responseBody.string(), Category.class);
                        return category;
                    }
                })
                //指定doOnNext的线程
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Category>() {
                    @Override
                    public void accept(Category category) throws Exception {
                        Log.d(TAG, "doOnNext1 : " + Thread.currentThread().getName());
                    }
                })
                //指定第二次doOnNext的线程
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Category>() {
                    @Override
                    public void accept(Category category) throws Exception {
                        Log.d(TAG, "doOnNext2 : " + Thread.currentThread().getName());
                    }
                })
                //指定事件产生的线程
                .subscribeOn(Schedulers.io())
                //指定事件消费线程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Category>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Category category) {
                        Log.d(TAG, "subscribe : " + Thread.currentThread().getName());
                        Log.d(TAG, category.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, e.toString());
                    }

                    @Override
                    public void onComplete() {

                    }
                });

    /**
     * 实体类
     * @author dxl
     * @date 2019/1/31 15:57
     */
    public class Category {

        private boolean error;
        public List<Results> results;

        public class Results {

            public String _id;
            public String en_name;
            public String name;
            public int rank;

            @Override
            public String toString() {
                return "Results{" +
                        "_id='" + _id + '\'' +
                        ", en_name='" + en_name + '\'' +
                        ", name='" + name + '\'' +
                        ", rank=" + rank +
                        '}';
            }
        }

        @Override
        public String toString() {
            return "Category{" +
                    "error=" + error +
                    ", results=" + results.toString() +
                    '}';
        }
    }

输出结果:

01-31 16:51:19.458 6151-6235/com.dxl.myapplication D/dxl: create : RxCachedThreadScheduler-1
01-31 16:51:19.738 6151-6151/com.dxl.myapplication D/dxl: map : main
01-31 16:51:19.778 6151-6151/com.dxl.myapplication D/dxl: doOnNext1 : main
01-31 16:51:19.788 6151-6282/com.dxl.myapplication D/dxl: doOnNext2 : RxCachedThreadScheduler-2
01-31 16:51:19.788 6151-6151/com.dxl.myapplication D/dxl: subscribe : main
01-31 16:51:19.788 6151-6151/com.dxl.myapplication D/dxl: Category{error=false, results=[Results{_id='57c83777421aa97cbd81c74d', en_name='wow', name='科技资讯', rank=1}, Results{_id='57c83577421aa97cb162d8b1', en_name='apps', name='趣味软件/游戏', rank=5}, Results{_id='57c83627421aa97cbd81c74b', en_name='imrich', name='装备党', rank=50}, Results{_id='57c836b4421aa97cbd81c74c', en_name='funny', name='草根新闻', rank=100}, Results{_id='5827dc81421aa911e32d87cc', en_name='android', name='Android', rank=300}, Results{_id='582c5346421aa95002741a8e', en_name='diediedie', name='创业新闻', rank=340}, Results{_id='5829c2bc421aa911e32d87e7', en_name='thinking', name='独立思想', rank=400}, Results{_id='5827dd7b421aa911d3bb7eca', en_name='iOS', name='iOS', rank=500}, Results{_id='5829b881421aa911dbc9156b', en_name='teamblog', name='团队博客', rank=600}]}

  • subscribeOn事件产生的线程只能指定一次, observeOn可以指定多次。
  1. 线程调度

    subScribeOn

    subscribeOn 用于指定 subscribe() 时所发生的线程

    observeOn

    observeOn 方法用于指定下游 Observer 回调发生的线程。

    线程切换需要注意的

    RxJava 内置的线程调度器的确可以让我们的线程切换得心应手,但其中也有些需要注意的地方。

    • 简单地说,subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
    • 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
    • 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                    e.onNext(1);
                    e.onComplete();
                }
            }).subscribeOn(Schedulers.newThread())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
                        }
                    })
                    .observeOn(Schedulers.io())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                        }
                    });
    
    07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
    07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main
    07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
    

    实例代码中,分别用 Schedulers.newThread()Schedulers.io() 对发射线程进行切换,并采用 observeOn(AndroidSchedulers.mainThread()Schedulers.io() 进行了接收线程的切换。可以看到输出中发射线程仅仅响应了第一个 newThread,但每调用一次 observeOn() ,线程便会切换一次,因此如果我们有类似的需求时,便知道如何处理了。

    RxJava 中,已经内置了很多线程选项供我们选择,例如有:

    • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
    • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
    • Schedulers.newThread() 代表一个常规的新线程;
    • AndroidSchedulers.mainThread() 代表Android的主线程

    这些内置的 Scheduler 已经足够满足我们开发的需求,因此我们应该使用内置的这些选项,而 RxJava 内部使用的是线程池来维护这些线程,所以效率也比较高。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容