RxJava2.x常用操作符总结(二)

四、功能操作符

1、delay

Observable.just(1, 2, 3)

.delay(2, TimeUnit.SECONDS)

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "=======================onSubscribe");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "=======================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "=======================onComplete");

    }

});

打印:

=======================onSubscribe

=======================onNext 1

=======================onNext 2

=======================onNext 3

=======================onComplete

说明:从打印结果可以看出 onSubscribe 回调2秒之后 onNext 才会回调。

2、doOnEach

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        //      e.onError(new NumberFormatException());

        e.onComplete();

    }

})

.doOnEach(new Consumer < Notification < Integer >> () {

    @Override

    public void accept(Notification < Integer > integerNotification) throws Exception {

        Log.d(TAG, "==================doOnEach " + integerNotification.getValue());

    }

})

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================doOnEach 1

==================onNext 1

==================doOnEach 2

==================onNext 2

==================doOnEach 3

==================onNext 3

==================doOnEach null

==================onComplete

说明:

Observable 每发送一件事件之前都会先回调这个方法。从结果就可以看出每发送一个事件之前都会回调 doOnEach 方法,并且可以取出 onNext() 发送的值。

3、doOnNext

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.doOnNext(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "==================doOnNext " + integer);

    }

})

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================doOnNext 1

==================onNext 1

==================doOnNext 2

==================onNext 2

==================doOnNext 3

==================onNext 3

==================onComplete

说明:Observable 每发送 onNext() 之前都会先回调这个方法。

4、doAfterNext

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.doAfterNext(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "==================doAfterNext " + integer);

    }

})

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================doAfterNext 1

==================onNext 2

==================doAfterNext 2

==================onNext 3

==================doAfterNext 3

==================onComplete

说明:Observable 每发送 onNext() 之后都会回调这个方法。

5、doOnComplete

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.doOnComplete(new Action() {

    @Override

    public void run() throws Exception {

        Log.d(TAG, "==================doOnComplete ");

    }

})

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================doOnComplete

==================onComplete

说明:Observable 每发送 onComplete() 之前都会回调这个方法。

6、doOnError()

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onError(new NullPointerException());

    }

})

.doOnError(new Consumer < Throwable > () {

    @Override

    public void accept(Throwable throwable) throws Exception {

        Log.d(TAG, "==================doOnError " + throwable);

    }

})

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================doOnError java.lang.NullPointerException

==================onError

说明:Observable 每发送 onError() 之前都会回调这个方法。

7、doOnSubscribe

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.doOnSubscribe(new Consumer < Disposable > () {

    @Override

    public void accept(Disposable disposable) throws Exception {

        Log.d(TAG, "==================doOnSubscribe ");

    }

})

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================doOnSubscribe

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

说明:Observable 每发送 onSubscribe() 之前都会回调这个方法。

8、doOnDispose

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.doOnDispose(new Action() {

    @Override

    public void run() throws Exception {

        Log.d(TAG, "==================doOnDispose ");

    }

})

.subscribe(new Observer < Integer > () {

    private Disposable d;


    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

        this.d = d;

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

        d.dispose();

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================doOnDispose

说明:当调用 Disposable 的 dispose() 之后回调该方法。

9、doOnLifecycle

Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override

    public void subscribe(ObservableEmitter<Integer> e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.doOnLifecycle(new Consumer<Disposable>() {

    @Override

    public void accept(Disposable disposable) throws Exception {

        Log.d(TAG, "==================doOnLifecycle accept");

    }

}, new Action() {

    @Override

    public void run() throws Exception {

        Log.d(TAG, "==================doOnLifecycle Action");

    }

})

.doOnDispose(

    new Action() {

        @Override

        public void run() throws Exception {

            Log.d(TAG, "==================doOnDispose Action");

        }

})

.subscribe(new Observer<Integer>() {

    private Disposable d;

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

        this.d = d;

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

        d.dispose();

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }


});

打印:

==================doOnLifecycle accept

==================onSubscribe

==================onNext 1

==================doOnDispose Action

==================doOnLifecycle Action

说明:

在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅。

doOnLifecycle() 第二个参数的回调方法的作用与 doOnDispose() 是一样的。

可以看到当在 onNext() 方法进行取消订阅操作后,doOnDispose() 和 doOnLifecycle() 都会被回调。

如果使用 doOnLifecycle 进行取消订阅

打印:

==================doOnLifecycle accept

==================onSubscrib

说明:

可以发现 doOnDispose Action 和 doOnLifecycle Action 都没有被回调。

10、doOnTerminate() & doAfterTerminate()

Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override

    public void subscribe(ObservableEmitter<Integer> e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

//      e.onError(new NullPointerException());

        e.onComplete();

    }

})

