如何使用RxJava

如何使用RxJava

Hello World

下面是一段java代码,代码创建了一个可观察者(Observable)用于发送一个字符串数组。然后去订阅(subscribe)它,并把结果打印出来。有关可观察者和订阅的概念请看观察者模式来了解。

//example
public static void hello(String... names) {
    Observable.from(names).subscribe(new Action1<String>() {

        @Override
        public void call(String s) {
            System.out.println("Hello " + s + "!");
        }

    });
}
//input
hello("Ben", "George");
//output
Hello Ben!
Hello George!

如何使用RxJava

要使用使用Rxjava,首先要创建可观察者(就是发送数据的那个Observable),可以通过不同的Observable操作符(just()、from()、create()等),获取不同返回值的Observable,来获取你想要的精确的数据(比如Array、Iterable之类的),然后创建一个观察者(Observer)来订阅Observable,观察它的变化并对此做出反应。

创建Observable

可以通过内置的方法方便的创建Observables,比如create(),或者其他将现有数据转化成
Observables的方法

基于已存在的数据结构常见Observable

可以使用just()from()来把对象、对象数组转化成Observable。

Observable<String> o = Observable.fromArray("a", "b", "c");

Observable<Integer> o2 = Observable.fromArray(5,6,7,8);

Observable<String> o3 = Observable.just("one object");

转换出来的Observable将会同步调用所有订阅这个Observable的Subscriber的onNext(),获取到Observable发送的所有数据,最后同步调用所有Subscriber的onCompleted()

通过create方法创建Observable

你可以通过create()来实现异步I/O(输入/输出)、计算操作或者生成无限大的数据流等目的。

Ps:
通常我们在系统级别说线程的blocked,是说线程操作io,被暂停了,这种线程由linux内核来唤醒(io设备报告数据来了,内核把block的线程放进可运行的进程队列,依次得到处理器时间)

同步的Observable例子

/**
 * This example shows a custom Observable that blocks 
 * when subscribed to (does not spawn an extra thread).
 */
def customObservableBlocking() {
    return Observable.create { aSubscriber ->
        50.times { i -> //=>for(i=0;i<50;i++)
            if (!aSubscriber.unsubscribed) {
                aSubscriber.onNext("value_${i}")
            }
        }
        // after sending all values we complete the sequence
        if (!aSubscriber.unsubscribed) {
            aSubscriber.onCompleted()
        }
    }
}

// To see output:
customObservableBlocking().subscribe { println(it) }
//
//如果执行两次customObservableBlocking(),同一线程里面只能同时存在一个Observable的实例,所以当运行两次customObservableBlocking,第二次的customObservableBlocking的Observable的创建会处于block状态。

异步的Observable例子

以下代码使用Groovy来创建一个Observable对象来发送75个字符串。它的编写非常简单,使用了静态类型和函数匿名内部类的实现,以使示例更加清晰。

/**
 * This example shows a custom Observable that does not block
 * when subscribed to as it spawns a separate thread.
 */
def customObservableNonBlocking() {
    return Observable.create({ subscriber ->
        Thread.start {
            for (i in 0..<75) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext("value_${i}")
            }
            // after sending all values we complete the sequence
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
    } as Observable.OnSubscribe)
}

// To see output:
customObservableNonBlocking().subscribe { println(it) }
//如果执行两次customObservableBlocking(),同一线程里面只能同时存在一个Observable的实例,所以当运行两次customObservableBlocking,两次的Observable在不同的线程,就不会发生block。

下面是在Clojure下,使用Funture来完成的相同功能的代码。

