RxJava2

1.  什么时候使用Flowable,什么时候使用Observable?

下面是官方文档(原文)的直接翻译:

一个小遗憾是,在RxJava 0.x引入背压(backpressure)时,并没有使用一个独立的基础reactive类,而是直接在Observable的基础上进行改进了。

背压的主要问题是,许多hot sources比如UI event,它们不能合理地被背压,然后导致我们不想看到的MissingBackpressureException

注:hot sources直译是热来源,其真实表示的是hot Observable,也就是冷热Observable中的热Observable

对于这种情况,我们在2.x中这样尝试补救:将io.reactivex.Observable 设为非背压(non-backpressured),同时增加一个新的基础reactive类io.reactivex.Flowable,而这个类是可背压的(backpressure-enabled)

好消息是2.x的操作符几乎与之前保持一致。坏消息是大家在自动导包(organize imports)时需要注意,不要无意中就选择了不支持背压的io.reactivex.Observable

  • 注:项目如果使用了2.x之前的版本的RxJava,即使有些场景需要背压,但当时只能使用io.reactivex.Observable;所以当迁移到2.x时,要注意将这部分代码改成使用io.reactivex.Flowable,因为前者在 2.x时不支持背压。

什么时候使用Observable?

  • 当你的数据流最多也不会超过1000个元素(element)时,也就是说一段时间内只有很少的元素发射,所以你的应用不大可能发生内存溢出。
  • 当你处理GUI事件,例如鼠标或者触摸事件时,这些事件很难被合理地背压,并且不会频繁的发生。你可以使用Observable去处理频率小于等于1000赫兹的元素发送,并且尽量考虑使用sampling/debouncing 等操作符。
  • 本来你的数据流是异步的,但你的平台不支持Java流或者你分不清该使用Observable还是Flowable时,使用ObservableFlowable有更小的开销。

什么时候使用Flowable?

  • 当处理超过10k的元素,这些元素生成自某处并且具备某些特性,因此数据链(Chain)可以告诉来源(Source)去限制生成量。
  • 读取或者解析来自硬盘的文件自然而然地会产生阻塞(blocking),并且是基于拉取式(pull-based)的。在你的控制下,同样可以很好地处理好背压。比如:在特定的请求量下,你会读取多少行的数据。
  • 通过JDBC读取数据库同样是基于拉取式并且会产生阻塞,但是你可以通过调用ResultSet.next()得到很好的控制,并且通过它,几乎可以应对每一条下流的请求。
  • 网络(流)IO:网络请求或者是一些支持请求逻辑量的协议。
  • 一些阻塞和/或基于拉取式的数据源,但是未来也许会提供非阻塞响应式的API或者驱动。

2.  Consumer,Function

这里说的两个类指的是io.reactivex.functions包下的:

io.reactivex.functions包

先说Consumer:

2.x的 Consumer   等于   1.x的 Action1

2.x的 BiConsumer   等于   1.x的 Action2

2.x的 Consumer<Object[]>   等于   1.x的 ActionN

2.x的 Action   等于   1.x的 Action0

2.x中   没有   1.x中的Action3~Action9

再说Function:

2.x的 Function   等于   1.x的 Func

2.x的 BiFunction   等于   1.x的 Func2

2.x的 Function3~ Fucntion9   等于   1.x的 Func3~Func9

2.x的 Function<Object[], R>   等于   1.x的 FuncN

很明显1.x的命名不太规范,2.x中采用通用的相对合理的命名。
然而2.x命名规范并不是RxJava自己设计的,而是与Java1.8中相同功能的同名类的命名保持一致(不仅是类名,还有方法名)

Java1.8中新增一个包:java.util.function
来大体浏览下这个包(非所有类)

java.util.function包

可以看出:Java1.8中没有ActionFunction3~Function9

Consumer的作用

Consumer描述了这样的一种操作(operation):接收一个传入的参数(argument),并且不返回结果(result)。
使用Consumer的目的是,根据传入的值,来做相应的事。所以重点是accept方法:

 * @since 1.8
 */
@FunctionalInterface
public interface Consumer<T> {

    /**
     * Performs this operation on the given argument.
     *
     * @param t the input argument
     */
    void accept(T t);

同理:BiConsumer是接收两个传入的参数,并且不返回结果

 * @since 1.8
 */
@FunctionalInterface
public interface BiConsumer<T, U> {

