Android RxJava框架源码解析

目录

简单示例1:

private Disposable mDisposable;
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("test");
            }
        })
        .subscribe(new Observer<String>() { 
            @Override
            public void onSubscribe(Disposable d) {
                mDisposable = d;
            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });


    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) {
            if (!mDisposable.isDisposed()) {
                mDisposable.dispose();
            }
        }
    }

特别注意:上面示例代码中的mDisposable最后必须要释放掉,不然会出现内存泄漏

一、观察者Observer创建过程

首先对观察者Observer源码开始进行简单分析下:
Observer.java

public interface Observer<T> {
    //表示一执行subscribe订阅就会执行该函数,这个函数跟当前调用.subscribe()一定执行在主线程中
    void onSubscribe(@NonNull Disposable d);
   // 表示拿到上一个流程的数据
    void onNext(@NonNull T t);
   // 表示拿到上一个流程的错误数据
    void onError(@NonNull Throwable e);
   // 表示事件流程结束
    void onComplete();
}

具体的对象创建是在上面示例代码1中的new Observer<String>()操作,这个称这个为自定义观察者

二、被观察者Observable创建过程

分析完观察者Observer的创建,现在来分析下被观察者Observable的创建流程,

Observable.create(new ObservableOnSubscribe<String>() {
     @Override
      public void subscribe(ObservableEmitter<String> e) throws Exception {
          e.onNext("test");
      }
    })

将new ObservableOnSubscribe()过程可以理解为是自定义source的过程。

new ObservableOnSubscribe<String>() {
     @Override
      public void subscribe(ObservableEmitter<String> e) throws Exception {
          e.onNext("test");
      }
    }

执行Observable.create()代码流程
Observable.java

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null"); //校验是否为null
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

其中,RxJavaPlugins.onAssembly()采用了hook技术,如果没有重写RxJavaPlugins.setOnObservableAssembly()方法,这个可以不要考虑。
ObservableCreate.java

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source; // 自定义source

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

    /**
     * Serializes calls to onNext, onError and onComplete.
     *
     * @param <T> the value type
     */
    static final class SerializedEmitter<T>
    extends AtomicInteger
    implements ObservableEmitter<T> {

        private static final long serialVersionUID = 4883307006032401862L;

        final ObservableEmitter<T> emitter;

        final AtomicThrowable error;

        final SpscLinkedArrayQueue<T> queue;

        volatile boolean done;

        SerializedEmitter(ObservableEmitter<T> emitter) {
            this.emitter = emitter;
            this.error = new AtomicThrowable();
            this.queue = new SpscLinkedArrayQueue<T>(16);
        }

        @Override
        public void onNext(T t) {
            if (emitter.isDisposed() || done) {
                return;
            }
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (get() == 0 && compareAndSet(0, 1)) {
                emitter.onNext(t);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<T> q = queue;
                synchronized (q) {
                    q.offer(t);
                }
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (emitter.isDisposed() || done) {
                return false;
            }
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (error.addThrowable(t)) {
                done = true;
                drain();
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (emitter.isDisposed() || done) {
                return;
            }
            done = true;
            drain();
        }

        void drain() {
            if (getAndIncrement() == 0) {
                drainLoop();
            }
        }

        void drainLoop() {
            ObservableEmitter<T> e = emitter;
            SpscLinkedArrayQueue<T> q = queue;
            AtomicThrowable error = this.error;
            int missed = 1;
            for (;;) {

                for (;;) {
                    if (e.isDisposed()) {
                        q.clear();
                        return;
                    }

                    if (error.get() != null) {
                        q.clear();
                        e.onError(error.terminate());
                        return;
                    }

                    boolean d = done;
                    T v = q.poll();

                    boolean empty = v == null;

                    if (d && empty) {
                        e.onComplete();
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    e.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        @Override
        public void setDisposable(Disposable s) {
            emitter.setDisposable(s);
        }

        @Override
        public void setCancellable(Cancellable c) {
            emitter.setCancellable(c);
        }

        @Override
        public boolean isDisposed() {
            return emitter.isDisposed();
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return this;
        }
    }

}


这里将ObservableCreate的源码全部放在这,作为一个埋点

其实,Observable.create()方法主要功能就是创建了一个ObservableCreate对象,并将自定义的source传给ObservableCreate。该方法最终返回的是ObserverableCreate对象。

三、subscribe订阅过程

分析执行subscribe()订阅流程,并将自定义观察者作为参数传入。
Observable.java

@Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null"); // 功能校验,判定observer是否为null
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer); 
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

首先会执行一些功能校验,最后执行到subscribeActual()方法中。
Observable.java

 protected abstract void subscribeActual(Observer<? super T> observer);

subscribeActual()是一个抽象类,从而最终调用的是ObservableCreate的subscribeActual()方法中。

ObservableCreate.java

@Override
    protected void subscribeActual(Observer<? super T> observer) {  // observer为自定义观察者
        // 自定义一个CreateEmitter发射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer); 
        // 执行该方法就会执行自定义观察者的onSubscribe()方法中
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

subscribeActual()方法里面会执行如下三个操作:
1)CreateEmitter<T> parent = new CreateEmitter<T>(observer); --> 首先会创建一个CreateEmitter发射器,并将自定义观察者传入该发射器中
2)observer.onSubscribe(parent);–> 执行自定义观察者的onSubscribe()方法,所以该方法也是最先执行调用,并且一定在主线程中
3)source.subscribe(parent); -->执行自定义source的subscribe()订阅操作,从而跳转到示例代码1中ObservableOnSubscribe的subscribe()方法,并将CreateEmitter发射器作为参数传入进去

new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("test");
            }
        }

