<pre>
static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
// Do some long running operation
SystemClock.sleep(2000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}
</pre>
一上来我们就举个例子可以看出来两个操作符。 有了前面的经验我们来简单的分析一下。这几个创建操作符的不同
<pre>
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableDefer<T>(supplier));
}
</pre>
和以前一样Observable的静态方法。
<pre>
public final class ObservableDefer<T> extends Observable<T> {
final Callable<? extends ObservableSource<? extends T>> supplier;
public ObservableDefer(Callable<? extends ObservableSource<? extends T>> supplier) {
this.supplier = supplier;
}
@Override
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
return;
}
pub.subscribe(s);
}
}
</pre>
这是完整ObservableDefer代码。可以看到subscribeActual这个方法不?看到这个我想大家一定马上就想到另一个方法Observable的静态方法subscribe,中最重要的部分 subscribeActual(observer);。吼吼 看到了吗。 只有当你调用的时候(订阅)才会整理数据。
<pre>
a = 12;
Observable<String> o2 =
Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just("defer result: " + a);
}
});
a = 20;
o2.subscribe(new Action1<String>() {
@Override
public void call(String t) {
System.out.println(t);
}
});
</pre>
这是一个网上例子。根据咱们的分析 。 但你订阅了subscribe。它才开始去找寻数据a=20;是不是很简单? 好了 往下进行
<pre>
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
return;
}
pub.subscribe(s);
}
</pre>
最重要的相信大家直接看到出来了。
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
supplier.call()这个就是调用你的call方法.下面的方法。大家试着分析一下?
pub.subscribe(s);
又回到了
Observable的静态方法subscribe。那么他会调用谁的?subscribeActual,大家肯定猜到了。
ObservableFromArray的subscribeActual方法~ 为什么呢? 看这里看这里
<pre>
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
</pre>
这是just的方法哦,让我们继续
<pre>
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
}
</pre>
看到这里明白了吧!大家思考一下。这两个以后什么不同点?
3
2
1
defer 中文的意思是推迟。 所以他们的区别就是 在创建的时候 defer的数据都是最新的,因为他在订阅的时候才回去执行,其他的just 就不会。
让我们进行下一个
creat 这个原理其实差不多。 我们来简单的走一下流程
<pre>
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
</pre>
ObservableCreate的subscribeActual方法
<pre>
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
</pre>
这里要稍微留意一下
source.subscribe(parent);
这里的方法调用的是
<pre>
Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesCricket());
e.onComplete();
}
}
</pre>
的subscribe方法,当调用onNext 或者onComplete的时候。他调用的就是ObservableCreate下的
onNext等方法
<pre>
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
<
</pre>
看看是不是特别简单
最后一个喽
from
。。。。。这个貌似很长。我要做一个专门的专题。