我是用来缓存的操作符-----Cache

Cache操作符功能介绍

缓存前面操作通过onNext所传递下来的参数,在下一次subscribe订阅时,直接跳过前面的步骤,直接执行后面步骤。

用途

单纯用文字介绍Cache的功能,大家可能很难理解。大家可以想象下,比如http请求,我的前面操作是有延时的,如果没有用cache,每次subscribe都会去真正发起http�请求。但是如果在尾部添加cache操作符的话,那么下一次subscribe的时候,其实就是直接从缓存中获取,而不会发起真正的http请求。

来一段小代码
  var handler = Handler()
        //延时后 cache的作用会更明显
//        observable = Observable.create(object : Observable.OnSubscribe<String> {
//            override fun call(t: Subscriber<in String>) {
//                t.onNext("Test1")
//                t.onNext("Test2")
//                t.onNext("Test3")
//            }
//        }).delay(4, TimeUnit.SECONDS).cache()
        observable = Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("Test1")
                t.onNext("Test2")
                t.onNext("Test3")
            }
        }).cache()

        btSub.setOnClickListener({
            observable?.subscribe({ msg ->
                handler.post(Runnable {
                    tvContent.text = tvContent.text.toString() + "\n" + msg

                })

            })
        })

这里有一段注释的代码,是使用了delay,使用了delay之后cache的作用会更加明显,第一次点击按钮触发subscribe订阅后,会延迟4秒后,在界面上Test1,Test2,Test3,第二次,第三次...界面上都会立刻打印Test1,Test2,Test3,但是由于我还没有为大家介绍过delay操作符,所以我这里也先不过多介绍,在以后的文章中我们会为大家进行delay的源码分析。
有了前面这么一大串的介绍后,我想大家对cache操作符有了一定的了解,那么我们就带着自己心中的疑问和猜想,按照demo的顺序一步步跟入源码。


看看源代码

Observable

public final Observable<T> cache() {
        return CachedObservable.from(this);
    }

CachedObservable

public static <T> CachedObservable<T> from(Observable<? extends T> source) {
        return (CachedObservable<T>)from(source, 16);
    }
public static <T> CachedObservable<T> from(Observable<? extends T> source, int capacityHint) {
        if (capacityHint < 1) {
            throw new IllegalArgumentException("capacityHint > 0 required");
        }
        CacheState<T> state = new CacheState<T>(source, capacityHint);
        CachedSubscribe<T> onSubscribe = new CachedSubscribe<T>(state);
        return new CachedObservable<T>(onSubscribe, state);
    }

个人理解:因为前面其实已经介绍到了,cache其实是把前面传递下来的参数,保存起来了,那么肯定是要有个数组或者列表的,这里我看到capacityHint,我就认为CacheState应该有可能是这个容器。
这倒不是最重要的,在Observable的子类中,其实我们最重要看的就是2个东西OnSubscribe,Subscriber明白了这2个东西,基本上对这个操作符就了解的差不多了

CachedObservable.CacheState

static final class CacheState<T> extends LinkedArrayList implements Observer<T> {
...
}

CachedObservable.CachedSubscribe

static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
  ...
}

OnSubscribe很明显,就是CachedSubscribe
那么CacheState是不是Subscriber?,在这里CacheState实现了Observer接口
Observer

public interface Observer<T> {    
   void onCompleted();   
   void onError(Throwable e);    
   void onNext(T t);
}

前面我们几篇文章都介绍了,Subscriber其实就是onNext..等方法的具体实现。所以我们可以直接把CacheState当作是Subscriber来看待,这样会很好的帮助我们理解。

 public abstract class Subscriber<T> implements Observer<T>, Subscription {
    ...
}

个人理解:Subscriber和Observer关系
Subscriber实现了Observer接口,Subscriber可以subscribe,unsubscribe,还有onStart方法。
在RxJava操作符中,其实我们关心的是SubscrberonNext方法和OnSubscribecall方法,能搞定这2个方法,那么我们就对这个操作符有了很深的了解了。

