将从Rx一个最简单的流程说起,说到map,说到SubscribeOn,说到observeOn,说到天荒地老
一个完整流程
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Object> e) throws Exception {
System.out.println("工作空间");
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("已连接观察通道");
}
@Override
public void onNext(@NonNull Object o) {
System.out.println("接受工作空间发送的事件");
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("出错");
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
接下来看图分析整个流程做了什么
文字描述:被观察者调用subscribe
(订阅)观察者,然后被观察者通过调用ObservableEmitter
(发射器)发射各种事件(onNext
、onError
、onComplete
)
转换事件
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
System.out.println("工作空间。我发送的是string类型数据");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String o) throws Exception {
System.out.println("将string转换成int返回");
return o.length();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("已连接观察通道");
}
@Override
public void onNext(@NonNull Integer o) {
System.out.println("接受工作空间发送的事件,接受到的是int类型");
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("出错");
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
可以看到,一开始的被观察者发射的类型是String
,而观察者接受的类型是Integer
类型。我要的是宝马你却给我电单车?WTF?
这个时候就需要用到map
这个操作符了,可以相当于一个中转站(加工站),将被观察者发送过来的数据转换成符合观察者需要的数据,然后再将它返回给观察者。完美!
But,how it work??
接下来看图分析整个流程做了什么
注:上述图传递的不是ObservableEmitter
,而是为了更直观了解流程,而实际具体流程也差不多,只是实现不太一样。
文字描述:
①:原始被观察者
调用map()
的时候,重新创建了一个被观察者(这里称它为Map被观察者
),然后用Map被观察者
订阅原始观察者
。
②:然后在订阅成功后,原始被观察者
将订阅一个新的观察者(这里称它为Map观察者
)。
③:然后原始被观察者
在发送(String)消息的时候,Map观察者
接受到(String)消息,将(String)消息通过apply()
方法将其转为(Integer)消息,并通过Map被观察者
发送给原始观察者
。
④:apply()
方法是我们自己实现的方法
Map简单代码如下
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return create(new ObservableOnSubscribe<R>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) {
subscribe(new Observer<T>() {
@Override
public void onNext(T var1) {
e.onNext(mapper.call(var1));
}
});
}
});
}
转换线程(SubscribeOn)
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
System.out.println("工作空间。我发送的是string类型数据");
}
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("已连接观察通道");
}
@Override
public void onNext(@NonNull String o) {
System.out.println("接受工作空间发送的事件,接受到的是int类型");
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("出错");
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
接下来看图分析整个流程做了什么
文字分析:和map类似的操作,但又有点不同
①:原始被观察者
调用subscribeOn()
的时候,重新创建了一个被观察者(这里称它为subscribeOn被观察者
),然后用subscribeOn被观察者
订阅原始观察者
。
②:然后在订阅成功后,进行线程的转换
。
③:在subscribeOn被观察者
中调用原始被观察者
的subscribe(ObservableEmitter<Object> e)
,其中的参数发射器e
用的是subscribeOn被观察者
的发射器
subscribeOn()简单代码如下
public final Observable<T> subscribeOn(Scheduler scheduler){
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
// 线程的转换
scheduler.createWorker().schedule(new SubscribeTask() {
@Override
public void run() {
// source为原始被观察者
source.call(e);
}
});
}
});
}
转换线程(observeOn)
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
System.out.println("工作空间。我发送的是string类型数据");
}
}).observeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("已连接观察通道");
}
@Override
public void onNext(@NonNull String o) {
System.out.println("接受工作空间发送的事件,接受到的是int类型");
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("出错");
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
接下来看图分析整个流程做了什么
文字分析:和map比较类似的操作,但又有点不同
①:原始被观察者
调用observeOn()
的时候,重新创建了一个被观察者(这里称它为observeOn被观察者
),然后用observeOn被观察者
订阅原始观察者
。
②:然后订阅成功后将创建一个观察者(这里称为observeOn观察者
),作为参数调用原始被观察者
的subscribe(@NonNull ObservableEmitter<Object> e)
。
③:之后的原始被观察者
发送的onNext
事件都会先经过observeOn观察者
的onNext
事件先,在里面会进行线程的转换
,再调用observeOn被观察者
的发射器来
发送onNext
事件给原始观察者
。
observeOn()简单代码如下
public final Observable<T> observeOn(Scheduler scheduler) {
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(@NonNull Observer<Object> e) {
// source为原始被观察者
source.subscribe(new Observer<T>() {
@Override
public void onNext(T var1) {
// 线程的转换
scheduler.createWorker().schedule(new Runnable() {
@Override
public void run() {
// e为observeOn被观察者
e.onNext(var1);
}
});
}
});
}
});
}
结语
其实RxJava里面的各种操作符大部分都是利用了这种思想,用各种Observable和Observer来达到目的。
observeOn和subscribeOn对线程的调度94这样,对于里面什么方法走在什么线程,应该理解上面所说的就应该清楚了。