深入RxJava2 源码解析(二)

本文作者JasonChen,原文地址: http://chblog.me/2018/12/19/rxjava2%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90(%E4%B8%80)/

前一篇文章我们讲述到RxJava2 的内部设计模式与原理机制,包括观察者模式和装饰者模式,其本质上都是RxJava2的事件驱动,那么本篇文章将会讲到RxJava2 的另外一个重要功能:异步。

RxJava2 深入解析

依旧是从源码实现开始,带着疑惑去读,前一篇文章我们讲到subcribeOn方法内部的实现涉及线程池:Scheduler.Worker w = scheduler.createWorker() 这边涉及两个重要组件:

  1. scheduler调度器
  2. 自定义线程池

scheduler调度器源码解析

public final class Schedulers {
    @NonNull
    static final Scheduler SINGLE;

    @NonNull
    static final Scheduler COMPUTATION;

    @NonNull
    static final Scheduler IO;

    @NonNull
    static final Scheduler TRAMPOLINE;

    @NonNull
    static final Scheduler NEW_THREAD;

一共有如下的五种调度器,分别对应不同的场景,当然企业可以针对自身的场景设置自己的调度器。

  • SINGLE,针对单一任务设置的单个定时线程池
  • COMPUTATION,针对计算任务设置的定时线程池的资源池(数组)
  • IO,针对IO任务设置的单个可复用的定时线程池
  • TRAMPOLINE,trampoline翻译是蹦床(佩服作者的脑洞)。这个调度器的源码注释是:任务在当前线程工作(不是线程池)但是不会立即执行,任务会被放入队列并在当前的任务完成之后执行。简单点说其实就是入队然后慢慢线性执行(这里巧妙的方法其实和前面我们所讲的回压实现机制基本是一致的,值得借鉴)
  • NEW_THREAD,单个的周期线程池和single基本一致唯一不同的是single对thread进行了一个简单的NonBlocking封装,这个封装从源码来看基本没有作用,只是一个marker interface标志接口

computation调度器源码分析

computation调度器针对大量计算场景,在后端并发场景会更多的用到,那么其是如何实现的呢?接下来带着疑惑进行源码分析。

  public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
    // 资源池
    final AtomicReference<FixedSchedulerPool> pool;

    // 这是computationScheduler类中实现的createWork()方法
    public Worker createWorker() {
      // 创建EventLoop工作者,入参是一个PoolWorker
        return new EventLoopWorker(pool.get().getEventLoop());
    }

  static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
          final int cores;
          // 资源池工作者,每个工作者其实都是一个定时线程池
          final PoolWorker[] eventLoops;
          long n;
          // 对应前面的函数调用
          public PoolWorker getEventLoop() {
            int c = cores;
            if (c == 0) {
                return SHUTDOWN_WORKER;
            }
            // simple round robin, improvements to come
            // 这里其实就是从工作者数组中轮询选出一个工作者
            这里其实拥有提升和优化的空间,这里笔者可能会向开源社区提交一个pr
            以此进行比较好的调度器调度
            return eventLoops[(int)(n++ % c)];
          }
  // 此处是一个简单的封装        
  static final class PoolWorker extends NewThreadWorker {
          PoolWorker(ThreadFactory threadFactory) {
              super(threadFactory);
          }
      }

