前言
去年无意间知道了RxJava
这个东东,但一直没时间去看看。最近,终于有了不少时间,经过两周的学习,对RxJava
有了初步的认识。下面我会记录一些个人认为比较重要的知识,以备以后查看(并没有深层的原理解析,仅仅在应用层面上)。
个人对RxJava的理解
RxJava
是一个对数据流和事件流操作的库,它是对数据的一种链式操作,在操作过程中,可以方便的进行数据加工,线程切换,从而避免了复杂的嵌套,缩进,提高代码的可读性,可维护性。
RxJava的使用
- 简单的例子
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(20);
}
}).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return integer.toString();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
Observable.create()
可以用来创建一个被观察者,Observable.OnSubscribe
中有被订阅时执行的操作。
这时可以使用操作符对得到的Observable
进行一些变换。比如map()
,对单一的数据流进行变换操作,如将Integer
转为String
类型。每进行一次变换,就得到了一个新的Observable
。
最后,使用subscribe()
方法,对其进行订阅操作。其中Subscriber
为订阅者。
- 常用的创建操作符
-
create()
从头创建一个Observable -
just()
将对象或者对象集合转换为一个会发射这些对象的Observable
Observable.just(1, 2, 3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
-
from()
将其它的对象或数据结构转换为Observable,比如数组
Integer[] numbers = new Integer[]{1, 2, 3};
Observable.from(numbers)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
- 线程控制
在不指定线程时,RxJava
遵循线程不变原则,在哪个线程中执行subscribe()
,就在哪个线程产生事件,消费事件。如需切换线程,需要用到Schedules
(调度器)。
- 常用的调度器
-
Schedulers.immediate()
直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。 -
Schedulers.newThread()
总是启用新线程,并在新线程执行操作。 -
Schedulers.io()
I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()
差不多,区别在于io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()
比newThread()
更有效率。不要把计算工作放在io()
中,可以避免创建不必要的线程。 -
Schedulers.computation()
计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为CPU核数。不要把I/O操作放在computation()
中,否则 I/O操作的等待时间会浪费CPU。 -
AndroidSchedulers.mainThread()
Android专用的Schedule,指定操作在Android主线程(UI线程)中运行。
-
- 线程控制操作符
-
subscribeOn()
指定subscribe()
所发生的线程,即Observable.OnSubscribe
被激活时所处的线程,或者叫做事件产生的线程。subscribeOn()
的位置放在哪里都可以,但它是只能调用一次。不过,可以在Observable.doOnSubscribe()
后使用subscribeOn()
来指定subscribe()
时,执行doOnSubscribe()
所在线程。
-
Integer[] numbers = new Integer[]{1, 2, 3};
Observable.from(numbers)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
System.out.println("toast " + Thread.currentThread().getName());
Toast.makeText(MainActivity.this, "hello", Toast.LENGTH_SHORT).show();
}
})
.subscribeOn(AndroidSchedulers.mainThread()) //①
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer + " " + Thread.currentThread().getName());
}
});
如没有①处的subscribeOn(AndroidSchedulers.mainThread())
,doOnSubscribe
将在当前执行subscribe()
的线程中运行。
- observeOn()
指定Subscriber
所运行在的线程,或者叫做事件消费的线程。这里的Subscriber
可以是变换以后的,换句话说,observeOn()
指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()
即可。
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
说明
这篇笔记是在看完 扔物线 大大的 给 Android 开发者的 RxJava 详解 一文后的笔记,并引用了文章里的部分代码和图片,仅供个人学习,还请大大不要责怪(●'◡'●)。这也是我写的第一篇文章,好开心,以后会坚持下去的。在水平提高后,也会努力写一些原创文章,与大家共同进步~