一:创建操作符
1.create:建议你在传递给create方法的函数中检查观察者的isUnsubscribed状态,以便在没有观察者的时候,让你的Observable停止发射数据或者做昂贵的运算。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()){
e.onNext("1");
e.onNext("2");
e.onNext("3");
}
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG,s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
2.Defer :操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。
在某些情况下,等待直到最后一分钟(就是知道订阅发生时)才生成Observable可以确保Observable包含最新的数据。
public class CourseThreeActivity extends AppCompatActivity {
private final static String TAG = "CourseThreeActivity";
private int num = 10;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_course_three);
Observable<Integer> observable = Observable.just(num);
num =20;
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG,integer+"");
}
};
observable.subscribe(subscriber);
}
}
运行结果如下
20244-20244/com.pse.rxandroid E/CourseThreeActivity: 10
可见当我们调用Just 时该Observerable 的数据已经固定。即使在下面做出更改也不会造成影响。
那我们再来看看defer
public class CourseThreeActivity extends AppCompatActivity {
private final static String TAG = "CourseThreeActivity";
private int num = 10;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_course_three);
Observable<Integer> observable = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(num);
}
});
num =20;
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG,integer+"");
}
};
observable.subscribe(subscriber);
}
}
运行结果如下
21658-21658/com.pse.rxandroid E/CourseThreeActivity: 20
现在想必大家能够理解defer 的作用了吧(等待订阅,获取最新值)。
3.From :将其它种类的对象和数据类型转换为Observable
使用方法如下:
Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);
myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);
结果大家可想而知,不过在这里有一个新的操作符Action0,其实看过源码就会知道Action可以有多个参数,方便我们来重写,处理自己的需求,这里我们只是模拟Observer创建了 onNext(),onError(),onComplete().
4.Range:创建一个发射特定整数序列的Observable
示例代码如下
Observable.range(5,10).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG,integer+"");
}
});
这里我们只是为了测试,所以只处理onNext()事件。
运行结果如下
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 5
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 6
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 7
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 8
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 9
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 10
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 11
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 12
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 13
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 14
5.Repeat :创建一个发射特定数据重复多次的Observable
Observable<String> observable = Observable.just("hell Word");
observable.repeat(4).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG,s);
}
});
运行结果如下,会连续发射4四次Observerable(同一个)
03-07 14:13:11.723 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
03-07 14:13:11.725 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
03-07 14:13:11.725 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
03-07 14:13:11.725 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
6.Timer:创建一个Observerable,并在一定的延时后发射一个特定的值“0”
示例如下
Observable<Long> observable = Observable.timer(2, TimeUnit.SECONDS);
observable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e(TAG,aLong+"");
}
});
运行结果如下
26279-26302/com.pse.rxandroid E/CourseThreeActivity: 0
在日常中我们经常做倒计时操作,之前使用的是Handler 现在你可以尝试改用RX实现了。
7.MAP():最重要的变换操作符,对Observable发射的每一项数据应用一个函数,执行变换操作
大家可以试着理解下这个图:
上面这张图我可以举个例子,比如幼儿园举行活动,我们要给每个孩子穿上礼服,那么这个map()就是我们换衣服这个方法,所有的孩子经过map()后都会穿上华丽的礼服。这样的方式我们就会很快理解上图的意义。
示例如下
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.e(TAG,1+"");
subscriber.onNext(1);
subscriber.onCompleted();
}
}).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return integer+"||";
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG,s);
}
});
03-07 14:38:08.358 7900-7900/? E/CourseThreeActivity: 1
03-07 14:38:08.358 7900-7900/? E/CourseThreeActivity: 1||
从上面看出来,我们发射的数据是整数类型,但经过Map()后就变为String类型,这个操作符默认不在任何特定的调度器上执行。(有序)
8.FlatMap():将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable,听着有点绕口,那么我们看下示例
我们先在页面建立以下代码
Observable.just(1, 2, 3, 4, 5, 6, 7)
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
return getObserverable(integer);
}
}).subscribe(
new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG,s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG,throwable.getMessage());
}
}
);
private Observable<String> getObserverable(final int id) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("This is the" + id);
subscriber.onCompleted();
}
});
}
结果如下
om.pse.rxandroid E/CourseThreeActivity: This is the1
om.pse.rxandroid E/CourseThreeActivity: This is the2
om.pse.rxandroid E/CourseThreeActivity: This is the3
om.pse.rxandroid E/CourseThreeActivity: This is the4
om.pse.rxandroid E/CourseThreeActivity: This is the5
om.pse.rxandroid E/CourseThreeActivity: This is the6
om.pse.rxandroid E/CourseThreeActivity: This is the7
9.Merge合并多个Observables的发射物
示例代码如下
Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
运行结果如下
Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.
以上是我们日常开发要用到的一些操作符,但是RxJava 可不止这些,如有需要可以去这里学习。
RxJava中文学习文档