  public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        // 进行定时线程池的初始化
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    public static ScheduledExecutorService create(ThreadFactory factory) {
      final ScheduledExecutorService exec =
      // 初始化一个定时线程池
      Executors.newScheduledThreadPool(1, factory);
      tryPutIntoPool(PURGE_ENABLED, exec);
      return exec;
    }

上述代码清晰的展示了computation调度器的实现细节,这里需要说明的是定时线程池的core设置为1,线程池的个数最多为cpu数量,这里涉及到ScheduledThreadPoolExecutor定时线程池的原理,简单的说起内部是一个可自动增长的数组(队列)类似于ArrayList,也就是说队列永远不会满,线程池中的线程数不会增加。

接下来结合订阅线程和发布线程分析其之间如何进行沟通的本质。

发布线程在上一篇的文章已经提到,内部是一个worker,那么订阅线程也是么,很显然必须是的,接下来我们来看下源代码:

// 还是从subscribeActul开始(原因见上一篇文章)
public void subscribeActual(Subscriber<? super T> s) {
    Worker worker = scheduler.createWorker();

    if (s instanceof ConditionalSubscriber) {
        source.subscribe(new ObserveOnConditionalSubscriber<T>(
                (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
    } else {
        // 
        source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
    }
}

其内部封装了一个ObserveOnsubcriber,这是个对下流订阅者的封装,主要什么作用呢,为什么要这个呢?其实这个涉及订阅线程内部的机制,接着看源代码了解其内部机制。

  // 基类
  abstract static class BaseObserveOnSubscriber<T> extends BasicIntQueueSubscription<T>
  implements FlowableSubscriber<T>, Runnable {
      private static final long serialVersionUID = -8241002408341274697L;

      final Worker worker;

      final boolean delayError;

      final int prefetch;

      //...

      @Override
      public final void onNext(T t) {
          if (done) {
              return;
          }
          if (sourceMode == ASYNC) {
              trySchedule();
              return;
          }

          if (!queue.offer(t)) {
              upstream.cancel();

              error = new MissingBackpressureException("Queue is full?!");
              done = true;
          }
          // 开启订阅者线程池模式的调度,具体实现在子类中实现
          trySchedule();
      }

      @Override
      public final void onError(Throwable t) {
          if (done) {
              RxJavaPlugins.onError(t);
              return;
          }
          error = t;
          done = true;
          trySchedule();
      }

      @Override
      public final void onComplete() {
          if (!done) {
              done = true;
              trySchedule();
          }
      }

      // 这里并没有向上传递request请求,而是把自己当做数据发射者进行request计数
      @Override
      public final void request(long n) {
          if (SubscriptionHelper.validate(n)) {
              BackpressureHelper.add(requested, n);
              // 开启调度
              trySchedule();
          }
      }

      // 调度代码
      final void trySchedule() {
          // 上一篇文章讲过这个的用法
          if (getAndIncrement() != 0) {
              return;
          }
          // 启用一个work来进行任务的执行 this对象说明实现了runable接口
          worker.schedule(this);
      }

      // 调度实现的代码
      @Override
      public final void run() {
          if (outputFused) {
              runBackfused();
          } else if (sourceMode == SYNC) {
              runSync();
          } else {
              // 一般会调用runAsync方法
              runAsync();
          }
      }

      abstract void runBackfused();

      abstract void runSync();

      abstract void runAsync();
      //...
  }

当上游的装饰者(上一篇提到的装饰者模式)调用onNext方法时,这时并没有类似的去调用下游的onNext方法,那这个时候其实就是订阅者线程模式的核心原理:采用queue队列进行数据的store,这里尝试将数据放进队列。

ObserveOnSubscriber的具体实现类部分实现如下。

  static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
  implements FlowableSubscriber<T> {

      private static final long serialVersionUID = -4547113800637756442L;

      final Subscriber<? super T> downstream;

      ObserveOnSubscriber(
              Subscriber<? super T> actual,
              Worker worker,
              boolean delayError,
              int prefetch) {
          super(worker, delayError, prefetch);
          this.downstream = actual;
      }

      //这是上游回调这个subscriber时调用的方法,详情见上一篇文章
      @Override
      public void onSubscribe(Subscription s) {
          if (SubscriptionHelper.validate(this.upstream, s)) {
              this.upstream = s;

              if (s instanceof QueueSubscription) {
                  @SuppressWarnings("unchecked")
                  QueueSubscription<T> f = (QueueSubscription<T>) s;

                  int m = f.requestFusion(ANY | BOUNDARY);

                  if (m == SYNC) {
                      sourceMode = SYNC;
                      queue = f;
                      done = true;

                      downstream.onSubscribe(this);
                      return;
                  } else
                  if (m == ASYNC) {
                      sourceMode = ASYNC;
                      queue = f;

                      downstream.onSubscribe(this);

                      s.request(prefetch);

                      return;
                  }
              }
              // 设置缓存队列
              // 这里涉及一个特别之处就是预获取(提前获取数据)
              queue = new SpscArrayQueue<T>(prefetch);
              // 触发下游subscriber 如果有request则会触发下游对上游数据的request
              downstream.onSubscribe(this);
              // 请求上游数据 上面的代码和这行代码就是起到承上启下的一个作用,也就是预获取,放在队列中
              s.request(prefetch);
          }
      }

      //...

下面看一下抽象方法runAsync()的实现。

      @Override
      void runAsync() {
          int missed = 1;

          final Subscriber<? super T> a = downstream;
          final SimpleQueue<T> q = queue;

          long e = produced;

          for (;;) {

              long r = requested.get();

              while (e != r) {
                  boolean d = done;
                  T v;

                  try {
                      // 获取数据
                      v = q.poll();
                  } catch (Throwable ex) {
                      Exceptions.throwIfFatal(ex);

                      cancelled = true;
                      upstream.cancel();
                      q.clear();

                      a.onError(ex);
                      worker.dispose();
                      return;
                  }

                  boolean empty = v == null;

                  if (checkTerminated(d, empty, a)) {
                      return;
                  }

                  if (empty) {
                      break;
                  }

                  a.onNext(v);

                  e++;
                  // limit = prefetch - (prefetch >> 2)
                  // prefetch  = BUFFER_SIZE(上一篇文章提到的默认128)
                  if (e == limit) {
                      if (r != Long.MAX_VALUE) {
                          r = requested.addAndGet(-e);
                      }
                      upstream.request(e);
                      e = 0L;
                  }
              }

              if (e == r && checkTerminated(done, q.isEmpty(), a)) {
                  return;
              }

              // 下面的代码机制在上一篇讲过主要涉及异步编程技巧
              int w = get();
              if (missed == w) {
                  produced = e;
                  missed = addAndGet(-missed);
                  if (missed == 0) {
                      break;
                  }
              } else {
                  missed = w;
              }
          }
      }
    //...
  }

前面说过,订阅者把自己当成一个发射者,那数/据从哪里来呢,而且还要持续有数据,那么后面的代码说明了数据来源,当数据达到limit,开始新的数据的prefetch,每次preftch的数量是limit。

为何要将订阅者这样区别设置呢,其实原因很简单,订阅者和发布者需要不同的线程机制异步地执行,比如订阅者需要computation的线程机制来进行大量的耗时数据计算,但又要保持一致的装修者模式,所以源码的做法是订阅者这边打破回调的调用流,采用数据队列进行两个线程池之间的数据传送

本文总结

笔者喜欢总结,总结意味着我们反思和学习前面的知识点,应用点以及自身的不足。

  1. rxjava2线程调度的原理机制,不同场景下线程机制需要进行定制
  2. rxjava2生产和消费的异步原理和实现方式

订阅最新文章,欢迎关注我的公众号

微信公众号
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容