(defn customObservableNonBlocking []
  "This example shows a custom Observable that does not block 
   when subscribed to as it spawns a separate thread.
   
  returns Observable<String>"
  (Observable/create 
    (fn [subscriber]
      (let [f (future 
                (doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
                ; after sending all values we complete the sequence
                (-> subscriber .onCompleted))
        ))
      ))

; To see output
(.subscribe (customObservableNonBlocking) #(println %))

下面是一个从维基百科获取文章的例子,并在获取到每一篇文章后调用onNext。

(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
  "Fetch a list of Wikipedia articles asynchronously.
  
   return Observable<String> of HTML"
  (Observable/create 
    (fn [subscriber]
      (let [f (future
                (doseq [articleName wikipediaArticleNames]
                  (-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
                ; after sending response to onnext we complete the sequence
                (-> subscriber .onCompleted))
        ))))

//run
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"]) 
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))

回到Groovy,相同的代码长这样:

/*
 * Fetch a list of Wikipedia articles asynchronously.
 */
def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
    return Observable.create { subscriber ->
        Thread.start {
            for (articleName in wikipediaArticleNames) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
            }
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
        return subscriber
    }
}

fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
    .subscribe { println "--- Article ---\n${it.substring(0, 125)}" }

//Result
--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Tiger - Wikipedia, the free encyclopedia</title> ...
--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Elephant - Wikipedia, the free encyclopedia</tit ...

注意,上面所有的例子都忽略了错误处理,为了简洁。更完整的信息可以在Observable创建Observable页面找到。

通过操作符创建Observable

RxJava允许你通过操作符转换和生成Observable。

以下Groovy示例使用了之前定义的异步Observable发送了75个数据,但是调用skip(10)跳过前十个数据,然后调用take(5)只发送前5个数据,并且用map...在打印前把它们转化为特定字符串。

Ps:map方法的作用就是在onNext()之前截取数据,并对数据进行一定的处理,比如说下面示例map()获取到的数据第一个数据为"value_10",调用map后就变成"value_10_xform"。

/**
 * Asynchronously calls 'customObservableNonBlocking' and defines
 * a chain of operators to apply to the callback sequence.
 */
def simpleComposition() {
    customObservableNonBlocking().skip(10).take(5)
        .map({ stringValue -> return stringValue + "_xform"})
        .subscribe({ println "onNext => " + it})
}

//result
onNext => value_10_xform
onNext => value_11_xform
onNext => value_12_xform
onNext => value_13_xform
onNext => value_14_xform


//上面的示例翻译成java就是:
public class Hello {
    public static void main(String... args) {
         customObservableNonBlocking().skip(10).take(5).map(new Function<String, Object>() {
            @Override
            public Object apply(String s) throws Exception {
                return s + "_xform";
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println(o);
            }
        });
    }

    private static Observable<String> customObservableNonBlocking() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
                new Thread() {
                    @Override
                    public void run() {
                        super.run();
                        for (int i = 0; i < 75; i++) {
                            emitter.onNext("value_"+i);
                        }
                        emitter.onComplete();
                    }
                }.start();
            }
        });
    }
}

以下是ReactiveX特有的大理石图表(marble diagram),清晰地表现了以上流程:


Composition.1.png

以下Clojure示例消耗了三个异步Observable,包括从一个到另一个的依赖性,通过zip操作符来将三个Observables的每一个项目组合起来,发出一个单一的返回对象,然后通过map转换获取到的数据的类型。

