RxJava 1.x:你一定会用到的常用操作符

本文基于 RxJava1.x 版本,阅读本文前请先了解 RxJava 的基本使用。

RxJava 版本已升级到 RxJava2.x ,各个 API 均有不同程度的变化,具体请查看官方文档

参考文档:


1 Observable 的创建

1.1 from()

public static <T> Observable<T> from(Iterable<? extends T> iterable);

转换集合为一个每次发射集合中一个元素的 Observable 对象。

使用场景: 对集合(数组、List 等)进行遍历。

from()

其他 from() API:

举例:

// 1. 遍历集合
Observable<String> observable = Observable.from(new String[]{"hello", "hi"});
// 2. 使用 Future 创建 Observable,Future 表示一个异步计算的结果。
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
    @Override
    public String call() throws Exception {
        // TODO 执行异步操作并返回数据
        return "hihi";
    }
});

Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(new Action0() {
    @Override
    public void call() {
        futureTask.run();
    }
});

Observable<String> observable = Observable.from(futureTask);

1.2 just()

public static <T> Observable<T> just(final T value);

转换一个或多个 Object 为依次发射这些 Object 的 Observable 对象。

使用场景: 转换一个或多个普通 Object 为 Observable 对象,如转换数据库查询结果、网络查询结果等。

just()

其他 just() API:

举例:

Observable<String> observable = Observable.just("hello");

// 使用 just() 遍历几个元素
Observable<String> observable = Observable.just("hello", "hi", "...");
       
// 使用 from() 方法遍历,效果和 just() 一样。
String[] stringArrs = new String[]{"hello", "hi", "..."};
Observable<String> observable = Observable.from(stringArrs);

just() 方法可传入 1~10 个参数,也就说当元素个数小于等于 10 的时候既可以使用 just() 也可以使用 from(),否则只能用 from() 方法。

1.3 create()

public static <T> Observabl<T> create(OnSubscribe<T> f);

返回一个在被 OnSubscribe 订阅时执行特定方法的 Observable 对象。

使用场景: 不推荐使用,可使用其他操作符替代,如使用 from()操作符完成遍历。

其他 create() API:

举例:

Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe< String >() {
    @Override
    public void call(Subscriber<? super String > subscriber) {
         // onNext() 方法可执行多次
        subscribe.onNext("hello");
        subscribe.onCompleted();
    }
};
Observable<Object> observable = Observable.create(onSubscribe);

此方法不常用,大多数时候都是使用 just()from() 等方法,如上面那串代码就可以写成:

Observable<Object> observable = Observable.just("hello");

1.4 interval()

public static Observable<Long> interval(long interval, TimeUnit unit);

返回一个每隔指定的时间间隔就发射一个序列号的 Observable 对象。

使用场景: 可使用该操作符完成定时、倒计时等功能。

interval()

其他 interval() API:

举例:

// 每隔 1 s 发送一个序列号,序列号从 0 开始,每次累加 1。
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);

1.5 timer()

public static Observable<Long> timer(long delay, TimeUnit unit);

创建一个在指定延迟时间后发射一条数据(固定值:0)的 Observable 对象。

使用场景: 可用来完成定时功能。

timer()

其他 timer() API:

举例:

// 定时 3 s
Observable<Long> observable = Observable.timer(3, TimeUnit.SECONDS);

1.6 range()

public static Observable<Integer> range(int start, int count);

创建一个发射指定范围内的连续整数的 Observable 对象。

使用场景: 可使用该操作符完成一个 fori 的循环,如 for(int i=5;i<=7;i++) --> Observable.range(5, 3)

range()

其他 range() API:

举例:

// 依次发射 5、6、7
Observable<Integer> observable = Observable.range(5, 3);

1.7 empty()

public static <T> Observable<T> empty();

创建一个不发射任何数据就发出 onCompleted() 通知的 Observable 对象。

empty()

举例:

// 发出一个 onCompleted() 通知
Observable<Object> observable = Observable.empty();

1.8 error()

public static <T> Observable<T> error(Throwable exception);

创建不发射任何数据就发出 onError 通知的 Observable 对象。

使用场景: 程序中捕获异常后,可使用该操作符把捕获的异常传递到后面的逻辑中处理。

error()

举例:

// 发出一个 onError() 通知
Observable<Object> observable = Observable.error(new Throwable("message"));

1.9 never()

public static <T> Observable<T> never();

创建一个不发射任何数据和通知的 Observable 对象。

never()

举例:

Observable<Object> observable = Observable.never();

1.10 defer()

public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);

在订阅的时候才会创建 Observable 对象;每一次订阅都创建一个新的 Observable 对象。

使用场景: 可以使用该操作符封装需要被多次执行的函数。

defer()

举例:

Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
    @Override
    public Observable<String> call() {
        return Observable.just("string");
    }
});

2 重做

2.1 repeat()

public final Observable<T> repeat();

使Observable 对象在发出 onNext() 通知之后重复发射数据。重做结束才会发出 onComplete() 通知,若重做过程中出现异常则会中断并发出 onError() 通知。

使用场景: 可使用该操作符指定一次任务执行完成后立即重复执行上一次的任务,如发送多次网络请求等。