.doOnTerminate(new Action() {

    @Override

    public void run() throws Exception {

        Log.d(TAG, "==================doOnTerminate ");

    }

})

.subscribe(new Observer<Integer>() {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }


});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================doOnTerminate

==================onComplete

说明:doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。doAfterTerminate 也是差不多,这里就不再赘述。

11、doFinally

Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override

    public void subscribe(ObservableEmitter<Integer> e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.doFinally(new Action() {

    @Override

    public void run() throws Exception {

        Log.d(TAG, "==================doFinally ");

    }

})

.doOnDispose(new Action() {

    @Override

    public void run() throws Exception {

        Log.d(TAG, "==================doOnDispose ");

    }

})

.doAfterTerminate(new Action() {

    @Override

    public void run() throws Exception {

        Log.d(TAG, "==================doAfterTerminate ");

    }

})

.subscribe(new Observer<Integer>() {

    private Disposable d;

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

        this.d = d;

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

        d.dispose();

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================doOnDispose

==================doFinally

说明:在所有事件发送完毕之后回调该方法。doFinally() 和 doAfterTerminate() 到底有什么区别?区别就是在于取消订阅,如果取消订阅之后 doAfterTerminate() 就不会被回调,而 doFinally() 无论怎么样都会被回调,且都会在事件序列的最后。可以看到如果调用了 dispose() 方法,doAfterTerminate() 不会被回调。

现在试试把 dispose() 注释掉看看

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

==================doAfterTerminate

==================doFinally

说明:doAfterTerminate() 已经成功回调,doFinally() 还是会在事件序列的最后。

12、onErrorReturn

Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override

    public void subscribe(ObservableEmitter<Integer> e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onError(new NullPointerException());

    }

})

.onErrorReturn(new Function<Throwable, Integer>() {

    @Override

    public Integer apply(Throwable throwable) throws Exception {

        Log.d(TAG, "==================onErrorReturn " + throwable);

        return 404;

    }

})

.subscribe(new Observer<Integer>() {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onErrorReturn java.lang.NullPointerException

==================onNext 404

==================onComplete

说明:当接收到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。

13、onErrorResumeNext

Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override

    public void subscribe(ObservableEmitter<Integer> e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onError(new NullPointerException());

    }

})

.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {

    @Override

    public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {

        Log.d(TAG, "==================onErrorResumeNext " + throwable);

        return Observable.just(4, 5, 6);

    }

})

.subscribe(new Observer<Integer>() {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onErrorResumeNext java.lang.NullPointerException

==================onNext 4

==================onNext 5

==================onNext 6

==================onComplete

说明:当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。

14、onExceptionResumeNext

Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override

    public void subscribe(ObservableEmitter<Integer> e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onError(new Exception("404"));

    }

})

.onExceptionResumeNext(new Observable<Integer>() {

    @Override

    protected void subscribeActual(Observer<? super Integer> observer) {

        observer.onNext(333);

        observer.onComplete();

    }

})

.subscribe(new Observer<Integer>() {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onNext 333

==================onComplete

说明:与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception。

15、retry

Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override

    public void subscribe(ObservableEmitter<Integer> e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onError(new Exception("404"));

    }

})

.retry(2)

.subscribe(new Observer<Integer>() {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onNext 1

==================onNext 2

==================onNext 3

==================onNext 1

==================onNext 2

==================onNext 3

==================onError

说明:如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数。

16、retryUntil

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onError(new Exception("404"));

    }

})

.retryUntil(new BooleanSupplier() {

    @Override

    public boolean getAsBoolean() throws Exception {

        if (i == 6) {

            return true;

        }

        return false;

    }

})

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        i += integer;

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onError

说明:出现错误事件之后,可以通过此方法判断是否继续发送事件。

17、retryWhen

Observable.create(new ObservableOnSubscribe < String > () {

    @Override

    public void subscribe(ObservableEmitter < String > e) throws Exception {

        e.onNext("chan");

        e.onNext("ze");

        e.onNext("de");

        e.onError(new Exception("404"));

        e.onNext("haha");

    }

})

