RxJava初学

Rxjava简介

Reactive Extensions for the JVM
a library for composing asynchronous and event-based programs using observable sequences for the Java VM
这里说了Rxjava就是JVM响应式扩展Reactive Extensions
用于使用Java VM的可观察序列编写异步与基于事件的程序库(sdk)。

使用观察者模式,采用链式编程,基于事件的实现异步的库

Rxjava的简单使用

首先创建被观察者(Observable)

Observable navel = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("hello,rxjava");
                e.onNext("认识一下 Rxjava");
                e.onComplete();
            }
        });

我们这里看下Observable.create的源码:

/**
     * Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.
     * <p>
     * Example:
     * <pre><code>
     * Observable.&lt;Event&gt;create(emitter -&gt; {
     *     Callback listener = new Callback() {
     *         &#64;Override
     *         public void onEvent(Event e) {
     *             emitter.onNext(e);
     *             if (e.isLast()) {
     *                 emitter.onComplete();
     *             }
     *         }
     *
     *         &#64;Override
     *         public void onFailure(Exception e) {
     *             emitter.onError(e);
     *         }
     *     };
     *
     *     AutoCloseable c = api.someMethod(listener);
     *
     *     emitter.setCancellable(c::close);
     *
     * });
     * </code></pre>
     * <p>
     * <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png" alt="">
     * <p>
     * You should call the ObservableEmitter's onNext, onError and onComplete methods in a serialized fashion. The
     * rest of its methods are thread-safe.
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     *
     * @param <T> the element type
     * @param source the emitter that is called when an Observer subscribes to the returned {@code Observable}
     * @return the new Observable instance
     * @see ObservableOnSubscribe
     * @see ObservableEmitter
     * @see Cancellable
     */
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

ObservableOnSubscribe<T>,可以理解为一个计划表,泛型T是要操作对象的类型

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

ObservableOnSubscribe只需要实现一个接口subscribe,参数是ObservableEmitter<T>;看源码可以知道ObservableEmitter继承了Emitter
那我们看下Emitter的接口

/**
 * Base interface for emitting signals in a push-fashion in various generator-like source
 * operators (create, generate).
 *
 * @param <T> the value type emitted
 */
public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

Emitter发射器可以看到只有三个接口void onNext(T value)、void onError(Throwable error)、void onComplete()。
onNext方法可以无限调用,Observer(观察者)所有的都能接收到,onError和onComplete是互斥的,Observer(观察者)只能接收到一个,OnComplete可以重复调用,但是Observer(观察者)只会接收一次,而onError不可以重复调用,第二次调用就会报异常。

第二步:创建观察者

Observer reader = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                //gengwei 这里说明一下这个Disposable;这个对象就是观察者和被观察者的关系,
                //如果观察者Observer不想继续订阅被观察者了,就可以主动取消掉
                Log.e("gengwei","onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.e("gengwei","onNext"+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("gengwei","onError");
            }

            @Override
            public void onComplete() {
                Log.e("gengwei","onComplete");
            }
        };

观察者通过new创建,可以看到有四个方法需要实现。onNext、onError、onComplete都是跟被观察者发射的方法一一对应的,这里就相当于接收了,这三个很好理解。关于onSubscribe这个方法需要关注下,我代码里备注了。

第三步:创建观察者关系

navel.subscribe(reader);

只需要一行代码搞定。
打log看下输出:

2020-03-19 15:41:32.229 10250-10250/com.example.myapplication E/gengwei: onSubscribe
2020-03-19 15:41:32.230 10250-10250/com.example.myapplication E/gengwei: onNexthello,rxjava
2020-03-19 15:41:32.230 10250-10250/com.example.myapplication E/gengwei: onNext认识一下 Rxjava
2020-03-19 15:41:32.230 10250-10250/com.example.myapplication E/gengwei: onComplete

这就是Rxjava最简单的使用方法

Rxjava的异步和链式编程

前面说过Rxjava是支持异步链式编程;我们先看下链式编程。
我们可以看下Observable<T>这个被观察者的API
这里列举部分,有兴趣可以去看下源码接口:

/**
     * Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
     * <p>
     * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>You specify which {@link Scheduler} this operator will use</dd>
     * </dl>
     *
     * @param scheduler
     *            the {@link Scheduler} to perform subscription actions on
     * @return the source ObservableSource modified so that its subscriptions happen on the
     *         specified {@link Scheduler}
     * @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
     * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
     * @see #observeOn
     */
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }

 /**
     * Returns an Observable that emits a single item and then completes.
     * <p>
     * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/just.png" alt="">
     * <p>
     * To convert any object into an ObservableSource that emits that object, pass that object into the {@code just}
     * method.
     * <p>
     * This is similar to the {@link #fromArray(java.lang.Object[])} method, except that {@code from} will convert
     * an {@link Iterable} object into an ObservableSource that emits each of the items in the Iterable, one at a
     * time, while the {@code just} method converts an Iterable into an ObservableSource that emits the entire
     * Iterable as a single item.
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code just} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     *
     * @param item
     *            the item to emit
     * @param <T>
     *            the type of that item
     * @return an Observable that emits {@code value} as a single item and then completes
     * @see <a href="http://reactivex.io/documentation/operators/just.html">ReactiveX operators documentation: Just</a>
     */
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
...

这里只是列举几个Rxjava的几个API,但是可以清晰的看到每一个方法都会再返回一个Observable<T>;这样的话就能形成一套简洁的链式代码。

下边再看一下Rxjava的异步编程是如何实现的,这里就要用到Scheduler(调度器),它是RxJava用来控制线程。

我们把刚刚的demo使用链式异步再写一下:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("hello,rxjava");
                //gengwei,这里增加休眠3s
                Thread.sleep(3000);
                e.onNext("认识一下 Rxjava");
                e.onComplete();
            }
        })
                .observeOn(AndroidSchedulers.mainThread()) //回调在mainThread
                .subscribeOn(Schedulers.io())//执行在io线程
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        //gengwei 这里说明一下这个Disposable;这个对象就是观察者和被观察者的关系,
                        //如果观察者Observer不想继续订阅被观察者了,就可以主动取消掉
                        Log.e("gengwei","onSubscribe");
                    }

                    @Override
                    public void onNext(String value) {
                        Log.e("gengwei","onNext"+value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("gengwei","onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.e("gengwei","onComplete");
                    }
                });

这里就是RxJava最常用的写法,异步+链式编程,还要再说一下,subscribe的方法重载,subscribe()方法里什么参数也不放是空实现,也就是说连载小说无论出什么连载,读者都不关心,推送过来了也不读,如果读者只关心onNext方法里的内容,可以直接重载subscribe(Consumer<? spuer T> onNext)这个方法,会减少代码,当然如果是初学者还是建议创建Observer对象。
看下输出


image.png

注意看下箭头的打印时间,可以看出这里sleep了3s。

好了RxJava的初学就到这里,下一篇继续RxJava的进阶。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容

  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,007评论 1 9
  • 一、观察者模式 观察者模式(Observer)完美的将观察者和被观察的对象分离开。举个例子,用户界面可以作为一个观...
    SuguriSora阅读 460评论 0 1
  • 前言 Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大Android开发者的欢迎。 ...
    CuiTao阅读 388评论 1 3
  • 在正文开始之前的最后,放上GitHub链接和引入依赖的gradle代码: Github: https://gith...
    苏苏说zz阅读 672评论 0 2
  • 文/珺儿 今天朋友圈看到一组图片,蛮触目惊心的,文胸下, 深深的勒痕,一堆罩杯外的副乳,陷入肩下的吊带,那一...
    珺儿Lyj阅读 326评论 0 0