执行e.onNext("test")就会跳转到CreateEmitter发射器中的onNext()方法
ObservableCreate.java

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);  //执行该流程,observer为自定义观察者
            }
        }
       ...
    }

该observer为上面流程中自定义的CreateEmitter发射器CreateEmitter<T> parent = new CreateEmitter<T>(observer);传入进来的自定义观察者对象,执行observer.onNext(t)该语句就调到示例代码1中的

@Override
public void onNext(String s) {

}

Observable与Observer订阅的过程时序图如下:
[图片上传失败...(image-e9a070-1677294388046)]

在标准的观察者设计模式中,是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知后,所以的“观察者”才能观察到
  在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要 起点(被观察者) 和 终点(观察者) 在“订阅”一次后,才发出改变通知,终点(观察者)才能观察到

图1:RxJava简单订阅过程:
[图片上传失败...(image-f0f708-1677294388046)]

四、map操作符

加入map操作符之后的简单示例代码2:

private Disposable mDisposable;

// 创建ObserverCreate
Observable.create(new ObservableOnSubscribe<String>() { //自定义source
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("test");
            }
        })
        // ObservableCreate.map
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s;
            }
        })
        // ObservableMap.subscribe
        .subscribe(new Observer<String>() { //自定义观察者
            @Override
            public void onSubscribe(Disposable d) {
                mDisposable = d;
            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) {
            if (!mDisposable.isDisposed()) {
                mDisposable.dispose();
            }
        }
    }

这个示例代码2写法采用装饰模型

图2:加入map操作符之后的流程:
[图片上传失败...(image-40fc9-1677294388046)]

从①~⑥流程简称为封包裹,⑦ ~⑨流程简称为拆包裹

其实图1与图2的区别不大,主要就是多了一个ObservableMap封包裹的流程,其他流程都类似。针对这个区别进行代码流程阐述下:
  从示例代码2中执行map()操作进行分析:
Observable.java

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

进行创建ObservableMap对象

ObservableMap.java

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source); //source指ObservableCreate
        this.function = function; // 自定义的Function方法
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function)); //这里面的t为下一层包裹即图2中的自定义观察者,source指上一层ObservableCreate
    }
...
}

这里需要注意,在ObservableMap()构造函数中,参数source指从上一层传过来的ObservableCreate对象,参数function指示例代码2中的new Function()方法。

 .map(new Function<String, String>() 

执行示例代码2中的.subscribe()其实就是执行到了ObservableMap类的subscribeActual()方法,在这个方法中会对MapObserver进行封装一层包裹,并将下一层的包裹即自定义观察者也就是参数t传入。

MapObserver为ObservableMap的内部类。

ObservableMap.java

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual); // actual为自定义观察者
            this.mapper = mapper;
        }
        ...
}

