众所周知Android开发不能在主线程中进行耗时操作,所以一些操作必须放在子线程中进行,这样一来就就会涉及到涉及线程的创建及线程间的通信。当然Android系统也提供了AsyncTask,但是在处理嵌套处理方面做的并不优雅。rxjava采用事件流的方式来解决了这一问题,当然rxjava的作用及优点不止是这个,还有很多的功能在使用起来也是让人爱不释手。本文不是对rxjava的用法及功能进行介绍,而是对rxjava的内部原理进行分析。
rxjava主要是通过发布/订阅模式来实现事件的控制和处理。两个接口和简单的一行代码就能明白rxjava的原理:
//发布者
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
//订阅者
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
//订阅
observableSource.subsrcibe(observer)
简单用法
Observable.create(ObservableOnSubscribe<String> {
//代码1
it.onNext("hello world")
}).flatMap(Function<String, ObservableSource<String>> {
val value = it
ObservableSource {
//代码2
it.onNext("flatmap-->$value")
}
}).subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
//代码3
e("MainActivity", "t--->$t")
}
override fun onError(e: Throwable) {
}
})
上面这段段代码创建了三个主要的对象:ObservableOnSubscribe(A)、ObservableFlatMap(B)和Observer(C),然后通过subscribe()方法将这个链串了起来。首先来看下Observable.create()方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
这里创建了一个ObservableCreate对象,ObservableCreate继承自Obsevable,Observable实现了ObservableSource接口。然后调用了flatMap方法,最终会创建ObservableFlatMap对象,这相当于B订阅了A。当调用了subsrcibe方法时,相当于C订阅了B。我们来看下ObservableFlatMap中的subsrcibe()方法,然后又调用了subscribeActual()这个核心方法:
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
这里的source对象就是他订阅的A对象,调用的subscribe(Observer o)方法就是代码方法,这里的o对象就是上面代码中的MergeObserver对象本身,紧接着又调用了MergeObserver.onNext()方法:
@Override
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
//省略部分代码
}
这里的mapper对象就是代码2处创建的Function对象,然后返回一个ObservaleSource对象,之后继续调用了下一个订阅者(也就是对象A)的onNext()方法。至此这个事件流就通过订阅链依次到每一个订阅者。
我们通过简单的代码来快速了解下rxjava的原理:
//发布者
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);}
//订阅者
public interface Observer<T> {
void onNext(T t);
}
//中间订阅者
public interface Function<V, K> {
K apply(V v);
}
public abstract class FlatMapObservable<T> implements Observer<T>, ObservableSource<T> {
private T mT;
public <K> ObservableSource<K> flatMap(Function<T, ? extends ObservableSource<K>> function) {
return function.apply(mT);
}
@Override
public void onNext(T t) {
mT = t;
}
}
//dome
public class Test {
public static void main(String[] args) {
new FlatMapObservable<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onNext("hello world");
}
}.flatMap(new Function<String, ObservableSource<Boolean>>() {
@Override
public ObservableSource<Boolean> apply(String s) {
return new ObservableSource<Boolean>() {
@Override
public void subscribe(Observer<? super Boolean> observer) {
observer.onNext(false);
}
};
}
}).subscribe(new Observer<Boolean>() {
@Override
public void onNext(Boolean aBoolean) {
System.out.println(aBoolean);
}
});
}
}