Rxjava

Rxjava 源于ReactiveX(Reactive Extensions),Rx是一个变成模型,目标是提供一致的编程接口,帮助开发者方便的处理异步数据流,许多流行的变成语言都有Rx库。Rx扩展了观察者模式,用于支持数据和事件序列,添加了一些操作符,他人你可以声明式的组合这些序列,而无需关注底层的实现(线程、同步、线程安全、并发)。

Rxjava优势就是使得代码更简洁,而且随着程序逻辑变得越来越复杂,它依然能够保持简洁。

扩展的观察者

Observable(被观察者)、Observer(观察者)、subscribe(订阅),Observable和Observer通过subscribe方法实现订阅关系。Observable在需要的时候发出事件来通知Observer。

与传统的观察者模式不同,除了定义了普通的回调事件onNext(),还定义的两个特殊的事件onCompleted()和onError()。

  • onCompleted:事件队列完结,Rxjava把所有事件看做一个队列,并规定,当不会再有新的onNext事件触发时,需要触发onCompleted
  • onError:事件队列异常,在时间处理过程中出现异常时触发onError,同时队列终止,不允许再有事件发出
  • onCompleted和onError有且只有一个,并且是事件序列的最后一个
基本实现

创建Observer,定义事件触发时的具体行为

Observer<String> observer = new Observer<String>() {
   @Override
   public void onNext(String s) {
       Log.d(tag, "Item: " + s);
   }

   @Override
   public void onCompleted() {
       Log.d(tag, "Completed!");
   }

   @Override
   public void onError(Throwable e) {
       Log.d(tag, "Error!");
   }
};

RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber,Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。
与Observer 的主要区别:

  1. onStart:这是Subscriber 新增的方法,会在subscribe里,事件还未发送前调用,它总是在subscribe所发生的线程被调用
    2、unsubscribe:这是Subscriber 新增的方法,用于取消订阅,这个方法被调用后,Subscriber将不再接收事件。Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
Subscription clickSubscribe = RxView.clicks(findViewById(R.id.bt))
           .throttleFirst(1, TimeUnit.SECONDS)
           .doOnUnsubscribe(new Action0() {
               @Override
               public void call() {
                   Log.e(TAG, "clicks->doOnUnsubscribe");
               }
           })
           .subscribe(new Action1<Void>() {
               @Override
               public void call(Void aVoid) {
                   methd6();
               }
           });
   //维护相关的资源引用
   subscriptions.add(clickSubscribe);

@Override
protected void onDestroy() {
   for (Subscription s : subscriptions) {
       if (!s.isUnsubscribed()) {
           s.unsubscribe();
           Log.e(TAG, "onDestroy: 取消订阅!");
       }
   }
   super.onDestroy();
}

创建Observable,定义被观察者,决定什么时候出发事件以及触发怎么样的事件

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

创建Observable的操作符

  • create(OnSubscribe)
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});
  • just(T...): 将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
  • from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
  • empty 创建一个什么都不做直接通知完成的Observable
  • error 创建一个什么都不做直接通知错误的Observable
  • never 创建一个什么都不做的Observable
Observable observable1=Observable.empty();//直接调用onCompleted。
Observable observable2=Observable.error(new RuntimeException());//直接调用onError。这里可以自定义异常
Observable observable3=Observable.never();//啥都不做
  • timer 创建一个在给定的延时之后发射数据项为0的Observable<Long>
Observable.timer(1000,TimeUnit.MILLISECONDS)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.d("JG",aLong.toString()); // 0
                }
            });
  • interval 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
 Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                     //每隔1秒发送数据项,从0开始计数
                     //0,1,2,3....
                }
            });
  • range 创建一个发射指定范围的整数序列的Observable<Integer>
 Observable.range(2,5).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.d("JG",integer.toString());// 2,3,4,5,6 从2开始发射5个数据
        }
    });
  • defer 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer在订阅时调用Func0创建Observable。
 Observable.defer(new Func0<Observable<String>>() {
        @Override
        public Observable<String> call() {
            return Observable.just("hello");
        }
    }).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.d("JG",s);
        }
    });

合并操作符

  • concat: 按顺序连接多个Observables。需要注意的是Observable.concat(a,b)等价于a.concatWith(b)。
