RxJava源码分析-线程切换

RxJava源码分析-线程切换

接着上篇分析,本篇我们来揭开RxJava线程切换的神秘面试,先上一段代码

Observable.just("hello,world!")
        .map { res->
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            res+"1234"
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({ res ->
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            Log.d("Observable", "length:" + res.length)
        }, { e ->
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            e.printStackTrace()
        }, {
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            Log.d("Observable", "onComplete")
        },{
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            Log.d("Observable", "onSubscribe")
        })

这段代码执行玩打印的log如下

05-21 10:45:06.109 17068-17068/com.example.pandaguo.rxdemo D/Observable: thread:main
    onSubscribe
05-21 10:45:06.115 17068-17086/com.example.pandaguo.rxdemo D/Observable: thread:RxCachedThreadScheduler-1
05-21 10:45:06.165 17068-17068/com.example.pandaguo.rxdemo D/Observable: thread:main
    length:16
    thread:main
    onComplete

可以看到其中在map操作符中执行的代码是在RxCachedThreadScheduler-1线程中执行,而其余的均是在UI线程,为什么呢?

  • 本文的重点不在数据流向分析,因此前面几个函数不在仔细分析
Observable.just("hello,world!")
        .map { res->
            Log.d("Observable", "thread:" + Thread.currentThread().name)
            res+"1234"
        }

代码执行到这里,我们可以拿到经过封装的数据源ObservableMap,其实就是个Observable,那么接下来调用subscribeOn(Schedulers.io())来进行线程切换的操作了,我们来一点点的分析,首先看下Schedulers.io()
是怎么创建一个Scheduler对象IO的

Schedulers.io()

public final class Schedulers {
    ...
    static final Scheduler IO;
    ...
    static {
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        ...
    }
    ...
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    ...
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
    ...
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    ...
}
  • 一部分代码需要结合着RxJavaPlugins来一起看
public final class RxJavaPlugins {
    ...
    public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
        ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
        //初始化时候 onInitIoHandler = null
        Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
        if (f == null) {
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }
    ...
    static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
        try {
            return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }
    ...
    public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
        //初始化时候onIoHandler = null
        Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
        if (f == null) {
            return defaultScheduler;
        }
        return apply(f, defaultScheduler);
    }
    ...
}
  • 上面代码首先Schedulers.io()会调用RxJavaPlugins.onIoScheduler(IO),这里传入的IO实际上是一早就初始化的RxJavaPlugins.initIoScheduler(new IOTask()), IOTaskSchedulers的一个静态内部类,实现了Callable接口,并且在call()方法中返回了一个IoHolder.DEFAULT,这个IoHolder其实是一个Schedulers的静态内部类,然后默认会持有一个IoScheduler对象DEFAULT
  • RxJavaPlugins.initIoScheduler(new IOTask())会调用到callRequireNonNull(),我们来看下这个方法回去调用s.call(),s的类型是IOTask
  • 说了那么多Schedulers.io()最终就是创建了一个类型为IoScheduler的对象,我们先不去看IoScheduler的实现,先来分析Observable.subscribeOn()的实现

Observable.subscribeOn()

public abstract class Observable<T> implements ObservableSource<T> {
    ...
     public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    ...
}
  • 还是要结合RxJavaPlugins来一起看
public final class RxJavaPlugins {
    ...
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    ...
}
  • 这里其实RxJavaPlugins.onAssembly()实际上就还是返回了传入的参数,也就是创建拿到了一个ObservableSubscribeOn对象,那么线程切换的核心逻辑也就在这个类中实现,接下来我们来分析这个类

ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
        
        //这里是线程切换的关键
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //订阅实际发生的位置
            source.subscribe(parent);
        }
    }
}
  • 我们来看下subscribeActual的实现,s.onSubscribe(parent);执行的时候我们并没有看到线程切换的业务,所以我们可以肯定Observ.onSubscribe()一定是在UI线程回调的,那么为什么map操作符中的逻辑是在另一个线程呢?parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));parent.setDisposable()从实现上来看是没有做线程切换的逻辑,new SubscribeTask(parent)从实现上来看仅仅是让订阅的操作发生在SubscribeTask执行的线程,等等我有一个大胆的想法 ,既然有Runnable对象了,那么scheduler.scheduleDirect()会不会就是实际上去切换线程的操作呢?我们来追下代码

Scheduler.scheduleDirect()

public abstract class Scheduler {
    ...
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    ...
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
    ...
    public abstract Worker createWorker();
    ...
}
  • Scheduler是一个抽象类!
  • 上面代码可以看到,实际上执行的逻辑是在另一个重载方法scheduleDirect中,这里调用createWorker()创建了Worker对象w,然后调用了w.schedule(task, delay, unit);去实现了线程切换的逻辑
  • 啥?你说这样就实现了线程切换,我们不信!那我就证明给你看撒,还记得前面说得么Schedulers.io()最终创建了一个IoScheduler对象,我们来看下它的定义

IOScheduler

public final class IoScheduler extends Scheduler {
    private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
    static final RxThreadFactory WORKER_THREAD_FACTORY;

    private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
    static final RxThreadFactory EVICTOR_THREAD_FACTORY;

    private static final long KEEP_ALIVE_TIME = 60;
    private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

    static final ThreadWorker SHUTDOWN_THREAD_WORKER;
    final ThreadFactory threadFactory;
    final AtomicReference<CachedWorkerPool> pool;
    
    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_IO_PRIORITY = "rx2.io-priority";