既然我们已经找到了2个重要的对象,那么我们继续往下深入。
Observable.cache调用后得到了一个CachedObservable,然后点击按钮调用subscribe
Observable

...
 public final Subscription subscribe(final Action1<? super T> onNext) {
        if (onNext == null) {
            throw new IllegalArgumentException("onNext can not be null");
        }

        Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
        Action0 onCompleted = Actions.empty();
        return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
    }
...
public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
...
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
        subscriber.onStart();
        if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
           ...

这里我贴出这块代码,其实在前面几篇文章我都贴过了,这里重新贴出希望大家加强记忆。调用subscribe方法,其实就是obsevable.onSubscibe.call(subscriber)
关键的地方来了,这里的obsevable,onSubscribe,subscriber分别是什么呢?
链式结构中,操作符使用的越多,到最后会越难判断,这里我们只有一个操作符,相对来说比较简单
这里的observable很明显就是CachedObservableobsevable.onSubscibe自然就是刚才我们分析得到的CachedSubscribe,如果你想当然的以为subscriber就是刚才的CacheState那就错了。
那么subscriber到底是什么呢?
我们从刚才的代码一步步往上推,其实就明白。
subscriber是一个参数由外部传入的,

...
 if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }
...

其实已经告诉我们,最终的subscriber其实是一个SafeSubscriber。而原来的subscriber当作构造参数保存在了SafeSubscriber中。

再往前看,看原来的subscriber是什么。

...
public final Subscription subscribe(final Action1<? super T> onNext) {
       ...
        return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
    }
...

很明显的就是,我们在demo中传入的Action被包装成了ActionSubscriber

看图说话

image.png

所以最终是这么一个subscriber

那么我们继续深入。

observable.onSubscriber.call(subscriber)

static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
 ...
        @Override
        public void call(Subscriber<? super T> t) {
            ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
            state.addProducer(rp);
            t.add(rp);
            t.setProducer(rp);
            if (!get() && compareAndSet(false, true)) {
                state.connect();
            }
        }
    }

我们一句一句来分析。

  1. 创建ReplayProducer对象
   static final class ReplayProducer<T> extends AtomicLong implements Producer, Subscription {
       ...
        public ReplayProducer(Subscriber<? super T> child, CacheState<T> state) {
            this.child = child;
            this.state = state;
        }
        @Override
        public void request(long n) {
        ...
        }
...

ReplayProducer是一个Producer,前面介绍过Producer,我们只需要看它request方法就好了。我们先等下看它的request

  1. ReplayProducer添加到CacheState对象中
 public void addProducer(ReplayProducer<T> p) {
            synchronized (connection) {
                ReplayProducer<?>[] a = producers;
                int n = a.length;
                ReplayProducer<?>[] b = new ReplayProducer<?>[n + 1];
                System.arraycopy(a, 0, b, 0, n);
                b[n] = p;
                producers = b;
            }
        }

这里用的是数组的形式,来添加一个新元素,比较简单,就不过多介绍了。

  1. ReplyProducer当作SubscriptionSubscriber绑定在一起。
public final void add(Subscription s) {
        subscriptions.add(s);
    }
...
@Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }
...

可以理解为,当subscriber.unsubscribe,对应的Producer也取消了订阅。这种操作在前面几个操作符中也有出现。

  1. 调用Producerrequest方法
    setProducer的操作呢,我们在前面也介绍过,就是直接调用Producer的request方法

注意:由于我们现在只使用一个操作符,比较简单,先这么理解,后续我们讲多个操作符混合使用就可能会出现比较复杂的情况,由浅入深,慢慢理解。