    /**
     * Performs this operation on the given arguments.
     *
     * @param t the first input argument
     * @param u the second input argument
     */
    void accept(T t, U u);

上面2个源码是Java1.8的,因为注释比较详细。。。
下面看下RxJava2.x的:

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}
/**
 * A functional interface (callback) that accepts two values (of possibly different types).
 * @param <T1> the first value type
 * @param <T2> the second value type
 */
public interface BiConsumer<T1, T2> {

    /**
     * Performs an operation on the given values.
     * @param t1 the first value
     * @param t2 the second value
     * @throws Exception on error
     */
    void accept(T1 t1, T2 t2) throws Exception;
}
/**
 * A functional interface similar to Runnable but allows throwing a checked exception.
 */
public interface Action {
    /**
     * Runs the action and optionally throws a checked exception.
     * @throws Exception if the implementation wishes to throw a checked exception
     */
    void run() throws Exception;
}

对于Consumer,RxJava2.x与Java1.8最大的区别是:
accept方法默认都会抛出Exception,这也是2.x新加入的特性。
(另外稍微提下,后者比前者多了一个andThen方法。)

Function的作用

Function描述了这样的一种功能(function):接收一个参数(argument),并且产生一个结果(result)。
使用Function的目的是,根据传入的值,来输出一个值。所以重点是apply方法:

 * @since 1.8
 */
@FunctionalInterface
public interface Function<T, R> {

    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @return the function result
     */
    R apply(T t);

同理:BiFunction是接收两个参数,并且产生一个结果

* @see Function
 * @since 1.8
 */
@FunctionalInterface
public interface BiFunction<T, U, R> {

    /**
     * Applies this function to the given arguments.
     *
     * @param t the first function argument
     * @param u the second function argument
     * @return the function result
     */
    R apply(T t, U u);

同样也比较下Rxjava2.x的:

/**
 * A functional interface that takes a value and returns another value, possibly with a
 * different type and allows throwing a checked exception.
 *
 * @param <T> the input value type
 * @param <R> the output value type
 */
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}
/**
 * A functional interface (callback) that computes a value based on multiple input values.
 * @param <T1> the first value type
 * @param <T2> the second value type
 * @param <R> the result type
 */
public interface BiFunction<T1, T2, R> {