在执行图2的第⑧步流程时,就会调用执行包裹1的onNext()方法,即MapObserver类的onNext();
ObservableMap.java

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != NONE) {
        actual.onNext(null);
        return;
    }

    U v;

    try {
       // 代码1
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    // 代码2
    actual.onNext(v);
}


1:代码1
  执行mapper.apply(t)流程的时候,其实就是调用了示例代码2中的apply()方法。
Function.java

public interface Function<T, R> {
    R apply(@NonNull T t) throws Exception;
}

@Override
public String apply(String s) throws Exception {
    return s;
}

2:代码2
   actual.onNext(v);中的actual是在ObservableMap构造函数传过来的,actual对应图2中的自定义观察者对象,也就是对应图2中的第9步流程。

五、线程切换原理

subscribeOn:给上面代码分配线程
observeOn:给下面代码分配线程

Scheduler分类:

调度器类型 效果
Schedulers.computation() 用于计算任务,如事件循环或回调处理,不要用于IO操作(IO操作使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate() 在当前线程立即开始执行任务
Schedulers.io() 用于IO密集型任务
Schedulers.newThread() 为每个任务创建一个新任务
Schedulers.trampoline() 当其他排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread() 用于Android的UI更新操作

1. 异步线程流程

示例代码3:

private Disposable mDisposable;

//创建ObserverableCreate对象
Observable.create(
            // 自定义source
            new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("test");
                    }
                }
        )
        // TODO 第二步
        //ObservableCreate.subscribeOn()
        .subscribeOn(
            // TODO 第一步
            Schedulers.io()  // 给上面的代码分配异步线程
            ) 
        // TODO 第三步
        // ObservableSubscribeOn.subscribe()
        .subscribe(
            // 自定义观察者
            new Observer<String>() { 
                @Override
                public void onSubscribe(Disposable d) {
                    mDisposable = d;
                }
    
                @Override
                public void onNext(String s) {
                    Log.d("abc", "onNext:" + Thread.currentThread().getName());
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
        });


    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) {
            if (!mDisposable.isDisposed()) {
                mDisposable.dispose();
            }
        }
    }

示例代码3只是简单的在示例代码1上添加了一行异步线程的操作 .subscribeOn(Schedulers.io()),从第一步该语句进行分析:
Schedules.java

static final Scheduler IO;
...
static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    
 ...
 @NonNull
 public static Scheduler io() {
     return RxJavaPlugins.onIoScheduler(IO);
 }

RxJavaPlugins.initIoScheduler(...);这条语句也采用了hook机制,继续分析new IOTask()流程
Schedules.java

static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }

DEFAULT赋值如下:
Schedules.java

static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

IoScheduler.java

public IoScheduler() {
   this(WORKER_THREAD_FACTORY);
}

public IoScheduler(ThreadFactory threadFactory) {
   this.threadFactory = threadFactory;
   this.pool = new AtomicReference<CachedWorkerPool>(NONE);
   start();
}

@Override
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

IoScheduler.java

 private final ScheduledExecutorService evictorService;  //线程池
 
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); // 创建线程池
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

第一步总结执行Schedulers.io() 最终返回的是Scheduler,也就是IOScheduler对象。通过new IOScheduler 创建了一个线程池,然后通过subscribeOn()来触发。
Observable.java

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

此时new 了一个ObservableSubscribeOn对象,并将IoScheduler对象传进去
ObservableSubscribeOn.java

final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
     super(source);
     this.scheduler = scheduler;
}

第二步总结:就是创建了一个ObservableSubscribeOn对象,并将IoScheduler传入到该类中。
  执行第三步的.subscribe()订阅流程也就执行到了ObservableSubscribeOn.subscribeActual()这个方法中。
ObservableSubscribeOn.java

// s为自定义观察者
public void subscribeActual(final Observer<? super T> s) {
        // 代码1
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //定义一个包裹SubscribeOnObserver
        // 代码2
        s.onSubscribe(parent);
        // 代码3
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

1:代码1
  final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);定义一个包裹SubscribeOnObserver,并将自定义观察者s作为参数传入

