Rxjava本质上是一个异步操作库。
是一个能让你用非常简单的逻辑 去处理那些繁琐复杂任务的 异步的操作库。
一、观察者模式
在正式分析Rxjava之前先简单看一下Rxjava中核心的一个观察者模式
- 被观察者
Observerable
public interface Observerable {
public void registerObserver(Observer o);
public void removeObserver(Observer o);
public void notifyObservers();
}
ConcreteObserverable
public class ConcreteObserverable implements Observerable {
private List<Observer> mObservers;
private String msg;
public ConcreteObserverable() {
mObservers = new ArrayList<>();
}
@Override
public void registerObserver(Observer o) {
mObservers.add(o);
}
@Override
public void removeObserver(Observer o) {
int i = mObservers.indexOf(o);
if(i >= 0)
mObservers.remove(i);
}
@Override
public void notifyObservers() {
for(int i = 0; i < mObservers.size(); i++){
Observer observer = mObservers.get(i);
observer.update(msg);
}
}
public void setMsg(String msg){
this.msg = msg;
//信息更新完毕,通知所有观察者
notifyObservers();
}
}
- 观察者
public interface Observer {
public void update(String msg);
}
二、Rxjava使用
1、创建被观察者
create
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onCompleted();
}
});
just
Observable observable = Observable.just("1", "2");
from
String[] parameters = {"1", "2"};
Observable observableFrom = Observable.from(parameters);
2、创建观察者
Observer<Object> observer = new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object s) {
}
};
3、订阅
observable.subscribe(observer);
三、Observable被观察者
public class Observable<T> {
// 创建时从外面传入,可以把他理解为观察者模式中的notify()方法
final OnSubscribe<T> onSubscribe;
// 一个抽象的代理类,默认情况下不会对OnSubscribe做任何的处理
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
//使用create完成被观察者的创建,把传入的OnSubscribe赋值给成员变量
public static <T> Observable<T> create(OnSubscribe<T> f) {
// 默认情况下onCreate不会做任何处理直接将f返回
return new Observable<T>(hook.onCreate(f));
}
}
四、OnSubscribe
五、Observer/Subscriber观察者
观察者有Observer、Subscriber两个接口 ,Subscriber其实是对Observer的一个扩展 ,使用方式基本相同,使用时都会将Observer转换成Subscriber
// 实现Observer和Subscription接口,Subscription只有两个方法void unsubscribe()和boolean isUnsubscribed()
public abstract class Subscriber<T> implements Observer<T>, Subscription {
// 保存所有这个观察者订阅的事件,Subscription中的解除订阅也是删除这个集合中的被观察者
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
}
六、subscribe订阅
- 订阅Observer:会把Observer转成Subscriber
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
observer.onCompleted();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
});
}
- 订阅Subscriber
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
// new Subscriber so onStart it
subscriber.onStart();
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
// 调用call方法
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
return Subscriptions.unsubscribed();
}
}
七、操作符
变换
:将事件序列中的对象或整个序列进行加工处理,转换成不同的对象或事件序列
1、map
把一个事件转换成为另一个事件
Observable.just("xxxx/map.png")
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) {
return getBitmap(filePath);
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
showBitmap(bitmap);
}
});
map()函数接收一个Func1类型的参数,然后把这个Func1应用到每一个由Observable发射的值上,将发射的值转换为我们期望的值
先看一下just
是怎样创建被观察者对象的
Observable:
public static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
ScalarSynchronousObservable:
public final class ScalarSynchronousObservable<T> extends Observable<T> {
public static <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}
protected ScalarSynchronousObservable(final T t) {
//会创建一个OnSubscribe,OnSubscribe来通知观察者不同行为
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
s.setProducer(createProducer(s, t));
}
});
this.t = t;
}
}
下面重点看一下map
是怎样转换的
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
// 传入一个OperatorMap对象,lift主要做事件序列的处理和再发送
return lift(new OperatorMap<T, R>(func));
}
OperatorMap
public final class OperatorMap<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
//transformer是map中传入的Func1
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
OperatorMap中会创建一个新的观察者并在onNext中做类型的转换,但是创建了新的观察者并没有看到订阅操作,那么定位操作在哪做的呢?
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
// 创建一个新的被观察者
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// 调用operator创建的观察者的onStart
st.onStart();
//调用原始被观察者onSubscribe的call,但传入的Subscriber是operator中call方法创建的,里面会调用func1做类型转换
onSubscribe.call(st);
} catch (Throwable e) {
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
}
}
});
}
1、flatMap
private Subscription processNetAddress() {
return Observable.just(
"http://www.baidu.com/",
"https://www.sina.com/",
"https://www.sohu.com/")
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return createObservable(s);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
}
private Observable<String> createObservable(String s) {
return null;
}
将传入的事件对象转换成一个Observable对象;
他是不会直接发送这个Observable,而是将这个Observable激活让他自己开始发送事件;
每一个创建出来的Observable发送事件,都会被传入同一个Observable
八、线程控制
在默认不指定线程的情况下,Rxjava遵循线程不变原则:在哪个线程调用的subscribe就在哪个线程生产事件、消费事件
Rxjava通过Schedulers通过线程调度:
- Schedulers.immediate():当前线程执行,不切换线程
- Schedulers.newThread():总是启用新线程
- Schedulers.io():io线程,内部是一个没有数量上限的线程池
- Schedulers.computation(): 计算线程
- AndroidSchedulers.mainThread():主线程
1、subscribeOn()
OnSubscribe中call方法执行所处的线程
// 创建一个新的被观察者,通过新的观察者来做操作
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
//根据不同的scheduler做不同的线程处理
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
// 在新的线程中调用原始OnSubscribe的call方法
source.unsafeSubscribe(s);
}
});
}
}
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
//调用原始OnSubscribe的call方法
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
return Subscriptions.unsubscribed();
}
}
- 生成一个新的Observable、OnSubscribe
- 新的OnSubscribe会在目标Subscriber订阅时使用传入的Scheduler的worker作为线程调度执行者
2、observeOn()
事件消费 的线程
通过operator操作符的形式去完成线程的切换
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final class OperatorObserveOn<T> implements Operator<T, T> {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
//新线程中处理
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
}
}