关于入门基础看这里Android RxJava之葵花宝典(上)(看我就够了)----入门基础
讲使用场景之前先普及一下一些关于Rx的另外框架
- RxBinding : 控件绑定库
使用如下
Platform bindings:
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
'support-v4' library bindings:
compile 'com.jakewharton.rxbinding:rxbinding-support-v4:0.4.0'
'appcompat-v7' library bindings:
compile 'com.jakewharton.rxbinding:rxbinding-appcompat-v7:0.4.0'
'design' library bindings:
compile 'com.jakewharton.rxbinding:rxbinding-design:0.4.0'
'recyclerview-v7' library bindings:
compile 'com.jakewharton.rxbinding:rxbinding-recyclerview-v7:0.4.0'
'leanback-v17' library bindings:
compile 'com.jakewharton.rxbinding:rxbinding-leanback-v17:0.4.0'
- RxPermissions
RxPermissions也是国外的大牛开发的基于RxJava的Android权限管理库,他让6.0以上的权限管理更加的简单,如果有适配6.0以上的手机的需求,这个库是个不错的选择。下面我们来看看基本的用法。
// 请求相机权限
RxPermissions.getInstance(this)
.request(Manifest.permission.CAMERA)
.subscribe(granted -> {
if (granted) { // 用户同意了(在6.0之前的手机始终都为true)
//可以拍照了
} else {
//可以在这里提示用户,或者再次请求
}
});
当然,如果我想一次请求多个权限呢,每次都去写上面的代码肯定是个不好的做法,RxPermissions的作者也考虑到了这一点,在Api里提供了一个多参数的重载
//取得相机权限和读取手机状态
RxPermissions.getInstance(this)
.request(Manifest.permission.CAMERA,
Manifest.permission.READ_PHONE_STATE)
.subscribe(granted -> {
if (granted) {
} else {
}
});
- RxBus
有了RxJava,EventBus、Otto什么的都可以靠边了,因为RxJava本身就自带了这个功能,我们只需做一下简单的封装就可以使用了,也顺便减少了我们项目的体积。
public class RxBus {
private final Subject<Object, Object> _bus;
private static class RxBusHolder {
private static final RxBus instance = new RxBus();
}
private RxBus() {
_bus = new SerializedSubject<>(PublishSubject.create());
}
public static synchronized RxBus getInstance() {
return RxBusHolder.instance;
}
public void post(Object o) {
_bus.onNext(o);
}
public <T> Observable<T> toObserverable(Class<T> eventType) {
return _bus.ofType(eventType);
}
怎么去使用?
在需要发送消息的地方
RxBus.getInstance().post("SomeChange");
在需要接收消息的地方
Subscription mSubscription = RxBus.getInstance().toObserverable(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
handleRxMsg(s);
}
});
不要忘了在适当的地方去取消这个订阅(以免发生内存泄漏)
mSubscription.unsubscribe();
一、RxJava的使用场景
场景一:缓存检测
在请求取数据的处理过程中,我们的操作一般是这样一个原理:
- 首先检查内存是否有缓存
- 然后检查文件缓存中是否有
-
最后才从网络中取
任何一步一旦发现数据后面的操作都不执行
在rxjava中为我们提供了两个解决这个问题的操作符,分别是: concat和first
concat
不交错的发射两个或多个Observable
concat操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。直到前面一个Observable终止,Concat才会订阅额外的一个Observable
请注意上面所说的“就好像它们是一个Observable”,其实并不是一个Observable,是前面一个停止之后才会订阅下一个,所以说他们并不是一个,请君注意咯。其实就是将两个Observable连接起来了。
还有一个实例方法concatWith,它是和concat等价的:Observable.concat(a,b)==a.concatWith(b)
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable a = Observable.just(1, 2, 3, 4, 5);
Observable b = Observable.just(6, 7, 8, 9, 10);
Observable.concat(a, b)
.subscribe(subscriber);
------------------------------------------
//打印结果
onNext: 1
onNext: 2
onNext: 3
...
onNext: 10
onCompleted: onCompleted
发现只打印了一次onCompleted: onCompleted,只有当onNext没有接收到数据时,才会调用onCompleted。
最后对这个操作符,再补充一点:如果当第一个Observable a抛异常,那么将不会继续执行后面的Observable b了。
测试代码
//讲a写成如下
Observable a = Observable.just(1, 2, 3, 4, new RuntimeException());
first
只发射第一项(或者满足某个条件的第一项)数据
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
};
Observable a = Observable.just(1, 2, 3, 4, 5);
a.first().subscribe(subscriber);
-----------------------------------
//打印结果
onNext: 1
onCompleted: onCompleted
这个只要第一项满足条件,后面的将不会再进行发射,所以只是得到了1这个数字。
在这儿必须为大家区别一个操作符:single(),这个操作符也是只打印一个数据的,但是single()和first()最大的区别在于:前者只会发射一个数据,不能发射多个,否则会报错;而first确实满足条件的那一个。
Observable a = Observable.just(1);
a.single().subscribe(subscriber);
最后总结这个场景的Demo
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
//依次检查memory、disk、network
Observable
.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
memoryCache = "memory";
System.out.println("--------------subscribe: " + s);
});
场景二:输入合法场景
在某些时候,我们需要所以的输入都合法后,我们的某些按钮才亮起来,或者才能点击
在这个场景中,我们得掌握两个操作符:skip和combineLatest
skip
抑制Observable发射的前N项数据
Observable.just(1,2,3,4).skip(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
});
-----------------------------------------
//打印结果
onNext: 3
onNext: 4
onCompleted: onCompleted
combineLatest
当多个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
CombineLatest在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest
使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
不懂?看代码:
private void combineLatestEvent() {
Observable<CharSequence> usernameObservable = RxTextView.textChanges(mUsername).skip(1);
Observable<CharSequence> emailObservable = RxTextView.textChanges(mEmail).skip(1);
Observable<CharSequence> passwordObservable = RxTextView.textChanges(mPassword).skip(1);
Subscription subscription = Observable.combineLatest(usernameObservable, emailObservable,
passwordObservable,
new Func3<CharSequence, CharSequence, CharSequence, Boolean>() {
@Override
public Boolean call(CharSequence userName, CharSequence email, CharSequence
password) {
boolean isUserNameValid = !TextUtils.isEmpty(userName) && (userName
.toString().length() > 2 && userName.toString().length() < 9);
if (!isUserNameValid) {
mUsername.setError("用户名无效");
}
boolean isEmailValid = !TextUtils.isEmpty(email) && Patterns
.EMAIL_ADDRESS.matcher(email).matches();
if (!isEmailValid) {
mEmail.setError("邮箱无效");
}
boolean isPasswordValid = !TextUtils.isEmpty(password) && (password
.toString().length() >5 && password.toString().length() < 11);
if (!isPasswordValid) {
mPassword.setError("密码无效");
}
return isUserNameValid && isEmailValid && isPasswordValid;
}
})
.subscribe(getObserver());
}
private Observer<Boolean> getObserver() {
return new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
//更改注册按钮是否可用的状态
register.setEnabled(aBoolean);
}
};
}
这个场景,有3个edittext,分别是mUsername,mEmail,mPassword,通过输入合法的内容进行判定注册按钮是否亮起来。
当我点击其中的任何一个进行编写的时候,就会发射数据,发射的是什么?是我们编辑的内容吗?其实不是的,发射的是结合Func3这个方法的返回值,在这里这个返回值是Boolean型的。返回了boolean型之后,就可以在观察者里面设置注册按钮是否亮起来。现在再看上面那句高深莫测的话,是不是简单多了!
场景三:数据过期场景
其实这个场景可以和上面的数据缓存检测场景进行合并:在缓存检测场景中,我们知道,如果memory中没有数据,就从disk上面寻找,然后再是网络请求,那么,问题来了,如果我们的memory中一直有数据,但是网络数据已经变更了,又由于缓存检测原则的只要有一个有数据就不会进行网络请求了,这就会造成我们显示的数据一直是一个旧数据。
解决方法有如下两个:
-
采用定时进行清除本地缓存数据
-
采用过滤操作符
我们先来看看第一种,如果是进行定时做本地数据清空的话,那么就会用到,我们一个轮询的操作符Interval
创建一个按固定时间间隔发射整数序列的Observable
Interval通俗的讲,就是每隔一段时间过后做什么事情!(不了解看这里Android RxJava之葵花宝典(上)(看我就够了)----入门基础)
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
...
@Override
public void onNext(Long aLong) {
//清除缓存操作
}
});
采用过滤操作符
其实这个操作符就是first,在众多的数据,有一个符合条件就发射数据,后面的都将不执行。
Observable source = Observable
.concat(memory, disk, network)
.first(new Func1() {
@Override public Boolean call(Data data) {
return data.isUpToDate();
}
})
场景四:合并两个数据源场景
使用merge合并两个数据源
Observable.merge(getInfoFromFile(), getInfoFromNet())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(String data) {
Log.d(TAG, "onNext: only one ! ");
});
场景五:Retrofit结合RxJava场景
这个就太通用了,简单帖贴
subscription = Network.getApi()
.CheckVersion(RequestMap)//检查版本是否有更新
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
场景六:控制输入的频繁度
如下图:我们需要动态搜索内容,输入框一改变就需要网络请求。但是过于频繁的网络请求就不是太友好了。
rxjava为我们提供了一个很好的解决方案:
-
使用debounce作为textSearch
debounce()函数过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。
debounce()使用TimeUnit对象指定时间间隔。
RxTextView.textChanges(editText)
.debounce(5000,TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<CharSequence>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: onError");
}
@Override
public void onNext(CharSequence charSequence) {
Log.d(TAG, "onNext: "+charSequence.toString());
}
});
在这5s内,我输入了2,3,4,5(出最后一个5,其他输入之后就删除哈),但是最后得到的结果却是:
onNext: 5
场景七:防止按钮重复点击
Button button = (Button) findViewById(R.id.button);
RxView.clicks(button).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
Log.i("test", "clicked");
}
});
----------------------------
//将上面改成如下
//过滤掉了在300ms内的重复点击
RxView.clicks(button).debounce(300, TimeUnit.MILLISECONDS).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
Log.i("test", "clicked");
}
});
场景七:轮询,定时操作
在做App的时候,有些地方我们可能会时不时的去请求服务器,以至于客户端的数据是最新的,在RxJava中可以这样做
//每隔两秒执行一次
Observable.interval(2, 2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//TODO WHAT YOU WANT
}
});
在两秒后去执行一些操作(比如启动页跳转到主页面)
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//TODO WHAT YOU WANT
}
});
###########场景八:能监听网络连接的广播自动重试,对网络无连接的情况不进行重试,并且重试有超时机制与重试间隔