如何使用RxJava
Hello World
下面是一段java代码,代码创建了一个可观察者(Observable)用于发送一个字符串数组。然后去订阅(subscribe)它,并把结果打印出来。有关可观察者和订阅的概念请看观察者模式来了解。
//example
public static void hello(String... names) {
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Hello " + s + "!");
}
});
}
//input
hello("Ben", "George");
//output
Hello Ben!
Hello George!
如何使用RxJava
要使用使用Rxjava,首先要创建可观察者(就是发送数据的那个Observable),可以通过不同的Observable操作符(just()、from()、create()等),获取不同返回值的Observable,来获取你想要的精确的数据(比如Array、Iterable之类的),然后创建一个观察者(Observer)来订阅Observable,观察它的变化并对此做出反应。
创建Observable
可以通过内置的方法方便的创建Observables,比如create(),或者其他将现有数据转化成
Observables的方法
基于已存在的数据结构常见Observable
可以使用just()和from()来把对象、对象数组转化成Observable。
Observable<String> o = Observable.fromArray("a", "b", "c");
Observable<Integer> o2 = Observable.fromArray(5,6,7,8);
Observable<String> o3 = Observable.just("one object");
转换出来的Observable将会同步调用所有订阅这个Observable的Subscriber的onNext(),获取到Observable发送的所有数据,最后同步调用所有Subscriber的onCompleted()
通过create方法创建Observable
你可以通过create()来实现异步I/O(输入/输出)、计算操作或者生成无限大的数据流等目的。
Ps:
通常我们在系统级别说线程的blocked,是说线程操作io,被暂停了,这种线程由linux内核来唤醒(io设备报告数据来了,内核把block的线程放进可运行的进程队列,依次得到处理器时间)
同步的Observable例子
/**
* This example shows a custom Observable that blocks
* when subscribed to (does not spawn an extra thread).
*/
def customObservableBlocking() {
return Observable.create { aSubscriber ->
50.times { i -> //=>for(i=0;i<50;i++)
if (!aSubscriber.unsubscribed) {
aSubscriber.onNext("value_${i}")
}
}
// after sending all values we complete the sequence
if (!aSubscriber.unsubscribed) {
aSubscriber.onCompleted()
}
}
}
// To see output:
customObservableBlocking().subscribe { println(it) }
//
//如果执行两次customObservableBlocking(),同一线程里面只能同时存在一个Observable的实例,所以当运行两次customObservableBlocking,第二次的customObservableBlocking的Observable的创建会处于block状态。
异步的Observable例子
以下代码使用Groovy来创建一个Observable对象来发送75个字符串。它的编写非常简单,使用了静态类型和函数匿名内部类的实现,以使示例更加清晰。
/**
* This example shows a custom Observable that does not block
* when subscribed to as it spawns a separate thread.
*/
def customObservableNonBlocking() {
return Observable.create({ subscriber ->
Thread.start {
for (i in 0..<75) {
if (subscriber.unsubscribed) {
return
}
subscriber.onNext("value_${i}")
}
// after sending all values we complete the sequence
if (!subscriber.unsubscribed) {
subscriber.onCompleted()
}
}
} as Observable.OnSubscribe)
}
// To see output:
customObservableNonBlocking().subscribe { println(it) }
//如果执行两次customObservableBlocking(),同一线程里面只能同时存在一个Observable的实例,所以当运行两次customObservableBlocking,两次的Observable在不同的线程,就不会发生block。
下面是在Clojure下,使用Funture来完成的相同功能的代码。
(defn customObservableNonBlocking []
"This example shows a custom Observable that does not block
when subscribed to as it spawns a separate thread.
returns Observable<String>"
(Observable/create
(fn [subscriber]
(let [f (future
(doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
; after sending all values we complete the sequence
(-> subscriber .onCompleted))
))
))
; To see output
(.subscribe (customObservableNonBlocking) #(println %))
下面是一个从维基百科获取文章的例子,并在获取到每一篇文章后调用onNext。
(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously.
return Observable<String> of HTML"
(Observable/create
(fn [subscriber]
(let [f (future
(doseq [articleName wikipediaArticleNames]
(-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
; after sending response to onnext we complete the sequence
(-> subscriber .onCompleted))
))))
//run
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))
回到Groovy,相同的代码长这样:
/*
* Fetch a list of Wikipedia articles asynchronously.
*/
def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
return Observable.create { subscriber ->
Thread.start {
for (articleName in wikipediaArticleNames) {
if (subscriber.unsubscribed) {
return
}
subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
}
if (!subscriber.unsubscribed) {
subscriber.onCompleted()
}
}
return subscriber
}
}
fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
.subscribe { println "--- Article ---\n${it.substring(0, 125)}" }
//Result
--- Article ---
<!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Tiger - Wikipedia, the free encyclopedia</title> ...
--- Article ---
<!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Elephant - Wikipedia, the free encyclopedia</tit ...
注意,上面所有的例子都忽略了错误处理,为了简洁。更完整的信息可以在Observable和创建Observable页面找到。
通过操作符创建Observable
RxJava允许你通过操作符转换和生成Observable。
以下Groovy示例使用了之前定义的异步Observable发送了75个数据,但是调用skip(10)跳过前十个数据,然后调用take(5)只发送前5个数据,并且用map...在打印前把它们转化为特定字符串。
Ps:map方法的作用就是在onNext()之前截取数据,并对数据进行一定的处理,比如说下面示例map()获取到的数据第一个数据为"value_10",调用map后就变成"value_10_xform"。
/**
* Asynchronously calls 'customObservableNonBlocking' and defines
* a chain of operators to apply to the callback sequence.
*/
def simpleComposition() {
customObservableNonBlocking().skip(10).take(5)
.map({ stringValue -> return stringValue + "_xform"})
.subscribe({ println "onNext => " + it})
}
//result
onNext => value_10_xform
onNext => value_11_xform
onNext => value_12_xform
onNext => value_13_xform
onNext => value_14_xform
//上面的示例翻译成java就是:
public class Hello {
public static void main(String... args) {
customObservableNonBlocking().skip(10).take(5).map(new Function<String, Object>() {
@Override
public Object apply(String s) throws Exception {
return s + "_xform";
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println(o);
}
});
}
private static Observable<String> customObservableNonBlocking() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
new Thread() {
@Override
public void run() {
super.run();
for (int i = 0; i < 75; i++) {
emitter.onNext("value_"+i);
}
emitter.onComplete();
}
}.start();
}
});
}
}
以下是ReactiveX特有的大理石图表(marble diagram),清晰地表现了以上流程:
以下Clojure示例消耗了三个异步Observable,包括从一个到另一个的依赖性,通过zip操作符来将三个Observables的每一个项目组合起来,发出一个单一的返回对象,然后通过map转换获取到的数据的类型。
(defn getVideoForUser [userId videoId]
"Get video metadata for a given userId
- video metadata
- video bookmark position
- user data
return Observable<Map>"
(let [user-observable (-> (getUser userId)
(.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
bookmark-observable (-> (getVideoBookmark userId videoId)
(.map (fn [bookmark] {:viewed-position (:position bookmark)})))
; getVideoMetadata requires :language from user-observable so nest inside map function
video-metadata-observable (-> user-observable
(.mapMany
; fetch metadata after a response from user-observable is received
(fn [user-map]
(getVideoMetadata videoId (:language user-map)))))]
; now combine 3 observables using zip
(-> (Observable/zip bookmark-observable video-metadata-observable user-observable
(fn [bookmark-map metadata-map user-map]
{:bookmark-map bookmark-map
:metadata-map metadata-map
:user-map user-map}))
; and transform into a single response object
(.map (fn [data]
{:video-id videoId
:video-metadata (:metadata-map data)
:user-id userId
:language (:language (:user-map data))
:bookmark (:viewed-position (:bookmark-map data))
})))))
//response
{:video-id 78965,
:video-metadata {:video-id 78965, :title House of Cards: Episode 1,
:director David Fincher, :duration 3365},
:user-id 12345, :language es-us, :bookmark 0}
图例如下:
如果用java代码借助上文中的customObservableNonBlocking()来演示Rxjava的zip操作符,就是如下显示:
private static void testForJava() {
Observable.zip(customObservableNonBlocking(), customObservableNonBlocking(), new BiFunction<String, String, Object>() {
@Override
public Object apply(String s, String s2) throws Exception {
return "s:" + s + ";s2:" + s2;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println(o);
}
});
}
//input
testForJava();
//output
s:value_0;s2:value_0
s:value_1;s2:value_1
s:value_2;s2:value_2
s:value_3;s2:value_3
...
//结论
zip可以用于将多个ObServable的返回结果结合成一个。比如
以下Groovy示例取自本。克里森的关于Netflix API进化的演讲。。它通过merge操作符把两个Observable相结合,然后通过reduce操作符在生成的序列中构造单个项目,然后在数据发送前使用map改变数据的类型。
public Observable getVideoSummary(APIVideo video) {
def seed = [id:video.id, title:video.getTitle()];
def bookmarkObservable = getBookmark(video);
def artworkObservable = getArtworkImageUrl(video);
return( Observable.merge(bookmarkObservable, artworkObservable)
.reduce(seed, { aggregate, current -> aggregate << current })
.map({ [(video.id.toString() : it] }))
}
图例如下:
以下示例是用Java演示的有关RxJava的merge操作符和reduce操作符用法:
private static void test4(){
String[] args1=new String[]{"张的欣1","张的欣2","张的欣3","张的欣4"};
String[] args2=new String[]{"春晓1","春晓2","春晓3","春晓4"};
//相同的数组可以进行合并
Observable<String> mergeObservable=Observable.merge(Observable.fromArray(args1),Observable.fromArray(args2));
//merge后输出:
//张的欣1
//张的欣2
//张的欣3
//张的欣4
//春晓1
//春晓2
//春晓3
//春晓4
Maybe<String> maybe=mergeObservable.reduce(new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s.length()>s2.length()?s:s2;
}
});
maybe.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
//reduce后输出:
//张的欣4
}
//结论:
1.merge的作用相当于Observable之间的Append。
2.reduce相当于筛选。
错误处理
以下上文中抓取维基百科上文章的一个例子,现在加上错误处理。
/*
* Fetch a list of Wikipedia articles asynchronously, with error handling.
*/
def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
return Observable.create({ subscriber ->
Thread.start {
try {
for (articleName in wikipediaArticleNames) {
if (true == subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
}
if (false == subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch(Throwable t) {
if (false == subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}
return (subscriber);
}
});
}
如果出现了错误,以下代码将通过订阅第二个方法来调用onError()。
fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
.subscribe(
{ println "--- Article ---\n" + it.substring(0, 125) },
{ println "--- Error ---\n" + it.getMessage() })
更多有关RxJava的错误处理操作符的信息请参见Error-Handling-Operators页面,其中包括像onErrorResumeNext()和onErrorReturn()这样可以让Observable在遭遇错误后
回调的方法。
以下示例说明了如何使用这样的方法来抛出异常。 假设你有一个或者一组的Observable — myObservable。然后你想用自己自定义的Throwable类替代默认Throwable拦截所有会回调Subscriber的onError()的异常。你可以通过重写myObservable的onErrorResumeNext() 来调用OnError()。然后把自定义错误作为参数调用一个名为error()的方法。
myModifiedObservable = myObservable.onErrorResumeNext({ t ->
Throwable myThrowable = myCustomizedThrowableCreator(t);
return (Observable.error(myThrowable));
});
参考原文:
1.How-To-Use-RxJava
2.Why should we use RxJava on Android