RxJava从使用到原理

创建操作符

操作符使用

  1. 基本创建
    create() 完整创建1个被观察者对象(Observable)
  2. 快速创建,发送事件
    just() 快速创建1个被观察者对象(Observable),发送事件的特点:直接发送 传入的事件
    fromArray() 快速创建1个被观察者对象(Observable),发送事件的特点:直接发送 传入的数组数据
    fromIterable() 快速创建1个被观察者对象(Observable),发送事件的特点:直接发送 传入的集合List数据
    测试使用 empty() error() never()
  • 延迟创建
    defer() 直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
    timer() 快速创建1个被观察者对象(Observable),发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
    interval() 快速创建1个被观察者对象(Observable),发送事件的特点:每隔指定时间 就发送 事件,发送的事件序列 = 从0开始、无限递增1的的整数序列
    intervalRange() 快速创建1个被观察者对象(Observable),发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
    range() 快速创建1个被观察者对象(Observable), 发送事件的特点:连续发送 1个事件序列,可指定范围, 作用类似于intervalRange(),但区别在于:无延迟发送事件
    rangeLong() 类似于range(),区别在于该方法支持数据类型 = Long

创建操作符.png

Android RxJava:最基础的操作符详解 - 创建操作符

变化操作符
  1. map() 对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件即,将被观察者发送的事件转换为任意的类型事件。
  2. flatmap() 将被观察者发送的事件序列进行拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关。
  3. ConcatMap() 类似FlatMap()操作符,拆分 & 重新合并生成的事件序列的顺序 = 被观察者旧序列生产的顺序
  4. Buffer() 定期从被观察者(Obervable)需要发送的事件中获取一定数量的事件 & 放到缓存区中,最终发送
    Android RxJava:图文详解 变换操作符
组合/合并操作符
  1. concat() / concatArray() 组合多个被观察者一起发送数据,合并后按发送顺序串行执行
  2. merge() / mergeArray() 组合多个被观察者一起发送数据,合并后按时间线并行执行
  3. concatDelayError() / mergeDelayError() onError事件推迟到其他被被观察者发送事件结束后触发
  4. zip() 合并多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送。事件组合方式 = 严格按照原先事件序列进行对位合并,最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量。
  5. combineLatest() 当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。
  6. combineLatestDelayError() 作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述。
  7. reduce() 把被观察者需要发送的事件聚合成1个事件 & 发送
  8. startWith() / startWithArray() 在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
  9. count() 统计被观察者发送事件的数量
    Android RxJava:组合 / 合并操作符 详细教程
功能性操作符
  1. subscribe() 订阅,即连接观察者&被观察者
  2. 线程调度
  3. 延迟操作
  4. do() 在事件的生命周期中操作


    do操作符.png
  5. 错误处理


    错误操作符.png
  6. 重复发送
    repeat() 无条件地、重复发送 被观察者事件
    repeatWhen() 有条件地、重复发送 被观察者事件
    image.png

    Android RxJava:功能性操作符 全面讲解

原理

Single:如下面的代码做最简单的操作,创建被观察者,观察者以及相互之间订阅。
//创建被观察者
Single<String> single = Single.just("1");
//创建观察者
SingleObserver<String> observer = new SingleObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onSuccess(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }
};
  //发生订阅关系
  single.subscribe(observer);

just方法,返回一个包裹了Single的SingleJust。SingleJust继承Single类。

public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}

被观察者订阅观察者,subscribe方法回去实现subscribeActual(subscriber),这个方法就在SingleJust中

 public final void subscribe(SingleObserver<? super T> subscriber) {
        ObjectHelper.requireNonNull(subscriber, "subscriber is null");

        subscriber = RxJavaPlugins.onSubscribe(this, subscriber);

        ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");

        try {
            subscribeActual(subscriber);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }

subscribleActual会去调用观察者SingleObserver的onSubscribe和onSuccess方法。其中onSubscribe还返回一个已经丢弃的丢弃对象Disposables,Disposable会在下面讲。

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> s) {
        s.onSubscribe(Disposables.disposed());
        s.onSuccess(value);
    }

}

Create作为和just一样的创建操作符,其实流程是相似的,下面是流程图
create创建原理.png
Map操作符使用如下,能将just中的内容进行变换后往下传递。
        SingleJust.just(1)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return String.valueOf(integer);
                    }
                })
                .subscribe(new SingleObserver<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(String integer) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });

进入map方法,让后会返回一个新创建的SingleMap。

 public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}

在SingleMap类当中,subscribeActual中上游source订阅一个新的观察者MapSingleObserver,source.subscribe(new MapSingleObserver<T, R>(t, mapper)),这样就是让SingleJust去调用它自己的subscribeActual(),这样整个过程启动了。MapSingleObserver包裹一个我们创建的SingleObserver(t),看MapSingleObserver类,内部做的就是一个桥接和自己apply的工作。代码和整体流程图在下面。

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;

        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