.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {

    @Override

    public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {

        return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {

            @Override

            public ObservableSource <? > apply(Throwable throwable) throws Exception {

                if(!throwable.toString().equals("java.lang.Exception: 404")) {

                    return Observable.just("可以忽略的异常");

                } else {

                    return Observable.error(new Throwable("终止啦"));

                }

            }

        });

    }

})

.subscribe(new Observer < String > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(String s) {

        Log.d(TAG, "==================onNext " + s);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError " + e.toString());

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext chan

==================onNext ze

==================onNext de

==================onError java.lang.Throwable: 终止啦

说明:当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。

将 onError(new Exception("404")) 改为 onError(new Exception("303"))

打印:

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

......

说明:不断的重复发送消息

18、repeat

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.repeat(2)

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "===================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "===================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "===================onComplete ");

    }

});

打印:

===================onSubscribe

===================onNext 1

===================onNext 2

===================onNext 3

===================onNext 1

===================onNext 2

===================onNext 3

===================onComplete

说明:重复发送被观察者的事件,times 为发送次数。该事件发送了两次。

19、repeatWhen

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.repeatWhen(new Function < Observable < Object > , ObservableSource <? >> () {

    @Override

    public ObservableSource <? > apply(Observable < Object > objectObservable) throws Exception {

        return Observable.empty();

    //  return Observable.error(new Exception("404"));

    //  return Observable.just(4); null;

    }

})

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "===================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "===================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "===================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "===================onComplete ");

    }

});

打印:

===================onSubscribe

===================onComplete

说明:这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件。这里分三种情况,如果新的被观察者返回 onComplete 或者 onError 事件,则旧的被观察者不会继续发送事件。如果被观察者返回其他事件,则会重复发送事件。

下面直接看看发送 onError 事件和其他事件的打印结果。

===================onSubscribe

===================onError

发送其他事件的打印结果:

===================onSubscribe

===================onNext 1

===================onNext 2

===================onNext 3

===================onComplete

20、subscribeOn

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

//.subscribeOn(Schedulers.newThread())

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "======================onSubscribe");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "======================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "======================onError");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "======================onComplete");

    }

});

打印:

======================onSubscribe

=========================currentThread name: main

======================onNext 1

======================onNext 2

======================onNext 3

======================onComplete

说明:上面的打印是不调用subscribeOn

打印:

======================onSubscribe

=========================currentThread name: RxNewThreadScheduler-1

======================onNext 1

======================onNext 2

======================onNext 3

======================onComplete

说明:调动了subscribeOn(Schedulers.newThread())

现在看看多次调用代码如下:

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());

        e.onNext(1);

        e.onNext(2);

        e.onNext(3);

        e.onComplete();

    }

})

.subscribeOn(Schedulers.computation())

.subscribeOn(Schedulers.newThread())

.subscribe(new Observer < Integer > () {@Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "======================onSubscribe");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "======================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "======================onError");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "======================onComplete");

    }

});

打印:

======================onSubscribe

=========================currentThread name: RxComputationThreadPool-1

======================onNext 1

======================onNext 2

======================onNext 3

======================onComplete

说明:可以看到第二次调动的 subscribeOn(Schedulers.newThread()) 并没有效果。

21、observeOn

Observable.just(1, 2, 3)

.observeOn(Schedulers.newThread())

.flatMap(new Function < Integer, ObservableSource < String >> () {

    @Override

    public ObservableSource < String > apply(Integer integer) throws Exception {

        Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());

        return Observable.just("chan" + integer);

    }

})

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Observer < String > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "======================onSubscribe");

    }

    @Override

    public void onNext(String s) {

        Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());

        Log.d(TAG, "======================onNext " + s);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "======================onError");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "======================onComplete");

    }

});

打印:

======================onSubscribe

======================flatMap Thread name RxNewThreadScheduler-1

======================flatMap Thread name RxNewThreadScheduler-1

======================flatMap Thread name RxNewThreadScheduler-1

======================onNext Thread name main

======================onNext chan1

======================onNext Thread name main

======================onNext chan2

======================onNext Thread name main

======================onNext chan3

======================onComplete

说明:指定观察者的线程,每指定一次就会生效一次。从打印结果可以知道,observeOn 成功切换了线程。

总结RxJava中的调度器:

Schedulers.computation( )

用于使用计算任务,如事件循环和回调处理

Schedulers.immediate( )

当前线程

Schedulers.io( )

用于 IO 密集型任务,如果异步阻塞 IO 操作。

Schedulers.newThread( )

创建一个新的线程

AndroidSchedulers.mainThread()

Android 的 UI 线程,用于操作 UI。