repeat()

其他 repeat() API:

举例:

Observable<String> observable = Observable.just("string");
// 无限重复执行
observable.repeat();
// 重复执行 5 次
observable.repeat(5);

2.2 repeatWhen()

public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler)

使Observable 对象在发出 onNext() 通知之后有条件的重复发射数据。重做结束才会发出 onCompleted() 通知,若重做过程中出现异常则会中断并发出 onError() 通知。

使用场景: 可使用该操作符指定满足一定条件时重复执行一个任务,如发送多次网络请求等。

repeatWhen()

其他 repeatWhen() API:

举例:

observable.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Void> observable) {
        // 重复 3 次, 每次间隔 1 s
        return observable.zipWith(Observable.range(1, 3), new Func2<Void, Integer, Integer>() {
            @Override
            public Integer call(Void aVoid, Integer integer) {
                return integer;
             }
        }).flatMap(integer -> Observable.timer(1, TimeUnit.SECONDS));
    }
});

3 重试

3.1 retry()

public final Observable<T> retry();

在执行 Observable对象的序列出现异常时,不直接发出 onError() 通知,而是重新订阅该 Observable对象,直到重做过程中未出现异常,则会发出 onNext()onCompleted() 通知;若重做过程中也出现异常,则会继续重试,直到达到重试次数上限,超出次数后发出最新的 onError() 通知。

使用场景: 网络等请求异常出错后,可重新发起请求。

retry()

其他 retry() API:

举例:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        System.out.println(".......");
        int a = 1 / 0;
        subscriber.onNext(a);
        subscriber.onCompleted();
    }
});
// 无限次的重试
observable.retry();
// 重试 3 次
observable.retry(3);

// 使用谓语函数决定是否重试
observable.retry(new Func2<Integer, Throwable, Boolean>() {
    @Override
    public Boolean call(Integer integer, Throwable throwable) {
        // 参数 integer 是订阅的次数; 参数 throwable 是抛出的异常
        // 返回值为 true 表示重试, 返回值为 false 表示不重试
        return false;
    }
});

3.2 retryWhen()

public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler);

有条件的执行重试。

使用场景: 网络等请求异常出错后,若满足一定条件,则重新发起请求。

retryWhen()

其他 retryWhen() API:

举例:

// 重试 3 次,每次间隔 1 s
observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Object>() {
            @Override
            public Object call(Throwable throwable, Integer integer) {
                return integer;
            }
        }).flatMap(new Func1<Object, Observable<?>>() {
            @Override
            public Observable<?> call(Object o) {
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        });
    }
});

4 变换

4.1 map()

public final <R> Observable<R> map(Func1<? super T, ? extends R> func);

把源 Observable 发射的元素应用于指定的函数,并发送该函数的结果。

使用场景: 将从网络获取的数据(NetData 对象)转换为数据库相关对象(DBData对象)并使用 Observable 发送。

map()

举例:

Observable.just(2)
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return String.valueOf(String.format("原始数据的两倍为: %s", integer * 2));
            }
        });

4.2 flatMap()

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func);

转换源 Observable 对象为另一个 Observable 对象。

使用场景: 从网络获取数据并使用 obsA 对象发射,flatMap() 操作符中可将数据存进数据库并返回一个新的对象 obsB。

flatMap()

其他 flatMap() API:

举例:

Observable.just(2)
        .flatMap(new Func1<Integer, Observable<Long>>() {
            @Override
            public Observable<Long> call(Integer integer) {
                // 转换为一个定时 integer 秒的 Observable 对象
                return Observable.timer(integer, TimeUnit.SECONDS);
            }
        });

5 过滤

5.1 filter()

public final Observable<T> filter(Func1<? super T, Boolean> predicate);

只发射满足指定谓词的元素。

使用场景: 可使用 filter 代替 if 语句。

filter()

举例:

Observable.just(-1, -2, 0, 1, 2)
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > 0;
            }
        });

5.2 first()

public final Observable<T> first();

返回一个仅仅发射源 Observable 发射的第一个[满足指定谓词的]元素的 Observable,如果源 Observable 为空,则会抛出一个 NoSuchElementException

使用场景: 顺序发出多条数据,只接收第一条。

first()

其他 first() API:

举例:

// 发射第一个元素
Observable.just(-1, -2, 0, 1, 2).first();

// 发射满足条件的第一个元素
Observable.just(-1, -2, 0, 1, 2)
        .first(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > 0;
            }
        });

// 会抛出 NoSuchElementException 异常
Observable.empty().first();

5.3 last()

public final Observable<T> last();

返回一个仅仅发射源 Observable 发射的倒数第一个[满足指定谓词的]元素的 Observable,如果源 Observable 为空,则会抛出一个 NoSuchElementException

使用场景: 顺序发出多条数据,只接收最后一条。

last()

其他 last() API:

举例:

// 发射倒数第一个元素
Observable.just(-1, -2, 0, 1, 2).first();

// 发射满足条件的倒数第一个元素
Observable.just(-1, -2, 0, 1, 2)
        .first(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer < 0;
            }
        });