2:代码2
  执行s.onSubscribe(parent);对应的执行到了示例代码3中的语句块

@Override
public void onSubscribe(Disposable d) {
    mDisposable = d;
}

3:代码3
  首先先分析parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));语句中的new SubscribeTask(parent)代码流程
ObservableSubscribeOn.java

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

     SubscribeTask(SubscribeOnObserver<T> parent) {
         this.parent = parent;
     }

     @Override
     public void run() {
         source.subscribe(parent);
     }
 }

SubscribeTask 就是一个线程任务

source.subscribe(parent);这个语句块中的source就是指上一层的对象,在示例代码3中指ObservableCreate,parent指包裹SubscribeOnObserver。

之后继续分析parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));语句中的scheduler.scheduleDirect(...)代码流程

Scheduler.java

public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

Scheduler.java

// run是指SubscribeTask任务
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
   // 代码1
    final Worker w = createWorker();

    // 代码2 
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); // hook机制
    
    // 代码3
    DisposeTask task = new DisposeTask(decoratedRun, w);
    
    // 代码4
    w.schedule(task, delay, unit);

    return task;
}

1:代码1
  执行final Worker w = createWorker();,createWorker()是一个抽象方法,其实调用到了IoScheduler类的createWorker()
IoScheduler.java

@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

IoScheduler.java

static final class EventLoopWorker extends Scheduler.Worker {
     private final CompositeDisposable tasks;
      private final CachedWorkerPool pool;
      private final ThreadWorker threadWorker;

      final AtomicBoolean once = new AtomicBoolean();

      EventLoopWorker(CachedWorkerPool pool) {
          this.pool = pool;
          this.tasks = new CompositeDisposable();
          this.threadWorker = pool.get();
      }
      ...
}

该语句块最后返回的是EventLoopWorker对象。

2:代码2
  其实代码2语句就是将Runable进行封装了下,最后还是Runnable

3:代码3
  将Runnable又包装了一层 为DisposeTask
Scheduler.java

static final class DisposeTask implements Runnable, Disposable {
        final Runnable decoratedRun;
        final Worker w;
    
    DisposeTask(Runnable decoratedRun, Worker w) {
       this.decoratedRun = decoratedRun;
       this.w = w;
    }
    ...
}

4:代码4
  执行 w.schedule(task, delay, unit);就会执行到EventLoopWorker类的schedule()方法
IoScheduler.java

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
   if (tasks.isDisposed()) {
         // don't schedule, we are unsubscribed
         return EmptyDisposable.INSTANCE;
     }

     return threadWorker.scheduleActual(action, delayTime, unit, tasks);
 }

NewThreadWorker.java

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
   Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

   ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

   if (parent != null) {
       if (!parent.add(sr)) {
           return sr;
       }
   }

   Future<?> f;
   try {
       if (delayTime <= 0) {
           f = executor.submit((Callable<Object>)sr); //线程池的执行,executor是ScheduleExecutorService线程池
       } else {
           f = executor.schedule((Callable<Object>)sr, delayTime, unit);
       }
       sr.setFuture(f);
   } catch (RejectedExecutionException ex) {
       if (parent != null) {
           parent.remove(sr);
       }
       RxJavaPlugins.onError(ex);
   }

   return sr;
}

scheduler.scheduleDirect(new SubscribeTask(parent)) 这句代码的最终目的就是将 SubscribeTask 任务 交给线程池去执行。

执行executor.submit()该语句就会触发SubscribeTask任务的Run()方法执行,该SubscribeTask任务就处于异步线程中。

ObservableSubscribeOn.java

public void run() {
    source.subscribe(parent); //处于异步线程中执行
}

source.subscribe(parent);这个语句块中的source就是指上一层的对象,在示例代码3中指ObservableCreate,parent指包裹SubscribeOnObserver。

图3:subscribeOn异步流程:
[图片上传失败...(image-584a9b-1677294388046)]

