RxJava 是什么?解决什么问题回想一下,在android 中通常做异步耗时的操作
,然后去通知UI线程进行更新.无非涉及到两个点,回调 or Handler. 例如:
Handler handler =new Handler(){ public void handMessge(Message msg){ //更新UI } }
new Thread(){
void run(){
//耗时请求
handler.sendMessage();
}
}.start();
简单来说RxJava是一个可以运行在Java VM上的库,是一个可观测的异步操作,基于事件的程序,响应式编程 可观测: 大话来讲,就是一目了然. 解决的问题 让复杂的程序逻辑,变得更加客观清晰
首先理解两个元素 Observable 为可观察者或者被观察者 Observer 为观察者/ Subscriber订阅者 在android开发中 Observable 可以是一个网络请求,Subscriber来显示请求结果 Observable 可以是一个数据查询,Subscriber来显示查询结果 Observable 可以是一个按钮的点击事件,Subscriber来响应点击事件 Observable 可以是一个图片文件的加载解析,Subscriber可以用来展示处理之后的图片
首先理解两个元素Observable 为可观察者或者被观察者, Observer 为观察者/ Subscriber订阅者,创建一个被观察者,创建一个观察者,在被观察中调用三个方法,来通知观察者
首先onNext 可以被多次调用,onCompleted调用成功之后,如果中间异常报错之后,会自动回调onError, 成功和失败只能任选其一
public void test(View v){
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//可以做各种复杂的操作,然后进行回调
subscriber.onNext("jack : 我是最后的执行结果");
subscriber.onNext("jack : 我被计算出来了");
subscriber.onCompleted();
}
}).subscribe( //此行为订阅,只有真正的被订阅,整个流程才算生效
new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG,"onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG,e.toString());
}
@Override
public void onNext(String s) {
Log.d(TAG,s);
}
});
}
Observable中有很多操作符,下面进行列举了两个方法
Observable.from() 传入数组,集合
Observable.just() 传入多个对象,泛型,可以传入多个或者一个
还有一些,map 转化对象, flatmap 平铺对象,filter 过滤,distinct 去重复,take 从开始取出固定个数,doOnNext 输出元素之前的额外操作,toList 打包对象为集合最后都会调用onNext方法进行输出,这就是所谓的RxJava变形第一步
public void test1(View v){
Observable.just("我是jack","我是jacktang").subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG,"onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG,e.toString());
}
@Override
public void onNext(String s) {
Log.d(TAG,s);
}
});
List<String> lists =new ArrayList<>();
lists.add("from ==jack");
lists.add("from ==jack1");
lists.add("from ==jack2");
Observable.from(lists).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG,"onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"Throwable");
}
@Override
public void onNext(String s) {
Log.d(TAG,s);
}
});
}
再来了解几个简单的回调,拿just为例,倘若我们只要获取单个结果,选择回调某个方法,如果什么方法都不要的话,subscribe()相当于内部自动创建一个默认的ActionSubscriber
subscribe()
subscribe(final Action1<? super T> onNext)
subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)
public void test2(View v){
//相当于 调用了 onNext, onError, onCompleted
Observable.just("我是jack","我是jacktang").subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
//相当于 调用了 onNext, onError
Observable.just("我是jack","我是jacktang").subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
//相当于 调用了 onNext, onError,onCompleted
Observable.just("我是jack","我是jacktang").subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
}, new Action0() {
@Override
public void call() {
}
});
}
Observer 为观察者/ Subscriber订阅者
Subscriber实现了Observer,增加了,onStart()一般用于在事件处理前调用,unsubscribe(),取消订阅,一般在activity的onstop() 或者onDestory()中调用,常用写法,isUnsubscribed() 判断是否已经取消订阅
public void test3(View view){
subscriber = new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
Log.d(TAG, "onStart");
}
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Throwable");
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
};
Observable.just("我是jack","我是jacktang").subscribe();
}
@Override
protected void onStop() {
super.onStop();
if(subscriber!=null && !subscriber.isUnsubscribed())
subscriber.unsubscribe();
}
操作符
操作符在基本概念中也讲到过一些,现在具体的说说操作符之间的变换
简单的变换
json 类型转换为 Student
在开发中我们可以将 网络请求的json 数据转换为Bean操作
public void test(View v) throws JSONException {
JSONObject json =new JSONObject();
json.put("name","jack");
json.put("age","18");
json.put("scro",80);
Observable.just(json).
map(new Func1<JSONObject, Student>() {//参数1 表示入参(变化前) 参数2 表示出参(变化后)
@Override
public Student call(JSONObject json) {
Student student = Student.parseTo(json);
return student;
}
}).subscribe(new Action1<Student>() {
@Override
public void call(Student s) {
Log.d("RxJavaOperRater",s.toString());
}
});
}
map 转化对象, flatmap 平铺对象, filter 过滤, distinct 去重复,take 从开始取出固定个数, doOnNext 输出元素之前的额外操作,toList 打包对象为集合
下面就慢慢的结和操作符 复杂起来,下面操作要求
1.过滤所有name为lucy的学生,2.可以指定去重复条件, 3.取出最后2个
public void test1(View v){
Observable.from(students).
filter(new Func1<Student, Boolean>() {//filter过滤条件为 name =tom
@Override
public Boolean call(Student student) {
return !student.name.equals("lucy");
}
})
.distinct()//选择重复条件 ,distinct() 默认情况为去掉对象重复的,
.takeLast(2).//取出两个
subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
Log.d("RxJavaOperRater",student.toString());
}
});
}
线程
倘若我们要进行一些数据的耗时操作,而且线程切换自由
在RxJava 中,Scheduler ——调度器,相当于线程控制器,
RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。 有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。
subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
public void action(View v) {
Observable.just(array).map(new Func1<JSONArray, List<Student>>() {
@Override
public List<Student> call(JSONArray array) {
Log.d("我在子线程解析(map) 当前线程",Thread.currentThread().getName());
List list =new ArrayList();
for (int i = 0; i <array.length() ; i++) {
Student st = Student.parseTo(array.optJSONObject(i));
list.add(st);
}
return list;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<Student>>() {
@Override
public void call(List<Student> students) {
Log.d("我是主线程更新UI 当前线程",Thread.currentThread().getName());
}
});
}