先来张女神的剧照放松一下...
话说gakki又拍新剧了,继爱干家务且投怀送抱不送不行之后,又完美诠释了"假面"超人,微笑pasta,其实如果没人给予阳光,那就做个快乐的小太阳吧.
好了,我们进入正题..
最近一直在重构老项目的代码,每天一顿删删删,开心到飞起...相信RxJava到现在没用过的人估计也很少了,我在重构的过程当中,把之前一些复杂的逻辑基本上都用操作符进行了处理,在这边我们就来总结一下,好像有点炒冷饭的意思...那RxJava(2.0+)的操作符大致有这么几类:创建,变换,过滤,组合等等,我们就一类一类说...
创建
首先,我们要说一下被观察者,也就是上游的创建方式,那我们发射一个带这样集合的火箭:
private List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
create
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (Integer number : mList) {
emitter.onNext(number);
}
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d)
...
@Override
public void onNext(Integer number)
...
@Override
public void onError(Throwable e)
...
@Override
public void onComplete()
...
});
最原始的版本就是用create来创建,但是遍历集合一个一个发,太臃肿了,那我们可以换一种方式,并且有两处地方我们改动一下:
1:我现在只想要onNext()返回,这就要看看是否有Observer的实现类
2:同时Observable是不支持Backpressure的,如果我们发1万条,只会增大内存,不会抛MissingBackpressureException,我们可以用2.0+版本中的Flowable
rang
Flowable.range(1,6).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Consumer,"+integer);
}
});
// Consumer,1
// Consumer,2
// Consumer,3
// Consumer,4
// Consumer,5
range操作符是从1开始(包括1)依次发射后面6个数,很明显可以达到我们的要求,然后我们来看看接收的consumer,点进去发现它是一个接口:
package io.reactivex.functions;
/**
* A functional interface (callback) that accepts a single value.
* @param <T> the value type
*/
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
那这个consumer是2.0的,其对应的也是1.0+版本中的Action,这时候又有个疑问??那有异常我怎么处理?真正在开发之中是要自定义Observer的去统一处理数据异常的,我们写demo就不用那么麻烦,如果用consumer的话,想处理异常还可以:
Flowable.range(1, 5).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Consumer," + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, throwable.toString());
}
});
我们可以看到同一个接口用泛型来区分实现类,那之后我们为了代码更简洁,就用new consumer的方式来接收了.
fromIterable
除了rang之外,fromIterable也是可以的,而且这个操作符更贴切,因为其原理就是遍历集合,一个一个发射:
Flowable.fromIterable(mList)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Consumer," + integer);
}
});
fromArray
同理:数组
private Integer[] mArr = {1,2,3};
...
Flowable.fromArray(mArr)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Consumer," + integer);
}
});
just
这里要不得不说一下just,这个操作符比较强大,它可以一次发送一个集合,数组,也可以一个一个的发送不同类型的数据等等,我们来举个例子:
(为了让它一行就可以,我这边把consumer这个匿名内部类替换成lamada表达式,更直观)
//发射集合
Flowable.just(mList).subscribe(numberList -> Log.d(TAG, numberList.toString()));
// [1, 2, 3, 4, 5]
//发射数组
Flowable.just(mArr).subscribe(numberArr -> Log.d(TAG, Arrays.toString(numberArr)));
// [1, 2, 3]
//发射字符串
Flowable.just(1, "A", 10).subscribe(str -> Log.d(TAG, str));
// 1
// A
// 10
defer
这个操作符很有意思,我们之前说的just,from等等,都是在创建上游被观察者的时候确定了数据,也就是说发射的什么数据我是知道的,那如果发射的数据本身就会变呢?比如这样:
StringBuilder deferMsg = new StringBuilder();
deferMsg.append("张三发来贺电");
Flowable<String> justDefer = Flowable.just(deferMsg.toString());
deferMsg.append(",李四发来贺电");
justDefer.subscribe(s -> Log.d(TAG, s));
//张三发来贺电
如果用defer的话
StringBuilder deferMsg = new StringBuilder();
deferMsg.append("张三发来贺电");
Flowable<String> defer = Flowable.defer((Callable<Publisher<String>>) () ->
Flowable.just(deferMsg.toString()));
deferMsg.append(",李四发来贺电");
defer.subscribe(s -> Log.d(TAG, s));
//张三发来贺电,李四发来贺电
我们可以看到是订阅的时候才会走Flowable的回调方法并且会创建一个新的Flowable/Observable,而不是创建的时候走,有点类似于eventBus的sticky,有人说这个叫延迟订阅,所以defer用于处理动态的数据,保证上游的数据是最新的
repeat
repeat就是我们可以在发射的时候指定重复次数:
Flowable.just(mList).repeat(2)
.subscribe(numbers -> Log.d(TAG, numbers.toString()));
//[1, 2, 3, 4, 5] [1, 2, 3, 4, 5]
timer
Flowable.timer(1000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "Timer:" + aLong + "=====当前线程为:"
+ Thread.currentThread().getName());
}
});
//Timer:0=====当前线程为:RxComputationThreadPool-1
我们可以用Timer操作符去延迟执行一个任务,比如上面延迟1秒,上游会向下游发送一个数据0,这边有两点需要注意
1.timer的是有第三个参数的,第三个参数可以指定运行线程
2.如果不指定,默认线程为computation,如果想更改ui,记得要做线程转换
Interval
上面说的timer是延迟发送,而interval是以一个固定的时间间隔不断发发发,比如一个很常见的需求,我需要每隔5分钟获取服务器信息,就可以:
Flowable.interval(5, TimeUnit.MINUTES).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "Timer:" + aLong + "=====当前线程为:"
+ Thread.currentThread().getName());
}
});
// Timer:0=====当前线程为:RxComputationThreadPool-1
// Timer:1=====当前线程为:RxComputationThreadPool-1
// Timer:2=====当前线程为:RxComputationThreadPool-1
// Timer:3=====当前线程为:RxComputationThreadPool-1
// Timer:4=====当前线程为:RxComputationThreadPool-1
我们发现也是默认在子线程,当然,我们也可以写一个发送短信的倒计时:
Flowable.interval(0, 1, TimeUnit.SECONDS)
.take(60)
.map(aLong -> 60 - aLong)
.doOnSubscribe(subscription -> tv.setClickable(false))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(aLong -> {
tv.setText(aLong + "s"); });
简单说一下
- interval第一个参数是延迟发送,我们这里用的0秒,也就是不延迟
- take:由于interval从0开始无限发,take就是制定发多少次,所以在这边我们让它只发60次.
- 由于数据是从0,1,2,3开始发,所以这里我们用map做了一个转换把数据变成了60,59,58...
- doOnSubscribe这个方法用于订阅之前执行,它跟doOnNext不太一样,doOnNext跟随onNext,onNext执行多少次,它就执行多少次,而doOnSubscribe只执行一次,所以,一般做初始化操作,比如我点击了按钮发送验证码,在倒计时的时候,你就不能再点击了,所以禁用点击功能.
- 正常情况下应该在onComplete中恢复按钮点击功能,并且修改ui为点击/获取.
- 以后就在也不用这写一块handler,那边写一块timer/timerTask了..
变换
先说用的最多的:
map
我们在工作中经常遇到数据处理的问题,可不是你想要什么数据形式,后台小哥哥就给的到你的,于是在我们拿到数据后,可以用map进行转换,比如:
1.转换成另外一个集合
Flowable.just(mArr).map(new Function<List<Integer>, List<String>>() {
@Override
public List<String> apply(List<Integer> mList) throws Exception {
List<String> newList = new ArrayList<>();
for (Integer number : mList) {
newList.add(number + "string");
}
return newList;
}
}).subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> newList) throws Exception {
Log.d(TAG, newList.toString());
}
});
// [1string, 2string, 3string, 4string, 5string]
2.转换成一个对象
// 将集合最后一个数字取出并放到对象里
Flowable.just(mArr)
.map((Function<List<Integer>, NumberTest>) mList -> {
NumberTest numberTest = new NumberTest();
numberTest.setNumber(mList.get(mList.size() - 1));
return numberTest;
}).subscribe((Consumer<NumberTest>) numberTest -> Log.d(TAG, numberTest.toString()));
// NumberTest{number=5}
通过map操作符,我们可以在apply这个小方法里面,各种定义数据转换类型,你可以把一个对象换成另外一个对象,或者把一个对象放进一个数组,再或者把数组转换成一个集合,都在整个链式调用中间完成,代码逻辑简单易懂,最重要的是非常美观!!
flatMap
这个操作符跟Map相比有一点不同,我们先用flatmap写一下上面的例子:
Flowable.just(mArr).flatMap(new Function<List<Integer>, Publisher<List<String>>>() {
@Override
public Publisher<List<String>> apply(List<Integer> mList) throws Exception {
List<String> newList = new ArrayList<>();
for (Integer number : mList) {
newList.add(number + "string");
}
return Flowable.just(newList);
}
}).subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> newList) throws Exception {
Log.d(TAG, newList.toString());
}
});
//[1string, 2string, 3string, 4string, 5string]
跟map相比,我们可以非常明显的看到在数据转换的类型上是不一样的.
1.map操作符:我们只需要将转换的类型作为apply方法中的返回值即可.
2.flatMap操作符:其原理是将上游发送的数据打包,形成一个全新的Flowable/Observable,所以我们在做数据转换的时候,需要将转换完成的数据类型重新封装成Flowable.
WTF,麻烦不是一点点啊!
通过上面的例子,不难发现我们全程只发送了一个集合,当然转换跟接收也就只有这一次,如果是这种情况的话,像数据转换这种小问题交给map处理就可以了,而flatMap通常可以解决两种问题:
1.for循环嵌套,比如我要获取一个城市所有的门店中的每一名销售人员的业绩,显然有俩集合,一个门店的集合,一个销售人员的集合,那用flatMap我们就可以这样子:
Flowable.fromIterable(cityStores)
.flatMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {
return Flowable.fromIterable(cityStore.getSalesman()); })
.subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId()
+ "店的"
+ salesman.getSalesManId() + "号的业绩为:"
+ salesman.getSalesPerformance() + "元"));
// 1店的0号的业绩为:58元
// 1店的1号的业绩为:38元
// 1店的2号的业绩为:29元
// 2店的0号的业绩为:49元
// 2店的1号的业绩为:51元
// 2店的2号的业绩为:84元
// 3店的0号的业绩为:65元
// 3店的1号的业绩为:14元
// 3店的2号的业绩为:44元
2.网络请求嵌套,最常见的注册完以后自动登录,我们就完全可以这么写:
ApiService.goToRegister()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext((Consumer<RegisterBean>) registerBean -> {
//注册完成
})
.observeOn(Schedulers.io())
.flatMap((Function<RegisterBean, Publisher<LoginBean>>)
registerBean -> ApiService.goToLogin(account, pwd))
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Consumer<LoginBean>) loginBean -> {
//登录成功
});
或者比如首页,在不用zip的情况下,可以先获取轮播图,再获取列表数据,再获取其他信息,不同的接口只要一条链式调用就ok,当然在实际项目中封装一下线程转换,跟统一数据格式处理,那就更简单了,清爽到没朋友.
concatMap
concatMap跟flatMap相比,它是有序的,上文获取销售业绩的例子,比如我们给第一个店加一个500毫秒的延迟,那么顺序就变了:
// 2店的0号的业绩为:7元
...
// 3店的2号的业绩为:20元
...
// 1店的2号的业绩为:80元
我们只需要将flatMap替换成concatmap就可以保证顺序
Flowable.fromIterable(cityStores)
.concatMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {
int delay = 0;
if (1 == cityStore.getCityCtoreId()) {
delay = 500;
}
return Flowable.fromIterable(cityStore.getSalesman()).delay(delay, TimeUnit.MILLISECONDS);
})
.subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() + "店的"
+ salesman.getSalesManId() + "号的业绩为:"
+ salesman.getSalesPerformance() + "元"));
// 1店的0号的业绩为:21元
// 1店的1号的业绩为:2元
// 1店的2号的业绩为:88元
// 2店的0号的业绩为:93元
// 2店的1号的业绩为:33元
// 2店的2号的业绩为:44元
// 3店的0号的业绩为:100元
// 3店的1号的业绩为:43元
// 3店的2号的业绩为:39元
switchMap
这个操作符我感觉用的比较少,不过既然也属于map范畴,我们就简单说一下,我们先将上面的concatMap换成switchMap,并且在变换的时候打个log,并且加上线程:
Flowable.fromIterable(cityStores)
.switchMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {
int delay = 0;
if (1 == cityStore.getCityCtoreId()) {
delay = 500;
}
Log.d(TAG,"=====switchMap:"+cityStore.getCityCtoreId());
return Flowable.fromIterable(cityStore.getSalesman()).delay(delay,TimeUnit.MILLISECONDS);
})
.subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() + "店的"
+ salesman.getSalesManId() + "号的业绩为:"
+ salesman.getSalesPerformance() + "元"));
// =====switchMap:1|main
// =====switchMap:2|main
// 2店的0号的业绩为:57元|RxComputationThreadPool-2
// =====switchMap:3|main
// 3店的0号的业绩为:30元|RxComputationThreadPool-3
// 3店的1号的业绩为:86元|RxComputationThreadPool-3
// 3店的2号的业绩为:41元|RxComputationThreadPool-3
switchMap:不难发现它的原理是当上一个订阅流程未结束时,检测到有新的发射数据了,那么上一个订阅还未走的流程会被舍弃掉,拿上面的例子举例说明:
原数据是一共发射三个对象,当1号店睡500毫秒的时候,二号店发射了,1号店舍弃,这时候注意,所有经过变换再发射的数据统统扔到了子线程里,当2号店0号员工出了业绩,3号店发射了,那么2号店的1,2号员工被舍弃,流程直到3号店所有人员走完
groupBy
没错,同SQ中的groupBy差不多,就是分组的意思,也就是说把发射出去的数据按需求,分成不同的组,比如:现在我想把上面的数据分成两个组,1号店单独放在一个集合里,2,3号店放在一个集合里,那就酱:
(不好理解的就不用lamada了)
Flowable.fromIterable(cityStores).groupBy(new Function<CityStore, Boolean>() {
@Override
public Boolean apply(CityStore cityStore) throws Exception {
return cityStore.getCityCtoreId()==1;
}
}).subscribe(new Consumer<GroupedFlowable<Boolean, CityStore>>() {
@Override
public void accept(GroupedFlowable<Boolean, CityStore> objectCityStoreGroupedFlowable) throws Exception {
Boolean key = objectCityStoreGroupedFlowable.getKey();
Log.d(TAG,key+"");
objectCityStoreGroupedFlowable.toList().subscribe(new Consumer<List<CityStore>>() {
@Override
public void accept(List<CityStore> cityStores) throws Exception {
Log.d(TAG,cityStores.toString());
}
});
}
});
// [CityStore {
// cityCtoreId = 1, salesman = [Salesman {
// cityStoreId = 1, salesPerformance = 35
// }, Salesman {
// cityStoreId = 1, salesPerformance = 31
// }, Salesman {
// cityStoreId = 1, salesPerformance = 10
// }]
// }]
// [CityStore {
// cityCtoreId = 2,
// salesman = [Salesman {
// cityStoreId = 2, salesPerformance = 37
// }, Salesman {
// cityStoreId = 2, salesPerformance = 96
// }, Salesman {
// cityStoreId = 2, salesPerformance = 12
// }]
// }, CityStore {
// cityCtoreId = 3,
// salesman = [Salesman {
// cityStoreId = 3, salesPerformance = 19
// }, Salesman {
// cityStoreId = 3, salesPerformance = 94
// }, Salesman {
// cityStoreId = 3, salesPerformance = 12
// }]
// }]
分析:
- 在apply这个回调方法中,我们需要定义自己的分组标准,上面定义了1号店返回true,其余返回false
- 我们定义的这个标准其实就是个key,那么groupBy这个操作符会根据这个key将原来的数据,也就是3个对象,拆分成2(true/false)个Flowable,然后在分别发射其中的数据,也就需要订阅两次.
buffer
官方的解释的非常到位:定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值,其本质是将一个Observable变换为另一个原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合.
我们详细说一下buffer的两个方法:
- buffer(count)
Flowable.fromIterable(mList).buffer(3)
.subscribe(integers -> Log.d(TAG,integers.toString()));
// [1, 2, 3]
// [4, 5]
这个方法类似于分组,等于是将之前的集合分成了若干集合发送,每一个集合会小于等于count,并且没有重叠部分,如果count正好等于或者大于集合的长度,就一次性都发射了,这个比较好理解
- buffer(count, skip)
Flowable.fromIterable(mList).buffer(3, 2)
.subscribe(integers -> Log.d(TAG, integers.toString()));
// [1, 2, 3]
// [3, 4, 5]
// [5]
这个重载的方法就有一点不好理解了,我们分成几点说一下
- 首先看第一个参数count,count指定了我们每一次的缓存集合长度,第一次[1,2,3]没有任何问题
- skip这个值得意思是,每当我缓存的时候,跳过第几个数据,我们设置的是2,第一次缓存[1,2,3]的时候,我们可以想象有一个指针跳过了第二个数据(2)指向了-->3
- 那么当第二次缓存的时候,指针指着3,从3开始,继续缓存count(3)个数据[3,4,5],并且skip第二个数据(4),现在的指针指向了-->5.
然后依次类推,这样会有重叠的部分,也可能有丢掉的部分,当然buffer这个操作符还有一些其他的功能,可以戳这里,我们就不详谈了.
window
window这个操作符是把数据分成了每个窗口,逐次发射,实现起来跟buffer类似,但是他不是一次性发完了所有数据,其原理是把原始数据分好组以后封装成一个新的Flowable/Observable,并且由新的逐个发送数据,比如:
Flowable.fromIterable(mList).window(3)
.subscribe(new Consumer<Flowable<Integer>>() {
@Override
public void accept(Flowable<Integer> integerFlowable) throws Exception {
integerFlowable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, integer.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "onError");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG,"onComplete");
}
});
}
});
// 1
// 2
// 3
// onComplete
// 4
// 5
// onComplete
scan
scan这个操作符的意思是对每一个数据进行同一个函数操作,并且以生成的值作为计算标准,计算下一个数据,比如:
Flowable.fromIterable(mList).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG, "integer:" + integer);
Log.d(TAG, "integer2:" + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, integer.toString());
}
});
// 1
// integer:1
// integer2:2
// 3
// integer:3
// integer2:3
// 6
// integer:6
// integer2:4
// 10
// integer:10
// integer2:5
// 15
由此可见 apply方法中的第一个integer对应了我们每一次计算出来的和,而第二个integer2就是我们每一次发送的原数据,累加发送,依次执行..
变换就写完了,我认为变换还是有一点不好理解的..本想一篇写完,不过觉得确实是太长了,下一篇我们会写一下剩下的操作符!