...
public void request(long n) {
            for (;;) {
                long r = get();
                if (r < 0) {
                    return;
                }
                long u = r + n;
                if (u < 0) {
                    u = Long.MAX_VALUE;
                }
                if (compareAndSet(r, u)) {
                    replay();
                    return;
                }
            }
        }
 public void replay() {
            synchronized (this) {
                if (emitting) {
                    missed = true;
                    return;
                }
                emitting = true;
            }
            boolean skipFinal = false;
            try {
                final Subscriber<? super T> child = this.child;

                for (;;) {

                    long r = get();

                    if (r < 0L) {
                        skipFinal = true;
                        return;
                    }
                    int s = state.size();
                    if (s != 0) {
                      //这段代码当读提出来分析
                        ...
  
                    }
                    synchronized (this) {
                        if (!missed) {
                            emitting = false;
                            skipFinal = true;
                            return;
                        }
                        missed = false;
                    }
                }
            } finally {
                if (!skipFinal) {
                    synchronized (this) {
                        emitting = false;
                    }
                }
            }
        }
    }
...

request方法中呢,其实主要调用了replay方法。
里面最重要的一句是

int s = state.size();
if (s != 0) {
...
}

由于第一次调用subscribe所以根本没有缓存东西,所以state.size()==0,所以第一调用replay方法基本上没有做什么操作。

  1. 第一次调用subscribe执行
 if (!get() && compareAndSet(false, true)) {
                state.connect();
            }

我们先来看
!get() && compareAndSet(false, true),由于CachedSubscribe<T> extends AtomicBoolean

第一次进来get()==false,然后再设置为true,确保了下一次不调用state.connect

下面我们具体分析下connect方法,最重要的缓存操作在这里

...
public void connect() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                @Override
                public void onNext(T t) {
                    CacheState.this.onNext(t);
                }
                @Override
                public void onError(Throwable e) {
                    CacheState.this.onError(e);
                }
                @Override
                public void onCompleted() {
                    CacheState.this.onCompleted();
                }
            };
            connection.set(subscriber);
            source.unsafeSubscribe(subscriber);
            isConnected = true;
        }
...

这里呢,我们还是分步骤一步步来解析

  1. 创建一个Subscriber
    因为在前面,其实我们本身对cache的功能已经有所了解,就是在第一次以后,再调用subscribe其实就是直接把缓存数据拿过来直接传递下去。

那么在这里,其实我们就可以看出,这里新建的Subscriber就是用来直接传递下去的Subscriber。这里其实我们可以画个大致的示意图。

看图说话

image.png

第一次subscribe走上面实现,第二次subscribe直接走下面虚线,而中间的这个Subscriber就是我们现在所提到的这个Subscriber。第二次以后就直接越过了demo中的OnSubscribe

注意: 这里没有涉及到demo中的Subscriber,通过前面的分析我们知道demo中的Subscriber已经被包装成了SafeSubscriber保存在ReplyProducer中的child变量里

  1. 保存subscriber
    既然前面提到了,第一次之后的每次操作都会使用到这个subscriber,那么我们肯定要把这个subscriber保存起来。
 public void set(Subscription s) {
      ...
        state.update(s);
    }

SequentialSubscription

...
public boolean update(Subscription next) {
        for (;;) {
            Subscription current = get();

            if (current == Unsubscribed.INSTANCE) {
                if (next != null) {
                    next.unsubscribe();
                }
                return false;
            }

            if (compareAndSet(current, next)) {
                if (current != null) {
                    current.unsubscribe();
                }
                return true;
            }
        }
    }
...

如我们所料,确实是把这个subscriber保存起来了。
compareAndSet(current, next)

  1. 第一次subscribe,还是要调用前面的onSubscribe
source.unsafeSubscribe(subscriber);

其实就是

source.onSubscribe.call(subscriber);

source我们可以通过源码知道,就是创建CachedObservable时传入的我们demoObservable.create()得到的Observable对象。
onSubscribe就是demo中的OnSubscribe
下面我们还是重点,放在subscriber上。

这里的subscriber其实我们根据上面的代码很容易判断出就是刚刚new 出来的Subscriber,并且在onNext其实是直接调用CacheStateonNext

