6.- PrototypeZ的Rxjava沉思录4篇
RxJava 存在首先最基本的意义就是 统一了所有异步任务的回调接口 。而这个接口就是 Observable<T>,这和Callback<T> 其实是一个意思。此外,我们可以考虑让这个回调更通用一点 —— 可以被回调多次,对应的,Observable 表示的就是一个事件流,它可以发射一系列的事件(onNext),包括一个终止信号(onComplete)。
1. Rxjava的好处
- 链式编程,避免缩进,使代码简洁易读
- 异步,解决callback hell(还是异步的问题)
- Observable 在空间维度和空间维度上重新组织事件的能力。
2. Rxjava典型的使用
Observable.from(folders)
.flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
.filter((Func1) (file) -> { file.getName().endsWith(".png") })
.map((Func1) (file) -> { getBitmapFromFile(file) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
3. 空间维度
4. 时间维度
5. 常见操作符:想清楚操作符会干什么!操作符的参数会干什么!元素是一个一个处理的,操作符中写入的是单个元素的处理方式!!!
query("王").flatMap(list -> Observable.fromArray(list))
.Map(student->return student.getGrade())
.filter(grade->grade>80)
.take(5)
.doOnNext(grade->save(grade))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(grade->Log.i(TAG,grade+"");
);
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting ObservableSources and emitting the results of this merger, while limiting the maximum number of concurrent subscriptions to these ObservableSources.
- fromArray():接收一个对象数组,然后逐个发射给观察者。
例子中list数据没变,但是从发送数组变为逐个发送数组元素。 - map():将一种Observable转化为另一种Observable。
- flatmap(Func):接收一个Observable的输出作为输入,同时输出另外一个Observable。通常是接收一个list,然后逐一发送list元素。发送数组变成发送元素。Func中通常会将一个元素转换成一个Observable,如下代码。flatmap会调用merge操作符,将所有的Observable融合成一个Observable,达到平铺的目的。
eg:
return Observable.from(file.listFile());
Func中要返回一个Observable,且对集合进行平铺,只写处理上级Observable的一个元素的逻辑代码
- filter():过滤方法
操作符是一个方法,括号内的参数会传入:输入输出信息,和处理代码。这些处理代码都是由filter()调用。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
- ofType(): 指定一个class,过滤掉元素中的其他class,代码中调用了filter()
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> ofType(final Class<U> clazz) {
ObjectHelper.requireNonNull(clazz, "clazz is null");
return filter(Functions.isInstanceOf(clazz)).cast(clazz);
}
- take():限定个数
- doOnNext():每次输出一个元素之前做一个处理。
- FuncX:一种数据转化成另一种数据。
Func有返回值,Action没有返回值
常用在map()和flatmap()
在call中写入转换操作,将Func对象导入map()
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
Function<Integer,String> func1 = new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return null;//操作逻辑
}
}
- Action:自动创建Subscriber,并作为subscriber的参数处理数据
???为什么找不到ActionX了,禁止传递参数了?
mObservable.subscribe(onNextAction,onErrorAction,onCompletedAcion);
/**
* A functional interface similar to Runnable but allows throwing a checked exception.
*/
public interface Action {
/**
* Runs the action and optionally throws a checked exception.
* @throws Exception if the implementation wishes to throw a checked exception
*/
void run() throws Exception;
}
- Scheduler
- subscribe(): Consumer,Action,Observer 三个类什么关系!!还有Subject.etc
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
- AOP:compose
6. 其他
- 热和冷
- 背压backpress