Observable<Integer> observable1=Observable.just(1,2,3,4);
    Observable<Integer>  observable2=Observable.just(4,5,6);

    Observable.concat(observable1,observable2)
            .subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6
  • startWith: 在数据序列的开头增加一项数据。startWith的内部也是调用了concat
Observable.just(1,2,3,4,5)
            .startWith(6,7,8)
    .subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5
  • merge 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知。
  • zip: 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合。内部通过OperatorZip进行压合。
Observable<Integer>  observable1=Observable.just(1,2,3,4);
Observable<Integer>  observable2=Observable.just(4,5,6);

Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
        @Override
        public String call(Integer item1, Integer item2) {
            return item1+"and"+item2;
        }
    })
    .subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6
  • combineLatest 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。类似于zip,但是,不同的是zip只有在每个Observable都发射了数据才工作,而combineLatest任何一个发射了数据都可以工作,每次与另一个Observable最近的数据压合。

过滤操作

  • filter 过滤数据。内部通过OnSubscribeFilter过滤数据。
Observable.just(3,4,5,6)
            .filter(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer>4;
                }
            })
    .subscribe(item->Log.d("JG",item.toString())); //5,6 

还有ofType、take、takeLast、throttleFirst、timeout等

条件/布尔操作

  • all 判断所有的数据项是否满足某个条件,内部通过OperatorAll实现。
 Observable.just(2,3,4,5)
            .all(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer>3;
                }
            })
    .subscribe(new Action1<Boolean>() {
        @Override
        public void call(Boolean aBoolean) {
            Log.d("JG",aBoolean.toString()); //false
        }
    })
    ;

还有exists、contains、isEmpty等

聚合操作

  • reduce: 对序列使用reduce()函数并发射最终的结果,内部使用OnSubscribeReduce实现。
 Observable.just(2,3,4,5)
            .reduce(new Func2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer sum, Integer item) {
                    return sum+item;
                }
            })
            .subscribe(integer -> Log.d("JG",integer.toString()));//14

还有collect、count/countLong

转换操作

  • toList 收集原始Observable发射的所有数据到一个列表,然后返回这个列表
 Observable.just(2,3,4,5)
            .toList()
            .subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {

                }
            });

还有toSortedList、toMap、toMultiMap

变换操作

  • map 对Observable发射的每一项数据都应用一个函数来变换。
Observable.just(6,2,3,4,5)
            .map(integer -> "item:"+integer)
            .subscribe(s -> Log.d("JG",s));//item:6,item:2....

还有cast、flatMap、flatMapIterable、concatMap、switchMap等

错误处理/重试机制

  • onErrorResumeNext 当原始Observable在遇到错误时,使用备用Observable。
 Observable.just(1,"2",3)
    .cast(Integer.class)
    .onErrorResumeNext(Observable.just(1,2,3))
    .subscribe(integer -> Log.d("JG",integer.toString())) //1,2,3
    ;
  • onExceptionResumeNext 当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常。
  • onErrorReturn 当原始Observable在遇到错误时发射一个特定的数据。
Observable.just(1,"2",3)
            .cast(Integer.class)
            .onErrorReturn(new Func1<Throwable, Integer>() {
                @Override
                public Integer call(Throwable throwable) {
                    return 4;
                }
            }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.d("JG",integer.toString());1,4
        }
    });
  • retry: 当原始Observable在遇到错误时进行重试。
Observable.just(1,"2",3)
    .cast(Integer.class)
    .retry(3)
    .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"))
    ;//1,1,1,1,onError
  • retryWhen 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable,内部调用的是retry。
Observable.just(1,"2",3)
    .cast(Integer.class)
    .retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {
        @Override
        public Observable<Long> call(Observable<? extends Throwable> observable) {
            return Observable.timer(1, TimeUnit.SECONDS);
        }
    })
    .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));
    //1,1

还有一些操作符具体还是看RxJava操作符大全这个文章吧

线程控制

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制

  • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
  • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

doOnSubscribe()它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);
相关资料

ReactiveX 的理念和特点
给 Android 开发者的 RxJava 详解
RxJava操作符大全

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容