简介
这篇文章没有介绍,因为这是我们最后一篇文章的延续,但在开始之前,我想我们会对上一篇文章进行修改。在最后一部分,Rx可以与我们分享Hot Vs Cold这些概念的一个具体的例子。在那之后,我问了一些问题,但是Rx可以观察到,我们应该在Subject API之前了解观察者API。因此,我们将继续我们的对话,从观察者API,开始。
对话
Observable:在我们了解之前Observables之前,这里有一个订阅者的概念,这个订阅者我们在项目中早就已经使用到,我们需要在api之前了解他。
me:好 我们怎么开始?
Observable: 我的新朋友Observer 他会好好教你
Observer: 在 开始了解我之前 我有一个问题。 你了解关于Functional 接口内容?
me:了解
Observer : 正如你知道的订阅者是观察Observable数据流变化的 如果有变了。注意到他变化的订阅者 。 你知道可以呦不同类型的订阅者 没有我。 Observable is什么都不是
Observable:哈哈。百分之分没错。甜心
Observer任何你能看见Observable的地方。我给你百分之百的保证我会在那里。相反,你可以说我是一个介于Observable和开发人员之间的桥梁。如果你是Rx的新用户,你想要使用第三方库,使用Rx。所以,只有当你知道我的时候,你才真的会使用这个库。我认为这就足够了。
me:o((≧▽≦o)!!
Observer:任何时候你都想知道数据发生怎么样的变化,或者Observable 观察的数据或者事件发生变化的时候,你需要通过使用我来订阅这个Observable的,后来,当Observable想要告诉你任何变化时,他会通知我,我会通知你。所以基本上你可以用我的很多方法但是首先我要从我最基本的API开始。
me:我对一句话有点困扰“你可以用到我的很多种方式”
Observer: 等一下并我讲,在我基本的用法中有四种
public interface Observer {
void onSubscribe(Disposable var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
这里T是Java中的泛型。我并不像在这里讨论Java泛型。简而言之,如果你在数据类型是Person的数据,那么T应该是一个Person对象。
现在,这并不是强制使用基本的四种方法观察者API。这完全取决于你的要求。我将给您一些例子,您可以很容易地确定何时使用这个基本API,或者何时可以使用非常简单的API,稍后我将与您分享。
现在我要一次用一种方法。
void onSubscribe(Disposable var1);:
无论何时,当你观察和被观察者连接到一起(订阅)的时候,你就会得到一个Disposable 对象。它有一个非常简单的API,如下所示。附加
public interface Disposable {
void dispose();
boolean isDisposed();
}
因此dispose 就是当你对Observable的变化不感兴趣的时候你调用这个方法。所以你想让我脱离Observable的时候,让我调用这个方法就可以,就类似我和Observable离婚了。在这之后发生任何事情我都不关心
第二个isDisposed方法。这个方法就是在我自己不确定自己是不是离婚的时候调用,我用这个方法去检查如果是false 那么我还没有离婚。那么我就需要调用dispose去离婚
void onNext(T var1);:方法
这个方法的作用。就是当我订阅Observable的时候。它有变化或者新的数据的时候,我就可以使用
我想我可以换一种方式去解释,当Observable 和我结婚的时候,他将onSubscribe给了我,因此我答应了求婚。这也是重要的一点。意味着我可以随时离婚,现在,当我们结婚时,Observable 总是告诉我他的数据或事件流中会发生什么变化。在那时,可以观察到我的onNext(任何数据)方法。因此,用简单的话来说,当他的数据有任何变化时,他总是用我的onNext(T data)方法告诉我,开发人员。
void onError(Throwable var1);:
这个api是很重要的 任何时候他挂的时候通过这个方式通知我。他能告诉我他死前面对的是什么类型,当调用这个方法的时候disposable isDispose()返回的都是true;所以有的时候 我并没有要求离婚。但是当他挂了我可以通过这个方法去检查一下、
void onComplete();:
这个方法对我来说也是很重要。这个是Observable 准备找死或者想和我离婚的时候调用这个俄方法、 ,它总是用onComplete 来通知我。就像我们之前讨论的那样。 希望你理顺了这一切。
me: 还有最后一个小小的问题,onError and onComplete ,都是告诉我再有新的数据或者改变也不用通知我!那么这有什么不同呢
Observer:onError 相当于死于疾病,onComplete相当于正常死亡。他们两个只会调用其中的一个。另一个肯定不会调用,希望你明白
me:wow 好
Observer 现在我给你一个例子告诉你显示中我是怎么被用到的
在这个例子中,我将创建一Observer,每隔一秒就会给我数据。我将使用这些数据,以不同的方式观察,让你对我的API有一个清晰的了解。
private staticObservablegetObservable() {
return Observable.create(observableEmitter -> {
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> observableEmitter.onNext(new Object()));
});
}
这一个方法可能会让你感到困惑, 我和Observable结婚之后他总是每一秒之后都给我新的数据,并且你还可以看到他的数据类型。,所以当订阅或者结婚的时候你就会知道数据类型。我将方法的调用
Observerobserver= new Observer() {
@Override
public voidonSubscribe(Disposable disposable) {ObserverLecture.disposable= disposable;
}
@Override
public voidonNext(Object o) {
System.out.println("onNext called");
}
@Override
public voidonError(Throwable throwable) {
System.out.println("onError called. Die due to reason: "+throwable.getMessage());
}
@Override
public voidonComplete() {
System.out.println("onComplete: Die with natural death");
}
};
、
看这就是我。 并不需要过多的解释。当我们订阅会或者想要结婚的时候 通过subscribe方法
Complete code:
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) {
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable disposable) {
ObserverLecture.disposable = disposable;
}
@Override
public void onNext(Object o) {
System.out.println("onNext called");
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError called. Die due to reason: "+throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete: Die with natural death");
}
};
getObservable().subscribe(observer);
while (true);
}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> observableEmitter.onNext(new Object()));
});
}
}
If I run this code. I will get below output for infinite time, means this program never exit.
Output:
onNext called
onNext called
onNext called
onNext called
onNext called
下面我就给展示一下Disposable 的一些用法
/**
* Created by waleed on 14/05/2017.
*/
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) throws InterruptedException {
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable disposable) {
ObserverLecture.disposable = disposable;
}
@Override
public void onNext(Object o) {
System.out.println("onNext called");
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError called. Die due to reason: "+throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete: Die with natural death");
}
};
getObservable().subscribe(observer);while (true){Thread.sleep(1000);System.out.println("disposable.isDisposed(): "+disposable.isDisposed());}}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> observableEmitter.onNext(new Object()));
});
}
}
This is a same code like above only one change in while loop. In while loop after every one second I am showing the value of Disposable is Observer is divorced or not.
Output:
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
… infinite
So you can easily see false, mean I am not divorced because I never called Disposable dispose() method. Now its time to show you what will happen when I will call dispose().
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) throws InterruptedException {
Observer observer = new Observer() {
@Override public void onSubscribe(Disposable disposable) {ObserverLecture.disposable = disposable;}
@Override public void onNext(Object o) {System.out.println("onNext called");}
@Override public void onError(Throwable throwable) {System.out.println("onError called. Die due to reason: " + throwable.getMessage());}
@Override public void onComplete() {System.out.println("onComplete: Die with natural death");}
};
getObservable().subscribe(observer);int count = 0;while (true) {
Thread.sleep(1000);
System.out.println("disposable.isDisposed(): " + disposable.isDisposed());count++;if (count == 3)disposable.dispose();}
}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> {
observableEmitter.onNext(new Object());
});
});
}
}
here again code is same only difference in while loop. This time I added a one count variable. So as I got data from Observable three time’s I will call dispose. Its mean I want divorce from Observable.
Output:
onNext called
disposable.isDisposed(): false
onNext called
disposable.isDisposed(): false
onNext called
disposable.isDisposed(): false
disposable.isDisposed():true
disposable.isDisposed():true
disposable.isDisposed():true
现在你能一眼就看出来输出,在三次之后我获取到的是true,他意味我离婚了。现在的问题是Observable怎了、是挂了还是没挂?因此我想使用Hot vs Cold Observable. 的概念。如果这是Hot Observable,他就不会死,但如果他 Cold Observable,他虽然不会死,而是会停止发送数据。
因此,是时候讨论关于疾病或自然死亡的onError()和onComplete()或死亡
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) throws InterruptedException {
Observer observer = new Observer() {
@Override public void onSubscribe(Disposable disposable) {ObserverLecture.disposable = disposable;}
@Override public void onNext(Object o) {System.out.println("onNext called");System.out.println("disposable.isDisposed(): " + disposable.isDisposed());}
@Override public void onError(Throwable throwable) {System.out.println("onError called. Die due to reason: " + throwable.getMessage());}
@Override public void onComplete() {System.out.println("onComplete: Die with natural death");}
};
getObservable().subscribe(observer);
while (true) {
Thread.sleep(1000);
System.out.println("disposable.isDisposed(): " + disposable.isDisposed());
}
}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onError(new RuntimeException("Die due to cancer"));});
}
}
这里我们可以明确的看到这个onerror的作用
Output:
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed(): false
onError called. Die due to reason: Die due to cancer
disposable.isDisposed():true
disposable.isDisposed():true
…
现在你可以很容易地看到了。我们Observable死亡。他把我的错误方法称为“我的错误方法”,而且在死后我的is处置()给了我真实的信息。也就是说,我离婚了,或者是寡妇。
现在是检查onComplete()的时候了。
public class ObserverLecture {
private static Disposable disposable;
public static void main(String[] args) throws InterruptedException {
Observer observer = new Observer() {
@Override public void onSubscribe(Disposable disposable) {ObserverLecture.disposable = disposable;}
@Override public void onNext(Object o) {System.out.println("onNext called"); System.out.println("disposable.isDisposed(): " + disposable.isDisposed());}
@Override public void onError(Throwable throwable) {System.out.println("onError called. Die due to reason: " + throwable.getMessage());}
@Override public void onComplete() {System.out.println("onComplete: Die with natural death");}
};
getObservable().subscribe(observer);
while (true) {
Thread.sleep(1000);
System.out.println("disposable.isDisposed(): " + disposable.isDisposed());
}
}
private static Observable getObservable() {
return Observable.create(observableEmitter -> {
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());
observableEmitter.onNext(new Object());observableEmitter.onComplete();});
}
}
Here you can see I have only one change. Observable called onComplete on his own.
Output:
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onNext called
disposable.isDisposed():false
onComplete: Die with natural death
disposable.isDisposed():true
disposable.isDisposed():true
disposable.isDisposed():true
更加直观
当我disposable.isDisposed()我获取到的是false,我并没有离婚我还在一直获取数据,但是当我调取onComplete的时候isDispose 变成了true,这意味了Observable 正常死亡
me:wow 谢谢Observer,这真是一个很棒的解释,你减少了我很多的迷惑。我对你有过一些了解,但现在我很好奇,有些时候,人们只在用户中使用一种方法。该方法是什么?
Observer:首先感谢,我可以项目解释更多api,但是首先我感觉你应该使用一些相同的概念 在android x下面我将给你介绍一个例子
me: 我同意你的观点,但我认为首先我们会了解你的一切,然后我会在Android上给出一个真实的例子,我将使用你上面所有的API。
Observer:正如你希望的那样,有时,场景并不复杂,您可以使用观察者4方法API,但是我有一种感觉,不需要使用四种方法,您可以使用更少的代码来实现该场景。为此,我将自己划分为功能接口,或者可以说,这是观察者的一种语法糖。例如
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(String string) {
System.out.println("onNext: "+string);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
}
Output:
onNext: A
onNext: B
onNext: C
onNext: D
onComplete
现在您可以看到,我只对数据感兴趣,但我需要实现on订阅、onError和onComplete。这是一个样板,在下一个例子中,我们如何用更少的代码实现
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(s -> System.out.println(s));
}
}
在上面的例子中,上面的例子都是相同的,但是您可以看到,这一次我只使用了两行代码,在此之前,它是一个非常长的代码。现在我将与你们分享我所有的功能接口以及如何在你们的应用程序中使用。
public interface Consumer {
void accept(@NonNull T var1) throws Exception;
}
public interface Action {
void run() throws Exception;
}
所以我有两个函数接口。一个是Consumer的,这是非常有用的,第二个是Action。
首先,我们将讨论Consumer接口
你知道的,我只对数据感兴趣,我不关心任何其他的状态,比如我不想是不是使用Disposable离婚。还是我不想知道的是可以被自然死亡或某种疾病致死的。在这种情况下,我可以使用这个消费者API,我也想说,感谢Observable ,他们给了我使用我的功能接口来订阅的选项。
Observable: O(∩_∩)O
Observer:下面看一下使用过的代码
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(new Consumer() {@Overridepublic void accept(String s) throws Exception {System.out.println(s);}});
}
这里我只订阅onNext()从Observable的回调,您可以很容易地看到,我将一个匿名类发送到Observable到的订阅。这是一个神奇的地方,正如我已经告诉你们的,我有函数接口,所以它的意思是,我可以将Lambda表达式发送到Observable的订阅,而不是匿名类或接口对象。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(s -> System.out.println(s));
}
}
Wow. You can see in above example, only one line.
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(System.out::println);
}
}
哇。更少的单词。这里我使用的是方法引用,但最终所有的代码块都给了我相同的功能。还有一种技术将保留在一个相同的示例中,如下所示。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer);
}private static Consumer consumer = System.out::print;//private static Consumer consumer2 = s->{};}
在这里,我分别定义了我的消费者功能接口,我正在使用该对象进行订阅。
接下来,我们还想知道是否出现了错误。如何使用相同的功能接口来通知我。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer,new Consumer() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("Die due to "+throwable.getMessage());}});
}
private static Consumer consumer = System.out::print;
}
在这里,您可以很容易地看到在订阅方法中可以看到的第二个参数是onError。同样,我也发送了相同的消费者功能接口,T=可投掷。这是很简单的。
接下来,如何通过使用Lambda表达式实现相同的目标。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer,throwable -> System.out.println("Die due to "+throwable.getMessage()));
}
private static Consumer consumer = System.out::print;
}
接下来,我如何使用方法引用来实现这一目标。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer,System.out::print);
}
private static Consumer consumer = System.out::print;
}
Wow. Only one thing to mention. Here method reference call throwable.toString() not able to show our custom message. Like how we are showing in one above example (System.out.println(“Die due to “+throwable.getMessage()).
Now its time to show you by defining my Error Consumer API and sending that object to subscribe.
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer,error);
}
private static Consumer consumer = System.out::print;private static Consumer error = System.out::print;}
现在我知道您很好奇如何知道是否可以看到onComplete()是否被调用,为此,我有一个Action 接口。我需要把它作为第三个参数来订阅可观察的。因此,作为完整的Observable ,我将在 action interface得到信号。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer, error,new Action() {@Overridepublic void run() throws Exception {System.out.println("OnComplete");}});
}
private static Consumer consumer = System.out::print;
private static Consumer error = System.out::print;
}
在这你能看到这个Action这个匿名接口作为第三个参数接下来我要展示给你们看。我们如何使用Lambda表达式作为一个方法引用,最后作为一个单独的定义对象。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer, error,() -> System.out.println("OnComplete"));
}
private static Consumer consumer = System.out::print;
private static Consumer error = System.out::print;
}
所以你可以看到第三个参数是Action而不是Consumer。这一点很重要。
最后一件事是Disposable。如果我想离婚,我怎么能得到Disposable。这是第四个参数,即T=可支配的消费者。
public class ObserverLecture {
public static void main(String[] args) {
List strings = Arrays.asList("A", "B", "C", "D");
Observable.fromIterable(strings)
.subscribe(consumer, error, complete,new Consumer() {@Overridepublic void accept(Disposable disposable) throws Exception {}});
}
private static Consumer consumer = System.out::print;
private static Consumer error = System.out::print;
private static Action complete = ()-> System.out.println("onComplete");
}
这里我可以获取Disposable。
我也可以在这里看到。我可以作为一个观察者来实现我自己,或者我可以通过使用功能接口来实现同样的目标。它的意思是
Observer subscription=4个功能接口订阅(onsumer, Consumer, Action, Consumer)