CachedObservable.CacheState

...
public void onNext(T t) {
            if (!sourceDone) {
                Object o = NotificationLite.next(t);
                add(o);
                dispatch();
            }
        }
...

当我们在demo中OnSubscribe调用一次subscriber.onNext,其实就是进入CachedObservable.CacheStateonNext

在这里我们再次一句一句分析。

  1. NotificationLite.next(t)
public static <T> Object next(T t) {
        if (t == null) {
            return ON_NEXT_NULL_SENTINEL;
        } else {
            return t;
        }
    }

很明显就是判断参数是否为null。

  1. add(o)

前面我们就提到了CacheState用来保存上面传递下来的参数.
具体实现就是在这里,add(o)。

  1. dispatch()
 void dispatch() {
            ReplayProducer<?>[] a = producers;
            for (ReplayProducer<?> rp : a) {
                rp.replay();
            }
        }

很明显这里最重要的就是rp.replay()
我们先不管为什么这里为什么用了个for循环,我们先来看看rp.replay到底做了什么。

 public void replay() {
...
  int s = state.size();
  if (s != 0) {
      ...
    if (NotificationLite.accept(child, o)) {
      ...
    }
  }
...
}

NotificationLite.accept

public static <T> boolean accept(Observer<? super T> o, Object n) {
         ...
            o.onNext((T) n);
            return false;
         ...
    }

reply就是判断CacheState的缓存是否已经有了,有了之后就直接调用child.onNext(o)

那么child是什么呢?就是前面所分析的

image.png

这样在一层一层的调用onNext最终到了我们自己的Action

第二次调用subscribe

第一次调用subscribe的整个流程其实我们已经走完,下面我们直接简单的走一下第二次调用subscribe的流程。

直接从CachedSubscribecall方法看起。

 public void call(Subscriber<? super T> t) {
            ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
            state.addProducer(rp);
            t.add(rp);
            t.setProducer(rp);
            if (!get() && compareAndSet(false, true)) {
                state.connect();
            }
        }
  1. 又创建了一个ReplayProducer
  2. 添加到CacheState
  3. ReplayProducer加入到SafeSubscriber中绑定在一起
  4. 调用Producerrequest方法,在这里其实就是调用ReplayProducerreplay方法(前面分析过了)
  5. 因为第一次的时候,已经设置为true所以get()==true直接跳过

所以综上分析,我们还是直接看ReplayProducerreplay方法。

 public void replay() {
...
  int s = state.size();
  if (s != 0) {
      ...
    if (NotificationLite.accept(child, o)) {
      ...
    }
  }
...
}

NotificationLite.accept

public static <T> boolean accept(Observer<? super T> o, Object n) {
         ...
            o.onNext((T) n);
            return false;
         ...
    }

因为在第一次中我们已经在CacheState中保存了所有发射的对象。以我们当前的demo来看,state中保存了Test1,Test2,Test33个字符串,然后就直接调用SafeSubsciber的onNext方法。

总结

总的来说,cache操作符与前2个操作符比较起来的话,在源码上其实看起来会更有难度,但是有了前2个操作符做铺垫,再来看,相对来说会比较容易。

附加

整体cache的流程,其实我们已经有了大致的了解,但是刚才其实我们还是遗留了一个问题。我们再次来看这段代码。

void dispatch() {
            ReplayProducer<?>[] a = producers;
            for (ReplayProducer<?> rp : a) {
                rp.replay();
            }
        }

这里为什么要用了一个for循环呢?

image.png

其实我们从上面的图片中可以看出。重复输出了3次Test1,Test2,Test3
因为,我在demo中连续点击了3下SUBSCRIBE按钮。
也就是调用了3次subscribe,也就是往CacheState中添加了
3个ReplayProducer,所以上面调用dispatch方法的时候,自然就for循环调用了3次replay,然后就输出了3次。


大家喜欢的话就点个赞哦,给我点鼓励嘛
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335

推荐阅读更多精彩内容