说明:步骤④是将SubscribeTask任务加入到线程池中执行,则后续步骤⑤~⑩都是在异步线程中执行

subscribeOn()切换线程时序图:
[图片上传失败...(image-32486d-1677294388046)]

2. 主线程流程

示例代码4:

private Disposable mDisposable;

//创建ObserverableCreate对象
Observable.create(
            // 自定义source
            new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("test");
                    }
                }
        )
        // TODO 第二步
        //ObservableCreate.observeOn()
        .observeOn(
            // TODO 第一步
            AndroidSchedulers.mainThread()  // 给上面的代码分配主线程
            ) 
        // TODO 第三步
        // ObservableObserveOn.subscribe()
        .subscribe(
            // 自定义观察者
            new Observer<String>() { 
                @Override
                public void onSubscribe(Disposable d) {
                    mDisposable = d;
                }
    
                @Override
                public void onNext(String s) {
                    Log.d("abc", "onNext:" + Thread.currentThread().getName());
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
        });


    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) {
            if (!mDisposable.isDisposed()) {
                mDisposable.dispose();
            }
        }
    }

先来分析下第一步AndroidSchedulers.mainThread()流程:
AndroidSchedulers.java

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}


private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
      new Callable<Scheduler>() {
           @Override public Scheduler call() throws Exception {
               return MainHolder.DEFAULT;
           }
       });


private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}

HandlerScheduler.java

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }
    ... 
}

AndroidSchedulers.mainThread()流程就是创建了一个HandlerScheduler对象。

执行第二步.observeOn(...)
Observable.java

public final Observable<T> observeOn(Scheduler scheduler) {
   return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

创建一个ObservableObserveOn对象,其中scheduler就是HandlerScheduler对象。
ObservableObserveOn.java

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

.observeOn(...)流程就是创建一个ObservableObserveOn对象,并将HandlerScheduler对象传入。

之后执行第三步.subscribe(),对应执行的是ObservableObserveOn.subscribeActual()
ObservableObserveOn.java

@Override
public void subscribeActual(final Observer<? super T> s) {
    // 代码1
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
   // 代码2
    s.onSubscribe(parent);
    // 代码3
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

其实,参数s为自定义的观察者,这个地方跟异步线程的流程是一样的,
代码1:封装了一层SubscribeOnObserver包裹
代码2:执行自定义观察者中的onSubscribe()方法流程
代码3:scheduler.scheduleDirect(new SubscribeTask(parent)) 这行代码的功能就是将SubscribeTask任务交给主线程执行。

HandlerScheduler.java

 @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); //将run运行在主线程中

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }
        ...
}

图4:observeOn主线程流程:
[图片上传失败...(image-5ac86f-1677294388046)]

observeOn()时序图:
[图片上传失败...(image-e9a7a-1677294388046)]

本文转自 https://blog.csdn.net/xuyin1204/article/details/129147940?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167729434516800188551819%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=167729434516800188551819&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2blogfirst_rank_ecpm_v1~rank_v31_ecpm-1-129147940-null-null.blog_rank_default&utm_term=Android%20RxJava%E6%A1%86%E6%9E%B6%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90&spm=1018.2226.3001.4450,如有侵权,请联系删除。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容

  • 前言 上一篇文章谈了单机的定时任务解决方案,只能在单个JVM进程中使用;而我们的现在基本上是分布式场景,需要一套在...
    逸飞追梦人阅读 7,634评论 1 2
  • 描述清点击 Android Studio 的 build 按钮后发生了什么 build[https://jueji...
    CHSmile阅读 552评论 0 1
  • 前言 在前一篇文章中介绍了如何构建源码阅读环境,既然构建好了源码环境,本地也可以正常运行,那就开始阅读源码吧! 在...
    Java李太白阅读 175评论 0 1
  • 前言 以前有学过,感觉理解不了,然后又用不到,就不了了之了 ,现在因为实习公司的项目有用到,如果不学的话感觉根本看...
    道别1999阅读 405评论 0 2
  • 一、发展历史 20世纪90年代,硬件领域出现了单片式计算机系统,这种价格低廉的系统一出现就立即引起了自动控制领域人...
    橙子v阅读 395评论 0 0