RxJava-----Flowable 源码分析

参考资料 
Pissay https://blog.piasy.com/AdvancedRxJava/2016/10/04/subjects-part-2/

一、重点对象

Flowable -----FlowableCreate
Subscribe----FlowableOnBackpressureBuffer
request-----AtomicLong 相当于一个计数,记录事件下发了多少次
BackpressureHelper---控制请求个数
drain:下发和整理事件
上游发送对象的emitter和下游onSubscribe持有的subscription是一个对象,所以反复操作它的request

BackPressure背压

在异步模型中,如果上游产生数据速度过快而下游消费事件过慢。会出现数据堆积导致内存不断增加而溢出的问题。为了解决这种问题RxJava提出了节流以及背压的策略。它是一种流速控制的策略。

1.响应式拉去实现流速控制
在普通的RxJava模型中,上游主动推送事件给下游,下游的被动接收数据(下游的onNext方法是被动触发的)。而在响应式拉取模型中,由下游来请求上游发送事件。

二、简单流程分析

 Flowable.create(new FlowableOnSubscribe<String>() {

            @Override
            public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
                  //doSomething
            }
        }, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(20);
            }

            @Override
            public void onNext(String o) {
                //doSomething
            }

            @Override
            public void onError(Throwable t) {

            }

        });

FlowableCreate对象

1.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) 负责产生一个Flowable对象,它和Obserable的功能近似也有subscribe方法完成对下游观察者事件的订阅。不同的是它提供了Backpressure策略的支持。所以在性能上低于Observable,因为内部为了完成背压操作添加了许多其他操作。
2.create实际产生一个FlowableCreate对象,这个对象会持有我们创建的FlowableOnSubscribe和背压策略

    final FlowableOnSubscribe<T> source;
    final BackpressureStrategy backpressure;
    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }

3.FlowableCreate负责根据背压策略mode来决定使用什么样的发射器emitter(Subscription),这一步是在订阅方法触发时完成的

  1. t.onSubscribe(emitter)会把发射器对象交给subscribe对象持有。
 @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }
        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
          .....
        }
    }

三、LatestAsyncEmitter分发器

我们采用的是LATEST策略:如果缓存池满了,会将后续的数据丢掉,但是不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中。
1.LatestAsyncEmitter分发器继承的是BaseEmitter,而BaseEmitter实际上是一个AtomicLong
2.request方法内部其实是对Emitter维护的计数进行修改
3.emitter内部维护了下游的Subscriber对象,用来调用下游的onNext方法传递事件

继承AtomicLong为了优化性能。AtomicLong实际维护的是request个数,这个操作可能是在异步线程操作的,如果使用volatile关键词来维护,而volatile为防止指令重排会加入内存栅栏,频繁操作会有性能损耗,所以由AtomicLong来维护。

  abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        final Subscriber<? super T> actual;
        final SequentialDisposable serial;

        BaseEmitter(Subscriber<? super T> actual) {
            this.actual = actual;
            this.serial = new SequentialDisposable();
        }
      @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }
    }
   // BackpressureHelper.add(this, n) 内部实现
         for (;;) {
            //如果request维护的是一个Long.MAX_VALUE的值,不做任何操作
            long r = requested.get();
            if (r == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
           //将新值n和旧值相加
            long u = addCap(r, n);
           //更新requested维护的值
            if (requested.compareAndSet(r, u)) {
                return r;
            }
        }
  //addCap
  public static long addCap(long a, long b) {
        long u = a + b;
        if (u < 0L) {
            return Long.MAX_VALUE;
        }
        return u;
    }

三、LatestAsyncEmitter的发送事件方法

只有在下游调用了request方法修改了BaseEmitter维护的引用计数,且触发了drain方法时才会下发事件吗,否则会因为在drain函数中判断了引用计数是否为0而终止发送
下游函数在onSubcrible函数中拿到BaseEmitter对象,调用它的request请求开始发送数据

   @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }
  //在LatestAsyncEmitter类中
   @Override
        void onRequested() {
            drain();
        }