五、过滤操作符

1、filter

Observable.just(1, 2, 3)

    .filter(new Predicate < Integer > () {

        @Override

        public boolean test(Integer integer) throws Exception {

            return integer < 2;

        }

})

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        i += integer;

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onComplete

说明:通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。以上代码只有小于2的事件才会发送,

2、ofType

Observable.just(1, 2, 3, "chan", "zhide")

.ofType(Integer.class)

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        i += integer;

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

说明:可以过滤不符合该类型事件

3、skip

Observable.just(1, 2, 3)

.skip(2)

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        i += integer;

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 3

==================onComplete

说明:跳过正序某些事件,count 代表跳过事件的数量(skipLast() 作用也是跳过某些事件,不过它是用来跳过正序的后面的事件,这里就不再讲解了。)

4、distinct

Observable.just(1, 2, 3, 3, 2, 1)

.distinct()

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        i += integer;

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

说明:过滤事件序列中的重复事件。

5、distinctUntilChanged

Observable.just(1, 2, 3, 3, 2, 1)

.distinctUntilChanged()

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        i += integer;

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onNext 2

==================onNext 1

==================onComplete

说明:过滤掉连续重复的事件,因为事件序列中连续出现两次3,所以第二次3并不会发出。

6、take

Observable.just(1, 2, 3, 4, 5)

.take(3)

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "==================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        i += integer;

        Log.d(TAG, "==================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "==================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "==================onComplete ");

    }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

说明:控制观察者接收的事件的数量,takeLast() 的作用就是控制观察者只能接受事件序列的后面几件事情,这里就不再讲解了,大家可以自己试试。

7、debounce

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onNext(1);

        Thread.sleep(900);

        e.onNext(2);

    }

})

.debounce(1, TimeUnit.SECONDS)

.subscribe(new Observer < Integer > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "===================onSubscribe ");

    }

    @Override

    public void onNext(Integer integer) {

        Log.d(TAG, "===================onNext " + integer);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "===================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "===================onComplete ");

    }

});

打印:

===================onSubscribe

===================onNext 2

说明:如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。

可以看到事件1并没有发送出去,现在将间隔时间改为1000,

打印:

===================onSubscribe

===================onNext 1

===================onNext 2

throttleWithTimeout() 与此方法的作用一样,这里就不再赘述了。

8、firstElement() && lastElement()

Observable.just(1, 2, 3, 4)

.firstElement()

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "====================firstElement " + integer);

    }

});

Observable.just(1, 2, 3, 4)

.lastElement()

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "====================lastElement " + integer);

    }

});

打印:

====================firstElement 1

====================lastElement 4

说明:firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素。

9、elementAt() & elementAtOrError()

Observable.just(1, 2, 3, 4)

.elementAt(0)

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "====================accept " + integer);

    }

});

打印:

====================accept 1

说明:elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError() 。

将 elementAt() 的值改为5,这时是没有打印结果的,因为没有满足条件的元素。

替换 elementAt() 为 elementAtOrError(),代码如下:

Observable.just(1, 2, 3, 4)

.elementAtOrError(5)

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "====================accept " + integer);

    }

});

打印:

io.reactivex.exceptions.OnErrorNotImplementedException

at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 704)

at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 701)

at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java: 47)

at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)

at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)

at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)

at io.reactivex.Observable.subscribe(Observable.java: 10903)

at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)

at io.reactivex.Single.subscribe(Single.java: 2707)

at io.reactivex.Single.subscribe(Single.java: 2693)

at io.reactivex.Single.subscribe(Single.java: 2664)

at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)

at android.app.Activity.performCreate(Activity.java: 6942)

at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)

at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)

at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)

at android.app.ActivityThread. - wrap14(ActivityThread.java)

at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)

at android.os.Handler.dispatchMessage(Handler.java: 102)

at android.os.Looper.loop(Looper.java: 154)

at android.app.ActivityThread.main(ActivityThread.java: 6682)

at java.lang.reflect.Method.invoke(Native Method)

at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)

at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)

Caused by: java.util.NoSuchElementException

at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)

at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)

at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)

at io.reactivex.Observable.subscribe(Observable.java: 10903)

at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)

at io.reactivex.Single.subscribe(Single.java: 2707)

at io.reactivex.Single.subscribe(Single.java: 2693)

at io.reactivex.Single.subscribe(Single.java: 2664)

at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)

at android.app.Activity.performCreate(Activity.java: 6942)

at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)

at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)

at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)

