Android响应式编程(一)RxJava[入门基础]

1.RxJava概述

ReactiveX与RxJava

在讲到RxJava之前我们首先要了解什么是ReactiveX,因为RxJava是ReactiveX的一种java实现。
ReactiveX是Reactive Extensions的缩写,一般简写为Rx,微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。

为何要用RxJava

想到异步的操作我们会想到android的AsyncTask 和Handler,但是随着请求的数量越来越多,代码逻辑将会变得越来越复杂而RxJava却仍旧能保持清晰的逻辑。RxJava的原理就是创建一个Observable对象来干活,然后使用各种操作符建立起来的链式操作,就如同流水线一样把你想要处理的数据一步一步地加工成你想要的成品然后发射给Subscriber。

RxJava与观察者模式

RxJava的异步操作是通过扩展的观察者模式来实现的,不了解观察者模式的可以先看下 设计模式(五)观察者模式这篇文章Rxjava有四个基本的要素:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、event(事件)。Observable (被观察者) 和 Observer (观察者)通过 subscribe() 方法实现订阅关系,Observable就可以在需要的时候来通知Observer。

2.RxJava基本用法

在使用RxJava前请现在Android Studio 配置gradle:

dependencies {
    ...
    compile 'io.reactivex:rxjava:1.1.6'
    compile 'io.reactivex:rxandroid:1.2.1'
}

其中RxAndroid是RxJava的一部分,在普通的RxJava基础上添加了几个有用的类,比如特殊的调度器,后文会提到。

RxJava的基本用法分为三个步骤,他们分别是:

创建Observer(观察者)

决定事件触发的时候将有怎样的行为

           Subscriber subscriber=new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.i("wangshu","onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i("wangshu","onError");
            }

            @Override
            public void onNext(String s) {
                Log.i("wangshu","onNext"+s);
            }

            @Override
            public void onStart() {
                Log.i("wangshu","onStart");
            }
        };

其中onCompleted、onError和onNext是必须要实现的方法,他们的含义分别是:

  • onCompleted:事件队列完结,RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。当不会再有新的 onNext发出时,需要触发 onCompleted() 方法作为完成标志。
  • onError:事件队列异常,在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • onNext:普通的事件,将要处理的事件添加到事件队列中。
  • onStart:它会在事件还未发送之前被调用,可以用于做一些准备工作。例如数据的清零或重置,这是一个可选方法,默认情况下它的实现为空。

当然如果要实现简单的功能也可以用到Observer来创建观察者,Observer是一个接口,而上面用到Subscriber是在Observer基础上进行了扩展,在后文的Subscribe订阅过程中Observer也会先被转换为Subscriber来使用。

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.i("wangshu", "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i("wangshu", "onError");
            }

            @Override
            public void onNext(String s) {
                Log.i("wangshu", "onNext" + s);
            }
        };

创建 Observable(被观察者)

它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("杨影枫");
                subscriber.onNext("月眉儿");
                subscriber.onCompleted();
            }
        });

通过调用subscriber的方法,不断的将事件添加到任务队列中,也可用just来实现:

  Observable observable = Observable.just("杨影枫", "月眉儿");

上述的代码会依次调用onNext("杨影枫")、onNext("月眉儿")、onCompleted()。

Subscribe (订阅)

订阅比较简单:

 observable.subscribe(subscriber);

或者也可以调用

 observable.subscribe(observer);

运行代码查看log:

com.example.liuwangshu.moonrxjava I/wangshu: onStart
com.example.liuwangshu.moonrxjava I/wangshu: onNext杨影枫
com.example.liuwangshu.moonrxjava I/wangshu: onNext月眉儿
com.example.liuwangshu.moonrxjava I/wangshu: onCompleted

3.不完整定义回调

上文介绍了回调的接收主要是依赖subscribe(Observer) 和 subscribe(Subscriber),除此之外RxJava还提供了另一种回调方式,也就是不完整回调。再讲到不完整回调之前我们首先要了解Action,查看RxJava源码我们发现提供了一堆Action:


这里写图片描述

我们打开Action0来看看:

public interface Action0 extends Action {
    void call();
}

再打开Action1:

public interface Action1<T> extends Action {
    void call(T t);
}

最后看看Action9:

public interface Action9<T1, T2, T3, T4, T5, T6, T7, T8, T9> extends Action {
    void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
}

很明显Action后的数字代表回调的参数类型数量,上文订阅也就可以改写为下面的代码:

        Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("wangshu", "onNext" + s);
            }
        };
        Action1<Throwable> onErrorAction = new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {

            }
        };
        Action0 onCompletedAction = new Action0() {
            @Override
            public void call() {
                Log.d("wangshu", "onCompleted");
            }
        };
        observable.subscribe(onNextAction,onErrorAction,onCompletedAction);