(defn getVideoForUser [userId videoId]
  "Get video metadata for a given userId
   - video metadata
   - video bookmark position
   - user data
  return Observable<Map>"
    (let [user-observable (-> (getUser userId)
              (.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
          bookmark-observable (-> (getVideoBookmark userId videoId)
              (.map (fn [bookmark] {:viewed-position (:position bookmark)})))
          ; getVideoMetadata requires :language from user-observable so nest inside map function
          video-metadata-observable (-> user-observable 
              (.mapMany
                ; fetch metadata after a response from user-observable is received
                (fn [user-map] 
                  (getVideoMetadata videoId (:language user-map)))))]
          ; now combine 3 observables using zip
          (-> (Observable/zip bookmark-observable video-metadata-observable user-observable 
                (fn [bookmark-map metadata-map user-map]
                  {:bookmark-map bookmark-map 
                  :metadata-map metadata-map
                  :user-map user-map}))
            ; and transform into a single response object
            (.map (fn [data]
                  {:video-id videoId
                   :video-metadata (:metadata-map data)
                   :user-id userId
                   :language (:language (:user-map data))
                   :bookmark (:viewed-position (:bookmark-map data))
                  })))))

//response 
{:video-id 78965, 
 :video-metadata {:video-id 78965, :title House of Cards: Episode 1, 
                  :director David Fincher, :duration 3365}, 
 :user-id 12345, :language es-us, :bookmark 0}

图例如下:


Composition.2.png

如果用java代码借助上文中的customObservableNonBlocking()来演示Rxjava的zip操作符,就是如下显示:

private static void testForJava() {
        Observable.zip(customObservableNonBlocking(), customObservableNonBlocking(), new BiFunction<String, String, Object>() {

            @Override
            public Object apply(String s, String s2) throws Exception {

                return "s:" + s + ";s2:" + s2;
            }
        }).subscribe(new Consumer<Object>() {

            @Override
            public void accept(Object o) throws Exception {
                System.out.println(o);
            }
        });
    }

//input
testForJava();
//output
s:value_0;s2:value_0
s:value_1;s2:value_1
s:value_2;s2:value_2
s:value_3;s2:value_3
...
//结论
zip可以用于将多个ObServable的返回结果结合成一个。比如

以下Groovy示例取自本。克里森的关于Netflix API进化的演讲。。它通过merge操作符把两个Observable相结合,然后通过reduce操作符在生成的序列中构造单个项目,然后在数据发送前使用map改变数据的类型。

public Observable getVideoSummary(APIVideo video) {
   def seed = [id:video.id, title:video.getTitle()];
   def bookmarkObservable = getBookmark(video);
   def artworkObservable = getArtworkImageUrl(video);
   return( Observable.merge(bookmarkObservable, artworkObservable)
      .reduce(seed, { aggregate, current -> aggregate << current })
      .map({ [(video.id.toString() : it] }))
}

图例如下:


Composition.3.png

以下示例是用Java演示的有关RxJava的merge操作符和reduce操作符用法:

private static void test4(){
        String[] args1=new String[]{"张的欣1","张的欣2","张的欣3","张的欣4"};
        String[] args2=new String[]{"春晓1","春晓2","春晓3","春晓4"};
        //相同的数组可以进行合并
        Observable<String> mergeObservable=Observable.merge(Observable.fromArray(args1),Observable.fromArray(args2));
        
        //merge后输出:
        //张的欣1
        //张的欣2
        //张的欣3
        //张的欣4
        //春晓1
        //春晓2
        //春晓3
        //春晓4

        Maybe<String> maybe=mergeObservable.reduce(new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                return s.length()>s2.length()?s:s2;
            }
        });
        maybe.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

        //reduce后输出:
        //张的欣4
    }

//结论:
1.merge的作用相当于Observable之间的Append。
2.reduce相当于筛选。

错误处理

以下上文中抓取维基百科上文章的一个例子,现在加上错误处理。

/*
 * Fetch a list of Wikipedia articles asynchronously, with error handling.
 */
def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
    return Observable.create({ subscriber ->
        Thread.start {
            try {
                for (articleName in wikipediaArticleNames) {
                    if (true == subscriber.isUnsubscribed()) {
                        return;
                    }
                    subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
                }
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            } catch(Throwable t) {
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onError(t);
                }
            }
            return (subscriber);
        }
    });
}

如果出现了错误,以下代码将通过订阅第二个方法来调用onError()。

fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
    .subscribe(
        { println "--- Article ---\n" + it.substring(0, 125) }, 
        { println "--- Error ---\n" + it.getMessage() })

更多有关RxJava的错误处理操作符的信息请参见Error-Handling-Operators页面,其中包括像onErrorResumeNext()onErrorReturn()这样可以让Observable在遭遇错误后
回调的方法。

以下示例说明了如何使用这样的方法来抛出异常。 假设你有一个或者一组的Observable — myObservable。然后你想用自己自定义的Throwable类替代默认Throwable拦截所有会回调Subscriber的onError()的异常。你可以通过重写myObservable的onErrorResumeNext() 来调用OnError()。然后把自定义错误作为参数调用一个名为error()的方法。

myModifiedObservable = myObservable.onErrorResumeNext({ t ->
   Throwable myThrowable = myCustomizedThrowableCreator(t);
   return (Observable.error(myThrowable));
});

参考原文:
1.How-To-Use-RxJava
2.Why should we use RxJava on Android

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342