done:是否忽略该事件
queue:负责存储下发的事件
drain:对事件是否下发做处理

上游在调用onNext时,事件被queue缓存起来,但是如果下游没有调用request函数进行修改计数,在drain函数中也会被拦截

//LatestAsyncEmitter的onNext方法
            if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            queue.set(t);
            drain();

get():拿到emitter维护的计数,就是下游请求发送多少个数据
e:代表当前处理了多少个数据的计数
wip:保证同一时刻只有一个线程操作drain函数,它是AtomicInteger类。getAndIncrement会返回AtomicInteger维护的数据后再进行加1
wip.addAndGet(-missed):相当于wip.addAndGet(wip.get()-1)
这段代码会通过get()方法拿到下游请求了多少个数据,以及e用来计录已经处理了多少个数据。接着会判断订阅关系是否已经取消,如果取消就不发送。
否则会不断下发数据直到e != r 即将所有下游请求的数据都发送完毕
当所有数据都发送完毕后,会调用 BackpressureHelper.produced(this, e)修改维护的request计数,触发onComplete以及onError
调用 if (missed == 0)来判断是否有新的任务,如果missed为0代表所有任务都执行完毕,如果大于0代表有新的任务,需要再次执行for(;;)的代码

        void drain() {
            if (wip.getAndIncrement() != 0) {
                return;
            }

            int missed = 1;
            final Subscriber<? super T> a = actual;
            final AtomicReference<T> q = queue;

            for (;;) {
                long r = get();
                long e = 0L;

                while (e != r) {
                    if (isCancelled()) {
                        q.lazySet(null);
                        return;
                    }

                    boolean d = done;

                    T o = q.getAndSet(null);

                    boolean empty = o == null;

                    if (d && empty) {
                        Throwable ex = error;
                        if (ex != null) {
                            error(ex);
                        } else {
                            complete();
                        }
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(o);

                    e++;
                }

                if (e == r) {
                    if (isCancelled()) {
                        q.lazySet(null);
                        return;
                    }

                    boolean d = done;

                    boolean empty = q.get() == null;

                    if (d && empty) {
                        Throwable ex = error;
                        if (ex != null) {
                            error(ex);
                        } else {
                            complete();
                        }
                        return;
                    }
                }

                if (e != 0) {
                    BackpressureHelper.produced(this, e);
                }

                missed = wip.addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

思路总结

使用Flowable和Subsrcibe对象完成订阅时,并不会像与Observable订阅完成后立即在subscribe方法中下发数据,而是在下游调用request时,通过触发drain函数(函数内部持有Subsrcibe,调用该对象的onNext方法)来去启动下发数据的流程,实现响应式拉取。拉取的数量由下游决定,通过更新BaseEmitter维护的任务计数来,来修改BaseEmitter处理事件的个数

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • 怎么如此平静, 感觉像是走错了片场.为什么呢, 因为上下游工作在同一个线程呀骚年们! 这个时候上游每次调用emit...
    Young1657阅读 1,440评论 2 1
  • 前言:Rxjava是通过观察者模式设计的异步任务框架,他的有点在于简洁性,不是代码的简洁性,而是逻辑的简洁性,随着...
    松哦哦阅读 4,316评论 2 1
  • 前言 Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎...
    Carson带你学安卓阅读 29,645评论 14 184
  • 所谓的Backpressure其实就是为了控制流量, 水缸存储的能力毕竟有限。 这段代码很简单, 上游同样无限循环...
    Ayres阅读 2,079评论 1 3
  • 一 阿文 从理发店走出来,已经快接近中午,阿文没回厂里吃饭,每次理完发他都会到街边的小饭馆去吃饭,因为最近阿文总去...
    鹿怔阅读 1,898评论 16 23