at android.app.ActivityThread. - wrap14(ActivityThread.java)

at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)

at android.os.Handler.dispatchMessage(Handler.java: 102)

at android.os.Looper.loop(Looper.java: 154)

at android.app.ActivityThread.main(ActivityThread.java: 6682)

at java.lang.reflect.Method.invoke(Native Method)

at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)

at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)

说明:这时候会抛出 NoSuchElementException 异常。

六、条件操作符

1、all

Observable.just(1, 2, 3, 4)

.all(new Predicate < Integer > () {

    @Override

    public boolean test(Integer integer) throws Exception {

        return integer < 5;

    }

})

.subscribe(new Consumer < Boolean > () {

    @Override

    public void accept(Boolean aBoolean) throws Exception {

        Log.d(TAG, "==================aBoolean " + aBoolean);

    }

});

打印:

==================aBoolean true

说明:判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false。

2、takeWhile

Observable.just(1, 2, 3, 4)

.takeWhile(new Predicate < Integer > () {

    @Override

    public boolean test(Integer integer) throws Exception {

        return integer < 3;

    }

})

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "========================integer " + integer);

    }

});

打印:

========================integer 1

========================integer 2

说明:可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送。

3、skipWhile

Observable.just(1, 2, 3, 4)

.skipWhile(new Predicate < Integer > () {

    @Override

    public boolean test(Integer integer) throws Exception {

        return integer < 3;

    }

})

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "========================integer " + integer);

    }

});

打印:

========================integer 3

========================integer 4

说明:可以设置条件,当某个数据满足条件时不发送该数据,反之则发送。

4、takeUntil

Observable.just(1, 2, 3, 4, 5, 6)

.takeUntil(new Predicate < Integer > () {

    @Override

    public boolean test(Integer integer) throws Exception {

        return integer > 3;

    }

})

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "========================integer " + integer);

    }

});

打印:

========================integer 1

========================integer 2

========================integer 3

========================integer 4

说明:可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了。

5、skipUntil

Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)

.skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))

.subscribe(new Observer < Long > () {

    @Override

    public void onSubscribe(Disposable d) {

        Log.d(TAG, "========================onSubscribe ");

    }

    @Override

    public void onNext(Long along) {

        Log.d(TAG, "========================onNext " + along);

    }

    @Override

    public void onError(Throwable e) {

        Log.d(TAG, "========================onError ");

    }

    @Override

    public void onComplete() {

        Log.d(TAG, "========================onComplete ");

    }

});

打印:

========================onSubscribe

========================onNext 4

========================onNext 5

========================onComplete

说明:当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者。

6、sequenceEqual

Observable.sequenceEqual(Observable.just(1, 2, 3),

Observable.just(1, 2, 3))

.subscribe(new Consumer < Boolean > () {

    @Override

    public void accept(Boolean aBoolean) throws Exception {

        Log.d(TAG, "========================onNext " + aBoolean);

    }

});

打印:

========================onNext true

说明:判断两个 Observable 发送的事件是否相同。

7、contains

Observable.just(1, 2, 3)

.contains(3)

.subscribe(new Consumer < Boolean > () {

    @Override

    public void accept(Boolean aBoolean) throws Exception {

        Log.d(TAG, "========================onNext " + aBoolean);

    }

});

打印:

========================onNext true

说明:判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false。

8、isEmpty

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onComplete();

    }

})

.isEmpty()

.subscribe(new Consumer < Boolean > () {

    @Override

    public void accept(Boolean aBoolean) throws Exception {

        Log.d(TAG, "========================onNext " + aBoolean);

    }

});

打印:

========================onNext true

说明:判断事件序列是否为空。

9、amb

ArrayList < Observable < Long >> list = new ArrayList < > ();

list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS));

list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));

Observable.amb(list)

.subscribe(new Consumer < Long > () {

    @Override

    public void accept(Long aLong) throws Exception {

        Log.d(TAG, "========================aLong " + aLong);

    }

});

打印:

========================aLong 6

========================aLong 7

========================aLong 8

========================aLong 9

========================aLong 10

说明:amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃。

10、Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override

    public void subscribe(ObservableEmitter < Integer > e) throws Exception {

        e.onComplete();

    }

})

.defaultIfEmpty(666)

.subscribe(new Consumer < Integer > () {

    @Override

    public void accept(Integer integer) throws Exception {

        Log.d(TAG, "========================onNext " + integer);

    }

});

打印:

========================onNext 666

说明:如果被观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容