RxJava
RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。注意这两个词:
- 异步的
- 基于事件的
这两个词能很好的概括RxJava的特点。为什么要使用它呢?它的最大优点就是:简洁,使用RxJava时,操作是链式的,在程序越来越复杂的时候,依然能够保持简洁,保证了可读性与可维护性。
这篇文章主要是学习RxJava中的常用操作符。操作符是为了解决对Observable对象的变换的问题。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
Map
Map的作用是对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化。
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
上面的例子中,事件一开始是基于String的,可是观察者需要的参数类型为Bitmap。使用map变换之后,事件参数变为Bitmap。
通过图片理解:map将圆形事件转化成了矩形事件。
FlatMap
FlatMap非常强大,可是有点难懂。
FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。
先看图解:注意事件的颜色。
flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象。map() 是一对一的转化,flatmap可以是一对多的。
比如说我们有一个嵌套的网络请求, 首先需要去请求注册, 待注册成功回调了再去请求登录的接口。我们可以使用RxJava写下如下代码:
api.register(new RegisterRequest()) //发起注册请求
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求注册结果
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
//先根据注册的响应结果去做一些操作
}
})
.observeOn(Schedulers.io()) //回到IO线程去发起登录请求
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());
}
})
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求登录的结果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
}
});
在注册成功之后,我们使用flatmap把RegisterResponse转化成一个ObservableSource<LoginResponse>,然后继续做登陆请求。
Zip
Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
看图:
上面图片中的例子中,zip()把两个Observable组合到了一起。然后一个Observer接收到了组合之后的事件。
在zip工作的过程中,有如下规则:
- 一个事件只能被使用一次。
- 组合的顺序是严格按照事件发送的顺利来进行的。
- 最终Observer收到的事件数量是和Observable中发送事件最少的那的事件数量相同。也就是说如果Observable 1中有5个事件,Observable 2有8个事件,组合之后只有5个事件。Observable 2 中剩下的事件将会被丢弃。
示例代码:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
上面代码中,创建了两个Observable,一个发送1,2,3,4,Complete, 另一个发送A,B,C,Complete, 接着用Zip把发出的事件组合。
运行结果:
D/TAG: onSubscribe
D/TAG: emit A
D/TAG: emit 1
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: emit 2
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: emit 3
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete
那zip会在什么时候用到呢? 其实很多场景都可以用到Zip. 举个例子,比如一个界面需要展示用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了之后才能进行展示, 这个时候就可以用Zip了。
filter
过滤数据。只保留符合条件的事件。
Observable.just(3,4,5,6)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>4;
}
})
.subscribe(item->Log.d("JG",item.toString())); //5,6
ConcatMap
类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射。flatMap则不保证顺序。
take
指定最多输出的数量。
take(3) //只保留前3个事件
retry
使用tetry()会在onError()后触发重订阅,应用场景是网络的重新请求。
http://www.jianshu.com/p/fca90d0da2b5
emmm...先到这里吧。其实上面的操作符已经可以满足大多数使用需求了。更多操作符请参考:
http://blog.csdn.net/maplejaw_/article/details/52396175
Thanks for wacthing。