// 会抛出 NoSuchElementException 异常
Observable.empty().last();

5.4 skip()

public final Observable<T> skip(int count);

跳过前面指定数量或指定时间内的元素,只发射后面的元素。

skip()

其他 skip() API:

举例:

Observable.just(-1, -2, 0, 1, 2)
        .skip(2) // 跳过前两条数据

5.5 skipLast()

public final Observable<T> skipLast(int count);

跳过前面指定数量或指定时间内的元素,只发射后面的元素。指定时间时会延迟源 Observable 发射的任何数据。

skipLast()

其他 skipLast() API:

举例:

Observable.just(-1, -2, 0, 1, 2)
        .skipLast(2) // 跳过后两条数据

5.6 take()

public final Observable<T> take(final int count);

只发射前面指定数量或指定时间内的元素。

take()

其他 take() API:

举例:

Observable.just(-1, -2, 0, 1, 2).take(3); // 只发射前三条数据

5.7 takeLast()

public final Observable<T> takeLast(final int count);

只发射后面指定数量或指定时间内的元素。指定时间时会延迟源 Observable 发射的任何数据。

takeLast()

其他 takeLast() API:

举例:

Observable.just(-1, -2, 0, 1, 2).takeLast(3); // 只发射后三条数据

5.8 sample()

public final Observable<T> sample(long period, TimeUnit unit);

定期发射 Observable 发射的最后一条数据。

sample()

其他 sample() API:

举例:

Observable.interval(300, TimeUnit.MILLISECONDS)
        .sample(2, TimeUnit.SECONDS)

5.9 elementAt()

public final Observable<T> elementAt(int index);

只发射指定索引的元素。
使用场景: 按索引去集合中的元素等。

elementAt()

举例:

Observable.just(-1, -2, 0, 1, 2).elementAt(2); // 发射索引为 2 的数据

5.10 elementAtOrDefault()

public final Observable<T> elementAtOrDefault(int index, T defaultValue);

只发射指定索引的元素,若该索引对应的元素不存在,则发射默认值。

elementAtOrDefault()

举例:

Observable.just(-1, -2, 0, 1, 2).elementAtOrDefault(9, -5); // 发射索引为 9的数据,若不存在,则发射 -5

5.11 ignoreElements()

public final Observable<T> ignoreElements();

不发射任何数据,直接发出 onCompleted() 通知。

ignoreElements()

举例:

Observable.just(-1, -2, 0, 1, 2).ignoreElements()

5.12 distinct()

public final Observable<T> distinct();

过滤重复的元素,过滤规则是:只允许还没有发射过的元素通过。

distinct()

其他 distinct() API:

举例:

// 直接过滤
Observable.just(-1, -2, 0, 1, 2, 1).distinct();

// 通过生成的 key 值过滤
Observable.just(-1, -2, 0, 1, 2, 1).distinct(new Func1<Integer, Integer>() {
    @Override
    public Integer call(Integer integer) {
        // 随机生成 key
        return integer * (int)(Math.random() * 10);
    }
});

5.13 debounce()

public final Observable<T> debounce(long timeout, TimeUnit unit)

Observable 每产生结果后,如果在规定的间隔时间内没有产生新的结果,则发射这个结果,否则会忽略这个结果。该操作符会过滤掉发射速率过快的数据项。

debounce()

其他 debounce() API:

举例:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        try {
            //产生结果的间隔时间分别为100、200、300...900毫秒
            for (int i = 1; i < 10; i++) {
                subscriber.onNext(i);
                Thread.sleep(i * 100);
            }
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }
});
observable.debounce(400, TimeUnit.MILLISECONDS)  // 超时时间为400毫秒

该例子产生结果为:依次打印5、6、7、8。

附:功能实现

延时遍历

// 遍历
Observable<Integer> traverseObservable = Observable.just(3, 4, 5, 6);
// 计时
Observable<Long> intervalObservable = Observable.interval(1, TimeUnit.SECONDS);
        
Func2<Long, Integer, Integer> func2 = new Func2<Long, Integer, Integer>() {
    @Override
    public Integer call(Long aLong, Integer integer) {
        return integer;
    }
};

intervalObservable.zipWith(traverseObservable, func2)
        .toBlocking()
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

           @Override
           public void onError(Throwable e) {
               e.printStackTrace();
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }
        });

倒计时

int startTime = 10;

Observable.interval(0, 1, TimeUnit.SECONDS)
        .take(startTime + 1) // 接收 startTime + 1 次
        .map(new Func1<Long, Long>() {
            @Override
            public Long call(Long time) {
                // 1 2 3...转换为...3 2 1
                return startTime - time;
            }
        })
        .toBlocking()
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("倒计时结束");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("倒计时出现异常");
                e.printStackTrace();
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println(String.format("倒计时: %s s", aLong));
            }
       });
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,600评论 8 93
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,184评论 2 8
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,599评论 18 139
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 911评论 0 3
  • 想起那首热恋的诗 就想起江水呢喃 春风拂柳 当然,也有偶尔短暂的电闪雷鸣 这些日子,她们都有一个共同的名字,叫 幸...
    秦渝阅读 231评论 0 0