    /**
     * Calculate a value based on the input values.
     * @param t1 the first value
     * @param t2 the second value
     * @return the result value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}
/**
 * A functional interface (callback) that computes a value based on multiple input values.
 * @param <T1> the first value type
 * @param <T2> the second value type
 * @param <T3> the third value type
 * @param <R> the result type
 */
public interface Function3<T1, T2, T3, R> {
    /**
     * Calculate a value based on the input values.
     * @param t1 the first value
     * @param t2 the second value
     * @param t3 the third value
     * @return the result value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3) throws Exception;
}

同样,对于Function,RxJava2.x与Java1.8最大的区别是:
apply方法默认都会抛出Exception,这也是2.x新加入的特性。
(另外稍微提下,后者比前者多了一个andThen方法。)

最后,大家可能有疑问:BiConsumerBiFcuntionBi是什么意思?
答案就是:Binary 。当然这里不是"二进制"的意思,而是"两"个参数的意思。

3.  观察者和被观察者

来看段经典的设计模式书籍---Head First怎么说?

出版者 + 订阅者 = 观察者模式

如果你了解报纸的订阅是怎么回事,其实就知道观察者模式是怎么回事,只是名称不太一样:出版者改称为“主题”(Subject),订阅者改称为“观察者”(Observer)

所以上面出现四个关键字:

Publisher(出版者) -> Subject(主题)

Subscriber(订阅者) -> Observer(观察者)

比较:Observable,ObservableSource,Flowable,Publisher

2.x 和 1.x中Observable的父类都是Object
但前者额外实现了ObservableSource接口,里面只有一个方法:

void subscribe(@NonNull Observer<? super T> observer)

两者在2.x 和 1.x 的包名分别为:io.reactivexrx

与2.x的Observable相似,Flowbale也实现了一个接口,
该接口就是Publisher!
同样只有一个方法:

void subscribe(Subscriber<? super T> s);

需要注意的是,Publisher并没有在io.reactivex包内,而是在org.reactivestreams内,该包内只有4个类,但每个都非常重要:

org.reactivestreams包

比较:Observer,Subscriber,Subscription,Disposable

先看下1.x的Observablesubscribe()方法:

subscribe()方法

如上图所示:每个方法subscribe()的方法都返回Subscription对象
Subscription是一个接口,就2个方法:unsubscribe()isUnsubscribed()

然后看下Observer,同样是一个接口,就那3个最常用的方法:onNext()onComplete()onError()

Subscriber刚好同时实现了ObserverSubscription

public abstract class Subscriber<T> implements Observer<T>, Subscription

虽然Observablesubscribe()时可以传入Observer,但实际处理时,会先把Observer转为Subscriber

 public final Subscription subscribe(final Observer<? super T> observer) {
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    if (observer == null) {
        throw new NullPointerException("observer is null");
    }
    return subscribe(new ObserverSubscriber<T>(observer));
}

上面是对于1.x的ObserverSubscriptionSubscriber的简单理解,前两者是接口,最后一个是抽象类,并且都在rx包内。


来看下2.x,先是Observablesubscribe()方法:

Observable 的 subscribe()方法

与1.x的不同点:

返回值是Disposable而不再是Subscription

subscribe的是Observer时,并没有返回值。

并且不能1.x那样subscribe一个Subscriber

先看下Disposable,只有两个方法:dispose()isDisposed()。看到这里应该会有下意识的反应:Disposable与1.x的Subscription如出一辙!很明显,它确实是这个作用。

然后我们来看下Observer,它仍然是一个接口,但比1.x多出一个方法:

void onSubscribe(@NonNull Disposable d);

到此,就可以解释为什么当Observable subscribe的是Observer时没有返回值,因为Observer内部的方法已经提供了Disposable的引用
同时,还由于Disposable与1.x的Subscription的作用相同,而1.x的Subscriber实现了ObserverSubscription所以实际上,2.x的Observer扮演了1.x中Subscriber的角色!

下面再看下Flowable

Flowable 的 subscribe()方法

如上图所示:

Flowable的前5个方法同Observable的一致,返回值都Disposable

但是Flowable只能subscribe的是Subscriber,而非Observer
(FlowableSubscriberSubscriber的子类,也是个接口,两者区别是前者的onSubscribe方法不可以传空对象,后者可以),

所以我们重点看下2.x的Subscriber

2.x的Subscriber是个接口(1.x是抽象类),与2.x的Observer非常相似,除了onNextonCompleteonError外,还有:

void onSubscribe(Subscription s);

这里又出现了Subscription,同样有2个方法,但是不同于1.x的unsubscribe()isUnsubscribed(),以及2.xDisposabledispose()isDisposed(),它的两个方法是:

/**
 * No events will be sent by a {@link Publisher} until demand is signaled via this method.
 *
 * It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE.
 * An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded".
 *
 * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
 * 
 * A {@link Publisher} can send less than is requested if the stream ends but
 * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.
 *
 * @param n the strictly positive number of elements to requests to the upstream {@link Publisher}
 */
void request(long n);

/**
 * Request the {@link Publisher} to stop sending data and clean up resources.
 * 
 * Data may still be sent to meet previously signalled demand after calling cancel.
 */
void cancel();

request()的重要性及作用就不多说了,说下cancel()
Disposabledispose()和1.x Subscriptionunsubscribe()仅仅是不再接收上流数据,并不影响上流数据的发送。
而2.x Subscriptioncancel()所做的是,直接让上流来源停止发送数据,并且清空数据。

最后还有小注意点:

2.x的ObservableFlowablesubscribe的都是Consumer的时候,两者的返回值都是Disposable

而2.x的ObserverSubscriber的区别只有onSubscribe方法(都有onNextonCompleteonError三个方法),且两者onSubscribe方法的区别只是:前者的接收的参数是Disposable,而后者是Subscription。也就是说Subscription只有在使用Subscriber时才会用到,而Subscriber只有在使用Flowable时才会用到。
所以Subscription只有在使用Flowable时才会用到

另外:2.xSubscriberSubscription并不在io.reactivex内,
而是在org.reactivestreams中(就是介绍Publisher时贴出的4个类)。

比较:Subject

这个就简单了,1.x 中的Subject 与 2.x 中的Subject所用是一致的:

Represents an Observer and an Observable at the same time

但由于2.x中加入了Flowable,也就意味着2.x中的Subject的覆盖范围没有1.x中那么广。两者分别位于2.x 和 1.x 的包:io.reactivex.subjectsrx.subjects

4. 操作符的决策树

源自:A Decision Tree of Observable Operators

我想创建一个Observable

只需要发射一个item:Just

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342

推荐阅读更多精彩内容