带map操作符的流程图.png
Disposable:作用主要就是让上游停止工作。实现方式主要有两种方式:桥接,替换。这两种方式我会具体讲。用法一般就是全局定义一个Disposable,在onSubscribe(Disposable d)中获得disposable对象,在生命周期的onDestroy中去丢弃。
1. Single.just()没有延迟没有后续,直接传一个已经丢弃的丢弃对象,相当于传了一个没有用的对象。
public final class SingleJust<T> extends Single<T> {

    @Override
    protected void subscribeActual(SingleObserver<? super T> s) {
        s.onSubscribe(Disposables.disposed());
        s.onSuccess(value);
    }

}
2. delay(),有的延迟丢弃,看看是如何完成的。
    public final Single<T> delay(long time, TimeUnit unit, boolean delayError) {
        return delay(time, unit, Schedulers.computation(), delayError);
    }

创建返回一个SingleDelay

    public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler, boolean delayError) {
        ObjectHelper.requireNonNull(unit, "unit is null");
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new SingleDelay<T>(this, time, unit, scheduler, delayError));
    }

进入SingleDelay类,重点还是看subscribeActual,创建一个SequentialDisposable,通过 s.onSubscribe(sd)方法,让下游的SingleObserver能够拿到sd。上游source,用subscribe(new Delay(sd, s))进行启动事件。重点看一下Delay类,当上游调用onSubscribe(Disposable d)方法时,sd.replace(d)将sd替换成上游的d。当上游调用onSuccess(final T value)方法时, sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit))将sd替换成在执行延迟的disposable。同样onError也是一样。
总结来说,delay用替换的方式去传递丢弃事件disposable。

public final class SingleDelay<T> extends Single<T> {

    final SingleSource<? extends T> source;
    final long time;
    final TimeUnit unit;
    final Scheduler scheduler;
    final boolean delayError;

    public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
        this.source = source;
        this.time = time;
        this.unit = unit;
        this.scheduler = scheduler;
        this.delayError = delayError;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {

        final SequentialDisposable sd = new SequentialDisposable();
        s.onSubscribe(sd);
        source.subscribe(new Delay(sd, s));
    }

    final class Delay implements SingleObserver<T> {
        private final SequentialDisposable sd;
        final SingleObserver<? super T> s;

        Delay(SequentialDisposable sd, SingleObserver<? super T> s) {
            this.sd = sd;
            this.s = s;
        }

        @Override
        public void onSubscribe(Disposable d) {
            sd.replace(d);
        }

        @Override
        public void onSuccess(final T value) {
            sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
        }

        @Override
        public void onError(final Throwable e) {
            sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
        }

        final class OnSuccess implements Runnable {
            private final T value;

            OnSuccess(T value) {
                this.value = value;
            }

            @Override
            public void run() {
                s.onSuccess(value);
            }
        }

        final class OnError implements Runnable {
            private final Throwable e;

            OnError(Throwable e) {
                this.e = e;
            }

            @Override
            public void run() {
                s.onError(e);
            }
        }
    }
}
带延迟的disposable流程.png
3. subscribeOn,observeOn线程转换操作符,subscribeOn是切换上游的线程,observeOn是切换下游的线程。

不断跟进SubscribeOn(),最后会到SingleSubscribeOn类中,看一下subscribeActual方法,scheduler.scheduleDirect(parent)切线执行上游任务。如果没有线程的再次切换,后续任务将在这个线程中一直执行下去。

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;

    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
        s.onSubscribe(parent);

        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;

        final SingleObserver<? super T> actual;

        final SequentialDisposable task;

        final SingleSource<? extends T> source;

        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.actual = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            actual.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            actual.onError(e);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
            task.dispose();
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public void run() {
            source.subscribe(this);
        }
    }

}

同样不断跟进observeOn(),最后进入SingleObserveOn类,查看subscribeActual()方法,source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler))具体查看ObserveOnSingleObserver,在onSuccess方法中,进行线程切换scheduler.scheduleDirect(this)。所以observeOn用来切换下游方法。

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> actual;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.actual = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                actual.onError(ex);
            } else {
                actual.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}

箭头颜色代表线程


线程切换.png

三件套缺一不可:

OkHttp从使用到原理
Retrofit从使用到原理
RxJava从使用到原理

真诚推荐

下面的三篇文章我觉的分析的很好,特别是下面这张图流程很到位。
基本流程及Rxjava中的设计模式
线程切换subscribeOn
线程切换observerOn

线程切换subscribeOn

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342

推荐阅读更多精彩内容

  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    BrotherChen阅读 1,590评论 0 10
  • 一、Retrofit详解 ·Retrofit的官网地址为 : http://square.github.io/re...
    余生_d630阅读 1,802评论 0 5
  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,830评论 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    无求_95dd阅读 2,973评论 0 21
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    测天测地测空气阅读 625评论 0 1