    static final CachedWorkerPool NONE;
    static {
        SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
        SHUTDOWN_THREAD_WORKER.dispose();

        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));

        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

        EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
        NONE.shutdown();
    }
    
    static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

        @Override
        public void run() {
            evictExpiredWorkers();
        }

        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }

        void release(ThreadWorker threadWorker) {
            // Refresh expire time before putting worker back in pool
            threadWorker.setExpirationTime(now() + keepAliveTime);

            expiringWorkerQueue.offer(threadWorker);
        }

        void evictExpiredWorkers() {
            if (!expiringWorkerQueue.isEmpty()) {
                long currentTimestamp = now();

                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            allWorkers.remove(threadWorker);
                        }
                    } else {
                        // Queue is ordered with the worker that will expire first in the beginning, so when we
                        // find a non-expired worker we can stop evicting.
                        break;
                    }
                }
            }
        }

        long now() {
            return System.nanoTime();
        }

        void shutdown() {
            allWorkers.dispose();
            if (evictorTask != null) {
                evictorTask.cancel(true);
            }
            if (evictorService != null) {
                evictorService.shutdownNow();
            }
        }
    }
    
    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }

    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    @Override
    public void shutdown() {
        for (;;) {
            CachedWorkerPool curr = pool.get();
            if (curr == NONE) {
                return;
            }
            if (pool.compareAndSet(curr, NONE)) {
                curr.shutdown();
                return;
            }
        }
    }
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    ...
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    ...
}
  • 代码比较长,我们对着Scheduler.scheduleDirect()的流程来看下IOScheduler,首先我们来看createWorker()
  • 可以看到这里是通过pool.get()获取了一个CachedWorkerPool,这个pool是IOScheduler的成员变量是在构造方法中进行初始化的,它是一个AtomicReference能够保证针对持有对象的原子操作,换句话说能够保证线程的安全
public IoScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    start();
}
  • 默认其持有的是一个NONE,是在静态代码块中初始化完成的
static {
    ...
    NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
    NONE.shutdown();
}
  • 紧接着的start方法中会替换为新的值,并且这个CachedWorkerPool处于非shutDown状态
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}
  • CachedWorkerPool是什么我们后面再去分析,我们回到Scheduler.scheduleDirect()接着往下看,后续会将创建的Worker对象w与传入的Runnable接口对象run封装成一个DisposeTask对象task,之后调用Worker对象w的schedule方法也就是EventLoopWorker的schedule方法
  • 这里说下DisposeTask其实就是代理了传入的Runnable对象run,在其run()方法中会调用到传入的Runnable对象的run方法
static final class DisposeTask implements Disposable, 
    Runnable, SchedulerRunnableIntrospection {
        final Runnable decoratedRun;
        ...
        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            ...
        }
        ...
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }
        ...
}
  • EventLoopWorker.schedule这里会调用threadWorker.scheduleActual(action, delayTime, unit, tasks)
public Disposable schedule(@NonNull Runnable action, long delayTime, 
    @NonNull TimeUnit unit) {
        ...
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
  • threadWorker是在EventLoopWorker的构造方法中进行初始化的
EventLoopWorker(CachedWorkerPool pool) {
    ...
    this.threadWorker = pool.get();
}
  • 可以看到就是从CachedWorkerPool中获取的,我们来看下CachedWorkerPool的get()方法
static final class CachedWorkerPool implements Runnable {
    ...
    ThreadWorker get() {
        if (allWorkers.isDisposed()) {
            return SHUTDOWN_THREAD_WORKER;
        }
        while (!expiringWorkerQueue.isEmpty()) {
            ThreadWorker threadWorker = expiringWorkerQueue.poll();
            if (threadWorker != null) {
                return threadWorker;
            }
        }
        
        // No cached worker found, so create a new one.
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }
    ...
}
  • 这段代码其实就是先从缓存中看看能不能拿到一个已经缓存下来的ThreadWorker,如果没有就创建一个新的ThreadWorker对象并缓存起来,接下来我们看下ThreadWorker的实现

ThreadWorker

static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

    public long getExpirationTime() {
        return expirationTime;
    }

    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}
  • 可以看到并没有实现特殊的方法,所以其大部门实现都是在NewThreadWorker中的,回到前面分析的,我们说过EventLoopWorker.schedule会调用threadWorker.scheduleActual(action, delayTime, unit, tasks),在NewThreadWorker中最终会调用到scheduleActual这个方法,我们来看下具体实现
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //包装一层传入的参数run
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        //通过线程池去执行run
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}
  • 这里会将传入的Runnable再做一层包装,之后通过线程池去submit或者schedule执行对应的任务,这个Runnable对象就是前面我们分析的SubscribeTask,还记得SubscribeTask的run方法执行了什么么?
    source.subscribe(parent);
  • 所以map操作符中所有的流程就是执行在线程池之中
  • 对于ObservableOn()原理也是一样的,只不过开源库中通过Handler将获取MainThread.Looper然后将其切回UI线程,看客大佬们可以跟下源码分析一波
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容

  • RxJava的被观察者在使用操作符时可以利用线程调度器--Scheduler来切换线程,例如 被观察者(Obser...
    fengzhizi715阅读 4,927评论 0 8
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,451评论 7 62
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,158评论 6 151
  • 9月26日杭州“明月入怀-中国团扇文化印象特展”开幕,因为有故宫博物院馆藏的宋徽宗《枇杷山鸟图》、马远《寒山子...
    东有乔木阅读 388评论 1 2
  • 好像打我记事以来我已经是一个在校生了,无论当时是有多小,是一年级还是幼儿园!虽然到了现在记忆还是有点模糊,不...
    邹戒戒阅读 538评论 0 9