我们定义了onNextAction来处理onNext的回调,同理我们还定义了onErrorAction和onCompletedAction,最后我们把他传给subscribe方法。很显然这样写的灵活度很大一些,同时我们也可以只传一个或者两个Action:

  observable.subscribe(onNextAction);
  observable.subscribe(onNextAction,onErrorAction);

第一行只定义了onNextAction来处理onNext的回调,而第二行则定义了onNextAction处理onNext的回调,onErrorAction来处理onError的回调。

4.**Scheduler **

内置的Scheduler

方才我们所做的都是运行在主线程的,如果我们不指定线程,默认是在调用subscribe方法的线程上进行回调的,如果我们想切换线程就需要使用Scheduler。RxJava 已经内置了5个 Scheduler:

  • Schedulers.immediate():默认的,直接在当前线程运行,相当于不指定线程。
  • Schedulers.newThread():总是启用新线程,并在新线程执行操作。
  • Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。
  • Schedulers.computation():计算所使用的 Scheduler,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Schedulers.trampoline():当我们想在当前线程执行一个任务时,并不是立即时,可以用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。

另外RxAndroid也提供了一个常用的Scheduler:

  • AndroidSchedulers.mainThread():RxAndroid库提供的Scheduler,它指定的操作在主线程中运行。

控制线程

subscribeOn() 和 observeOn() 两个方法来对线程进行控制。
subscribeOn()方法指定 subscribe() 这个方法所在的线程,即事件产生的线程。observeOn()方法指定 Subscriber 回调所运行在的线程,即事件消费的线程。

Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("wangshu", "onNext" + s);

            }
        };
Observable observable = Observable.just("杨影枫", "月眉儿"); 
            observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(onNextAction);

我们仍旧是用log打印出onNext事件所传递过来的字符串,只不过这一次事件的产生的线程是在io线程上,事件回调的线程则是在主线程。

5.**RxJava基础应用 **

好了,讲的不是很多,我们来举一个例子来消化上面的知识。RxJava+Retrofit访问网络是比较搭的,但是此前我的网络系列并没有介绍Retrofit,所以我们先准备用RxJava+OKHttp来访问网络,至于RxJava+OKHttp访问网络会在此系列的以后的章节做介绍。OKHttp的用法请详见Android网络编程(六)OkHttp3用法全解析这篇文章。
此前我们用OkHttp3访问网络是这样做的:

      private void postAsynHttp(int size) {
        mOkHttpClient=new OkHttpClient();
        RequestBody formBody = new FormBody.Builder()
                .add("size", size+"")
                .build();
        Request request = new Request.Builder()
                .url("http://api.1-blog.com/biz/bizserver/article/list.do")
                .post(formBody)
                .build();
        Call call = mOkHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                String str = response.body().string();
                Log.i("wangshu", str);
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        Toast.makeText(getApplicationContext(), "请求成功", Toast.LENGTH_SHORT).show();
                    }
                });
            }

        });
    }

接下来我们进行改造,首先我们创建Observable(被观察者):

     private Observable<String> getObservable(final int size){
       Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
           @Override
           public void call(final Subscriber<? super String> subscriber) {
               mOkHttpClient=new OkHttpClient();
               RequestBody formBody = new FormBody.Builder()
                       .add("size",size+"")
                       .build();
               Request request = new Request.Builder()
                       .url("http://api.1-blog.com/biz/bizserver/article/list.do")
                       .post(formBody)
                       .build();
               Call call = mOkHttpClient.newCall(request);
               call.enqueue(new Callback() {
                   @Override
                   public void onFailure(Call call, IOException e) {
                       subscriber.onError(new Exception("error"));
                   }

                   @Override
                   public void onResponse(Call call, Response response) throws IOException {
                       String str = response.body().string();
                       subscriber.onNext(str);
                       subscriber.onCompleted();
                   }
               });
           }
       });
    return observable;
   }

我们将根据Okhttp的回调(不在主线程)来定义事件的规则,调用subscriber.onNext来将请求返回的数据添加到事件队列中。接下来我们来实现观察者:

private void postAsynHttp(int size){   
getObservable(size).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() {
           @Override
           public void onCompleted() {
               Log.i("wangshu", "onCompleted");
           }

           @Override
           public void onError(Throwable e) {
             Log.i("wangshu", e.getMessage());
           }

           @Override
           public void onNext(String s) {
               Log.i("wangshu", s);
               Toast.makeText(getApplicationContext(), "请求成功", Toast.LENGTH_SHORT).show();
           }
       });
   }

我们将事件产生也就是访问网络的操作设置为io线程,访问网络回调设置为主线程,所以Toast是能正常显示的。好了这一篇就讲到这里,关于RxJava的文章后期还会写,敬请期待。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,932评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,554评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 145,894评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,442评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,347评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,899评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,325评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,980评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,196评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,163评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,085评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,826评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,389评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,501评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,753评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,171评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,616评论 2 339

推荐阅读更多精彩内容